use std::{sync::Arc, time::Instant};
use url::Url;
use darkfi::{
blockchain::BlockInfo,
rpc::{
client::RpcClient,
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
util::JsonValue,
},
system::{Publisher, StoppableTask},
tx::Transaction,
util::encoding::base64,
Error, Result,
};
use darkfi_sdk::{
crypto::{ContractId, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID, MONEY_CONTRACT_ID},
tx::TransactionHash,
};
use darkfi_serial::{deserialize_async, serialize_async};
use crate::{
error::{WalletDbError, WalletDbResult},
money::{MONEY_INFO_COL_LAST_SCANNED_BLOCK, MONEY_INFO_TABLE},
Drk,
};
impl Drk {
pub async fn subscribe_blocks(
&self,
endpoint: Url,
ex: Arc<smol::Executor<'static>>,
) -> Result<()> {
let rep = self
.darkfid_daemon_request("blockchain.last_known_block", &JsonValue::Array(vec![]))
.await?;
let last_known = *rep.get::<f64>().unwrap() as u32;
let last_scanned = match self.last_scanned_block() {
Ok(l) => l,
Err(e) => {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
)))
}
};
if last_known != last_scanned {
eprintln!("Warning: Last scanned block is not the last known block.");
eprintln!("You should first fully scan the blockchain, and then subscribe");
return Err(Error::DatabaseError(
"[subscribe_blocks] Blockchain not fully scanned".to_string(),
))
}
println!("Subscribing to receive notifications of incoming blocks");
let publisher = Publisher::new();
let subscription = publisher.clone().subscribe().await;
let _publisher = publisher.clone();
let _ex = ex.clone();
StoppableTask::new().start(
async move {
let rpc_client = RpcClient::new(endpoint, _ex).await?;
let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
rpc_client.subscribe(req, _publisher).await
},
|res| async move {
match res {
Ok(()) => { }
Err(e) => {
eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
publisher
.notify(JsonResult::Error(JsonError::new(
ErrorCode::InternalError,
None,
0,
)))
.await;
}
}
},
Error::RpcServerStopped,
ex,
);
println!("Detached subscription to background");
println!("All is good. Waiting for block notifications...");
let e = loop {
match subscription.receive().await {
JsonResult::Notification(n) => {
println!("Got Block notification from darkfid subscription");
if n.method != "blockchain.subscribe_blocks" {
break Error::UnexpectedJsonRpc(format!(
"Got foreign notification from darkfid: {}",
n.method
))
}
if !n.params.is_array() {
break Error::UnexpectedJsonRpc(
"Received notification params are not an array".to_string(),
)
}
let params = n.params.get::<Vec<JsonValue>>().unwrap();
if params.is_empty() {
break Error::UnexpectedJsonRpc(
"Notification parameters are empty".to_string(),
)
}
for param in params {
let param = param.get::<String>().unwrap();
let bytes = base64::decode(param).unwrap();
let block_data: BlockInfo = deserialize_async(&bytes).await?;
println!("Deserialized successfully. Scanning block...");
if let Err(e) = self.scan_block(&block_data).await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning block failed: {e:?}"
)))
}
let txs_hashes = match self.insert_tx_history_records(&block_data.txs).await {
Ok(hashes) => hashes,
Err(e) => {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Inserting transaction history records failed: {e:?}"
)))
},
};
if let Err(e) =
self.update_tx_history_records_status(&txs_hashes, "Finalized")
{
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Update transaction history record status failed: {e:?}"
)))
}
}
}
JsonResult::Error(e) => {
break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
}
x => {
break Error::UnexpectedJsonRpc(format!(
"Got unexpected data from JSON-RPC: {x:?}"
))
}
}
};
Err(e)
}
async fn scan_block(&self, block: &BlockInfo) -> Result<()> {
println!("=======================================");
println!("{}", block.header);
println!("=======================================");
println!("[scan_block] Iterating over {} transactions", block.txs.len());
for tx in block.txs.iter() {
let tx_hash = tx.hash().to_string();
println!("[scan_block] Processing transaction: {tx_hash}");
for (i, call) in tx.calls.iter().enumerate() {
if call.data.contract_id == *MONEY_CONTRACT_ID {
println!("[scan_block] Found Money contract in call {i}");
self.apply_tx_money_data(i, &tx.calls, &tx_hash).await?;
continue
}
if call.data.contract_id == *DAO_CONTRACT_ID {
println!("[scan_block] Found DAO contract in call {i}");
self.apply_tx_dao_data(
&call.data.data,
TransactionHash::new(*blake3::hash(&serialize_async(tx).await).as_bytes()),
i as u8,
)
.await?;
continue
}
if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
println!("[scan_block] Found DeployoOor contract in call {i}");
continue
}
println!("[scan_block] Found non-native contract in call {i}, skipping.");
}
}
let query =
format!("UPDATE {} SET {} = ?1;", *MONEY_INFO_TABLE, MONEY_INFO_COL_LAST_SCANNED_BLOCK);
if let Err(e) = self.wallet.exec_sql(&query, rusqlite::params![block.header.height]) {
return Err(Error::DatabaseError(format!(
"[scan_block] Update last scanned block failed: {e:?}"
)))
}
Ok(())
}
pub async fn scan_blocks(&self, reset: bool) -> WalletDbResult<()> {
let mut height = self.last_scanned_block()?;
if height == 0 || reset {
self.reset_money_tree().await?;
self.reset_money_smt()?;
self.reset_money_coins()?;
self.reset_dao_trees().await?;
self.reset_daos().await?;
self.reset_dao_proposals().await?;
self.reset_dao_votes()?;
self.update_all_tx_history_records_status("Rejected")?;
height = 0;
} else {
height += 1;
};
loop {
let rep = match self
.darkfid_daemon_request("blockchain.last_known_block", &JsonValue::Array(vec![]))
.await
{
Ok(r) => r,
Err(e) => {
eprintln!("[scan_blocks] RPC client request failed: {e:?}");
return Err(WalletDbError::GenericError)
}
};
let last = *rep.get::<f64>().unwrap() as u32;
println!("Requested to scan from block number: {height}");
println!("Last known block number reported by darkfid: {last}");
if height > last {
return Ok(())
}
while height <= last {
println!("Requesting block {height}...");
let block = match self.get_block_by_height(height).await {
Ok(r) => r,
Err(e) => {
eprintln!("[scan_blocks] RPC client request failed: {e:?}");
return Err(WalletDbError::GenericError)
}
};
println!("Block {height} received! Scanning block...");
if let Err(e) = self.scan_block(&block).await {
eprintln!("[scan_blocks] Scan block failed: {e:?}");
return Err(WalletDbError::GenericError)
};
let txs_hashes = self.insert_tx_history_records(&block.txs).await?;
self.update_tx_history_records_status(&txs_hashes, "Finalized")?;
height += 1;
}
}
}
async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
let params = self
.darkfid_daemon_request(
"blockchain.get_block",
&JsonValue::Array(vec![JsonValue::String(height.to_string())]),
)
.await?;
let param = params.get::<String>().unwrap();
let bytes = base64::decode(param).unwrap();
let block = deserialize_async(&bytes).await?;
Ok(block)
}
pub async fn broadcast_tx(&self, tx: &Transaction) -> Result<String> {
println!("Broadcasting transaction...");
let params =
JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
let rep = self.darkfid_daemon_request("tx.broadcast", ¶ms).await?;
let txid = rep.get::<String>().unwrap().clone();
if let Err(e) = self.insert_tx_history_record(tx).await {
return Err(Error::DatabaseError(format!(
"[broadcast_tx] Inserting transaction history record failed: {e:?}"
)))
}
Ok(txid)
}
pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
let tx_hash_str = tx_hash.to_string();
match self
.darkfid_daemon_request(
"blockchain.get_tx",
&JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
)
.await
{
Ok(param) => {
let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
let tx = deserialize_async(&tx_bytes).await?;
Ok(Some(tx))
}
Err(_) => Ok(None),
}
}
pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
let tx_str = base64::encode(&serialize_async(tx).await);
let rep = self
.darkfid_daemon_request(
"tx.simulate",
&JsonValue::Array(vec![JsonValue::String(tx_str)]),
)
.await?;
let is_valid = *rep.get::<bool>().unwrap();
Ok(is_valid)
}
pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", ¶ms).await?;
let params = rep.get::<Vec<JsonValue>>().unwrap();
let mut ret = Vec::with_capacity(params.len());
for param in params {
let zkas_ns = param[0].get::<String>().unwrap().clone();
let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
ret.push((zkas_ns, zkas_bincode_bytes));
}
Ok(ret)
}
pub async fn get_tx_gas(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
let params = JsonValue::Array(vec![
JsonValue::String(base64::encode(&serialize_async(tx).await)),
JsonValue::Boolean(include_fee),
]);
let rep = self.darkfid_daemon_request("tx.calculate_gas", ¶ms).await?;
let gas = *rep.get::<f64>().unwrap() as u64;
Ok(gas)
}
pub async fn get_next_block_height(&self) -> Result<u32> {
let rep = self
.darkfid_daemon_request(
"blockchain.best_fork_next_block_height",
&JsonValue::Array(vec![]),
)
.await?;
let next_height = *rep.get::<f64>().unwrap() as u32;
Ok(next_height)
}
pub async fn get_block_target(&self) -> Result<u32> {
let rep = self
.darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
.await?;
let next_height = *rep.get::<f64>().unwrap() as u32;
Ok(next_height)
}
pub async fn ping(&self) -> Result<()> {
println!("Executing ping request to darkfid...");
let latency = Instant::now();
let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
let latency = latency.elapsed();
println!("Got reply: {rep:?}");
println!("Latency: {latency:?}");
Ok(())
}
pub async fn darkfid_daemon_request(
&self,
method: &str,
params: &JsonValue,
) -> Result<JsonValue> {
let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
let req = JsonRequest::new(method, params.clone());
let rep = rpc_client.request(req).await?;
Ok(rep)
}
pub async fn stop_rpc_client(&self) -> Result<()> {
if let Some(ref rpc_client) = self.rpc_client {
rpc_client.stop().await;
};
Ok(())
}
}