1use std::{sync::Arc, time::Instant};
20
21use url::Url;
22
23use darkfi::{
24 blockchain::BlockInfo,
25 rpc::{
26 client::RpcClient,
27 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
28 util::JsonValue,
29 },
30 system::{Publisher, StoppableTask},
31 tx::Transaction,
32 util::encoding::base64,
33 Error, Result,
34};
35use darkfi_sdk::{
36 crypto::{ContractId, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID, MONEY_CONTRACT_ID},
37 tx::TransactionHash,
38};
39use darkfi_serial::{deserialize_async, serialize_async};
40
41use crate::{
42 error::{WalletDbError, WalletDbResult},
43 Drk,
44};
45
46impl Drk {
47 pub async fn subscribe_blocks(
56 &self,
57 endpoint: Url,
58 ex: Arc<smol::Executor<'static>>,
59 ) -> Result<()> {
60 let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
62
63 if last_confirmed_height == 0 {
65 if let Err(e) = self.scan_blocks().await {
66 return Err(Error::DatabaseError(format!(
67 "[subscribe_blocks] Scanning from genesis block failed: {e:?}"
68 )))
69 }
70 }
71
72 let (last_confirmed_height, last_confirmed_hash) = self.get_last_confirmed_block().await?;
74
75 let (mut last_scanned_height, last_scanned_hash) = match self.get_last_scanned_block() {
77 Ok(last) => last,
78 Err(e) => {
79 return Err(Error::DatabaseError(format!(
80 "[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
81 )))
82 }
83 };
84
85 if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash
87 {
88 eprintln!("Warning: Last scanned block is not the last confirmed block.");
89 eprintln!("You should first fully scan the blockchain, and then subscribe");
90 return Err(Error::DatabaseError(
91 "[subscribe_blocks] Blockchain not fully scanned".to_string(),
92 ))
93 }
94
95 println!("Subscribing to receive notifications of incoming blocks");
96 let publisher = Publisher::new();
97 let subscription = publisher.clone().subscribe().await;
98 let _publisher = publisher.clone();
99 let _ex = ex.clone();
100 StoppableTask::new().start(
101 async move {
103 let rpc_client = RpcClient::new(endpoint, _ex).await?;
104 let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
105 rpc_client.subscribe(req, _publisher).await
106 },
107 |res| async move {
108 match res {
109 Ok(()) => { }
110 Err(e) => {
111 eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
112 publisher
113 .notify(JsonResult::Error(JsonError::new(
114 ErrorCode::InternalError,
115 None,
116 0,
117 )))
118 .await;
119 }
120 }
121 },
122 Error::RpcServerStopped,
123 ex,
124 );
125 println!("Detached subscription to background");
126 println!("All is good. Waiting for block notifications...");
127
128 let e = loop {
129 match subscription.receive().await {
130 JsonResult::Notification(n) => {
131 println!("Got Block notification from darkfid subscription");
132 if n.method != "blockchain.subscribe_blocks" {
133 break Error::UnexpectedJsonRpc(format!(
134 "Got foreign notification from darkfid: {}",
135 n.method
136 ))
137 }
138
139 if !n.params.is_array() {
141 break Error::UnexpectedJsonRpc(
142 "Received notification params are not an array".to_string(),
143 )
144 }
145 let params = n.params.get::<Vec<JsonValue>>().unwrap();
146 if params.is_empty() {
147 break Error::UnexpectedJsonRpc(
148 "Notification parameters are empty".to_string(),
149 )
150 }
151
152 for param in params {
153 let param = param.get::<String>().unwrap();
154 let bytes = base64::decode(param).unwrap();
155
156 let block: BlockInfo = deserialize_async(&bytes).await?;
157 println!("Deserialized successfully. Scanning block...");
158
159 if block.header.height <= last_scanned_height {
161 let reset_height = block.header.height.saturating_sub(1);
162 if let Err(e) = self.reset_to_height(reset_height).await {
163 return Err(Error::DatabaseError(format!(
164 "[subscribe_blocks] Wallet state reset failed: {e:?}"
165 )))
166 }
167
168 if reset_height == 0 {
170 let genesis = match self.get_block_by_height(reset_height).await {
171 Ok(b) => b,
172 Err(e) => {
173 return Err(Error::Custom(format!(
174 "[subscribe_blocks] RPC client request failed: {e:?}"
175 )))
176 }
177 };
178 if let Err(e) = self.scan_block(&genesis).await {
179 return Err(Error::DatabaseError(format!(
180 "[subscribe_blocks] Scanning block failed: {e:?}"
181 )))
182 };
183 }
184 }
185
186 if let Err(e) = self.scan_block(&block).await {
187 return Err(Error::DatabaseError(format!(
188 "[subscribe_blocks] Scanning block failed: {e:?}"
189 )))
190 }
191
192 last_scanned_height = block.header.height;
194 }
195 }
196
197 JsonResult::Error(e) => {
198 break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
200 }
201
202 x => {
203 break Error::UnexpectedJsonRpc(format!(
205 "Got unexpected data from JSON-RPC: {x:?}"
206 ))
207 }
208 }
209 };
210
211 Err(e)
212 }
213
214 async fn scan_block(&self, block: &BlockInfo) -> Result<()> {
218 self.reset_inverse_cache().await?;
220
221 let mut wallet_txs = vec![];
223 println!("=======================================");
224 println!("{}", block.header);
225 println!("=======================================");
226 println!("[scan_block] Iterating over {} transactions", block.txs.len());
227 for tx in block.txs.iter() {
228 let tx_hash = tx.hash().to_string();
229 let mut wallet_tx = false;
230 println!("[scan_block] Processing transaction: {tx_hash}");
231 for (i, call) in tx.calls.iter().enumerate() {
232 if call.data.contract_id == *MONEY_CONTRACT_ID {
233 println!("[scan_block] Found Money contract in call {i}");
234 if self.apply_tx_money_data(i, &tx.calls, &tx_hash).await? {
235 wallet_tx = true;
236 };
237 continue
238 }
239
240 if call.data.contract_id == *DAO_CONTRACT_ID {
241 println!("[scan_block] Found DAO contract in call {i}");
242 if self
243 .apply_tx_dao_data(
244 &call.data.data,
245 TransactionHash::new(
246 *blake3::hash(&serialize_async(tx).await).as_bytes(),
247 ),
248 i as u8,
249 )
250 .await?
251 {
252 wallet_tx = true;
253 };
254 continue
255 }
256
257 if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
258 println!("[scan_block] Found DeployoOor contract in call {i}");
259 continue
261 }
262
263 println!("[scan_block] Found non-native contract in call {i}, skipping.");
265 }
266
267 if wallet_tx {
269 wallet_txs.push(tx);
270 }
271 }
272
273 if let Err(e) = self.put_tx_history_records(&wallet_txs, "Confirmed").await {
275 return Err(Error::DatabaseError(format!(
276 "[scan_block] Inserting transaction history records failed: {e:?}"
277 )))
278 }
279
280 self.store_inverse_cache(block.header.height, &block.hash().to_string())?;
282
283 Ok(())
284 }
285
286 pub async fn scan_blocks(&self) -> WalletDbResult<()> {
290 let (mut height, hash) = self.get_last_scanned_block()?;
292
293 let block = match self.get_block_by_height(height).await {
295 Ok(b) => Some(b),
296 Err(Error::JsonRpcError((-32121, _))) => None,
298 Err(e) => {
299 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
300 return Err(WalletDbError::GenericError)
301 }
302 };
303
304 if block.is_none() || hash != block.unwrap().hash().to_string() {
306 println!("A reorg has happened, finding last known common block...");
308 height = height.saturating_sub(1);
309 while height != 0 {
310 let (_, scanned_block_hash, _) = self.get_scanned_block_record(height)?;
312
313 let block = match self.get_block_by_height(height).await {
315 Ok(b) => Some(b),
316 Err(Error::JsonRpcError((-32121, _))) => None,
318 Err(e) => {
319 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
320 return Err(WalletDbError::GenericError)
321 }
322 };
323
324 if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
326 height = height.saturating_sub(1);
327 continue
328 }
329
330 println!("Last common block found: {height} - {scanned_block_hash}");
332 self.reset_to_height(height).await?;
333 break
334 }
335 }
336
337 if height == 0 {
340 self.reset().await?;
341 } else {
342 height += 1;
343 }
344
345 loop {
346 println!("Requested to scan from block number: {height}");
348 let (last_height, last_hash) = match self.get_last_confirmed_block().await {
349 Ok(last) => last,
350 Err(e) => {
351 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
352 return Err(WalletDbError::GenericError)
353 }
354 };
355 println!("Last confirmed block reported by darkfid: {last_height} - {last_hash}");
356
357 if height > last_height {
359 return Ok(())
360 }
361
362 while height <= last_height {
363 println!("Requesting block {height}...");
364 let block = match self.get_block_by_height(height).await {
365 Ok(b) => b,
366 Err(e) => {
367 eprintln!("[scan_blocks] RPC client request failed: {e:?}");
368 return Err(WalletDbError::GenericError)
369 }
370 };
371 println!("Block {height} received! Scanning block...");
372 if let Err(e) = self.scan_block(&block).await {
373 eprintln!("[scan_blocks] Scan block failed: {e:?}");
374 return Err(WalletDbError::GenericError)
375 };
376 height += 1;
377 }
378 }
379 }
380
381 async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
383 let rep = self
384 .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
385 .await?;
386 let params = rep.get::<Vec<JsonValue>>().unwrap();
387 let height = *params[0].get::<f64>().unwrap() as u32;
388 let hash = params[1].get::<String>().unwrap().clone();
389
390 Ok((height, hash))
391 }
392
393 async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
395 let params = self
396 .darkfid_daemon_request(
397 "blockchain.get_block",
398 &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
399 )
400 .await?;
401 let param = params.get::<String>().unwrap();
402 let bytes = base64::decode(param).unwrap();
403 let block = deserialize_async(&bytes).await?;
404 Ok(block)
405 }
406
407 pub async fn broadcast_tx(&self, tx: &Transaction) -> Result<String> {
410 println!("Broadcasting transaction...");
411
412 let params =
413 JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
414 let rep = self.darkfid_daemon_request("tx.broadcast", ¶ms).await?;
415
416 let txid = rep.get::<String>().unwrap().clone();
417
418 if let Err(e) = self.put_tx_history_record(tx, "Broadcasted").await {
420 return Err(Error::DatabaseError(format!(
421 "[broadcast_tx] Inserting transaction history record failed: {e:?}"
422 )))
423 }
424
425 Ok(txid)
426 }
427
428 pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
430 let tx_hash_str = tx_hash.to_string();
431 match self
432 .darkfid_daemon_request(
433 "blockchain.get_tx",
434 &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
435 )
436 .await
437 {
438 Ok(param) => {
439 let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
440 let tx = deserialize_async(&tx_bytes).await?;
441 Ok(Some(tx))
442 }
443
444 Err(_) => Ok(None),
445 }
446 }
447
448 pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
450 let tx_str = base64::encode(&serialize_async(tx).await);
451 let rep = self
452 .darkfid_daemon_request(
453 "tx.simulate",
454 &JsonValue::Array(vec![JsonValue::String(tx_str)]),
455 )
456 .await?;
457
458 let is_valid = *rep.get::<bool>().unwrap();
459 Ok(is_valid)
460 }
461
462 pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
464 let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
465 let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", ¶ms).await?;
466 let params = rep.get::<Vec<JsonValue>>().unwrap();
467
468 let mut ret = Vec::with_capacity(params.len());
469 for param in params {
470 let zkas_ns = param[0].get::<String>().unwrap().clone();
471 let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
472 ret.push((zkas_ns, zkas_bincode_bytes));
473 }
474
475 Ok(ret)
476 }
477
478 pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
480 let params = JsonValue::Array(vec![
481 JsonValue::String(base64::encode(&serialize_async(tx).await)),
482 JsonValue::Boolean(include_fee),
483 ]);
484 let rep = self.darkfid_daemon_request("tx.calculate_fee", ¶ms).await?;
485
486 let fee = *rep.get::<f64>().unwrap() as u64;
487
488 Ok(fee)
489 }
490
491 pub async fn get_next_block_height(&self) -> Result<u32> {
493 let rep = self
494 .darkfid_daemon_request(
495 "blockchain.best_fork_next_block_height",
496 &JsonValue::Array(vec![]),
497 )
498 .await?;
499
500 let next_height = *rep.get::<f64>().unwrap() as u32;
501
502 Ok(next_height)
503 }
504
505 pub async fn get_block_target(&self) -> Result<u32> {
507 let rep = self
508 .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
509 .await?;
510
511 let next_height = *rep.get::<f64>().unwrap() as u32;
512
513 Ok(next_height)
514 }
515
516 pub async fn ping(&self) -> Result<()> {
518 println!("Executing ping request to darkfid...");
519 let latency = Instant::now();
520 let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
521 let latency = latency.elapsed();
522 println!("Got reply: {rep:?}");
523 println!("Latency: {latency:?}");
524 Ok(())
525 }
526
527 pub async fn darkfid_daemon_request(
529 &self,
530 method: &str,
531 params: &JsonValue,
532 ) -> Result<JsonValue> {
533 let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
534 let req = JsonRequest::new(method, params.clone());
535 let rep = rpc_client.request(req).await?;
536 Ok(rep)
537 }
538
539 pub async fn stop_rpc_client(&self) -> Result<()> {
541 if let Some(ref rpc_client) = self.rpc_client {
542 rpc_client.stop().await;
543 };
544 Ok(())
545 }
546}