1use std::{sync::Arc, time::Instant};
20
21use url::Url;
22
23use darkfi::{
24 blockchain::BlockInfo,
25 rpc::{
26 client::RpcClient,
27 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
28 util::JsonValue,
29 },
30 system::{Publisher, StoppableTask},
31 tx::Transaction,
32 util::encoding::base64,
33 Error, Result,
34};
35use darkfi_sdk::{
36 crypto::{ContractId, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID, MONEY_CONTRACT_ID},
37 tx::TransactionHash,
38};
39use darkfi_serial::{deserialize_async, serialize_async};
40
41use crate::{
42 error::{WalletDbError, WalletDbResult},
43 Drk,
44};
45
46impl Drk {
47 pub async fn subscribe_blocks(
56 &self,
57 endpoint: Url,
58 ex: Arc<smol::Executor<'static>>,
59 ) -> Result<()> {
60 let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
62
63 if last_confirmed_height == 0 {
65 if let Err(e) = self.scan_blocks().await {
66 return Err(Error::DatabaseError(format!(
67 "[subscribe_blocks] Scanning from genesis block failed: {e:?}"
68 )))
69 }
70 }
71
72 let (last_confirmed_height, last_confirmed_hash) = self.get_last_confirmed_block().await?;
74
75 let (mut last_scanned_height, last_scanned_hash) = match self.get_last_scanned_block() {
77 Ok(last) => last,
78 Err(e) => {
79 return Err(Error::DatabaseError(format!(
80 "[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
81 )))
82 }
83 };
84
85 if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash
87 {
88 eprintln!("Warning: Last scanned block is not the last confirmed block.");
89 eprintln!("You should first fully scan the blockchain, and then subscribe");
90 return Err(Error::DatabaseError(
91 "[subscribe_blocks] Blockchain not fully scanned".to_string(),
92 ))
93 }
94
95 println!("Subscribing to receive notifications of incoming blocks");
96 let publisher = Publisher::new();
97 let subscription = publisher.clone().subscribe().await;
98 let _publisher = publisher.clone();
99 let _ex = ex.clone();
100 StoppableTask::new().start(
101 async move {
103 let rpc_client = RpcClient::new(endpoint, _ex).await?;
104 let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
105 rpc_client.subscribe(req, _publisher).await
106 },
107 |res| async move {
108 match res {
109 Ok(()) => { }
110 Err(e) => {
111 eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
112 publisher
113 .notify(JsonResult::Error(JsonError::new(
114 ErrorCode::InternalError,
115 None,
116 0,
117 )))
118 .await;
119 }
120 }
121 },
122 Error::RpcServerStopped,
123 ex,
124 );
125 println!("Detached subscription to background");
126 println!("All is good. Waiting for block notifications...");
127
128 let e = loop {
129 match subscription.receive().await {
130 JsonResult::Notification(n) => {
131 println!("Got Block notification from darkfid subscription");
132 if n.method != "blockchain.subscribe_blocks" {
133 break Error::UnexpectedJsonRpc(format!(
134 "Got foreign notification from darkfid: {}",
135 n.method
136 ))
137 }
138
139 if !n.params.is_array() {
141 break Error::UnexpectedJsonRpc(
142 "Received notification params are not an array".to_string(),
143 )
144 }
145 let params = n.params.get::<Vec<JsonValue>>().unwrap();
146 if params.is_empty() {
147 break Error::UnexpectedJsonRpc(
148 "Notification parameters are empty".to_string(),
149 )
150 }
151
152 for param in params {
153 let param = param.get::<String>().unwrap();
154 let bytes = base64::decode(param).unwrap();
155
156 let block: BlockInfo = deserialize_async(&bytes).await?;
157 println!("Deserialized successfully. Scanning block...");
158
159 if block.header.height <= last_scanned_height {
161 let reset_height = block.header.height.saturating_sub(1);
162 if let Err(e) = self.reset_to_height(reset_height).await {
163 return Err(Error::DatabaseError(format!(
164 "[subscribe_blocks] Wallet state reset failed: {e:?}"
165 )))
166 }
167
168 if reset_height == 0 {
170 let genesis = match self.get_block_by_height(reset_height).await {
171 Ok(b) => b,
172 Err(e) => {
173 return Err(Error::Custom(format!(
174 "[subscribe_blocks] RPC client request failed: {e:?}"
175 )))
176 }
177 };
178 if let Err(e) = self.scan_block(&genesis).await {
179 return Err(Error::DatabaseError(format!(
180 "[subscribe_blocks] Scanning block failed: {e:?}"
181 )))
182 };
183 }
184 }
185
186 if let Err(e) = self.scan_block(&block).await {
187 return Err(Error::DatabaseError(format!(
188 "[subscribe_blocks] Scanning block failed: {e:?}"
189 )))
190 }
191
192 last_scanned_height = block.header.height;
194 }
195 }
196
197 JsonResult::Error(e) => {
198 break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
200 }
201
202 x => {
203 break Error::UnexpectedJsonRpc(format!(
205 "Got unexpected data from JSON-RPC: {x:?}"
206 ))
207 }
208 }
209 };
210
211 Err(e)
212 }
213
214 async fn scan_block(&self, block: &BlockInfo) -> Result<()> {
218 self.reset_inverse_cache().await?;
220
221 let mut wallet_txs = vec![];
223 println!("=======================================");
224 println!("{}", block.header);
225 println!("=======================================");
226 println!("[scan_block] Iterating over {} transactions", block.txs.len());
227 for tx in block.txs.iter() {
228 let tx_hash = tx.hash();
229 let tx_hash_string = tx_hash.to_string();
230 let mut wallet_tx = false;
231 println!("[scan_block] Processing transaction: {tx_hash_string}");
232 for (i, call) in tx.calls.iter().enumerate() {
233 if call.data.contract_id == *MONEY_CONTRACT_ID {
234 println!("[scan_block] Found Money contract in call {i}");
235 if self.apply_tx_money_data(i, &tx.calls, &tx_hash_string).await? {
236 wallet_tx = true;
237 };
238 continue
239 }
240
241 if call.data.contract_id == *DAO_CONTRACT_ID {
242 println!("[scan_block] Found DAO contract in call {i}");
243 if self.apply_tx_dao_data(&call.data.data, tx_hash, i as u8).await? {
244 wallet_tx = true;
245 };
246 continue
247 }
248
249 if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
250 println!("[scan_block] Found DeployoOor contract in call {i}");
251 continue
253 }
254
255 println!("[scan_block] Found non-native contract in call {i}, skipping.");
257 }
258
259 if wallet_tx {
261 wallet_txs.push(tx);
262 }
263 }
264
265 if let Err(e) = self.put_tx_history_records(&wallet_txs, "Confirmed").await {
267 return Err(Error::DatabaseError(format!(
268 "[scan_block] Inserting transaction history records failed: {e:?}"
269 )))
270 }
271
272 self.store_inverse_cache(block.header.height, &block.hash().to_string())?;
274
275 Ok(())
276 }
277
278 pub async fn scan_blocks(&self) -> WalletDbResult<()> {
282 let (mut height, hash) = self.get_last_scanned_block()?;
284
285 let block = match self.get_block_by_height(height).await {
287 Ok(b) => Some(b),
288 Err(Error::JsonRpcError((-32121, _))) => None,
290 Err(e) => {
291 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
292 return Err(WalletDbError::GenericError)
293 }
294 };
295
296 if block.is_none() || hash != block.unwrap().hash().to_string() {
298 println!("A reorg has happened, finding last known common block...");
300 height = height.saturating_sub(1);
301 while height != 0 {
302 let (_, scanned_block_hash, _) = self.get_scanned_block_record(height)?;
304
305 let block = match self.get_block_by_height(height).await {
307 Ok(b) => Some(b),
308 Err(Error::JsonRpcError((-32121, _))) => None,
310 Err(e) => {
311 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
312 return Err(WalletDbError::GenericError)
313 }
314 };
315
316 if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
318 height = height.saturating_sub(1);
319 continue
320 }
321
322 println!("Last common block found: {height} - {scanned_block_hash}");
324 self.reset_to_height(height).await?;
325 break
326 }
327 }
328
329 if height == 0 {
332 self.reset().await?;
333 } else {
334 height += 1;
335 }
336
337 loop {
338 println!("Requested to scan from block number: {height}");
340 let (last_height, last_hash) = match self.get_last_confirmed_block().await {
341 Ok(last) => last,
342 Err(e) => {
343 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
344 return Err(WalletDbError::GenericError)
345 }
346 };
347 println!("Last confirmed block reported by darkfid: {last_height} - {last_hash}");
348
349 if height > last_height {
351 return Ok(())
352 }
353
354 while height <= last_height {
355 println!("Requesting block {height}...");
356 let block = match self.get_block_by_height(height).await {
357 Ok(b) => b,
358 Err(e) => {
359 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
360 return Err(WalletDbError::GenericError)
361 }
362 };
363 println!("Block {height} received! Scanning block...");
364 if let Err(e) = self.scan_block(&block).await {
365 eprintln!("[scan_blocks] Scan block failed: {e:?}");
366 return Err(WalletDbError::GenericError)
367 };
368 height += 1;
369 }
370 }
371 }
372
373 async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
375 let rep = self
376 .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
377 .await?;
378 let params = rep.get::<Vec<JsonValue>>().unwrap();
379 let height = *params[0].get::<f64>().unwrap() as u32;
380 let hash = params[1].get::<String>().unwrap().clone();
381
382 Ok((height, hash))
383 }
384
385 async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
387 let params = self
388 .darkfid_daemon_request(
389 "blockchain.get_block",
390 &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
391 )
392 .await?;
393 let param = params.get::<String>().unwrap();
394 let bytes = base64::decode(param).unwrap();
395 let block = deserialize_async(&bytes).await?;
396 Ok(block)
397 }
398
399 pub async fn broadcast_tx(&self, tx: &Transaction) -> Result<String> {
402 println!("Broadcasting transaction...");
403
404 let params =
405 JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
406 let rep = self.darkfid_daemon_request("tx.broadcast", ¶ms).await?;
407
408 let txid = rep.get::<String>().unwrap().clone();
409
410 if let Err(e) = self.put_tx_history_record(tx, "Broadcasted").await {
412 return Err(Error::DatabaseError(format!(
413 "[broadcast_tx] Inserting transaction history record failed: {e:?}"
414 )))
415 }
416
417 Ok(txid)
418 }
419
420 pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
422 let tx_hash_str = tx_hash.to_string();
423 match self
424 .darkfid_daemon_request(
425 "blockchain.get_tx",
426 &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
427 )
428 .await
429 {
430 Ok(param) => {
431 let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
432 let tx = deserialize_async(&tx_bytes).await?;
433 Ok(Some(tx))
434 }
435
436 Err(_) => Ok(None),
437 }
438 }
439
440 pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
442 let tx_str = base64::encode(&serialize_async(tx).await);
443 let rep = self
444 .darkfid_daemon_request(
445 "tx.simulate",
446 &JsonValue::Array(vec![JsonValue::String(tx_str)]),
447 )
448 .await?;
449
450 let is_valid = *rep.get::<bool>().unwrap();
451 Ok(is_valid)
452 }
453
454 pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
456 let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
457 let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", ¶ms).await?;
458 let params = rep.get::<Vec<JsonValue>>().unwrap();
459
460 let mut ret = Vec::with_capacity(params.len());
461 for param in params {
462 let zkas_ns = param[0].get::<String>().unwrap().clone();
463 let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
464 ret.push((zkas_ns, zkas_bincode_bytes));
465 }
466
467 Ok(ret)
468 }
469
470 pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
472 let params = JsonValue::Array(vec![
473 JsonValue::String(base64::encode(&serialize_async(tx).await)),
474 JsonValue::Boolean(include_fee),
475 ]);
476 let rep = self.darkfid_daemon_request("tx.calculate_fee", ¶ms).await?;
477
478 let fee = *rep.get::<f64>().unwrap() as u64;
479
480 Ok(fee)
481 }
482
483 pub async fn get_next_block_height(&self) -> Result<u32> {
485 let rep = self
486 .darkfid_daemon_request(
487 "blockchain.best_fork_next_block_height",
488 &JsonValue::Array(vec![]),
489 )
490 .await?;
491
492 let next_height = *rep.get::<f64>().unwrap() as u32;
493
494 Ok(next_height)
495 }
496
497 pub async fn get_block_target(&self) -> Result<u32> {
499 let rep = self
500 .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
501 .await?;
502
503 let next_height = *rep.get::<f64>().unwrap() as u32;
504
505 Ok(next_height)
506 }
507
508 pub async fn ping(&self) -> Result<()> {
510 println!("Executing ping request to darkfid...");
511 let latency = Instant::now();
512 let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
513 let latency = latency.elapsed();
514 println!("Got reply: {rep:?}");
515 println!("Latency: {latency:?}");
516 Ok(())
517 }
518
519 pub async fn darkfid_daemon_request(
521 &self,
522 method: &str,
523 params: &JsonValue,
524 ) -> Result<JsonValue> {
525 let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
526 let req = JsonRequest::new(method, params.clone());
527 let rep = rpc_client.request(req).await?;
528 Ok(rep)
529 }
530
531 pub async fn stop_rpc_client(&self) -> Result<()> {
533 if let Some(ref rpc_client) = self.rpc_client {
534 rpc_client.stop().await;
535 };
536 Ok(())
537 }
538}