drk/
rpc.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{sync::Arc, time::Instant};
20
21use url::Url;
22
23use darkfi::{
24    blockchain::BlockInfo,
25    rpc::{
26        client::RpcClient,
27        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
28        util::JsonValue,
29    },
30    system::{Publisher, StoppableTask},
31    tx::Transaction,
32    util::encoding::base64,
33    Error, Result,
34};
35use darkfi_sdk::{
36    crypto::{ContractId, DAO_CONTRACT_ID, DEPLOYOOOR_CONTRACT_ID, MONEY_CONTRACT_ID},
37    tx::TransactionHash,
38};
39use darkfi_serial::{deserialize_async, serialize_async};
40
41use crate::{
42    error::{WalletDbError, WalletDbResult},
43    Drk,
44};
45
46impl Drk {
47    /// Subscribes to darkfid's JSON-RPC notification endpoint that serves
48    /// new confirmed blocks. Upon receiving them, all the transactions are
49    /// scanned and we check if any of them call the money contract, and if
50    /// the payments are intended for us. If so, we decrypt them and append
51    /// the metadata to our wallet. If a reorg block is received, we revert
52    /// to its previous height and then scan it. We assume that the blocks
53    /// up to that point are unchanged, since darkfid will just broadcast
54    /// the sequence after the reorg.
55    pub async fn subscribe_blocks(
56        &self,
57        endpoint: Url,
58        ex: Arc<smol::Executor<'static>>,
59    ) -> Result<()> {
60        // Grab last confirmed block height
61        let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
62
63        // Handle genesis(0) block
64        if last_confirmed_height == 0 {
65            if let Err(e) = self.scan_blocks().await {
66                return Err(Error::DatabaseError(format!(
67                    "[subscribe_blocks] Scanning from genesis block failed: {e:?}"
68                )))
69            }
70        }
71
72        // Grab last confirmed block again
73        let (last_confirmed_height, last_confirmed_hash) = self.get_last_confirmed_block().await?;
74
75        // Grab last scanned block
76        let (mut last_scanned_height, last_scanned_hash) = match self.get_last_scanned_block() {
77            Ok(last) => last,
78            Err(e) => {
79                return Err(Error::DatabaseError(format!(
80                    "[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
81                )))
82            }
83        };
84
85        // Check if other blocks have been created
86        if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash
87        {
88            eprintln!("Warning: Last scanned block is not the last confirmed block.");
89            eprintln!("You should first fully scan the blockchain, and then subscribe");
90            return Err(Error::DatabaseError(
91                "[subscribe_blocks] Blockchain not fully scanned".to_string(),
92            ))
93        }
94
95        println!("Subscribing to receive notifications of incoming blocks");
96        let publisher = Publisher::new();
97        let subscription = publisher.clone().subscribe().await;
98        let _publisher = publisher.clone();
99        let _ex = ex.clone();
100        StoppableTask::new().start(
101            // Weird hack to prevent lifetimes hell
102            async move {
103                let rpc_client = RpcClient::new(endpoint, _ex).await?;
104                let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
105                rpc_client.subscribe(req, _publisher).await
106            },
107            |res| async move {
108                match res {
109                    Ok(()) => { /* Do nothing */ }
110                    Err(e) => {
111                        eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
112                        publisher
113                            .notify(JsonResult::Error(JsonError::new(
114                                ErrorCode::InternalError,
115                                None,
116                                0,
117                            )))
118                            .await;
119                    }
120                }
121            },
122            Error::RpcServerStopped,
123            ex,
124        );
125        println!("Detached subscription to background");
126        println!("All is good. Waiting for block notifications...");
127
128        let e = loop {
129            match subscription.receive().await {
130                JsonResult::Notification(n) => {
131                    println!("Got Block notification from darkfid subscription");
132                    if n.method != "blockchain.subscribe_blocks" {
133                        break Error::UnexpectedJsonRpc(format!(
134                            "Got foreign notification from darkfid: {}",
135                            n.method
136                        ))
137                    }
138
139                    // Verify parameters
140                    if !n.params.is_array() {
141                        break Error::UnexpectedJsonRpc(
142                            "Received notification params are not an array".to_string(),
143                        )
144                    }
145                    let params = n.params.get::<Vec<JsonValue>>().unwrap();
146                    if params.is_empty() {
147                        break Error::UnexpectedJsonRpc(
148                            "Notification parameters are empty".to_string(),
149                        )
150                    }
151
152                    for param in params {
153                        let param = param.get::<String>().unwrap();
154                        let bytes = base64::decode(param).unwrap();
155
156                        let block: BlockInfo = deserialize_async(&bytes).await?;
157                        println!("Deserialized successfully. Scanning block...");
158
159                        // Check if a reorg block was received, to reset to its previous
160                        if block.header.height <= last_scanned_height {
161                            let reset_height = block.header.height.saturating_sub(1);
162                            if let Err(e) = self.reset_to_height(reset_height).await {
163                                return Err(Error::DatabaseError(format!(
164                                    "[subscribe_blocks] Wallet state reset failed: {e:?}"
165                                )))
166                            }
167
168                            // Scan genesis again if needed
169                            if reset_height == 0 {
170                                let genesis = match self.get_block_by_height(reset_height).await {
171                                    Ok(b) => b,
172                                    Err(e) => {
173                                        return Err(Error::Custom(format!(
174                                            "[subscribe_blocks] RPC client request failed: {e:?}"
175                                        )))
176                                    }
177                                };
178                                if let Err(e) = self.scan_block(&genesis).await {
179                                    return Err(Error::DatabaseError(format!(
180                                        "[subscribe_blocks] Scanning block failed: {e:?}"
181                                    )))
182                                };
183                            }
184                        }
185
186                        if let Err(e) = self.scan_block(&block).await {
187                            return Err(Error::DatabaseError(format!(
188                                "[subscribe_blocks] Scanning block failed: {e:?}"
189                            )))
190                        }
191
192                        // Set new last scanned block height
193                        last_scanned_height = block.header.height;
194                    }
195                }
196
197                JsonResult::Error(e) => {
198                    // Some error happened in the transmission
199                    break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
200                }
201
202                x => {
203                    // And this is weird
204                    break Error::UnexpectedJsonRpc(format!(
205                        "Got unexpected data from JSON-RPC: {x:?}"
206                    ))
207                }
208            }
209        };
210
211        Err(e)
212    }
213
214    /// `scan_block` will go over over transactions in a block and handle their calls
215    /// based on the called contract. Additionally, will update `last_scanned_block` to
216    /// the provided block height and will store its height, hash and inverse query.
217    async fn scan_block(&self, block: &BlockInfo) -> Result<()> {
218        // Reset wallet inverse cache state
219        self.reset_inverse_cache().await?;
220
221        // Keep track of our wallet transactions
222        let mut wallet_txs = vec![];
223        println!("=======================================");
224        println!("{}", block.header);
225        println!("=======================================");
226        println!("[scan_block] Iterating over {} transactions", block.txs.len());
227        for tx in block.txs.iter() {
228            let tx_hash = tx.hash().to_string();
229            let mut wallet_tx = false;
230            println!("[scan_block] Processing transaction: {tx_hash}");
231            for (i, call) in tx.calls.iter().enumerate() {
232                if call.data.contract_id == *MONEY_CONTRACT_ID {
233                    println!("[scan_block] Found Money contract in call {i}");
234                    if self.apply_tx_money_data(i, &tx.calls, &tx_hash).await? {
235                        wallet_tx = true;
236                    };
237                    continue
238                }
239
240                if call.data.contract_id == *DAO_CONTRACT_ID {
241                    println!("[scan_block] Found DAO contract in call {i}");
242                    if self
243                        .apply_tx_dao_data(
244                            &call.data.data,
245                            TransactionHash::new(
246                                *blake3::hash(&serialize_async(tx).await).as_bytes(),
247                            ),
248                            i as u8,
249                        )
250                        .await?
251                    {
252                        wallet_tx = true;
253                    };
254                    continue
255                }
256
257                if call.data.contract_id == *DEPLOYOOOR_CONTRACT_ID {
258                    println!("[scan_block] Found DeployoOor contract in call {i}");
259                    // TODO: implement
260                    continue
261                }
262
263                // TODO: For now we skip non-native contract calls
264                println!("[scan_block] Found non-native contract in call {i}, skipping.");
265            }
266
267            // If this is our wallet tx we mark it for update
268            if wallet_tx {
269                wallet_txs.push(tx);
270            }
271        }
272
273        // Update wallet transactions records
274        if let Err(e) = self.put_tx_history_records(&wallet_txs, "Confirmed").await {
275            return Err(Error::DatabaseError(format!(
276                "[scan_block] Inserting transaction history records failed: {e:?}"
277            )))
278        }
279
280        // Store this block rollback query
281        self.store_inverse_cache(block.header.height, &block.hash().to_string())?;
282
283        Ok(())
284    }
285
286    /// Scans the blockchain for wallet relevant transactions,
287    /// starting from the last scanned block. If a reorg has happened,
288    /// we revert to its previous height and then scan from there.
289    pub async fn scan_blocks(&self) -> WalletDbResult<()> {
290        // Grab last scanned block height
291        let (mut height, hash) = self.get_last_scanned_block()?;
292
293        // Grab our last scanned block from darkfid
294        let block = match self.get_block_by_height(height).await {
295            Ok(b) => Some(b),
296            // Check if block was found
297            Err(Error::JsonRpcError((-32121, _))) => None,
298            Err(e) => {
299                eprintln!("[scan_blocks] RPC client request failed: {e:?}");
300                return Err(WalletDbError::GenericError)
301            }
302        };
303
304        // Check if a reorg has happened
305        if block.is_none() || hash != block.unwrap().hash().to_string() {
306            // Find the exact block height the reorg happened
307            println!("A reorg has happened, finding last known common block...");
308            height = height.saturating_sub(1);
309            while height != 0 {
310                // Grab our scanned block hash for that height
311                let (_, scanned_block_hash, _) = self.get_scanned_block_record(height)?;
312
313                // Grab the block from darkfid for that height
314                let block = match self.get_block_by_height(height).await {
315                    Ok(b) => Some(b),
316                    // Check if block was found
317                    Err(Error::JsonRpcError((-32121, _))) => None,
318                    Err(e) => {
319                        eprintln!("[scan_blocks] RPC client request failed: {e:?}");
320                        return Err(WalletDbError::GenericError)
321                    }
322                };
323
324                // Continue to previous one if they don't match
325                if block.is_none() || scanned_block_hash != block.unwrap().hash().to_string() {
326                    height = height.saturating_sub(1);
327                    continue
328                }
329
330                // Reset to its height
331                println!("Last common block found: {height} - {scanned_block_hash}");
332                self.reset_to_height(height).await?;
333                break
334            }
335        }
336
337        // If last scanned block is genesis(0) we reset,
338        // otherwise continue with the next block height.
339        if height == 0 {
340            self.reset().await?;
341        } else {
342            height += 1;
343        }
344
345        loop {
346            // Grab last confirmed block
347            println!("Requested to scan from block number: {height}");
348            let (last_height, last_hash) = match self.get_last_confirmed_block().await {
349                Ok(last) => last,
350                Err(e) => {
351                    eprintln!("[scan_blocks] RPC client request failed: {e:?}");
352                    return Err(WalletDbError::GenericError)
353                }
354            };
355            println!("Last confirmed block reported by darkfid: {last_height} - {last_hash}");
356
357            // Already scanned last confirmed block
358            if height > last_height {
359                return Ok(())
360            }
361
362            while height <= last_height {
363                println!("Requesting block {height}...");
364                let block = match self.get_block_by_height(height).await {
365                    Ok(b) => b,
366                    Err(e) => {
367                        eprintln!("[scan_blocks] RPC client request failed: {e:?}");
368                        return Err(WalletDbError::GenericError)
369                    }
370                };
371                println!("Block {height} received! Scanning block...");
372                if let Err(e) = self.scan_block(&block).await {
373                    eprintln!("[scan_blocks] Scan block failed: {e:?}");
374                    return Err(WalletDbError::GenericError)
375                };
376                height += 1;
377            }
378        }
379    }
380
381    // Queries darkfid for last confirmed block.
382    async fn get_last_confirmed_block(&self) -> Result<(u32, String)> {
383        let rep = self
384            .darkfid_daemon_request("blockchain.last_confirmed_block", &JsonValue::Array(vec![]))
385            .await?;
386        let params = rep.get::<Vec<JsonValue>>().unwrap();
387        let height = *params[0].get::<f64>().unwrap() as u32;
388        let hash = params[1].get::<String>().unwrap().clone();
389
390        Ok((height, hash))
391    }
392
393    // Queries darkfid for a block with given height.
394    async fn get_block_by_height(&self, height: u32) -> Result<BlockInfo> {
395        let params = self
396            .darkfid_daemon_request(
397                "blockchain.get_block",
398                &JsonValue::Array(vec![JsonValue::String(height.to_string())]),
399            )
400            .await?;
401        let param = params.get::<String>().unwrap();
402        let bytes = base64::decode(param).unwrap();
403        let block = deserialize_async(&bytes).await?;
404        Ok(block)
405    }
406
407    /// Broadcast a given transaction to darkfid and forward onto the network.
408    /// Returns the transaction ID upon success.
409    pub async fn broadcast_tx(&self, tx: &Transaction) -> Result<String> {
410        println!("Broadcasting transaction...");
411
412        let params =
413            JsonValue::Array(vec![JsonValue::String(base64::encode(&serialize_async(tx).await))]);
414        let rep = self.darkfid_daemon_request("tx.broadcast", &params).await?;
415
416        let txid = rep.get::<String>().unwrap().clone();
417
418        // Store transactions history record
419        if let Err(e) = self.put_tx_history_record(tx, "Broadcasted").await {
420            return Err(Error::DatabaseError(format!(
421                "[broadcast_tx] Inserting transaction history record failed: {e:?}"
422            )))
423        }
424
425        Ok(txid)
426    }
427
428    /// Queries darkfid for a tx with given hash.
429    pub async fn get_tx(&self, tx_hash: &TransactionHash) -> Result<Option<Transaction>> {
430        let tx_hash_str = tx_hash.to_string();
431        match self
432            .darkfid_daemon_request(
433                "blockchain.get_tx",
434                &JsonValue::Array(vec![JsonValue::String(tx_hash_str)]),
435            )
436            .await
437        {
438            Ok(param) => {
439                let tx_bytes = base64::decode(param.get::<String>().unwrap()).unwrap();
440                let tx = deserialize_async(&tx_bytes).await?;
441                Ok(Some(tx))
442            }
443
444            Err(_) => Ok(None),
445        }
446    }
447
448    /// Simulate the transaction with the state machine.
449    pub async fn simulate_tx(&self, tx: &Transaction) -> Result<bool> {
450        let tx_str = base64::encode(&serialize_async(tx).await);
451        let rep = self
452            .darkfid_daemon_request(
453                "tx.simulate",
454                &JsonValue::Array(vec![JsonValue::String(tx_str)]),
455            )
456            .await?;
457
458        let is_valid = *rep.get::<bool>().unwrap();
459        Ok(is_valid)
460    }
461
462    /// Try to fetch zkas bincodes for the given `ContractId`.
463    pub async fn lookup_zkas(&self, contract_id: &ContractId) -> Result<Vec<(String, Vec<u8>)>> {
464        let params = JsonValue::Array(vec![JsonValue::String(format!("{contract_id}"))]);
465        let rep = self.darkfid_daemon_request("blockchain.lookup_zkas", &params).await?;
466        let params = rep.get::<Vec<JsonValue>>().unwrap();
467
468        let mut ret = Vec::with_capacity(params.len());
469        for param in params {
470            let zkas_ns = param[0].get::<String>().unwrap().clone();
471            let zkas_bincode_bytes = base64::decode(param[1].get::<String>().unwrap()).unwrap();
472            ret.push((zkas_ns, zkas_bincode_bytes));
473        }
474
475        Ok(ret)
476    }
477
478    /// Queries darkfid for given transaction's required fee.
479    pub async fn get_tx_fee(&self, tx: &Transaction, include_fee: bool) -> Result<u64> {
480        let params = JsonValue::Array(vec![
481            JsonValue::String(base64::encode(&serialize_async(tx).await)),
482            JsonValue::Boolean(include_fee),
483        ]);
484        let rep = self.darkfid_daemon_request("tx.calculate_fee", &params).await?;
485
486        let fee = *rep.get::<f64>().unwrap() as u64;
487
488        Ok(fee)
489    }
490
491    /// Queries darkfid for current best fork next height.
492    pub async fn get_next_block_height(&self) -> Result<u32> {
493        let rep = self
494            .darkfid_daemon_request(
495                "blockchain.best_fork_next_block_height",
496                &JsonValue::Array(vec![]),
497            )
498            .await?;
499
500        let next_height = *rep.get::<f64>().unwrap() as u32;
501
502        Ok(next_height)
503    }
504
505    /// Queries darkfid for currently configured block target time.
506    pub async fn get_block_target(&self) -> Result<u32> {
507        let rep = self
508            .darkfid_daemon_request("blockchain.block_target", &JsonValue::Array(vec![]))
509            .await?;
510
511        let next_height = *rep.get::<f64>().unwrap() as u32;
512
513        Ok(next_height)
514    }
515
516    /// Auxiliary function to ping configured darkfid daemon for liveness.
517    pub async fn ping(&self) -> Result<()> {
518        println!("Executing ping request to darkfid...");
519        let latency = Instant::now();
520        let rep = self.darkfid_daemon_request("ping", &JsonValue::Array(vec![])).await?;
521        let latency = latency.elapsed();
522        println!("Got reply: {rep:?}");
523        println!("Latency: {latency:?}");
524        Ok(())
525    }
526
527    /// Auxiliary function to execute a request towards the configured darkfid daemon JSON-RPC endpoint.
528    pub async fn darkfid_daemon_request(
529        &self,
530        method: &str,
531        params: &JsonValue,
532    ) -> Result<JsonValue> {
533        let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
534        let req = JsonRequest::new(method, params.clone());
535        let rep = rpc_client.request(req).await?;
536        Ok(rep)
537    }
538
539    /// Auxiliary function to stop current JSON-RPC client, if its initialized.
540    pub async fn stop_rpc_client(&self) -> Result<()> {
541        if let Some(ref rpc_client) = self.rpc_client {
542            rpc_client.stop().await;
543        };
544        Ok(())
545    }
546}