1use std::{
20 collections::{BTreeMap, HashMap},
21 sync::Arc,
22 time::Instant,
23};
24
25use smol::channel::Sender;
26use url::Url;
27
28use darkfi::{
29 blockchain::BlockInfo,
30 rpc::{
31 client::RpcClient,
32 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
33 util::JsonValue,
34 },
35 system::{ExecutorPtr, Publisher, StoppableTaskPtr},
36 tx::Transaction,
37 util::encoding::base64,
38 Error, Result,
39};
40use darkfi_dao_contract::model::{DaoBulla, DaoProposalBulla};
41use darkfi_money_contract::model::TokenId;
42use darkfi_sdk::{
43 bridgetree::Position,
44 crypto::{
45 smt::{PoseidonFp, EMPTY_NODES_FP},
46 ContractId, MerkleTree, SecretKey, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID,
47 MONEY_CONTRACT_ID,
48 },
49 tx::TransactionHash,
50};
51use darkfi_serial::{deserialize_async, serialize_async};
52
53use crate::{
54 cache::{CacheOverlay, CacheSmt, CacheSmtStorage, SLED_MONEY_SMT_TREE},
55 cli_util::append_or_print,
56 dao::{SLED_MERKLE_TREES_DAO_DAOS, SLED_MERKLE_TREES_DAO_PROPOSALS},
57 error::{WalletDbError, WalletDbResult},
58 money::SLED_MERKLE_TREES_MONEY,
59 Drk, DrkPtr,
60};
61
62pub struct DarkfidRpcClient {
65 endpoint: Url,
66 ex: ExecutorPtr,
67 client: Option<RpcClient>,
68}
69
70impl DarkfidRpcClient {
71 pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Self {
72 let client = RpcClient::new(endpoint.clone(), ex.clone()).await.ok();
73 Self { endpoint, ex, client }
74 }
75
76 pub async fn stop(&self) {
78 if let Some(ref client) = self.client {
79 client.stop().await
80 }
81 }
82}
83
84pub struct ScanCache {
86 pub money_tree: MerkleTree,
88 pub money_smt: CacheSmt,
90 pub notes_secrets: Vec<SecretKey>,
92 pub owncoins_nullifiers: BTreeMap<[u8; 32], ([u8; 32], Position)>,
94 pub own_tokens: Vec<TokenId>,
96 pub dao_daos_tree: MerkleTree,
98 pub dao_proposals_tree: MerkleTree,
100 pub own_daos: HashMap<DaoBulla, (Option<SecretKey>, Option<SecretKey>)>,
102 pub own_proposals: HashMap<DaoProposalBulla, DaoBulla>,
104 pub own_deploy_auths: HashMap<[u8; 32], SecretKey>,
106 pub messages_buffer: Vec<String>,
108}
109
110impl ScanCache {
111 pub fn log(&mut self, msg: String) {
113 self.messages_buffer.push(msg);
114 }
115
116 pub fn flush_messages(&mut self) -> Vec<String> {
118 self.messages_buffer.drain(..).collect()
119 }
120}
121
122impl Drk {
123 pub async fn scan_cache(&self) -> Result<ScanCache> {
126 let money_tree = self.get_money_tree().await?;
127 let smt_store = CacheSmtStorage::new(CacheOverlay::new(&self.cache)?, SLED_MONEY_SMT_TREE);
128 let money_smt = CacheSmt::new(smt_store, PoseidonFp::new(), &EMPTY_NODES_FP);
129 let mut notes_secrets = self.get_money_secrets().await?;
130 let mut owncoins_nullifiers = BTreeMap::new();
131 for coin in self.get_coins(true).await? {
132 owncoins_nullifiers.insert(
133 coin.0.nullifier().to_bytes(),
134 (coin.0.coin.to_bytes(), coin.0.leaf_position),
135 );
136 }
137 let mint_authorities = self.get_mint_authorities().await?;
138 let mut own_tokens = Vec::with_capacity(mint_authorities.len());
139 for (token, _, _, _, _) in mint_authorities {
140 own_tokens.push(token);
141 }
142 let (dao_daos_tree, dao_proposals_tree) = self.get_dao_trees().await?;
143 let mut own_daos = HashMap::new();
144 for dao in self.get_daos().await? {
145 own_daos.insert(
146 dao.bulla(),
147 (dao.params.proposals_secret_key, dao.params.votes_secret_key),
148 );
149 if let Some(secret_key) = dao.params.notes_secret_key {
150 notes_secrets.push(secret_key);
151 }
152 }
153 let mut own_proposals = HashMap::new();
154 for proposal in self.get_proposals().await? {
155 own_proposals.insert(proposal.bulla(), proposal.proposal.dao_bulla);
156 }
157 let own_deploy_auths = self.get_deploy_auths_keys_map().await?;
158
159 Ok(ScanCache {
160 money_tree,
161 money_smt,
162 notes_secrets,
163 owncoins_nullifiers,
164 own_tokens,
165 dao_daos_tree,
166 dao_proposals_tree,
167 own_daos,
168 own_proposals,
169 own_deploy_auths,
170 messages_buffer: vec![],
171 })
172 }
173
174 async fn scan_block(&self, scan_cache: &mut ScanCache, block: &BlockInfo) -> Result<()> {
177 let mut wallet_txs = vec![];
179
180 scan_cache.money_tree.checkpoint(block.header.height as usize);
182 scan_cache.dao_daos_tree.checkpoint(block.header.height as usize);
183 scan_cache.dao_proposals_tree.checkpoint(block.header.height as usize);
184
185 scan_cache.log(String::from("======================================="));
187 scan_cache.log(format!("{}", block.header));
188 scan_cache.log(String::from("======================================="));
189 scan_cache.log(format!("[scan_block] Iterating over {} transactions", block.txs.len()));
190 for tx in block.txs.iter() {
191 let tx_hash = tx.hash();
192 let tx_hash_string = tx_hash.to_string();
193 let mut wallet_tx = false;
194 scan_cache.log(format!("[scan_block] Processing transaction: {tx_hash_string}"));
195 for (i, call) in tx.calls.iter().enumerate() {
196 if call.data.contract_id == *MONEY_CONTRACT_ID {
197 scan_cache.log(format!("[scan_block] Found Money contract in call {i}"));
198 if self
199 .apply_tx_money_data(
200 scan_cache,
201 &i,
202 &tx.calls,
203 &tx_hash_string,
204 &block.header.height,
205 )
206 .await?
207 {
208 wallet_tx = true;
209 }
210 continue
211 }
212
213 if call.data.contract_id == *DAO_CONTRACT_ID {
214 scan_cache.log(format!("[scan_block] Found DAO contract in call {i}"));
215 if self
216 .apply_tx_dao_data(
217 scan_cache,
218 &call.data.data,
219 &tx_hash,
220 &(i as u8),
221 &block.header.height,
222 )
223 .await?
224 {
225 wallet_tx = true;
226 }
227 continue
228 }
229
230 if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
231 scan_cache.log(format!("[scan_block] Found DeployoOor contract in call {i}"));
232 if self
233 .apply_tx_deploy_data(
234 scan_cache,
235 &call.data.data,
236 &tx_hash,
237 &block.header.height,
238 )
239 .await?
240 {
241 wallet_tx = true;
242 }
243 continue
244 }
245
246 scan_cache
248 .log(format!("[scan_block] Found non-native contract in call {i}, skipping."));
249 }
250
251 if wallet_tx {
253 wallet_txs.push(tx);
254 }
255 }
256
257 scan_cache
259 .money_smt
260 .store
261 .overlay
262 .insert_scanned_block(&block.header.height, &block.header.hash())?;
263
264 let diff = scan_cache.money_smt.store.overlay.0.diff(&[])?;
266
267 scan_cache.money_smt.store.overlay.0.apply_diff(&diff)?;
269
270 self.cache.insert_state_inverse_diff(&block.header.height, &diff.inverse())?;
272
273 self.cache.insert_merkle_trees(&[
275 (SLED_MERKLE_TREES_MONEY, &scan_cache.money_tree),
276 (SLED_MERKLE_TREES_DAO_DAOS, &scan_cache.dao_daos_tree),
277 (SLED_MERKLE_TREES_DAO_PROPOSALS, &scan_cache.dao_proposals_tree),
278 ])?;
279
280 self.cache.sled_db.flush()?;
282
283 if let Err(e) =
285 self.put_tx_history_records(&wallet_txs, "Confirmed", Some(block.header.height)).await
286 {
287 return Err(Error::DatabaseError(format!(
288 "[scan_block] Inserting transaction history records failed: {e}"
289 )))
290 }
291
292 Ok(())
293 }
294
295 pub async fn scan_blocks(
299 &self,
300 output: &mut Vec<String>,
301 sender: Option<&Sender<Vec<String>>>,
302 print: &bool,
303 ) -> WalletDbResult<()> {
304 let (mut height, hash) = self.get_last_scanned_block()?;
306
307 let block = match self.get_block_by_height(height).await {
309 Ok(b) => Some(b),
310 Err(Error::JsonRpcError((-32121, _))) => None,
312 Err(e) => {
313 append_or_print(
314 output,
315 sender,
316 print,
317 vec![format!("[scan_blocks] RPC client request failed: {e}")],
318 )
319 .await;
320 return Err(WalletDbError::GenericError)
321 }
322 };
323
324 if block.is_none() || hash != block.unwrap().hash().to_string() {
326 let mut buf =
328 vec![String::from("A reorg has happened, finding last known common block...")];
329 height = height.saturating_sub(1);
330 while height != 0 {
331 let scanned_block_hash = self.get_scanned_block_hash(&height)?;
333
334 let block = match self.get_block_by_height(height).await {
336 Ok(b) => Some(b),
337 Err(Error::JsonRpcError((-32121, _))) => None,
339 Err(e) => {
340 buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
341 append_or_print(output, sender, print, buf).await;
342 return Err(WalletDbError::GenericError)
343 }
344 };
345
346 if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
348 height = height.saturating_sub(1);
349 continue
350 }
351
352 buf.push(format!("Last common block found: {height} - {scanned_block_hash}"));
354 self.reset_to_height(height, &mut buf).await?;
355 append_or_print(output, sender, print, buf).await;
356 break
357 }
358 }
359
360 if height == 0 {
363 let mut buf = vec![];
364 self.reset(&mut buf)?;
365 append_or_print(output, sender, print, buf).await;
366 } else {
367 height += 1;
368 }
369
370 let mut scan_cache = match self.scan_cache().await {
372 Ok(c) => c,
373 Err(e) => {
374 append_or_print(
375 output,
376 sender,
377 print,
378 vec![format!("[scan_blocks] Generating scan cache failed: {e}")],
379 )
380 .await;
381 return Err(WalletDbError::GenericError)
382 }
383 };
384
385 loop {
386 let mut buf = vec![format!("Requested to scan from block number: {height}")];
388 let (last_height, last_hash) = match self.get_last_confirmed_block().await {
389 Ok(last) => last,
390 Err(e) => {
391 buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
392 append_or_print(output, sender, print, buf).await;
393 return Err(WalletDbError::GenericError)
394 }
395 };
396 buf.push(format!(
397 "Last confirmed block reported by darkfid: {last_height} - {last_hash}"
398 ));
399 append_or_print(output, sender, print, buf).await;
400
401 if height > last_height {
403 return Ok(())
404 }
405
406 while height <= last_height {
407 let mut buf = vec![format!("Requesting block {height}...")];
408 let block = match self.get_block_by_height(height).await {
409 Ok(b) => b,
410 Err(e) => {
411 buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
412 append_or_print(output, sender, print, buf).await;
413 return Err(WalletDbError::GenericError)
414 }
415 };
416 buf.push(format!("Block {height} received! Scanning block..."));
417 if let Err(e) = self.scan_block(&mut scan_cache, &block).await {
418 buf.push(format!("[scan_blocks] Scan block failed: {e}"));
419 append_or_print(output, sender, print, buf).await;
420 return Err(WalletDbError::GenericError)
421 };
422 for msg in scan_cache.flush_messages() {
423 buf.push(msg);
424 }
425 append_or_print(output, sender, print, buf).await;
426 height += 1;
427 }
428 }
429 }
430
431 async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
433 let rep = self
434 .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
435 .await?;
436 let params = rep.get::<Vec<JsonValue>>().unwrap();
437 let height = *params[0].get::<f64>().unwrap() as u32;
438 let hash = params[1].get::<String>().unwrap().clone();
439
440 Ok((height, hash))
441 }
442
443 async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
445 let params = self
446 .darkfid_daemon_request(
447 "blockchain.get_block",
448 &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
449 )
450 .await?;
451 let param = params.get::<String>().unwrap();
452 let bytes = base64::decode(param).unwrap();
453 let block = deserialize_async(&bytes).await?;
454 Ok(block)
455 }
456
457 pub async fn broadcast_tx(&self, tx: &Transaction, output: &mut Vec<String>) -> Result<String> {
460 output.push(String::from("Broadcasting transaction..."));
461
462 let params =
463 JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
464 let rep = self.darkfid_daemon_request("tx.broadcast", ¶ms).await?;
465
466 let txid = rep.get::<String>().unwrap().clone();
467
468 if let Err(e) = self.put_tx_history_record(tx, "Broadcasted", None).await {
470 return Err(Error::DatabaseError(format!(
471 "[broadcast_tx] Inserting transaction history record failed: {e}"
472 )))
473 }
474
475 Ok(txid)
476 }
477
478 pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
480 let tx_hash_str = tx_hash.to_string();
481 match self
482 .darkfid_daemon_request(
483 "blockchain.get_tx",
484 &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
485 )
486 .await
487 {
488 Ok(param) => {
489 let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
490 let tx = deserialize_async(&tx_bytes).await?;
491 Ok(Some(tx))
492 }
493
494 Err(_) => Ok(None),
495 }
496 }
497
498 pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
500 let tx_str = base64::encode(&serialize_async(tx).await);
501 let rep = self
502 .darkfid_daemon_request(
503 "tx.simulate",
504 &JsonValue::Array(vec![JsonValue::String(tx_str)]),
505 )
506 .await?;
507
508 let is_valid = *rep.get::<bool>().unwrap();
509 Ok(is_valid)
510 }
511
512 pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
514 let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
515 let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", ¶ms).await?;
516 let params = rep.get::<Vec<JsonValue>>().unwrap();
517
518 let mut ret = Vec::with_capacity(params.len());
519 for param in params {
520 let zkas_ns = param[0].get::<String>().unwrap().clone();
521 let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
522 ret.push((zkas_ns, zkas_bincode_bytes));
523 }
524
525 Ok(ret)
526 }
527
528 pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
530 let params = JsonValue::Array(vec![
531 JsonValue::String(base64::encode(&serialize_async(tx).await)),
532 JsonValue::Boolean(include_fee),
533 ]);
534 let rep = self.darkfid_daemon_request("tx.calculate_fee", ¶ms).await?;
535
536 let fee = *rep.get::<f64>().unwrap() as u64;
537
538 Ok(fee)
539 }
540
541 pub async fn get_next_block_height(&self) -> Result<u32> {
543 let rep = self
544 .darkfid_daemon_request(
545 "blockchain.best_fork_next_block_height",
546 &JsonValue::Array(vec![]),
547 )
548 .await?;
549
550 let next_height = *rep.get::<f64>().unwrap() as u32;
551
552 Ok(next_height)
553 }
554
555 pub async fn get_block_target(&self) -> Result<u32> {
557 let rep = self
558 .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
559 .await?;
560
561 let next_height = *rep.get::<f64>().unwrap() as u32;
562
563 Ok(next_height)
564 }
565
566 pub async fn ping(&self, output: &mut Vec<String>) -> Result<()> {
568 output.push(String::from("Executing ping request to darkfid..."));
569 let latency = Instant::now();
570 let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
571 let latency = latency.elapsed();
572 output.push(format!("Got reply: {rep:?}"));
573 output.push(format!("Latency: {latency:?}"));
574 Ok(())
575 }
576
577 pub async fn darkfid_daemon_request(
579 &self,
580 method: &str,
581 params: &JsonValue,
582 ) -> Result<JsonValue> {
583 let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
584 let mut lock = rpc_client.write().await;
585 let req = JsonRequest::new(method, params.clone());
586
587 if let Some(ref client) = lock.client {
589 if let Ok(rep) = client.request(req.clone()).await {
591 drop(lock);
592 return Ok(rep);
593 }
594 }
595
596 let client = RpcClient::new(lock.endpoint.clone(), lock.ex.clone()).await?;
598 let rep = client.request(req).await?;
599 lock.client = Some(client);
600 drop(lock);
601 Ok(rep)
602 }
603
604 pub async fn stop_rpc_client(&self) -> Result<()> {
606 if let Some(ref rpc_client) = self.rpc_client {
607 rpc_client.read().await.stop().await;
608 };
609 Ok(())
610 }
611}
612
613pub async fn subscribe_blocks(
622 drk: &DrkPtr,
623 rpc_task: StoppableTaskPtr,
624 shell_sender: Sender<Vec<String>>,
625 endpoint: Url,
626 ex: &ExecutorPtr,
627) -> Result<()> {
628 let lock = drk.read().await;
630 if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
631 let err_msg = format!("Failed during scanning: {e}");
632 shell_sender.send(vec![err_msg.clone()]).await?;
633 return Err(Error::Custom(err_msg))
634 }
635 shell_sender.send(vec![String::from("Finished scanning blockchain")]).await?;
636
637 let (last_confirmed_height, _) = lock.get_last_confirmed_block().await?;
639
640 if last_confirmed_height == 0 {
642 if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
643 let err_msg = format!("[subscribe_blocks] Scanning from genesis block failed: {e}");
644 shell_sender.send(vec![err_msg.clone()]).await?;
645 return Err(Error::Custom(err_msg))
646 }
647 }
648
649 let (last_confirmed_height, last_confirmed_hash) = lock.get_last_confirmed_block().await?;
651
652 let (mut last_scanned_height, last_scanned_hash) = match lock.get_last_scanned_block() {
654 Ok(last) => last,
655 Err(e) => {
656 let err_msg = format!("[subscribe_blocks] Retrieving last scanned block failed: {e}");
657 shell_sender.send(vec![err_msg.clone()]).await?;
658 return Err(Error::Custom(err_msg))
659 }
660 };
661 drop(lock);
662
663 if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash {
665 let err_msg = String::from("[subscribe_blocks] Blockchain not fully scanned");
666 shell_sender
667 .send(vec![
668 String::from("Warning: Last scanned block is not the last confirmed block."),
669 String::from("You should first fully scan the blockchain, and then subscribe"),
670 err_msg.clone(),
671 ])
672 .await?;
673 return Err(Error::Custom(err_msg))
674 }
675
676 let mut shell_message =
677 vec![String::from("Subscribing to receive notifications of incoming blocks")];
678 let publisher = Publisher::new();
679 let subscription = publisher.clone().subscribe().await;
680 let _publisher = publisher.clone();
681 let rpc_client = Arc::new(RpcClient::new(endpoint, ex.clone()).await?);
682 let rpc_client_ = rpc_client.clone();
683 rpc_task.start(
684 async move {
686 let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
687 rpc_client_.subscribe(req, _publisher).await
688 },
689 |res| async move {
690 rpc_client.stop().await;
691 match res {
692 Ok(()) | Err(Error::DetachedTaskStopped) | Err(Error::RpcServerStopped) => { }
693 Err(e) => {
694 eprintln!("[subscribe_blocks] JSON-RPC server error: {e}");
695 publisher
696 .notify(JsonResult::Error(JsonError::new(
697 ErrorCode::InternalError,
698 None,
699 0,
700 )))
701 .await;
702 }
703 }
704 },
705 Error::RpcServerStopped,
706 ex.clone(),
707 );
708 shell_message.push(String::from("Detached subscription to background"));
709 shell_message.push(String::from("All is good. Waiting for block notifications..."));
710 shell_sender.send(shell_message).await?;
711
712 let e = 'outer: loop {
713 match subscription.receive().await {
714 JsonResult::Notification(n) => {
715 let mut shell_message =
716 vec![String::from("Got Block notification from darkfid subscription")];
717 if n.method != "blockchain.subscribe_blocks" {
718 shell_sender.send(shell_message).await?;
719 break Error::UnexpectedJsonRpc(format!(
720 "Got foreign notification from darkfid: {}",
721 n.method
722 ))
723 }
724
725 if !n.params.is_array() {
727 shell_sender.send(shell_message).await?;
728 break Error::UnexpectedJsonRpc(
729 "Received notification params are not an array".to_string(),
730 )
731 }
732 let params = n.params.get::<Vec<JsonValue>>().unwrap();
733 if params.is_empty() {
734 shell_sender.send(shell_message).await?;
735 break Error::UnexpectedJsonRpc("Notification parameters are empty".to_string())
736 }
737
738 for param in params {
739 let param = param.get::<String>().unwrap();
740 let bytes = base64::decode(param).unwrap();
741
742 let block: BlockInfo = deserialize_async(&bytes).await?;
743 shell_message
744 .push(String::from("Deserialized successfully. Scanning block..."));
745
746 let lock = drk.read().await;
748 if block.header.height <= last_scanned_height {
749 let reset_height = block.header.height.saturating_sub(1);
750 if let Err(e) = lock.reset_to_height(reset_height, &mut shell_message).await
751 {
752 shell_sender.send(shell_message).await?;
753 break 'outer Error::Custom(format!(
754 "[subscribe_blocks] Wallet state reset failed: {e}"
755 ))
756 }
757
758 if reset_height == 0 {
760 let genesis = match lock.get_block_by_height(reset_height).await {
761 Ok(b) => b,
762 Err(e) => {
763 shell_sender.send(shell_message).await?;
764 break 'outer Error::Custom(format!(
765 "[subscribe_blocks] RPC client request failed: {e}"
766 ))
767 }
768 };
769 let mut scan_cache = lock.scan_cache().await?;
770 if let Err(e) = lock.scan_block(&mut scan_cache, &genesis).await {
771 shell_sender.send(shell_message).await?;
772 break 'outer Error::Custom(format!(
773 "[subscribe_blocks] Scanning block failed: {e}"
774 ))
775 };
776 for msg in scan_cache.flush_messages() {
777 shell_message.push(msg);
778 }
779 }
780 }
781
782 let mut scan_cache = lock.scan_cache().await?;
783 if let Err(e) = lock.scan_block(&mut scan_cache, &block).await {
784 shell_sender.send(shell_message).await?;
785 break 'outer Error::Custom(format!(
786 "[subscribe_blocks] Scanning block failed: {e}"
787 ))
788 }
789 for msg in scan_cache.flush_messages() {
790 shell_message.push(msg);
791 }
792 shell_sender.send(shell_message.clone()).await?;
793
794 last_scanned_height = block.header.height;
796 }
797 }
798
799 JsonResult::Error(e) => {
800 break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
802 }
803
804 x => {
805 break Error::UnexpectedJsonRpc(format!("Got unexpected data from JSON-RPC: {x:?}"))
807 }
808 }
809 };
810
811 shell_sender.send(vec![format!("[subscribe_blocks] Subscription loop break: {e}")]).await?;
812 Err(e)
813}