1use std::{
20 collections::{BTreeMap, HashMap},
21 sync::Arc,
22 time::Instant,
23};
24
25use smol::channel::Sender;
26use url::Url;
27
28use darkfi::{
29 blockchain::BlockInfo,
30 rpc::{
31 client::RpcClient,
32 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
33 util::JsonValue,
34 },
35 system::{ExecutorPtr, Publisher, StoppableTaskPtr},
36 tx::Transaction,
37 util::encoding::base64,
38 Error, Result,
39};
40use darkfi_dao_contract::model::{DaoBulla, DaoProposalBulla};
41use darkfi_money_contract::model::TokenId;
42use darkfi_sdk::{
43 bridgetree::Position,
44 crypto::{
45 smt::{PoseidonFp, EMPTY_NODES_FP},
46 ContractId, MerkleTree, SecretKey, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID,
47 MONEY_CONTRACT_ID,
48 },
49 tx::TransactionHash,
50};
51use darkfi_serial::{deserialize_async, serialize_async};
52
53use crate::{
54 cache::{CacheOverlay, CacheSmt, CacheSmtStorage, SLED_MONEY_SMT_TREE},
55 cli_util::append_or_print,
56 dao::{SLED_MERKLE_TREES_DAO_DAOS, SLED_MERKLE_TREES_DAO_PROPOSALS},
57 error::{WalletDbError, WalletDbResult},
58 money::SLED_MERKLE_TREES_MONEY,
59 Drk, DrkPtr,
60};
61
62pub struct DarkfidRpcClient {
65 endpoint: Url,
66 ex: ExecutorPtr,
67 client: Option<RpcClient>,
68}
69
70impl DarkfidRpcClient {
71 pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Self {
72 let client = RpcClient::new(endpoint.clone(), ex.clone()).await.ok();
73 Self { endpoint, ex, client }
74 }
75
76 pub async fn stop(&self) {
78 if let Some(ref client) = self.client {
79 client.stop().await
80 }
81 }
82}
83
84pub struct ScanCache {
86 pub money_tree: MerkleTree,
88 pub money_smt: CacheSmt,
90 pub notes_secrets: Vec<SecretKey>,
92 pub owncoins_nullifiers: BTreeMap<[u8; 32], ([u8; 32], Position)>,
94 pub own_tokens: Vec<TokenId>,
96 pub dao_daos_tree: MerkleTree,
98 pub dao_proposals_tree: MerkleTree,
100 pub own_daos: HashMap<DaoBulla, (Option<SecretKey>, Option<SecretKey>)>,
102 pub own_proposals: HashMap<DaoProposalBulla, DaoBulla>,
104 pub own_deploy_auths: HashMap<[u8; 32], SecretKey>,
106 pub messages_buffer: Vec<String>,
108}
109
110impl ScanCache {
111 pub fn log(&mut self, msg: String) {
113 self.messages_buffer.push(msg);
114 }
115
116 pub fn flush_messages(&mut self) -> Vec<String> {
118 self.messages_buffer.drain(..).collect()
119 }
120}
121
122impl Drk {
123 pub async fn scan_cache(&self) -> Result<ScanCache> {
126 let money_tree = self.get_money_tree().await?;
127 let smt_store = CacheSmtStorage::new(CacheOverlay::new(&self.cache)?, SLED_MONEY_SMT_TREE);
128 let money_smt = CacheSmt::new(smt_store, PoseidonFp::new(), &EMPTY_NODES_FP);
129 let mut notes_secrets = self.get_money_secrets().await?;
130 let mut owncoins_nullifiers = BTreeMap::new();
131 for coin in self.get_coins(true).await? {
132 owncoins_nullifiers.insert(
133 coin.0.nullifier().to_bytes(),
134 (coin.0.coin.to_bytes(), coin.0.leaf_position),
135 );
136 }
137 let mint_authorities = self.get_mint_authorities().await?;
138 let mut own_tokens = Vec::with_capacity(mint_authorities.len());
139 for (token, _, _, _, _) in mint_authorities {
140 own_tokens.push(token);
141 }
142 let (dao_daos_tree, dao_proposals_tree) = self.get_dao_trees().await?;
143 let mut own_daos = HashMap::new();
144 for dao in self.get_daos().await? {
145 own_daos.insert(
146 dao.bulla(),
147 (dao.params.proposals_secret_key, dao.params.votes_secret_key),
148 );
149 if let Some(secret_key) = dao.params.notes_secret_key {
150 notes_secrets.push(secret_key);
151 }
152 }
153 let mut own_proposals = HashMap::new();
154 for proposal in self.get_proposals().await? {
155 own_proposals.insert(proposal.bulla(), proposal.proposal.dao_bulla);
156 }
157 let own_deploy_auths = self.get_deploy_auths_keys_map().await?;
158
159 Ok(ScanCache {
160 money_tree,
161 money_smt,
162 notes_secrets,
163 owncoins_nullifiers,
164 own_tokens,
165 dao_daos_tree,
166 dao_proposals_tree,
167 own_daos,
168 own_proposals,
169 own_deploy_auths,
170 messages_buffer: vec![],
171 })
172 }
173
174 async fn scan_block(&self, scan_cache: &mut ScanCache, block: &BlockInfo) -> Result<()> {
177 let mut wallet_txs = vec![];
179
180 scan_cache.money_tree.checkpoint(block.header.height as usize);
182 scan_cache.dao_daos_tree.checkpoint(block.header.height as usize);
183 scan_cache.dao_proposals_tree.checkpoint(block.header.height as usize);
184
185 scan_cache.log(String::from("======================================="));
187 scan_cache.log(format!("{}", block.header));
188 scan_cache.log(String::from("======================================="));
189 scan_cache.log(format!("[scan_block] Iterating over {} transactions", block.txs.len()));
190 let mut block_signing_key = None;
191 for tx in block.txs.iter() {
192 let tx_hash = tx.hash();
193 let tx_hash_string = tx_hash.to_string();
194 let mut wallet_tx = false;
195 scan_cache.log(format!("[scan_block] Processing transaction: {tx_hash_string}"));
196 for (i, call) in tx.calls.iter().enumerate() {
197 if call.data.contract_id == *MONEY_CONTRACT_ID {
198 scan_cache.log(format!("[scan_block] Found Money contract in call {i}"));
199 let (is_wallet_tx, signing_key) = self
200 .apply_tx_money_data(
201 scan_cache,
202 &i,
203 &tx.calls,
204 &tx_hash_string,
205 &block.header.height,
206 )
207 .await?;
208 if is_wallet_tx {
209 wallet_tx = true;
210 if signing_key.is_some() {
212 block_signing_key = signing_key;
213 }
214 }
215 continue
216 }
217
218 if call.data.contract_id == *DAO_CONTRACT_ID {
219 scan_cache.log(format!("[scan_block] Found DAO contract in call {i}"));
220 if self
221 .apply_tx_dao_data(
222 scan_cache,
223 &call.data.data,
224 &tx_hash,
225 &(i as u8),
226 &block.header.height,
227 )
228 .await?
229 {
230 wallet_tx = true;
231 }
232 continue
233 }
234
235 if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
236 scan_cache.log(format!("[scan_block] Found DeployoOor contract in call {i}"));
237 if self
238 .apply_tx_deploy_data(
239 scan_cache,
240 &call.data.data,
241 &tx_hash,
242 &block.header.height,
243 )
244 .await?
245 {
246 wallet_tx = true;
247 }
248 continue
249 }
250
251 scan_cache
253 .log(format!("[scan_block] Found non-native contract in call {i}, skipping."));
254 }
255
256 if wallet_tx {
258 wallet_txs.push(tx);
259 }
260 }
261
262 scan_cache.money_smt.store.overlay.insert_scanned_block(
264 &block.header.height,
265 &block.header.hash(),
266 &block_signing_key,
267 )?;
268
269 let diff = scan_cache.money_smt.store.overlay.0.diff(&[])?;
271
272 scan_cache.money_smt.store.overlay.0.apply_diff(&diff)?;
274
275 self.cache.insert_state_inverse_diff(&block.header.height, &diff.inverse())?;
277
278 self.cache.insert_merkle_trees(&[
280 (SLED_MERKLE_TREES_MONEY, &scan_cache.money_tree),
281 (SLED_MERKLE_TREES_DAO_DAOS, &scan_cache.dao_daos_tree),
282 (SLED_MERKLE_TREES_DAO_PROPOSALS, &scan_cache.dao_proposals_tree),
283 ])?;
284
285 self.cache.sled_db.flush()?;
287
288 if let Err(e) =
290 self.put_tx_history_records(&wallet_txs, "Confirmed", Some(block.header.height)).await
291 {
292 return Err(Error::DatabaseError(format!(
293 "[scan_block] Inserting transaction history records failed: {e}"
294 )))
295 }
296
297 Ok(())
298 }
299
300 pub async fn scan_blocks(
304 &self,
305 output: &mut Vec<String>,
306 sender: Option<&Sender<Vec<String>>>,
307 print: &bool,
308 ) -> WalletDbResult<()> {
309 let (mut height, hash) = self.get_last_scanned_block()?;
311
312 let block = match self.get_block_by_height(height).await {
314 Ok(b) => Some(b),
315 Err(Error::JsonRpcError((-32121, _))) => None,
317 Err(e) => {
318 append_or_print(
319 output,
320 sender,
321 print,
322 vec![format!("[scan_blocks] RPC client request failed: {e}")],
323 )
324 .await;
325 return Err(WalletDbError::GenericError)
326 }
327 };
328
329 if block.is_none() || hash != block.unwrap().hash().to_string() {
331 let mut buf =
333 vec![String::from("A reorg has happened, finding last known common block...")];
334 height = height.saturating_sub(1);
335 while height != 0 {
336 let (scanned_block_hash, _) = self.get_scanned_block(&height)?;
338
339 let block = match self.get_block_by_height(height).await {
341 Ok(b) => Some(b),
342 Err(Error::JsonRpcError((-32121, _))) => None,
344 Err(e) => {
345 buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
346 append_or_print(output, sender, print, buf).await;
347 return Err(WalletDbError::GenericError)
348 }
349 };
350
351 if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
353 height = height.saturating_sub(1);
354 continue
355 }
356
357 buf.push(format!("Last common block found: {height} - {scanned_block_hash}"));
359 self.reset_to_height(height, &mut buf).await?;
360 append_or_print(output, sender, print, buf).await;
361 break
362 }
363 }
364
365 if height == 0 {
368 let mut buf = vec![];
369 self.reset(&mut buf)?;
370 append_or_print(output, sender, print, buf).await;
371 } else {
372 height += 1;
373 }
374
375 let mut scan_cache = match self.scan_cache().await {
377 Ok(c) => c,
378 Err(e) => {
379 append_or_print(
380 output,
381 sender,
382 print,
383 vec![format!("[scan_blocks] Generating scan cache failed: {e}")],
384 )
385 .await;
386 return Err(WalletDbError::GenericError)
387 }
388 };
389
390 loop {
391 let mut buf = vec![format!("Requested to scan from block number: {height}")];
393 let (last_height, last_hash) = match self.get_last_confirmed_block().await {
394 Ok(last) => last,
395 Err(e) => {
396 buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
397 append_or_print(output, sender, print, buf).await;
398 return Err(WalletDbError::GenericError)
399 }
400 };
401 buf.push(format!(
402 "Last confirmed block reported by darkfid: {last_height} - {last_hash}"
403 ));
404 append_or_print(output, sender, print, buf).await;
405
406 if height > last_height {
408 return Ok(())
409 }
410
411 while height <= last_height {
412 let mut buf = vec![format!("Requesting block {height}...")];
413 let block = match self.get_block_by_height(height).await {
414 Ok(b) => b,
415 Err(e) => {
416 buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
417 append_or_print(output, sender, print, buf).await;
418 return Err(WalletDbError::GenericError)
419 }
420 };
421 buf.push(format!("Block {height} received! Scanning block..."));
422 if let Err(e) = self.scan_block(&mut scan_cache, &block).await {
423 buf.push(format!("[scan_blocks] Scan block failed: {e}"));
424 append_or_print(output, sender, print, buf).await;
425 return Err(WalletDbError::GenericError)
426 };
427 for msg in scan_cache.flush_messages() {
428 buf.push(msg);
429 }
430 append_or_print(output, sender, print, buf).await;
431 height += 1;
432 }
433 }
434 }
435
436 async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
438 let rep = self
439 .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
440 .await?;
441 let params = rep.get::<Vec<JsonValue>>().unwrap();
442 let height = *params[0].get::<f64>().unwrap() as u32;
443 let hash = params[1].get::<String>().unwrap().clone();
444
445 Ok((height, hash))
446 }
447
448 async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
450 let params = self
451 .darkfid_daemon_request(
452 "blockchain.get_block",
453 &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
454 )
455 .await?;
456 let param = params.get::<String>().unwrap();
457 let bytes = base64::decode(param).unwrap();
458 let block = deserialize_async(&bytes).await?;
459 Ok(block)
460 }
461
462 pub async fn broadcast_tx(&self, tx: &Transaction, output: &mut Vec<String>) -> Result<String> {
465 output.push(String::from("Broadcasting transaction..."));
466
467 let params =
468 JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
469 let rep = self.darkfid_daemon_request("tx.broadcast", ¶ms).await?;
470
471 let txid = rep.get::<String>().unwrap().clone();
472
473 if let Err(e) = self.put_tx_history_record(tx, "Broadcasted", None).await {
475 return Err(Error::DatabaseError(format!(
476 "[broadcast_tx] Inserting transaction history record failed: {e}"
477 )))
478 }
479
480 Ok(txid)
481 }
482
483 pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
485 let tx_hash_str = tx_hash.to_string();
486 match self
487 .darkfid_daemon_request(
488 "blockchain.get_tx",
489 &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
490 )
491 .await
492 {
493 Ok(param) => {
494 let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
495 let tx = deserialize_async(&tx_bytes).await?;
496 Ok(Some(tx))
497 }
498
499 Err(_) => Ok(None),
500 }
501 }
502
503 pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
505 let tx_str = base64::encode(&serialize_async(tx).await);
506 let rep = self
507 .darkfid_daemon_request(
508 "tx.simulate",
509 &JsonValue::Array(vec![JsonValue::String(tx_str)]),
510 )
511 .await?;
512
513 let is_valid = *rep.get::<bool>().unwrap();
514 Ok(is_valid)
515 }
516
517 pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
519 let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
520 let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", ¶ms).await?;
521 let params = rep.get::<Vec<JsonValue>>().unwrap();
522
523 let mut ret = Vec::with_capacity(params.len());
524 for param in params {
525 let zkas_ns = param[0].get::<String>().unwrap().clone();
526 let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
527 ret.push((zkas_ns, zkas_bincode_bytes));
528 }
529
530 Ok(ret)
531 }
532
533 pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
535 let params = JsonValue::Array(vec![
536 JsonValue::String(base64::encode(&serialize_async(tx).await)),
537 JsonValue::Boolean(include_fee),
538 ]);
539 let rep = self.darkfid_daemon_request("tx.calculate_fee", ¶ms).await?;
540
541 let fee = *rep.get::<f64>().unwrap() as u64;
542
543 Ok(fee)
544 }
545
546 pub async fn get_next_block_height(&self) -> Result<u32> {
548 let rep = self
549 .darkfid_daemon_request(
550 "blockchain.best_fork_next_block_height",
551 &JsonValue::Array(vec![]),
552 )
553 .await?;
554
555 let next_height = *rep.get::<f64>().unwrap() as u32;
556
557 Ok(next_height)
558 }
559
560 pub async fn get_block_target(&self) -> Result<u32> {
562 let rep = self
563 .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
564 .await?;
565
566 let next_height = *rep.get::<f64>().unwrap() as u32;
567
568 Ok(next_height)
569 }
570
571 pub async fn ping(&self, output: &mut Vec<String>) -> Result<()> {
573 output.push(String::from("Executing ping request to darkfid..."));
574 let latency = Instant::now();
575 let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
576 let latency = latency.elapsed();
577 output.push(format!("Got reply: {rep:?}"));
578 output.push(format!("Latency: {latency:?}"));
579 Ok(())
580 }
581
582 pub async fn darkfid_daemon_request(
584 &self,
585 method: &str,
586 params: &JsonValue,
587 ) -> Result<JsonValue> {
588 let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
589 let mut lock = rpc_client.write().await;
590 let req = JsonRequest::new(method, params.clone());
591
592 if let Some(ref client) = lock.client {
594 if let Ok(rep) = client.request(req.clone()).await {
596 drop(lock);
597 return Ok(rep);
598 }
599 }
600
601 let client = RpcClient::new(lock.endpoint.clone(), lock.ex.clone()).await?;
603 let rep = client.request(req).await?;
604 lock.client = Some(client);
605 drop(lock);
606 Ok(rep)
607 }
608
609 pub async fn stop_rpc_client(&self) -> Result<()> {
611 if let Some(ref rpc_client) = self.rpc_client {
612 rpc_client.read().await.stop().await;
613 };
614 Ok(())
615 }
616}
617
618pub async fn subscribe_blocks(
627 drk: &DrkPtr,
628 rpc_task: StoppableTaskPtr,
629 shell_sender: Sender<Vec<String>>,
630 endpoint: Url,
631 ex: &ExecutorPtr,
632) -> Result<()> {
633 let lock = drk.read().await;
635 if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
636 let err_msg = format!("Failed during scanning: {e}");
637 shell_sender.send(vec![err_msg.clone()]).await?;
638 return Err(Error::Custom(err_msg))
639 }
640 shell_sender.send(vec![String::from("Finished scanning blockchain")]).await?;
641
642 let (last_confirmed_height, _) = lock.get_last_confirmed_block().await?;
644
645 if last_confirmed_height == 0 {
647 if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
648 let err_msg = format!("[subscribe_blocks] Scanning from genesis block failed: {e}");
649 shell_sender.send(vec![err_msg.clone()]).await?;
650 return Err(Error::Custom(err_msg))
651 }
652 }
653
654 let (last_confirmed_height, last_confirmed_hash) = lock.get_last_confirmed_block().await?;
656
657 let (mut last_scanned_height, last_scanned_hash) = match lock.get_last_scanned_block() {
659 Ok(last) => last,
660 Err(e) => {
661 let err_msg = format!("[subscribe_blocks] Retrieving last scanned block failed: {e}");
662 shell_sender.send(vec![err_msg.clone()]).await?;
663 return Err(Error::Custom(err_msg))
664 }
665 };
666 drop(lock);
667
668 if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash {
670 let err_msg = String::from("[subscribe_blocks] Blockchain not fully scanned");
671 shell_sender
672 .send(vec![
673 String::from("Warning: Last scanned block is not the last confirmed block."),
674 String::from("You should first fully scan the blockchain, and then subscribe"),
675 err_msg.clone(),
676 ])
677 .await?;
678 return Err(Error::Custom(err_msg))
679 }
680
681 let mut shell_message =
682 vec![String::from("Subscribing to receive notifications of incoming blocks")];
683 let publisher = Publisher::new();
684 let subscription = publisher.clone().subscribe().await;
685 let _publisher = publisher.clone();
686 let rpc_client = Arc::new(RpcClient::new(endpoint, ex.clone()).await?);
687 let rpc_client_ = rpc_client.clone();
688 rpc_task.start(
689 async move {
691 let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
692 rpc_client_.subscribe(req, _publisher).await
693 },
694 |res| async move {
695 rpc_client.stop().await;
696 match res {
697 Ok(()) | Err(Error::DetachedTaskStopped) | Err(Error::RpcServerStopped) => { }
698 Err(e) => {
699 eprintln!("[subscribe_blocks] JSON-RPC server error: {e}");
700 publisher
701 .notify(JsonResult::Error(JsonError::new(
702 ErrorCode::InternalError,
703 None,
704 0,
705 )))
706 .await;
707 }
708 }
709 },
710 Error::RpcServerStopped,
711 ex.clone(),
712 );
713 shell_message.push(String::from("Detached subscription to background"));
714 shell_message.push(String::from("All is good. Waiting for block notifications..."));
715 shell_sender.send(shell_message).await?;
716
717 let e = 'outer: loop {
718 match subscription.receive().await {
719 JsonResult::Notification(n) => {
720 let mut shell_message =
721 vec![String::from("Got Block notification from darkfid subscription")];
722 if n.method != "blockchain.subscribe_blocks" {
723 shell_sender.send(shell_message).await?;
724 break Error::UnexpectedJsonRpc(format!(
725 "Got foreign notification from darkfid: {}",
726 n.method
727 ))
728 }
729
730 if !n.params.is_array() {
732 shell_sender.send(shell_message).await?;
733 break Error::UnexpectedJsonRpc(
734 "Received notification params are not an array".to_string(),
735 )
736 }
737 let params = n.params.get::<Vec<JsonValue>>().unwrap();
738 if params.is_empty() {
739 shell_sender.send(shell_message).await?;
740 break Error::UnexpectedJsonRpc("Notification parameters are empty".to_string())
741 }
742
743 for param in params {
744 let param = param.get::<String>().unwrap();
745 let bytes = base64::decode(param).unwrap();
746
747 let block: BlockInfo = deserialize_async(&bytes).await?;
748 shell_message
749 .push(String::from("Deserialized successfully. Scanning block..."));
750
751 let lock = drk.read().await;
753 if block.header.height <= last_scanned_height {
754 let reset_height = block.header.height.saturating_sub(1);
755 if let Err(e) = lock.reset_to_height(reset_height, &mut shell_message).await
756 {
757 shell_sender.send(shell_message).await?;
758 break 'outer Error::Custom(format!(
759 "[subscribe_blocks] Wallet state reset failed: {e}"
760 ))
761 }
762
763 if reset_height == 0 {
765 let genesis = match lock.get_block_by_height(reset_height).await {
766 Ok(b) => b,
767 Err(e) => {
768 shell_sender.send(shell_message).await?;
769 break 'outer Error::Custom(format!(
770 "[subscribe_blocks] RPC client request failed: {e}"
771 ))
772 }
773 };
774 let mut scan_cache = lock.scan_cache().await?;
775 if let Err(e) = lock.scan_block(&mut scan_cache, &genesis).await {
776 shell_sender.send(shell_message).await?;
777 break 'outer Error::Custom(format!(
778 "[subscribe_blocks] Scanning block failed: {e}"
779 ))
780 };
781 for msg in scan_cache.flush_messages() {
782 shell_message.push(msg);
783 }
784 }
785 }
786
787 let mut scan_cache = lock.scan_cache().await?;
788 if let Err(e) = lock.scan_block(&mut scan_cache, &block).await {
789 shell_sender.send(shell_message).await?;
790 break 'outer Error::Custom(format!(
791 "[subscribe_blocks] Scanning block failed: {e}"
792 ))
793 }
794 for msg in scan_cache.flush_messages() {
795 shell_message.push(msg);
796 }
797 shell_sender.send(shell_message.clone()).await?;
798
799 last_scanned_height = block.header.height;
801 }
802 }
803
804 JsonResult::Error(e) => {
805 break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
807 }
808
809 x => {
810 break Error::UnexpectedJsonRpc(format!("Got unexpected data from JSON-RPC: {x:?}"))
812 }
813 }
814 };
815
816 shell_sender.send(vec![format!("[subscribe_blocks] Subscription loop break: {e}")]).await?;
817 Err(e)
818}