drk/
rpc.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeMap, HashMap},
21    sync::Arc,
22    time::Instant,
23};
24
25use smol::channel::Sender;
26use url::Url;
27
28use darkfi::{
29    blockchain::BlockInfo,
30    rpc::{
31        client::RpcClient,
32        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
33        util::JsonValue,
34    },
35    system::{ExecutorPtr, Publisher, StoppableTaskPtr},
36    tx::Transaction,
37    util::encoding::base64,
38    Error, Result,
39};
40use darkfi_dao_contract::model::{DaoBulla, DaoProposalBulla};
41use darkfi_money_contract::model::TokenId;
42use darkfi_sdk::{
43    bridgetree::Position,
44    crypto::{
45        smt::{PoseidonFp, EMPTY_NODES_FP},
46        ContractId, MerkleTree, SecretKey, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID,
47        MONEY_CONTRACT_ID,
48    },
49    tx::TransactionHash,
50};
51use darkfi_serial::{deserialize_async, serialize_async};
52
53use crate::{
54    cache::{CacheOverlay, CacheSmt, CacheSmtStorage, SLED_MONEY_SMT_TREE},
55    cli_util::append_or_print,
56    dao::{SLED_MERKLE_TREES_DAO_DAOS, SLED_MERKLE_TREES_DAO_PROPOSALS},
57    error::{WalletDbError, WalletDbResult},
58    money::SLED_MERKLE_TREES_MONEY,
59    Drk, DrkPtr,
60};
61
62/// Structure to hold a JSON-RPC client and its config,
63/// so we can recreate it in case of an error.
64pub struct DarkfidRpcClient {
65    endpoint: Url,
66    ex: ExecutorPtr,
67    client: Option<RpcClient>,
68}
69
70impl DarkfidRpcClient {
71    pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Self {
72        let client = RpcClient::new(endpoint.clone(), ex.clone()).await.ok();
73        Self { endpoint, ex, client }
74    }
75
76    /// Stop the client.
77    pub async fn stop(&self) {
78        if let Some(ref client) = self.client {
79            client.stop().await
80        }
81    }
82}
83
84/// Auxiliary structure holding various in memory caches to use during scan
85pub struct ScanCache {
86    /// The Money Merkle tree containing coins
87    pub money_tree: MerkleTree,
88    /// The Money Sparse Merkle tree containing coins nullifiers
89    pub money_smt: CacheSmt,
90    /// All our known secrets to decrypt coin notes
91    pub notes_secrets: Vec<SecretKey>,
92    /// Our own coins nullifiers and their leaf positions
93    pub owncoins_nullifiers: BTreeMap<[u8; 32], ([u8; 32], Position)>,
94    /// Our own tokens to track freezes
95    pub own_tokens: Vec<TokenId>,
96    /// The DAO Merkle tree containing DAO bullas
97    pub dao_daos_tree: MerkleTree,
98    /// The DAO Merkle tree containing proposals bullas
99    pub dao_proposals_tree: MerkleTree,
100    /// Our own DAOs with their proposals and votes keys
101    pub own_daos: HashMap<DaoBulla, (Option<SecretKey>, Option<SecretKey>)>,
102    /// Our own DAOs proposals with their corresponding DAO reference
103    pub own_proposals: HashMap<DaoProposalBulla, DaoBulla>,
104    /// Our own deploy authorities
105    pub own_deploy_auths: HashMap<[u8; 32], SecretKey>,
106    /// Messages buffer for better downstream prints handling
107    pub messages_buffer: Vec<String>,
108}
109
110impl ScanCache {
111    /// Auxiliary function to append messages to the buffer.
112    pub fn log(&mut self, msg: String) {
113        self.messages_buffer.push(msg);
114    }
115
116    /// Auxiliary function to consume the messages buffer.
117    pub fn flush_messages(&mut self) -> Vec<String> {
118        self.messages_buffer.drain(..).collect()
119    }
120}
121
122impl Drk {
123    /// Auxiliary function to generate a new [`ScanCache`] for the
124    /// wallet.
125    pub async fn scan_cache(&self) -> Result<ScanCache> {
126        let money_tree = self.get_money_tree().await?;
127        let smt_store = CacheSmtStorage::new(CacheOverlay::new(&self.cache)?, SLED_MONEY_SMT_TREE);
128        let money_smt = CacheSmt::new(smt_store, PoseidonFp::new(), &EMPTY_NODES_FP);
129        let mut notes_secrets = self.get_money_secrets().await?;
130        let mut owncoins_nullifiers = BTreeMap::new();
131        for coin in self.get_coins(true).await? {
132            owncoins_nullifiers.insert(
133                coin.0.nullifier().to_bytes(),
134                (coin.0.coin.to_bytes(), coin.0.leaf_position),
135            );
136        }
137        let mint_authorities = self.get_mint_authorities().await?;
138        let mut own_tokens = Vec::with_capacity(mint_authorities.len());
139        for (token, _, _, _, _) in mint_authorities {
140            own_tokens.push(token);
141        }
142        let (dao_daos_tree, dao_proposals_tree) = self.get_dao_trees().await?;
143        let mut own_daos = HashMap::new();
144        for dao in self.get_daos().await? {
145            own_daos.insert(
146                dao.bulla(),
147                (dao.params.proposals_secret_key, dao.params.votes_secret_key),
148            );
149            if let Some(secret_key) = dao.params.notes_secret_key {
150                notes_secrets.push(secret_key);
151            }
152        }
153        let mut own_proposals = HashMap::new();
154        for proposal in self.get_proposals().await? {
155            own_proposals.insert(proposal.bulla(), proposal.proposal.dao_bulla);
156        }
157        let own_deploy_auths = self.get_deploy_auths_keys_map().await?;
158
159        Ok(ScanCache {
160            money_tree,
161            money_smt,
162            notes_secrets,
163            owncoins_nullifiers,
164            own_tokens,
165            dao_daos_tree,
166            dao_proposals_tree,
167            own_daos,
168            own_proposals,
169            own_deploy_auths,
170            messages_buffer: vec![],
171        })
172    }
173
174    /// `scan_block` will go over over transactions in a block and handle their calls
175    /// based on the called contract.
176    async fn scan_block(&self, scan_cache: &mut ScanCache, block: &BlockInfo) -> Result<()> {
177        // Keep track of our wallet transactions.
178        let mut wallet_txs = vec![];
179
180        // Checkpoint the merkle trees
181        scan_cache.money_tree.checkpoint(block.header.height as usize);
182        scan_cache.dao_daos_tree.checkpoint(block.header.height as usize);
183        scan_cache.dao_proposals_tree.checkpoint(block.header.height as usize);
184
185        // Scan the block
186        scan_cache.log(String::from("======================================="));
187        scan_cache.log(format!("{}", block.header));
188        scan_cache.log(String::from("======================================="));
189        scan_cache.log(format!("[scan_block] Iterating over {} transactions", block.txs.len()));
190        let mut block_signing_key = None;
191        for tx in block.txs.iter() {
192            let tx_hash = tx.hash();
193            let tx_hash_string = tx_hash.to_string();
194            let mut wallet_tx = false;
195            scan_cache.log(format!("[scan_block] Processing transaction: {tx_hash_string}"));
196            for (i, call) in tx.calls.iter().enumerate() {
197                if call.data.contract_id == *MONEY_CONTRACT_ID {
198                    scan_cache.log(format!("[scan_block] Found Money contract in call {i}"));
199                    let (is_wallet_tx, signing_key) = self
200                        .apply_tx_money_data(
201                            scan_cache,
202                            &i,
203                            &tx.calls,
204                            &tx_hash_string,
205                            &block.header.height,
206                        )
207                        .await?;
208                    if is_wallet_tx {
209                        wallet_tx = true;
210                        // Only one block signing key exists per block
211                        if signing_key.is_some() {
212                            block_signing_key = signing_key;
213                        }
214                    }
215                    continue
216                }
217
218                if call.data.contract_id == *DAO_CONTRACT_ID {
219                    scan_cache.log(format!("[scan_block] Found DAO contract in call {i}"));
220                    if self
221                        .apply_tx_dao_data(
222                            scan_cache,
223                            &call.data.data,
224                            &tx_hash,
225                            &(i as u8),
226                            &block.header.height,
227                        )
228                        .await?
229                    {
230                        wallet_tx = true;
231                    }
232                    continue
233                }
234
235                if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
236                    scan_cache.log(format!("[scan_block] Found DeployoOor contract in call {i}"));
237                    if self
238                        .apply_tx_deploy_data(
239                            scan_cache,
240                            &call.data.data,
241                            &tx_hash,
242                            &block.header.height,
243                        )
244                        .await?
245                    {
246                        wallet_tx = true;
247                    }
248                    continue
249                }
250
251                // TODO: For now we skip non-native contract calls
252                scan_cache
253                    .log(format!("[scan_block] Found non-native contract in call {i}, skipping."));
254            }
255
256            // If this is our wallet tx we mark it for update
257            if wallet_tx {
258                wallet_txs.push(tx);
259            }
260        }
261
262        // Insert the block record
263        scan_cache.money_smt.store.overlay.insert_scanned_block(
264            &block.header.height,
265            &block.header.hash(),
266            &block_signing_key,
267        )?;
268
269        // Grab the overlay current diff
270        let diff = scan_cache.money_smt.store.overlay.0.diff(&[])?;
271
272        // Apply the overlay current changes
273        scan_cache.money_smt.store.overlay.0.apply_diff(&diff)?;
274
275        // Insert the state inverse diff record
276        self.cache.insert_state_inverse_diff(&block.header.height, &diff.inverse())?;
277
278        // Update the merkle trees
279        self.cache.insert_merkle_trees(&[
280            (SLED_MERKLE_TREES_MONEY, &scan_cache.money_tree),
281            (SLED_MERKLE_TREES_DAO_DAOS, &scan_cache.dao_daos_tree),
282            (SLED_MERKLE_TREES_DAO_PROPOSALS, &scan_cache.dao_proposals_tree),
283        ])?;
284
285        // Flush sled
286        self.cache.sled_db.flush()?;
287
288        // Update wallet transactions records
289        if let Err(e) =
290            self.put_tx_history_records(&wallet_txs, "Confirmed", Some(block.header.height)).await
291        {
292            return Err(Error::DatabaseError(format!(
293                "[scan_block] Inserting transaction history records failed: {e}"
294            )))
295        }
296
297        Ok(())
298    }
299
300    /// Scans the blockchain for wallet relevant transactions,
301    /// starting from the last scanned block. If a reorg has happened,
302    /// we revert to its previous height and then scan from there.
303    pub async fn scan_blocks(
304        &self,
305        output: &mut Vec<String>,
306        sender: Option<&Sender<Vec<String>>>,
307        print: &bool,
308    ) -> WalletDbResult<()> {
309        // Grab last scanned block height
310        let (mut height, hash) = self.get_last_scanned_block()?;
311
312        // Grab our last scanned block from darkfid
313        let block = match self.get_block_by_height(height).await {
314            Ok(b) => Some(b),
315            // Check if block was found
316            Err(Error::JsonRpcError((-32121, _))) => None,
317            Err(e) => {
318                append_or_print(
319                    output,
320                    sender,
321                    print,
322                    vec![format!("[scan_blocks] RPC client request failed: {e}")],
323                )
324                .await;
325                return Err(WalletDbError::GenericError)
326            }
327        };
328
329        // Check if a reorg has happened
330        if block.is_none() || hash != block.unwrap().hash().to_string() {
331            // Find the exact block height the reorg happened
332            let mut buf =
333                vec![String::from("A reorg has happened, finding last known common block...")];
334            height = height.saturating_sub(1);
335            while height != 0 {
336                // Grab our scanned block hash for that height
337                let (scanned_block_hash, _) = self.get_scanned_block(&height)?;
338
339                // Grab the block from darkfid for that height
340                let block = match self.get_block_by_height(height).await {
341                    Ok(b) => Some(b),
342                    // Check if block was found
343                    Err(Error::JsonRpcError((-32121, _))) => None,
344                    Err(e) => {
345                        buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
346                        append_or_print(output, sender, print, buf).await;
347                        return Err(WalletDbError::GenericError)
348                    }
349                };
350
351                // Continue to previous one if they don't match
352                if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
353                    height = height.saturating_sub(1);
354                    continue
355                }
356
357                // Reset to its height
358                buf.push(format!("Last common block found: {height} - {scanned_block_hash}"));
359                self.reset_to_height(height, &mut buf).await?;
360                append_or_print(output, sender, print, buf).await;
361                break
362            }
363        }
364
365        // If last scanned block is genesis(0) we reset,
366        // otherwise continue with the next block height.
367        if height == 0 {
368            let mut buf = vec![];
369            self.reset(&mut buf)?;
370            append_or_print(output, sender, print, buf).await;
371        } else {
372            height += 1;
373        }
374
375        // Generate a new scan cache
376        let mut scan_cache = match self.scan_cache().await {
377            Ok(c) => c,
378            Err(e) => {
379                append_or_print(
380                    output,
381                    sender,
382                    print,
383                    vec![format!("[scan_blocks] Generating scan cache failed: {e}")],
384                )
385                .await;
386                return Err(WalletDbError::GenericError)
387            }
388        };
389
390        loop {
391            // Grab last confirmed block
392            let mut buf = vec![format!("Requested to scan from block number: {height}")];
393            let (last_height, last_hash) = match self.get_last_confirmed_block().await {
394                Ok(last) => last,
395                Err(e) => {
396                    buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
397                    append_or_print(output, sender, print, buf).await;
398                    return Err(WalletDbError::GenericError)
399                }
400            };
401            buf.push(format!(
402                "Last confirmed block reported by darkfid: {last_height} - {last_hash}"
403            ));
404            append_or_print(output, sender, print, buf).await;
405
406            // Already scanned last confirmed block
407            if height > last_height {
408                return Ok(())
409            }
410
411            while height <= last_height {
412                let mut buf = vec![format!("Requesting block {height}...")];
413                let block = match self.get_block_by_height(height).await {
414                    Ok(b) => b,
415                    Err(e) => {
416                        buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
417                        append_or_print(output, sender, print, buf).await;
418                        return Err(WalletDbError::GenericError)
419                    }
420                };
421                buf.push(format!("Block {height} received! Scanning block..."));
422                if let Err(e) = self.scan_block(&mut scan_cache, &block).await {
423                    buf.push(format!("[scan_blocks] Scan block failed: {e}"));
424                    append_or_print(output, sender, print, buf).await;
425                    return Err(WalletDbError::GenericError)
426                };
427                for msg in scan_cache.flush_messages() {
428                    buf.push(msg);
429                }
430                append_or_print(output, sender, print, buf).await;
431                height += 1;
432            }
433        }
434    }
435
436    // Queries darkfid for last confirmed block.
437    async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
438        let rep = self
439            .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
440            .await?;
441        let params = rep.get::<Vec<JsonValue>>().unwrap();
442        let height = *params[0].get::<f64>().unwrap() as u32;
443        let hash = params[1].get::<String>().unwrap().clone();
444
445        Ok((height, hash))
446    }
447
448    // Queries darkfid for a block with given height.
449    async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
450        let params = self
451            .darkfid_daemon_request(
452                "blockchain.get_block",
453                &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
454            )
455            .await?;
456        let param = params.get::<String>().unwrap();
457        let bytes = base64::decode(param).unwrap();
458        let block = deserialize_async(&bytes).await?;
459        Ok(block)
460    }
461
462    /// Broadcast a given transaction to darkfid and forward onto the network.
463    /// Returns the transaction ID upon success.
464    pub async fn broadcast_tx(&self, tx: &Transaction, output: &mut Vec<String>) -> Result<String> {
465        output.push(String::from("Broadcasting transaction..."));
466
467        let params =
468            JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
469        let rep = self.darkfid_daemon_request("tx.broadcast", &params).await?;
470
471        let txid = rep.get::<String>().unwrap().clone();
472
473        // Store transactions history record
474        if let Err(e) = self.put_tx_history_record(tx, "Broadcasted", None).await {
475            return Err(Error::DatabaseError(format!(
476                "[broadcast_tx] Inserting transaction history record failed: {e}"
477            )))
478        }
479
480        Ok(txid)
481    }
482
483    /// Queries darkfid for a tx with given hash.
484    pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
485        let tx_hash_str = tx_hash.to_string();
486        match self
487            .darkfid_daemon_request(
488                "blockchain.get_tx",
489                &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
490            )
491            .await
492        {
493            Ok(param) => {
494                let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
495                let tx = deserialize_async(&tx_bytes).await?;
496                Ok(Some(tx))
497            }
498
499            Err(_) => Ok(None),
500        }
501    }
502
503    /// Simulate the transaction with the state machine.
504    pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
505        let tx_str = base64::encode(&serialize_async(tx).await);
506        let rep = self
507            .darkfid_daemon_request(
508                "tx.simulate",
509                &JsonValue::Array(vec![JsonValue::String(tx_str)]),
510            )
511            .await?;
512
513        let is_valid = *rep.get::<bool>().unwrap();
514        Ok(is_valid)
515    }
516
517    /// Try to fetch zkas bincodes for the given `ContractId`.
518    pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
519        let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
520        let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", &params).await?;
521        let params = rep.get::<Vec<JsonValue>>().unwrap();
522
523        let mut ret = Vec::with_capacity(params.len());
524        for param in params {
525            let zkas_ns = param[0].get::<String>().unwrap().clone();
526            let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
527            ret.push((zkas_ns, zkas_bincode_bytes));
528        }
529
530        Ok(ret)
531    }
532
533    /// Queries darkfid for given transaction's required fee.
534    pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
535        let params = JsonValue::Array(vec![
536            JsonValue::String(base64::encode(&serialize_async(tx).await)),
537            JsonValue::Boolean(include_fee),
538        ]);
539        let rep = self.darkfid_daemon_request("tx.calculate_fee", &params).await?;
540
541        let fee = *rep.get::<f64>().unwrap() as u64;
542
543        Ok(fee)
544    }
545
546    /// Queries darkfid for current best fork next height.
547    pub async fn get_next_block_height(&self) -> Result<u32> {
548        let rep = self
549            .darkfid_daemon_request(
550                "blockchain.best_fork_next_block_height",
551                &JsonValue::Array(vec![]),
552            )
553            .await?;
554
555        let next_height = *rep.get::<f64>().unwrap() as u32;
556
557        Ok(next_height)
558    }
559
560    /// Queries darkfid for currently configured block target time.
561    pub async fn get_block_target(&self) -> Result<u32> {
562        let rep = self
563            .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
564            .await?;
565
566        let next_height = *rep.get::<f64>().unwrap() as u32;
567
568        Ok(next_height)
569    }
570
571    /// Auxiliary function to ping configured darkfid daemon for liveness.
572    pub async fn ping(&self, output: &mut Vec<String>) -> Result<()> {
573        output.push(String::from("Executing ping request to darkfid..."));
574        let latency = Instant::now();
575        let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
576        let latency = latency.elapsed();
577        output.push(format!("Got reply: {rep:?}"));
578        output.push(format!("Latency: {latency:?}"));
579        Ok(())
580    }
581
582    /// Auxiliary function to execute a request towards the configured darkfid daemon JSON-RPC endpoint.
583    pub async fn darkfid_daemon_request(
584        &self,
585        method: &str,
586        params: &JsonValue,
587    ) -> Result<JsonValue> {
588        let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
589        let mut lock = rpc_client.write().await;
590        let req = JsonRequest::new(method, params.clone());
591
592        // Check the client is initialized
593        if let Some(ref client) = lock.client {
594            // Execute request
595            if let Ok(rep) = client.request(req.clone()).await {
596                drop(lock);
597                return Ok(rep);
598            }
599        }
600
601        // Reset the rpc client in case of an error and try again
602        let client = RpcClient::new(lock.endpoint.clone(), lock.ex.clone()).await?;
603        let rep = client.request(req).await?;
604        lock.client = Some(client);
605        drop(lock);
606        Ok(rep)
607    }
608
609    /// Auxiliary function to stop current JSON-RPC client, if its initialized.
610    pub async fn stop_rpc_client(&self) -> Result<()> {
611        if let Some(ref rpc_client) = self.rpc_client {
612            rpc_client.read().await.stop().await;
613        };
614        Ok(())
615    }
616}
617
618/// Subscribes to darkfid's JSON-RPC notification endpoint that serves
619/// new confirmed blocks. Upon receiving them, all the transactions are
620/// scanned and we check if any of them call the money contract, and if
621/// the payments are intended for us. If so, we decrypt them and append
622/// the metadata to our wallet. If a reorg block is received, we revert
623/// to its previous height and then scan it. We assume that the blocks
624/// up to that point are unchanged, since darkfid will just broadcast
625/// the sequence after the reorg.
626pub async fn subscribe_blocks(
627    drk: &DrkPtr,
628    rpc_task: StoppableTaskPtr,
629    shell_sender: Sender<Vec<String>>,
630    endpoint: Url,
631    ex: &ExecutorPtr,
632) -> Result<()> {
633    // First we do a clean scan
634    let lock = drk.read().await;
635    if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
636        let err_msg = format!("Failed during scanning: {e}");
637        shell_sender.send(vec![err_msg.clone()]).await?;
638        return Err(Error::Custom(err_msg))
639    }
640    shell_sender.send(vec![String::from("Finished scanning blockchain")]).await?;
641
642    // Grab last confirmed block height
643    let (last_confirmed_height, _) = lock.get_last_confirmed_block().await?;
644
645    // Handle genesis(0) block
646    if last_confirmed_height == 0 {
647        if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
648            let err_msg = format!("[subscribe_blocks] Scanning from genesis block failed: {e}");
649            shell_sender.send(vec![err_msg.clone()]).await?;
650            return Err(Error::Custom(err_msg))
651        }
652    }
653
654    // Grab last confirmed block again
655    let (last_confirmed_height, last_confirmed_hash) = lock.get_last_confirmed_block().await?;
656
657    // Grab last scanned block
658    let (mut last_scanned_height, last_scanned_hash) = match lock.get_last_scanned_block() {
659        Ok(last) => last,
660        Err(e) => {
661            let err_msg = format!("[subscribe_blocks] Retrieving last scanned block failed: {e}");
662            shell_sender.send(vec![err_msg.clone()]).await?;
663            return Err(Error::Custom(err_msg))
664        }
665    };
666    drop(lock);
667
668    // Check if other blocks have been created
669    if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash {
670        let err_msg = String::from("[subscribe_blocks] Blockchain not fully scanned");
671        shell_sender
672            .send(vec![
673                String::from("Warning: Last scanned block is not the last confirmed block."),
674                String::from("You should first fully scan the blockchain, and then subscribe"),
675                err_msg.clone(),
676            ])
677            .await?;
678        return Err(Error::Custom(err_msg))
679    }
680
681    let mut shell_message =
682        vec![String::from("Subscribing to receive notifications of incoming blocks")];
683    let publisher = Publisher::new();
684    let subscription = publisher.clone().subscribe().await;
685    let _publisher = publisher.clone();
686    let rpc_client = Arc::new(RpcClient::new(endpoint, ex.clone()).await?);
687    let rpc_client_ = rpc_client.clone();
688    rpc_task.start(
689        // Weird hack to prevent lifetimes hell
690        async move {
691            let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
692            rpc_client_.subscribe(req, _publisher).await
693        },
694        |res| async move {
695            rpc_client.stop().await;
696            match res {
697                Ok(()) | Err(Error::DetachedTaskStopped) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
698                Err(e) => {
699                    eprintln!("[subscribe_blocks] JSON-RPC server error: {e}");
700                    publisher
701                        .notify(JsonResult::Error(JsonError::new(
702                            ErrorCode::InternalError,
703                            None,
704                            0,
705                        )))
706                        .await;
707                }
708            }
709        },
710        Error::RpcServerStopped,
711        ex.clone(),
712    );
713    shell_message.push(String::from("Detached subscription to background"));
714    shell_message.push(String::from("All is good. Waiting for block notifications..."));
715    shell_sender.send(shell_message).await?;
716
717    let e = 'outer: loop {
718        match subscription.receive().await {
719            JsonResult::Notification(n) => {
720                let mut shell_message =
721                    vec![String::from("Got Block notification from darkfid subscription")];
722                if n.method != "blockchain.subscribe_blocks" {
723                    shell_sender.send(shell_message).await?;
724                    break Error::UnexpectedJsonRpc(format!(
725                        "Got foreign notification from darkfid: {}",
726                        n.method
727                    ))
728                }
729
730                // Verify parameters
731                if !n.params.is_array() {
732                    shell_sender.send(shell_message).await?;
733                    break Error::UnexpectedJsonRpc(
734                        "Received notification params are not an array".to_string(),
735                    )
736                }
737                let params = n.params.get::<Vec<JsonValue>>().unwrap();
738                if params.is_empty() {
739                    shell_sender.send(shell_message).await?;
740                    break Error::UnexpectedJsonRpc("Notification parameters are empty".to_string())
741                }
742
743                for param in params {
744                    let param = param.get::<String>().unwrap();
745                    let bytes = base64::decode(param).unwrap();
746
747                    let block: BlockInfo = deserialize_async(&bytes).await?;
748                    shell_message
749                        .push(String::from("Deserialized successfully. Scanning block..."));
750
751                    // Check if a reorg block was received, to reset to its previous
752                    let lock = drk.read().await;
753                    if block.header.height <= last_scanned_height {
754                        let reset_height = block.header.height.saturating_sub(1);
755                        if let Err(e) = lock.reset_to_height(reset_height, &mut shell_message).await
756                        {
757                            shell_sender.send(shell_message).await?;
758                            break 'outer Error::Custom(format!(
759                                "[subscribe_blocks] Wallet state reset failed: {e}"
760                            ))
761                        }
762
763                        // Scan genesis again if needed
764                        if reset_height == 0 {
765                            let genesis = match lock.get_block_by_height(reset_height).await {
766                                Ok(b) => b,
767                                Err(e) => {
768                                    shell_sender.send(shell_message).await?;
769                                    break 'outer Error::Custom(format!(
770                                        "[subscribe_blocks] RPC client request failed: {e}"
771                                    ))
772                                }
773                            };
774                            let mut scan_cache = lock.scan_cache().await?;
775                            if let Err(e) = lock.scan_block(&mut scan_cache, &genesis).await {
776                                shell_sender.send(shell_message).await?;
777                                break 'outer Error::Custom(format!(
778                                    "[subscribe_blocks] Scanning block failed: {e}"
779                                ))
780                            };
781                            for msg in scan_cache.flush_messages() {
782                                shell_message.push(msg);
783                            }
784                        }
785                    }
786
787                    let mut scan_cache = lock.scan_cache().await?;
788                    if let Err(e) = lock.scan_block(&mut scan_cache, &block).await {
789                        shell_sender.send(shell_message).await?;
790                        break 'outer Error::Custom(format!(
791                            "[subscribe_blocks] Scanning block failed: {e}"
792                        ))
793                    }
794                    for msg in scan_cache.flush_messages() {
795                        shell_message.push(msg);
796                    }
797                    shell_sender.send(shell_message.clone()).await?;
798
799                    // Set new last scanned block height
800                    last_scanned_height = block.header.height;
801                }
802            }
803
804            JsonResult::Error(e) => {
805                // Some error happened in the transmission
806                break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
807            }
808
809            x => {
810                // And this is weird
811                break Error::UnexpectedJsonRpc(format!("Got unexpected data from JSON-RPC: {x:?}"))
812            }
813        }
814    };
815
816    shell_sender.send(vec![format!("[subscribe_blocks] Subscription loop break: {e}")]).await?;
817    Err(e)
818}