drk/
rpc.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeMap, HashMap},
21    sync::Arc,
22    time::Instant,
23};
24
25use smol::channel::Sender;
26use url::Url;
27
28use darkfi::{
29    blockchain::BlockInfo,
30    rpc::{
31        client::RpcClient,
32        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
33        util::JsonValue,
34    },
35    system::{ExecutorPtr, Publisher, StoppableTaskPtr},
36    tx::Transaction,
37    util::encoding::base64,
38    Error, Result,
39};
40use darkfi_dao_contract::model::{DaoBulla, DaoProposalBulla};
41use darkfi_money_contract::model::TokenId;
42use darkfi_sdk::{
43    bridgetree::Position,
44    crypto::{
45        smt::{PoseidonFp, EMPTY_NODES_FP},
46        ContractId, MerkleTree, SecretKey, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID,
47        MONEY_CONTRACT_ID,
48    },
49    tx::TransactionHash,
50};
51use darkfi_serial::{deserialize_async, serialize_async};
52
53use crate::{
54    cache::{CacheOverlay, CacheSmt, CacheSmtStorage, SLED_MONEY_SMT_TREE},
55    cli_util::append_or_print,
56    dao::{SLED_MERKLE_TREES_DAO_DAOS, SLED_MERKLE_TREES_DAO_PROPOSALS},
57    error::{WalletDbError, WalletDbResult},
58    money::SLED_MERKLE_TREES_MONEY,
59    Drk, DrkPtr,
60};
61
62/// Structure to hold a JSON-RPC client and its config,
63/// so we can recreate it in case of an error.
64pub struct DarkfidRpcClient {
65    endpoint: Url,
66    ex: ExecutorPtr,
67    client: Option<RpcClient>,
68}
69
70impl DarkfidRpcClient {
71    pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Self {
72        let client = RpcClient::new(endpoint.clone(), ex.clone()).await.ok();
73        Self { endpoint, ex, client }
74    }
75
76    /// Stop the client.
77    pub async fn stop(&self) {
78        if let Some(ref client) = self.client {
79            client.stop().await
80        }
81    }
82}
83
84/// Auxiliary structure holding various in memory caches to use during scan
85pub struct ScanCache {
86    /// The Money Merkle tree containing coins
87    pub money_tree: MerkleTree,
88    /// The Money Sparse Merkle tree containing coins nullifiers
89    pub money_smt: CacheSmt,
90    /// All our known secrets to decrypt coin notes
91    pub notes_secrets: Vec<SecretKey>,
92    /// Our own coins nullifiers and their leaf positions
93    pub owncoins_nullifiers: BTreeMap<[u8; 32], ([u8; 32], Position)>,
94    /// Our own tokens to track freezes
95    pub own_tokens: Vec<TokenId>,
96    /// The DAO Merkle tree containing DAO bullas
97    pub dao_daos_tree: MerkleTree,
98    /// The DAO Merkle tree containing proposals bullas
99    pub dao_proposals_tree: MerkleTree,
100    /// Our own DAOs with their proposals and votes keys
101    pub own_daos: HashMap<DaoBulla, (Option<SecretKey>, Option<SecretKey>)>,
102    /// Our own DAOs proposals with their corresponding DAO reference
103    pub own_proposals: HashMap<DaoProposalBulla, DaoBulla>,
104    /// Our own deploy authorities
105    pub own_deploy_auths: HashMap<[u8; 32], SecretKey>,
106    /// Messages buffer for better downstream prints handling
107    pub messages_buffer: Vec<String>,
108}
109
110impl ScanCache {
111    /// Auxiliary function to append messages to the buffer.
112    pub fn log(&mut self, msg: String) {
113        self.messages_buffer.push(msg);
114    }
115
116    /// Auxiliary function to consume the messages buffer.
117    pub fn flush_messages(&mut self) -> Vec<String> {
118        self.messages_buffer.drain(..).collect()
119    }
120}
121
122impl Drk {
123    /// Auxiliary function to generate a new [`ScanCache`] for the
124    /// wallet.
125    pub async fn scan_cache(&self) -> Result<ScanCache> {
126        let money_tree = self.get_money_tree().await?;
127        let smt_store = CacheSmtStorage::new(CacheOverlay::new(&self.cache)?, SLED_MONEY_SMT_TREE);
128        let money_smt = CacheSmt::new(smt_store, PoseidonFp::new(), &EMPTY_NODES_FP);
129        let mut notes_secrets = self.get_money_secrets().await?;
130        let mut owncoins_nullifiers = BTreeMap::new();
131        for coin in self.get_coins(true).await? {
132            owncoins_nullifiers.insert(
133                coin.0.nullifier().to_bytes(),
134                (coin.0.coin.to_bytes(), coin.0.leaf_position),
135            );
136        }
137        let mint_authorities = self.get_mint_authorities().await?;
138        let mut own_tokens = Vec::with_capacity(mint_authorities.len());
139        for (token, _, _, _, _) in mint_authorities {
140            own_tokens.push(token);
141        }
142        let (dao_daos_tree, dao_proposals_tree) = self.get_dao_trees().await?;
143        let mut own_daos = HashMap::new();
144        for dao in self.get_daos().await? {
145            own_daos.insert(
146                dao.bulla(),
147                (dao.params.proposals_secret_key, dao.params.votes_secret_key),
148            );
149            if let Some(secret_key) = dao.params.notes_secret_key {
150                notes_secrets.push(secret_key);
151            }
152        }
153        let mut own_proposals = HashMap::new();
154        for proposal in self.get_proposals().await? {
155            own_proposals.insert(proposal.bulla(), proposal.proposal.dao_bulla);
156        }
157        let own_deploy_auths = self.get_deploy_auths_keys_map().await?;
158
159        Ok(ScanCache {
160            money_tree,
161            money_smt,
162            notes_secrets,
163            owncoins_nullifiers,
164            own_tokens,
165            dao_daos_tree,
166            dao_proposals_tree,
167            own_daos,
168            own_proposals,
169            own_deploy_auths,
170            messages_buffer: vec![],
171        })
172    }
173
174    /// `scan_block` will go over over transactions in a block and handle their calls
175    /// based on the called contract.
176    async fn scan_block(&self, scan_cache: &mut ScanCache, block: &BlockInfo) -> Result<()> {
177        // Keep track of our wallet transactions.
178        let mut wallet_txs = vec![];
179
180        // Checkpoint the merkle trees
181        scan_cache.money_tree.checkpoint(block.header.height as usize);
182        scan_cache.dao_daos_tree.checkpoint(block.header.height as usize);
183        scan_cache.dao_proposals_tree.checkpoint(block.header.height as usize);
184
185        // Scan the block
186        scan_cache.log(String::from("======================================="));
187        scan_cache.log(format!("{}", block.header));
188        scan_cache.log(String::from("======================================="));
189        scan_cache.log(format!("[scan_block] Iterating over {} transactions", block.txs.len()));
190        for tx in block.txs.iter() {
191            let tx_hash = tx.hash();
192            let tx_hash_string = tx_hash.to_string();
193            let mut wallet_tx = false;
194            scan_cache.log(format!("[scan_block] Processing transaction: {tx_hash_string}"));
195            for (i, call) in tx.calls.iter().enumerate() {
196                if call.data.contract_id == *MONEY_CONTRACT_ID {
197                    scan_cache.log(format!("[scan_block] Found Money contract in call {i}"));
198                    if self
199                        .apply_tx_money_data(
200                            scan_cache,
201                            &i,
202                            &tx.calls,
203                            &tx_hash_string,
204                            &block.header.height,
205                        )
206                        .await?
207                    {
208                        wallet_tx = true;
209                    }
210                    continue
211                }
212
213                if call.data.contract_id == *DAO_CONTRACT_ID {
214                    scan_cache.log(format!("[scan_block] Found DAO contract in call {i}"));
215                    if self
216                        .apply_tx_dao_data(
217                            scan_cache,
218                            &call.data.data,
219                            &tx_hash,
220                            &(i as u8),
221                            &block.header.height,
222                        )
223                        .await?
224                    {
225                        wallet_tx = true;
226                    }
227                    continue
228                }
229
230                if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
231                    scan_cache.log(format!("[scan_block] Found DeployoOor contract in call {i}"));
232                    if self
233                        .apply_tx_deploy_data(
234                            scan_cache,
235                            &call.data.data,
236                            &tx_hash,
237                            &block.header.height,
238                        )
239                        .await?
240                    {
241                        wallet_tx = true;
242                    }
243                    continue
244                }
245
246                // TODO: For now we skip non-native contract calls
247                scan_cache
248                    .log(format!("[scan_block] Found non-native contract in call {i}, skipping."));
249            }
250
251            // If this is our wallet tx we mark it for update
252            if wallet_tx {
253                wallet_txs.push(tx);
254            }
255        }
256
257        // Insert the block record
258        scan_cache
259            .money_smt
260            .store
261            .overlay
262            .insert_scanned_block(&block.header.height, &block.header.hash())?;
263
264        // Grab the overlay current diff
265        let diff = scan_cache.money_smt.store.overlay.0.diff(&[])?;
266
267        // Apply the overlay current changes
268        scan_cache.money_smt.store.overlay.0.apply_diff(&diff)?;
269
270        // Insert the state inverse diff record
271        self.cache.insert_state_inverse_diff(&block.header.height, &diff.inverse())?;
272
273        // Update the merkle trees
274        self.cache.insert_merkle_trees(&[
275            (SLED_MERKLE_TREES_MONEY, &scan_cache.money_tree),
276            (SLED_MERKLE_TREES_DAO_DAOS, &scan_cache.dao_daos_tree),
277            (SLED_MERKLE_TREES_DAO_PROPOSALS, &scan_cache.dao_proposals_tree),
278        ])?;
279
280        // Flush sled
281        self.cache.sled_db.flush()?;
282
283        // Update wallet transactions records
284        if let Err(e) =
285            self.put_tx_history_records(&wallet_txs, "Confirmed", Some(block.header.height)).await
286        {
287            return Err(Error::DatabaseError(format!(
288                "[scan_block] Inserting transaction history records failed: {e}"
289            )))
290        }
291
292        Ok(())
293    }
294
295    /// Scans the blockchain for wallet relevant transactions,
296    /// starting from the last scanned block. If a reorg has happened,
297    /// we revert to its previous height and then scan from there.
298    pub async fn scan_blocks(
299        &self,
300        output: &mut Vec<String>,
301        sender: Option<&Sender<Vec<String>>>,
302        print: &bool,
303    ) -> WalletDbResult<()> {
304        // Grab last scanned block height
305        let (mut height, hash) = self.get_last_scanned_block()?;
306
307        // Grab our last scanned block from darkfid
308        let block = match self.get_block_by_height(height).await {
309            Ok(b) => Some(b),
310            // Check if block was found
311            Err(Error::JsonRpcError((-32121, _))) => None,
312            Err(e) => {
313                append_or_print(
314                    output,
315                    sender,
316                    print,
317                    vec![format!("[scan_blocks] RPC client request failed: {e}")],
318                )
319                .await;
320                return Err(WalletDbError::GenericError)
321            }
322        };
323
324        // Check if a reorg has happened
325        if block.is_none() || hash != block.unwrap().hash().to_string() {
326            // Find the exact block height the reorg happened
327            let mut buf =
328                vec![String::from("A reorg has happened, finding last known common block...")];
329            height = height.saturating_sub(1);
330            while height != 0 {
331                // Grab our scanned block hash for that height
332                let scanned_block_hash = self.get_scanned_block_hash(&height)?;
333
334                // Grab the block from darkfid for that height
335                let block = match self.get_block_by_height(height).await {
336                    Ok(b) => Some(b),
337                    // Check if block was found
338                    Err(Error::JsonRpcError((-32121, _))) => None,
339                    Err(e) => {
340                        buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
341                        append_or_print(output, sender, print, buf).await;
342                        return Err(WalletDbError::GenericError)
343                    }
344                };
345
346                // Continue to previous one if they don't match
347                if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
348                    height = height.saturating_sub(1);
349                    continue
350                }
351
352                // Reset to its height
353                buf.push(format!("Last common block found: {height} - {scanned_block_hash}"));
354                self.reset_to_height(height, &mut buf).await?;
355                append_or_print(output, sender, print, buf).await;
356                break
357            }
358        }
359
360        // If last scanned block is genesis(0) we reset,
361        // otherwise continue with the next block height.
362        if height == 0 {
363            let mut buf = vec![];
364            self.reset(&mut buf)?;
365            append_or_print(output, sender, print, buf).await;
366        } else {
367            height += 1;
368        }
369
370        // Generate a new scan cache
371        let mut scan_cache = match self.scan_cache().await {
372            Ok(c) => c,
373            Err(e) => {
374                append_or_print(
375                    output,
376                    sender,
377                    print,
378                    vec![format!("[scan_blocks] Generating scan cache failed: {e}")],
379                )
380                .await;
381                return Err(WalletDbError::GenericError)
382            }
383        };
384
385        loop {
386            // Grab last confirmed block
387            let mut buf = vec![format!("Requested to scan from block number: {height}")];
388            let (last_height, last_hash) = match self.get_last_confirmed_block().await {
389                Ok(last) => last,
390                Err(e) => {
391                    buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
392                    append_or_print(output, sender, print, buf).await;
393                    return Err(WalletDbError::GenericError)
394                }
395            };
396            buf.push(format!(
397                "Last confirmed block reported by darkfid: {last_height} - {last_hash}"
398            ));
399            append_or_print(output, sender, print, buf).await;
400
401            // Already scanned last confirmed block
402            if height > last_height {
403                return Ok(())
404            }
405
406            while height <= last_height {
407                let mut buf = vec![format!("Requesting block {height}...")];
408                let block = match self.get_block_by_height(height).await {
409                    Ok(b) => b,
410                    Err(e) => {
411                        buf.push(format!("[scan_blocks] RPC client request failed: {e}"));
412                        append_or_print(output, sender, print, buf).await;
413                        return Err(WalletDbError::GenericError)
414                    }
415                };
416                buf.push(format!("Block {height} received! Scanning block..."));
417                if let Err(e) = self.scan_block(&mut scan_cache, &block).await {
418                    buf.push(format!("[scan_blocks] Scan block failed: {e}"));
419                    append_or_print(output, sender, print, buf).await;
420                    return Err(WalletDbError::GenericError)
421                };
422                for msg in scan_cache.flush_messages() {
423                    buf.push(msg);
424                }
425                append_or_print(output, sender, print, buf).await;
426                height += 1;
427            }
428        }
429    }
430
431    // Queries darkfid for last confirmed block.
432    async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
433        let rep = self
434            .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
435            .await?;
436        let params = rep.get::<Vec<JsonValue>>().unwrap();
437        let height = *params[0].get::<f64>().unwrap() as u32;
438        let hash = params[1].get::<String>().unwrap().clone();
439
440        Ok((height, hash))
441    }
442
443    // Queries darkfid for a block with given height.
444    async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
445        let params = self
446            .darkfid_daemon_request(
447                "blockchain.get_block",
448                &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
449            )
450            .await?;
451        let param = params.get::<String>().unwrap();
452        let bytes = base64::decode(param).unwrap();
453        let block = deserialize_async(&bytes).await?;
454        Ok(block)
455    }
456
457    /// Broadcast a given transaction to darkfid and forward onto the network.
458    /// Returns the transaction ID upon success.
459    pub async fn broadcast_tx(&self, tx: &Transaction, output: &mut Vec<String>) -> Result<String> {
460        output.push(String::from("Broadcasting transaction..."));
461
462        let params =
463            JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
464        let rep = self.darkfid_daemon_request("tx.broadcast", &params).await?;
465
466        let txid = rep.get::<String>().unwrap().clone();
467
468        // Store transactions history record
469        if let Err(e) = self.put_tx_history_record(tx, "Broadcasted", None).await {
470            return Err(Error::DatabaseError(format!(
471                "[broadcast_tx] Inserting transaction history record failed: {e}"
472            )))
473        }
474
475        Ok(txid)
476    }
477
478    /// Queries darkfid for a tx with given hash.
479    pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
480        let tx_hash_str = tx_hash.to_string();
481        match self
482            .darkfid_daemon_request(
483                "blockchain.get_tx",
484                &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
485            )
486            .await
487        {
488            Ok(param) => {
489                let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
490                let tx = deserialize_async(&tx_bytes).await?;
491                Ok(Some(tx))
492            }
493
494            Err(_) => Ok(None),
495        }
496    }
497
498    /// Simulate the transaction with the state machine.
499    pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
500        let tx_str = base64::encode(&serialize_async(tx).await);
501        let rep = self
502            .darkfid_daemon_request(
503                "tx.simulate",
504                &JsonValue::Array(vec![JsonValue::String(tx_str)]),
505            )
506            .await?;
507
508        let is_valid = *rep.get::<bool>().unwrap();
509        Ok(is_valid)
510    }
511
512    /// Try to fetch zkas bincodes for the given `ContractId`.
513    pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
514        let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
515        let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", &params).await?;
516        let params = rep.get::<Vec<JsonValue>>().unwrap();
517
518        let mut ret = Vec::with_capacity(params.len());
519        for param in params {
520            let zkas_ns = param[0].get::<String>().unwrap().clone();
521            let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
522            ret.push((zkas_ns, zkas_bincode_bytes));
523        }
524
525        Ok(ret)
526    }
527
528    /// Queries darkfid for given transaction's required fee.
529    pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
530        let params = JsonValue::Array(vec![
531            JsonValue::String(base64::encode(&serialize_async(tx).await)),
532            JsonValue::Boolean(include_fee),
533        ]);
534        let rep = self.darkfid_daemon_request("tx.calculate_fee", &params).await?;
535
536        let fee = *rep.get::<f64>().unwrap() as u64;
537
538        Ok(fee)
539    }
540
541    /// Queries darkfid for current best fork next height.
542    pub async fn get_next_block_height(&self) -> Result<u32> {
543        let rep = self
544            .darkfid_daemon_request(
545                "blockchain.best_fork_next_block_height",
546                &JsonValue::Array(vec![]),
547            )
548            .await?;
549
550        let next_height = *rep.get::<f64>().unwrap() as u32;
551
552        Ok(next_height)
553    }
554
555    /// Queries darkfid for currently configured block target time.
556    pub async fn get_block_target(&self) -> Result<u32> {
557        let rep = self
558            .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
559            .await?;
560
561        let next_height = *rep.get::<f64>().unwrap() as u32;
562
563        Ok(next_height)
564    }
565
566    /// Auxiliary function to ping configured darkfid daemon for liveness.
567    pub async fn ping(&self, output: &mut Vec<String>) -> Result<()> {
568        output.push(String::from("Executing ping request to darkfid..."));
569        let latency = Instant::now();
570        let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
571        let latency = latency.elapsed();
572        output.push(format!("Got reply: {rep:?}"));
573        output.push(format!("Latency: {latency:?}"));
574        Ok(())
575    }
576
577    /// Auxiliary function to execute a request towards the configured darkfid daemon JSON-RPC endpoint.
578    pub async fn darkfid_daemon_request(
579        &self,
580        method: &str,
581        params: &JsonValue,
582    ) -> Result<JsonValue> {
583        let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
584        let mut lock = rpc_client.write().await;
585        let req = JsonRequest::new(method, params.clone());
586
587        // Check the client is initialized
588        if let Some(ref client) = lock.client {
589            // Execute request
590            if let Ok(rep) = client.request(req.clone()).await {
591                drop(lock);
592                return Ok(rep);
593            }
594        }
595
596        // Reset the rpc client in case of an error and try again
597        let client = RpcClient::new(lock.endpoint.clone(), lock.ex.clone()).await?;
598        let rep = client.request(req).await?;
599        lock.client = Some(client);
600        drop(lock);
601        Ok(rep)
602    }
603
604    /// Auxiliary function to stop current JSON-RPC client, if its initialized.
605    pub async fn stop_rpc_client(&self) -> Result<()> {
606        if let Some(ref rpc_client) = self.rpc_client {
607            rpc_client.read().await.stop().await;
608        };
609        Ok(())
610    }
611}
612
613/// Subscribes to darkfid's JSON-RPC notification endpoint that serves
614/// new confirmed blocks. Upon receiving them, all the transactions are
615/// scanned and we check if any of them call the money contract, and if
616/// the payments are intended for us. If so, we decrypt them and append
617/// the metadata to our wallet. If a reorg block is received, we revert
618/// to its previous height and then scan it. We assume that the blocks
619/// up to that point are unchanged, since darkfid will just broadcast
620/// the sequence after the reorg.
621pub async fn subscribe_blocks(
622    drk: &DrkPtr,
623    rpc_task: StoppableTaskPtr,
624    shell_sender: Sender<Vec<String>>,
625    endpoint: Url,
626    ex: &ExecutorPtr,
627) -> Result<()> {
628    // First we do a clean scan
629    let lock = drk.read().await;
630    if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
631        let err_msg = format!("Failed during scanning: {e}");
632        shell_sender.send(vec![err_msg.clone()]).await?;
633        return Err(Error::Custom(err_msg))
634    }
635    shell_sender.send(vec![String::from("Finished scanning blockchain")]).await?;
636
637    // Grab last confirmed block height
638    let (last_confirmed_height, _) = lock.get_last_confirmed_block().await?;
639
640    // Handle genesis(0) block
641    if last_confirmed_height == 0 {
642        if let Err(e) = lock.scan_blocks(&mut vec![], Some(&shell_sender), &false).await {
643            let err_msg = format!("[subscribe_blocks] Scanning from genesis block failed: {e}");
644            shell_sender.send(vec![err_msg.clone()]).await?;
645            return Err(Error::Custom(err_msg))
646        }
647    }
648
649    // Grab last confirmed block again
650    let (last_confirmed_height, last_confirmed_hash) = lock.get_last_confirmed_block().await?;
651
652    // Grab last scanned block
653    let (mut last_scanned_height, last_scanned_hash) = match lock.get_last_scanned_block() {
654        Ok(last) => last,
655        Err(e) => {
656            let err_msg = format!("[subscribe_blocks] Retrieving last scanned block failed: {e}");
657            shell_sender.send(vec![err_msg.clone()]).await?;
658            return Err(Error::Custom(err_msg))
659        }
660    };
661    drop(lock);
662
663    // Check if other blocks have been created
664    if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash {
665        let err_msg = String::from("[subscribe_blocks] Blockchain not fully scanned");
666        shell_sender
667            .send(vec![
668                String::from("Warning: Last scanned block is not the last confirmed block."),
669                String::from("You should first fully scan the blockchain, and then subscribe"),
670                err_msg.clone(),
671            ])
672            .await?;
673        return Err(Error::Custom(err_msg))
674    }
675
676    let mut shell_message =
677        vec![String::from("Subscribing to receive notifications of incoming blocks")];
678    let publisher = Publisher::new();
679    let subscription = publisher.clone().subscribe().await;
680    let _publisher = publisher.clone();
681    let rpc_client = Arc::new(RpcClient::new(endpoint, ex.clone()).await?);
682    let rpc_client_ = rpc_client.clone();
683    rpc_task.start(
684        // Weird hack to prevent lifetimes hell
685        async move {
686            let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
687            rpc_client_.subscribe(req, _publisher).await
688        },
689        |res| async move {
690            rpc_client.stop().await;
691            match res {
692                Ok(()) | Err(Error::DetachedTaskStopped) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
693                Err(e) => {
694                    eprintln!("[subscribe_blocks] JSON-RPC server error: {e}");
695                    publisher
696                        .notify(JsonResult::Error(JsonError::new(
697                            ErrorCode::InternalError,
698                            None,
699                            0,
700                        )))
701                        .await;
702                }
703            }
704        },
705        Error::RpcServerStopped,
706        ex.clone(),
707    );
708    shell_message.push(String::from("Detached subscription to background"));
709    shell_message.push(String::from("All is good. Waiting for block notifications..."));
710    shell_sender.send(shell_message).await?;
711
712    let e = 'outer: loop {
713        match subscription.receive().await {
714            JsonResult::Notification(n) => {
715                let mut shell_message =
716                    vec![String::from("Got Block notification from darkfid subscription")];
717                if n.method != "blockchain.subscribe_blocks" {
718                    shell_sender.send(shell_message).await?;
719                    break Error::UnexpectedJsonRpc(format!(
720                        "Got foreign notification from darkfid: {}",
721                        n.method
722                    ))
723                }
724
725                // Verify parameters
726                if !n.params.is_array() {
727                    shell_sender.send(shell_message).await?;
728                    break Error::UnexpectedJsonRpc(
729                        "Received notification params are not an array".to_string(),
730                    )
731                }
732                let params = n.params.get::<Vec<JsonValue>>().unwrap();
733                if params.is_empty() {
734                    shell_sender.send(shell_message).await?;
735                    break Error::UnexpectedJsonRpc("Notification parameters are empty".to_string())
736                }
737
738                for param in params {
739                    let param = param.get::<String>().unwrap();
740                    let bytes = base64::decode(param).unwrap();
741
742                    let block: BlockInfo = deserialize_async(&bytes).await?;
743                    shell_message
744                        .push(String::from("Deserialized successfully. Scanning block..."));
745
746                    // Check if a reorg block was received, to reset to its previous
747                    let lock = drk.read().await;
748                    if block.header.height <= last_scanned_height {
749                        let reset_height = block.header.height.saturating_sub(1);
750                        if let Err(e) = lock.reset_to_height(reset_height, &mut shell_message).await
751                        {
752                            shell_sender.send(shell_message).await?;
753                            break 'outer Error::Custom(format!(
754                                "[subscribe_blocks] Wallet state reset failed: {e}"
755                            ))
756                        }
757
758                        // Scan genesis again if needed
759                        if reset_height == 0 {
760                            let genesis = match lock.get_block_by_height(reset_height).await {
761                                Ok(b) => b,
762                                Err(e) => {
763                                    shell_sender.send(shell_message).await?;
764                                    break 'outer Error::Custom(format!(
765                                        "[subscribe_blocks] RPC client request failed: {e}"
766                                    ))
767                                }
768                            };
769                            let mut scan_cache = lock.scan_cache().await?;
770                            if let Err(e) = lock.scan_block(&mut scan_cache, &genesis).await {
771                                shell_sender.send(shell_message).await?;
772                                break 'outer Error::Custom(format!(
773                                    "[subscribe_blocks] Scanning block failed: {e}"
774                                ))
775                            };
776                            for msg in scan_cache.flush_messages() {
777                                shell_message.push(msg);
778                            }
779                        }
780                    }
781
782                    let mut scan_cache = lock.scan_cache().await?;
783                    if let Err(e) = lock.scan_block(&mut scan_cache, &block).await {
784                        shell_sender.send(shell_message).await?;
785                        break 'outer Error::Custom(format!(
786                            "[subscribe_blocks] Scanning block failed: {e}"
787                        ))
788                    }
789                    for msg in scan_cache.flush_messages() {
790                        shell_message.push(msg);
791                    }
792                    shell_sender.send(shell_message.clone()).await?;
793
794                    // Set new last scanned block height
795                    last_scanned_height = block.header.height;
796                }
797            }
798
799            JsonResult::Error(e) => {
800                // Some error happened in the transmission
801                break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
802            }
803
804            x => {
805                // And this is weird
806                break Error::UnexpectedJsonRpc(format!("Got unexpected data from JSON-RPC: {x:?}"))
807            }
808        }
809    };
810
811    shell_sender.send(vec![format!("[subscribe_blocks] Subscription loop break: {e}")]).await?;
812    Err(e)
813}