drk/
rpc.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{sync::Arc, time::Instant};
20
21use url::Url;
22
23use darkfi::{
24    blockchain::BlockInfo,
25    rpc::{
26        client::RpcClient,
27        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
28        util::JsonValue,
29    },
30    system::{Publisher, StoppableTask},
31    tx::Transaction,
32    util::encoding::base64,
33    Error, Result,
34};
35use darkfi_sdk::{
36    crypto::{ContractId, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID, MONEY_CONTRACT_ID},
37    tx::TransactionHash,
38};
39use darkfi_serial::{deserialize_async, serialize_async};
40
41use crate::{
42    error::{WalletDbError, WalletDbResult},
43    Drk,
44};
45
46impl Drk {
47    /// Subscribes to darkfid's JSON-RPC notification endpoint that serves
48    /// new confirmed blocks. Upon receiving them, all the transactions are
49    /// scanned and we check if any of them call the money contract, and if
50    /// the payments are intended for us. If so, we decrypt them and append
51    /// the metadata to our wallet. If a reorg block is received, we revert
52    /// to its previous height and then scan it. We assume that the blocks
53    /// up to that point are unchanged, since darkfid will just broadcast
54    /// the sequence after the reorg.
55    pub async fn subscribe_blocks(
56        &self,
57        endpoint: Url,
58        ex: Arc<smol::Executor<'static>>,
59    ) -> Result<()> {
60        // Grab last confirmed block height
61        let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
62
63        // Handle genesis(0) block
64        if last_confirmed_height == 0 {
65            if let Err(e) = self.scan_blocks().await {
66                return Err(Error::DatabaseError(format!(
67                    "[subscribe_blocks] Scanning from genesis block failed: {e:?}"
68                )))
69            }
70        }
71
72        // Grab last confirmed block again
73        let (last_confirmed_height, last_confirmed_hash) = self.get_last_confirmed_block().await?;
74
75        // Grab last scanned block
76        let (mut last_scanned_height, last_scanned_hash) = match self.get_last_scanned_block() {
77            Ok(last) => last,
78            Err(e) => {
79                return Err(Error::DatabaseError(format!(
80                    "[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
81                )))
82            }
83        };
84
85        // Check if other blocks have been created
86        if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash
87        {
88            eprintln!("Warning: Last scanned block is not the last confirmed block.");
89            eprintln!("You should first fully scan the blockchain, and then subscribe");
90            return Err(Error::DatabaseError(
91                "[subscribe_blocks] Blockchain not fully scanned".to_string(),
92            ))
93        }
94
95        println!("Subscribing to receive notifications of incoming blocks");
96        let publisher = Publisher::new();
97        let subscription = publisher.clone().subscribe().await;
98        let _publisher = publisher.clone();
99        let _ex = ex.clone();
100        StoppableTask::new().start(
101            // Weird hack to prevent lifetimes hell
102            async move {
103                let rpc_client = RpcClient::new(endpoint, _ex).await?;
104                let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
105                rpc_client.subscribe(req, _publisher).await
106            },
107            |res| async move {
108                match res {
109                    Ok(()) => { /* Do nothing */ }
110                    Err(e) => {
111                        eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
112                        publisher
113                            .notify(JsonResult::Error(JsonError::new(
114                                ErrorCode::InternalError,
115                                None,
116                                0,
117                            )))
118                            .await;
119                    }
120                }
121            },
122            Error::RpcServerStopped,
123            ex,
124        );
125        println!("Detached subscription to background");
126        println!("All is good. Waiting for block notifications...");
127
128        let e = loop {
129            match subscription.receive().await {
130                JsonResult::Notification(n) => {
131                    println!("Got Block notification from darkfid subscription");
132                    if n.method != "blockchain.subscribe_blocks" {
133                        break Error::UnexpectedJsonRpc(format!(
134                            "Got foreign notification from darkfid: {}",
135                            n.method
136                        ))
137                    }
138
139                    // Verify parameters
140                    if !n.params.is_array() {
141                        break Error::UnexpectedJsonRpc(
142                            "Received notification params are not an array".to_string(),
143                        )
144                    }
145                    let params = n.params.get::<Vec<JsonValue>>().unwrap();
146                    if params.is_empty() {
147                        break Error::UnexpectedJsonRpc(
148                            "Notification parameters are empty".to_string(),
149                        )
150                    }
151
152                    for param in params {
153                        let param = param.get::<String>().unwrap();
154                        let bytes = base64::decode(param).unwrap();
155
156                        let block: BlockInfo = deserialize_async(&bytes).await?;
157                        println!("Deserialized successfully. Scanning block...");
158
159                        // Check if a reorg block was received, to reset to its previous
160                        if block.header.height <= last_scanned_height {
161                            let reset_height = block.header.height.saturating_sub(1);
162                            if let Err(e) = self.reset_to_height(reset_height).await {
163                                return Err(Error::DatabaseError(format!(
164                                    "[subscribe_blocks] Wallet state reset failed: {e:?}"
165                                )))
166                            }
167
168                            // Scan genesis again if needed
169                            if reset_height == 0 {
170                                let genesis = match self.get_block_by_height(reset_height).await {
171                                    Ok(b) => b,
172                                    Err(e) => {
173                                        return Err(Error::Custom(format!(
174                                            "[subscribe_blocks] RPC client request failed: {e:?}"
175                                        )))
176                                    }
177                                };
178                                if let Err(e) = self.scan_block(&genesis).await {
179                                    return Err(Error::DatabaseError(format!(
180                                        "[subscribe_blocks] Scanning block failed: {e:?}"
181                                    )))
182                                };
183                            }
184                        }
185
186                        if let Err(e) = self.scan_block(&block).await {
187                            return Err(Error::DatabaseError(format!(
188                                "[subscribe_blocks] Scanning block failed: {e:?}"
189                            )))
190                        }
191
192                        // Set new last scanned block height
193                        last_scanned_height = block.header.height;
194                    }
195                }
196
197                JsonResult::Error(e) => {
198                    // Some error happened in the transmission
199                    break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
200                }
201
202                x => {
203                    // And this is weird
204                    break Error::UnexpectedJsonRpc(format!(
205                        "Got unexpected data from JSON-RPC: {x:?}"
206                    ))
207                }
208            }
209        };
210
211        Err(e)
212    }
213
214    /// `scan_block` will go over over transactions in a block and handle their calls
215    /// based on the called contract. Additionally, will update `last_scanned_block` to
216    /// the provided block height and will store its height, hash and inverse query.
217    async fn scan_block(&self, block: &BlockInfo) -> Result<()> {
218        // Reset wallet inverse cache state
219        self.reset_inverse_cache().await?;
220
221        // Keep track of our wallet transactions
222        let mut wallet_txs = vec![];
223        println!("=======================================");
224        println!("{}", block.header);
225        println!("=======================================");
226        println!("[scan_block] Iterating over {} transactions", block.txs.len());
227        for tx in block.txs.iter() {
228            let tx_hash = tx.hash();
229            let tx_hash_string = tx_hash.to_string();
230            let mut wallet_tx = false;
231            println!("[scan_block] Processing transaction: {tx_hash_string}");
232            for (i, call) in tx.calls.iter().enumerate() {
233                if call.data.contract_id == *MONEY_CONTRACT_ID {
234                    println!("[scan_block] Found Money contract in call {i}");
235                    if self.apply_tx_money_data(i, &tx.calls, &tx_hash_string).await? {
236                        wallet_tx = true;
237                    };
238                    continue
239                }
240
241                if call.data.contract_id == *DAO_CONTRACT_ID {
242                    println!("[scan_block] Found DAO contract in call {i}");
243                    if self.apply_tx_dao_data(&call.data.data, tx_hash, i as u8).await? {
244                        wallet_tx = true;
245                    };
246                    continue
247                }
248
249                if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
250                    println!("[scan_block] Found DeployoOor contract in call {i}");
251                    // TODO: implement
252                    continue
253                }
254
255                // TODO: For now we skip non-native contract calls
256                println!("[scan_block] Found non-native contract in call {i}, skipping.");
257            }
258
259            // If this is our wallet tx we mark it for update
260            if wallet_tx {
261                wallet_txs.push(tx);
262            }
263        }
264
265        // Update wallet transactions records
266        if let Err(e) = self.put_tx_history_records(&wallet_txs, "Confirmed").await {
267            return Err(Error::DatabaseError(format!(
268                "[scan_block] Inserting transaction history records failed: {e:?}"
269            )))
270        }
271
272        // Store this block rollback query
273        self.store_inverse_cache(block.header.height, &block.hash().to_string())?;
274
275        Ok(())
276    }
277
278    /// Scans the blockchain for wallet relevant transactions,
279    /// starting from the last scanned block. If a reorg has happened,
280    /// we revert to its previous height and then scan from there.
281    pub async fn scan_blocks(&self) -> WalletDbResult<()> {
282        // Grab last scanned block height
283        let (mut height, hash) = self.get_last_scanned_block()?;
284
285        // Grab our last scanned block from darkfid
286        let block = match self.get_block_by_height(height).await {
287            Ok(b) => Some(b),
288            // Check if block was found
289            Err(Error::JsonRpcError((-32121, _))) => None,
290            Err(e) => {
291                eprintln!("[scan_blocks] RPC client request failed: {e:?}");
292                return Err(WalletDbError::GenericError)
293            }
294        };
295
296        // Check if a reorg has happened
297        if block.is_none() || hash != block.unwrap().hash().to_string() {
298            // Find the exact block height the reorg happened
299            println!("A reorg has happened, finding last known common block...");
300            height = height.saturating_sub(1);
301            while height != 0 {
302                // Grab our scanned block hash for that height
303                let (_, scanned_block_hash, _) = self.get_scanned_block_record(height)?;
304
305                // Grab the block from darkfid for that height
306                let block = match self.get_block_by_height(height).await {
307                    Ok(b) => Some(b),
308                    // Check if block was found
309                    Err(Error::JsonRpcError((-32121, _))) => None,
310                    Err(e) => {
311                        eprintln!("[scan_blocks] RPC client request failed: {e:?}");
312                        return Err(WalletDbError::GenericError)
313                    }
314                };
315
316                // Continue to previous one if they don't match
317                if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
318                    height = height.saturating_sub(1);
319                    continue
320                }
321
322                // Reset to its height
323                println!("Last common block found: {height} - {scanned_block_hash}");
324                self.reset_to_height(height).await?;
325                break
326            }
327        }
328
329        // If last scanned block is genesis(0) we reset,
330        // otherwise continue with the next block height.
331        if height == 0 {
332            self.reset().await?;
333        } else {
334            height += 1;
335        }
336
337        loop {
338            // Grab last confirmed block
339            println!("Requested to scan from block number: {height}");
340            let (last_height, last_hash) = match self.get_last_confirmed_block().await {
341                Ok(last) => last,
342                Err(e) => {
343                    eprintln!("[scan_blocks] RPC client request failed: {e:?}");
344                    return Err(WalletDbError::GenericError)
345                }
346            };
347            println!("Last confirmed block reported by darkfid: {last_height} - {last_hash}");
348
349            // Already scanned last confirmed block
350            if height > last_height {
351                return Ok(())
352            }
353
354            while height <= last_height {
355                println!("Requesting block {height}...");
356                let block = match self.get_block_by_height(height).await {
357                    Ok(b) => b,
358                    Err(e) => {
359                        eprintln!("[scan_blocks] RPC client request failed: {e:?}");
360                        return Err(WalletDbError::GenericError)
361                    }
362                };
363                println!("Block {height} received! Scanning block...");
364                if let Err(e) = self.scan_block(&block).await {
365                    eprintln!("[scan_blocks] Scan block failed: {e:?}");
366                    return Err(WalletDbError::GenericError)
367                };
368                height += 1;
369            }
370        }
371    }
372
373    // Queries darkfid for last confirmed block.
374    async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
375        let rep = self
376            .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
377            .await?;
378        let params = rep.get::<Vec<JsonValue>>().unwrap();
379        let height = *params[0].get::<f64>().unwrap() as u32;
380        let hash = params[1].get::<String>().unwrap().clone();
381
382        Ok((height, hash))
383    }
384
385    // Queries darkfid for a block with given height.
386    async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
387        let params = self
388            .darkfid_daemon_request(
389                "blockchain.get_block",
390                &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
391            )
392            .await?;
393        let param = params.get::<String>().unwrap();
394        let bytes = base64::decode(param).unwrap();
395        let block = deserialize_async(&bytes).await?;
396        Ok(block)
397    }
398
399    /// Broadcast a given transaction to darkfid and forward onto the network.
400    /// Returns the transaction ID upon success.
401    pub async fn broadcast_tx(&self, tx: &Transaction) -> Result<String> {
402        println!("Broadcasting transaction...");
403
404        let params =
405            JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
406        let rep = self.darkfid_daemon_request("tx.broadcast", &params).await?;
407
408        let txid = rep.get::<String>().unwrap().clone();
409
410        // Store transactions history record
411        if let Err(e) = self.put_tx_history_record(tx, "Broadcasted").await {
412            return Err(Error::DatabaseError(format!(
413                "[broadcast_tx] Inserting transaction history record failed: {e:?}"
414            )))
415        }
416
417        Ok(txid)
418    }
419
420    /// Queries darkfid for a tx with given hash.
421    pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
422        let tx_hash_str = tx_hash.to_string();
423        match self
424            .darkfid_daemon_request(
425                "blockchain.get_tx",
426                &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
427            )
428            .await
429        {
430            Ok(param) => {
431                let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
432                let tx = deserialize_async(&tx_bytes).await?;
433                Ok(Some(tx))
434            }
435
436            Err(_) => Ok(None),
437        }
438    }
439
440    /// Simulate the transaction with the state machine.
441    pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
442        let tx_str = base64::encode(&serialize_async(tx).await);
443        let rep = self
444            .darkfid_daemon_request(
445                "tx.simulate",
446                &JsonValue::Array(vec![JsonValue::String(tx_str)]),
447            )
448            .await?;
449
450        let is_valid = *rep.get::<bool>().unwrap();
451        Ok(is_valid)
452    }
453
454    /// Try to fetch zkas bincodes for the given `ContractId`.
455    pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
456        let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
457        let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", &params).await?;
458        let params = rep.get::<Vec<JsonValue>>().unwrap();
459
460        let mut ret = Vec::with_capacity(params.len());
461        for param in params {
462            let zkas_ns = param[0].get::<String>().unwrap().clone();
463            let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
464            ret.push((zkas_ns, zkas_bincode_bytes));
465        }
466
467        Ok(ret)
468    }
469
470    /// Queries darkfid for given transaction's required fee.
471    pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
472        let params = JsonValue::Array(vec![
473            JsonValue::String(base64::encode(&serialize_async(tx).await)),
474            JsonValue::Boolean(include_fee),
475        ]);
476        let rep = self.darkfid_daemon_request("tx.calculate_fee", &params).await?;
477
478        let fee = *rep.get::<f64>().unwrap() as u64;
479
480        Ok(fee)
481    }
482
483    /// Queries darkfid for current best fork next height.
484    pub async fn get_next_block_height(&self) -> Result<u32> {
485        let rep = self
486            .darkfid_daemon_request(
487                "blockchain.best_fork_next_block_height",
488                &JsonValue::Array(vec![]),
489            )
490            .await?;
491
492        let next_height = *rep.get::<f64>().unwrap() as u32;
493
494        Ok(next_height)
495    }
496
497    /// Queries darkfid for currently configured block target time.
498    pub async fn get_block_target(&self) -> Result<u32> {
499        let rep = self
500            .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
501            .await?;
502
503        let next_height = *rep.get::<f64>().unwrap() as u32;
504
505        Ok(next_height)
506    }
507
508    /// Auxiliary function to ping configured darkfid daemon for liveness.
509    pub async fn ping(&self) -> Result<()> {
510        println!("Executing ping request to darkfid...");
511        let latency = Instant::now();
512        let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
513        let latency = latency.elapsed();
514        println!("Got reply: {rep:?}");
515        println!("Latency: {latency:?}");
516        Ok(())
517    }
518
519    /// Auxiliary function to execute a request towards the configured darkfid daemon JSON-RPC endpoint.
520    pub async fn darkfid_daemon_request(
521        &self,
522        method: &str,
523        params: &JsonValue,
524    ) -> Result<JsonValue> {
525        let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
526        let req = JsonRequest::new(method, params.clone());
527        let rep = rpc_client.request(req).await?;
528        Ok(rep)
529    }
530
531    /// Auxiliary function to stop current JSON-RPC client, if its initialized.
532    pub async fn stop_rpc_client(&self) -> Result<()> {
533        if let Some(ref rpc_client) = self.rpc_client {
534            rpc_client.stop().await;
535        };
536        Ok(())
537    }
538}