explorerd/service/
sync.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! # Sync Module
20//!
21//! The `sync` module is responsible for synchronizing the explorer's database with the Darkfi
22//! blockchain network. It ensures consistency between the explorer and the blockchain by
23//! fetching missing blocks, handling reorganizations (reorgs), and subscribing to live updates
24//! through Darkfi's JSON-RPC service.
25//!
26//! ## Responsibilities
27//!
28//! - **Block Synchronization**: Handles fetching and storing blocks from a Darkfi
29//!   blockchain node during startup or when syncing, ensuring the explorer stays synchronized
30//!   with the latest confirmed blocks.
31//! - **Real-Time Updates**: Subscribes to Darkfi's JSON-RPC notification service,
32//!   allowing the explorer to process and sync new blocks as they are confirmed.
33//! - **Reorg Handling**: Detects and resolves blockchain reorganizations by identifying
34//!   the last common block (in case of divergence) and re-aligning the explorer's state with the
35//!   latest blockchain state. Reorgs are an importnt part of synchronization because they prevent
36//!   syncing invalid or outdated states, ensuring the explorer maintains an accurate view of a
37//!   Darkfi blockchain network.
38
39use std::{sync::Arc, time::Instant};
40
41use log::{debug, error, info, warn};
42use tinyjson::JsonValue;
43use url::Url;
44
45use darkfi::{
46    blockchain::BlockInfo,
47    rpc::{
48        client::RpcClient,
49        jsonrpc::{JsonRequest, JsonResult},
50    },
51    system::{Publisher, StoppableTask, StoppableTaskPtr},
52    util::{encoding::base64, time::fmt_duration},
53    Error,
54};
55use darkfi_serial::deserialize_async;
56
57use crate::{error::handle_database_error, service::ExplorerService, Explorerd};
58
59impl ExplorerService {
60    /// Synchronizes blocks between the explorer and a Darkfi blockchain node, ensuring
61    /// the database remains consistent by syncing any missing or outdated blocks.
62    ///
63    /// If provided `reset` is true, the explorer's blockchain-related and metric sled trees are purged
64    /// and syncing starts from the genesis block. The function also handles reorgs by re-aligning the
65    /// explorer state to the correct height when blocks are outdated. Returns a result indicating
66    /// success or failure.
67    ///
68    /// Reorg handling is delegated to the [`Self::reorg_blocks`] function, whose
69    /// documentation provides more details on the reorg process during block syncing.
70    pub async fn sync_blocks(&self, reset: bool) -> darkfi::Result<()> {
71        // Grab last synced block height from the explorer's database.
72        let last_synced_block = self.last_block().map_err(|e| {
73            handle_database_error(
74                "rpc_blocks::sync_blocks",
75                "[sync_blocks] Retrieving last synced block failed",
76                e,
77            )
78        })?;
79
80        // Grab the last confirmed block height and hash from the darkfi node
81        let (last_darkfid_height, last_darkfid_hash) =
82            self.darkfid_client.get_last_confirmed_block().await?;
83
84        // Initialize the current height to sync from, starting from genesis block if last sync block does not exist
85        let (last_synced_height, last_synced_hash) = last_synced_block
86            .map_or((0, "".to_string()), |(height, header_hash)| (height, header_hash));
87
88        // Declare a mutable variable to track the current sync height while processing blocks
89        let mut current_height = last_synced_height;
90
91        info!(target: "explorerd::rpc_blocks::sync_blocks", "Syncing from block number: {current_height}");
92        info!(target: "explorerd::rpc_blocks::sync_blocks", "Last confirmed darkfid block: {last_darkfid_height} - {last_darkfid_hash}");
93
94        // A reorg is detected if the hash of the last synced block differs from the hash of the last confirmed block,
95        // unless the reset flag is set or the current height is 0
96        let reorg_detected = last_synced_hash != last_darkfid_hash && !reset && current_height != 0;
97
98        // If the reset flag is set, reset the explorer state and start syncing from the genesis block height.
99        // Otherwise, handle reorgs if detected, or proceed to the next block if not at the genesis height.
100        if reset {
101            self.reset_explorer_state(0)?;
102            current_height = 0;
103            info!(target: "explorerd::rpc_blocks::sync_blocks", "Reset explorer database based on set reset parameter");
104        } else if reorg_detected {
105            // Record the start time to measure the duration of potential reorg
106            let start_reorg_time = Instant::now();
107
108            // Process reorg
109            current_height = self.reorg_blocks(last_synced_height, last_darkfid_height).await?;
110
111            // Log only if a reorg occurred (i.e., the explorer wasn't merely catching up to Darkfi node blocks)
112            if current_height != last_synced_height {
113                info!(target: "explorerd::rpc_blocks::sync_blocks", "Completed reorg to height: {current_height} [{}]", fmt_duration(start_reorg_time.elapsed()));
114            }
115
116            // Prepare to sync the next block after reorg if not from genesis height
117            if current_height != 0 {
118                current_height += 1;
119            }
120        } else if current_height != 0 {
121            // Resume syncing from the block after the last synced height
122            current_height += 1;
123        }
124
125        // Record the sync start time to measure the total block sync duration
126        let sync_start_time = Instant::now();
127        // Track the number of blocks synced for reporting
128        let mut blocks_synced = 0;
129
130        // Sync blocks until the explorer is up to date with the last confirmed block
131        while current_height <= last_darkfid_height {
132            // Record the start time to measure the duration it took to sync the block
133            let block_sync_start = Instant::now();
134
135            // Retrieve the block from darkfi node by height
136            let block = match self.darkfid_client.get_block_by_height(current_height).await {
137                Ok(r) => r,
138                Err(e) => {
139                    return Err(handle_database_error(
140                        "rpc_blocks::sync_blocks",
141                        "[sync_blocks] RPC client request failed",
142                        e,
143                    ))
144                }
145            };
146
147            // Store the retrieved block in the explorer's database
148            if let Err(e) = self.put_block(&block).await {
149                return Err(handle_database_error(
150                    "rpc_blocks::sync_blocks",
151                    "[sync_blocks] Put block failed",
152                    e,
153                ));
154            };
155
156            debug!(
157                target: "explorerd::rpc_blocks::sync_blocks",
158                "Synced block {current_height} [{}]",
159                fmt_duration(block_sync_start.elapsed())
160            );
161
162            // Increment the current height to sync the next block
163            current_height += 1;
164            // Increment the count of successfully synced blocks
165            blocks_synced += 1;
166        }
167
168        info!(
169            target: "explorerd::rpc_blocks::sync_blocks",
170            "Synced {blocks_synced} blocks: explorer blocks total {} [{}]",
171            self.db.blockchain.blocks.len(),
172            fmt_duration(sync_start_time.elapsed()),
173        );
174
175        Ok(())
176    }
177
178    /// Handles blockchain reorganizations (reorgs) during the explorer node's startup synchronization
179    /// with Darkfi nodes, ensuring the explorer provides a consistent and accurate view of the blockchain.
180    ///
181    /// A reorg occurs when the blocks stored by the blockchain nodes diverge from those stored by the explorer.
182    /// This function resolves inconsistencies by identifying the point of divergence, searching backward through
183    /// block heights, and comparing block hashes between the explorer database and the blockchain node. Once a
184    /// common block height is found, the explorer is re-aligned to that height.
185    ///
186    /// If no common block can be found, the explorer resets to the "genesis height," removing all blocks,
187    /// transactions, and metrics from its database to resynchronize with the canonical chain from the nodes.
188    ///
189    /// Returns the last height at which the explorer's state was successfully re-aligned with the blockchain.
190    async fn reorg_blocks(
191        &self,
192        last_synced_height: u32,
193        last_darkfid_height: u32,
194    ) -> darkfi::Result<u32> {
195        // Log reorg detection in the case that explorer height is greater or equal to height of darkfi node
196        if last_synced_height >= last_darkfid_height {
197            info!(target: "explorerd::rpc_blocks::process_sync_blocks_reorg",
198                "Reorg detected with heights: explorer.{last_synced_height} >= darkfid.{last_darkfid_height}");
199        }
200
201        // Declare a mutable variable to track the current height while searching for a common block
202        let mut cur_height = last_synced_height;
203        // Search for an explorer block that matches a darkfi node block
204        while cur_height > 0 {
205            let synced_block = self.get_block_by_height(cur_height)?;
206            debug!(target: "explorerd::rpc_blocks::process_sync_blocks_reorg", "Searching for common block: {cur_height}");
207
208            // Check if we found a synced block for current height being searched
209            if let Some(synced_block) = synced_block {
210                // Fetch the block from darkfi node to check for a match
211                match self.darkfid_client.get_block_by_height(cur_height).await {
212                    Ok(darkfid_block) => {
213                        // If hashes match, we've found the point of divergence
214                        if synced_block.header_hash == darkfid_block.hash().to_string() {
215                            // If hashes match but the cur_height differs from the last synced height, reset the explorer state
216                            if cur_height != last_synced_height {
217                                self.reset_explorer_state(cur_height)?;
218                                debug!(target: "explorerd::rpc_blocks::process_sync_blocks_reorg", "Completed reorg to height: {cur_height}");
219                            }
220                            break;
221                        } else {
222                            // Log reorg detection with height and header hash mismatch details
223                            if cur_height == last_synced_height {
224                                info!(
225                                    target: "explorerd::rpc_blocks::process_sync_blocks_reorg",
226                                    "Reorg detected at height {cur_height}: explorer.{} != darkfid.{}",
227                                    synced_block.header_hash,
228                                    darkfid_block.hash()
229                                );
230                            }
231                        }
232                    }
233                    // Continue searching for blocks that do not exist on darkfi nodes
234                    Err(Error::JsonRpcError((-32121, _))) => (),
235                    Err(e) => {
236                        return Err(handle_database_error(
237                            "rpc_blocks::process_sync_blocks_reorg",
238                            "[process_sync_blocks_reorg] RPC client request failed",
239                            e,
240                        ))
241                    }
242                }
243            }
244
245            // Move to previous block to search for a match
246            cur_height = cur_height.saturating_sub(1);
247        }
248
249        // Check if genesis block reorg is needed
250        if cur_height == 0 {
251            self.reset_explorer_state(0)?;
252        }
253
254        // Return the last height we reorged to
255        Ok(cur_height)
256    }
257}
258/// Subscribes to darkfid's JSON-RPC notification endpoint that serves
259/// new confirmed blocks. Upon receiving them, store them to the database.
260pub async fn subscribe_sync_blocks(
261    explorer: Arc<Explorerd>,
262    endpoint: Url,
263    ex: Arc<smol::Executor<'static>>,
264) -> darkfi::Result<(StoppableTaskPtr, StoppableTaskPtr)> {
265    // Grab last confirmed block
266    let (last_darkfid_height, last_darkfid_hash) =
267        explorer.darkfid_client.get_last_confirmed_block().await?;
268
269    // Grab last synced block
270    let (mut height, hash) = match explorer.service.last_block() {
271        Ok(Some((height, hash))) => (height, hash),
272        Ok(None) => (0, "".to_string()),
273        Err(e) => {
274            return Err(Error::DatabaseError(format!(
275                "[subscribe_blocks] Retrieving last synced block failed: {e:?}"
276            )))
277        }
278    };
279
280    // Evaluates whether there is a mismatch between the last confirmed block and the last synced block
281    let blocks_mismatch = (last_darkfid_height != height || last_darkfid_hash != hash) &&
282        last_darkfid_height != 0 &&
283        height != 0;
284
285    // Check if there is a mismatch, throwing an error to prevent operating in a potentially inconsistent state
286    if blocks_mismatch {
287        warn!(target: "explorerd::rpc_blocks::subscribe_blocks",
288        "Warning: Last synced block is not the last confirmed block: \
289        last_darkfid_height={last_darkfid_height}, last_synced_height={height}, last_darkfid_hash={last_darkfid_hash}, last_synced_hash={hash}");
290        warn!(target: "explorerd::rpc_blocks::subscribe_blocks", "You should first fully sync the blockchain, and then subscribe");
291        return Err(Error::DatabaseError(
292            "[subscribe_blocks] Blockchain not fully synced".to_string(),
293        ));
294    }
295
296    info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Subscribing to receive notifications of incoming blocks");
297    let publisher = Publisher::new();
298    let subscription = publisher.clone().subscribe().await;
299    let _ex = ex.clone();
300    let subscriber_task = StoppableTask::new();
301    subscriber_task.clone().start(
302        // Weird hack to prevent lifetimes hell
303        async move {
304            let ex = _ex.clone();
305            let rpc_client = RpcClient::new(endpoint, ex).await?;
306            let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
307            rpc_client.subscribe(req, publisher).await
308        },
309        |res| async move {
310            match res {
311                Ok(()) => { /* Do nothing */ }
312                Err(e) => error!(target: "explorerd::rpc_blocks::subscribe_blocks", "[subscribe_blocks] JSON-RPC server error: {e:?}"),
313            }
314        },
315        Error::RpcServerStopped,
316        ex.clone(),
317    );
318    info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Detached subscription to background");
319
320    let listener_task = StoppableTask::new();
321    listener_task.clone().start(
322        // Weird hack to prevent lifetimes hell
323        async move {
324            loop {
325                match subscription.receive().await {
326                    JsonResult::Notification(n) => {
327                        debug!(target: "explorerd::rpc_blocks::subscribe_blocks", "Got Block notification from darkfid subscription");
328                        if n.method != "blockchain.subscribe_blocks" {
329                            return Err(Error::UnexpectedJsonRpc(format!(
330                                "Got foreign notification from darkfid: {}",
331                                n.method
332                            )))
333                        }
334
335                        // Verify parameters
336                        if !n.params.is_array() {
337                            return Err(Error::UnexpectedJsonRpc(
338                                "Received notification params are not an array".to_string(),
339                            ))
340                        }
341                        let params = n.params.get::<Vec<JsonValue>>().unwrap();
342                        if params.is_empty() {
343                            return Err(Error::UnexpectedJsonRpc(
344                                "Notification parameters are empty".to_string(),
345                            ))
346                        }
347
348                        for param in params {
349                            let param = param.get::<String>().unwrap();
350                            let bytes = base64::decode(param).unwrap();
351
352                            let darkfid_block: BlockInfo = match deserialize_async(&bytes).await {
353                                Ok(b) => b,
354                                Err(e) => {
355                                    return Err(Error::UnexpectedJsonRpc(format!(
356                                        "[subscribe_blocks] Deserializing block failed: {e:?}"
357                                    )))
358                                },
359                            };
360                            info!(target: "explorerd::rpc_blocks::subscribe_blocks", "========================================================================================");
361                            info!(target: "explorerd::rpc_blocks::subscribe_blocks", "| Block Notification: {} |", darkfid_block.hash());
362                            info!(target: "explorerd::rpc_blocks::subscribe_blocks", "========================================================================================");
363
364                            // Store darkfi node block height for later use
365                            let darkfid_block_height = darkfid_block.header.height;
366
367                            // Check if we need to perform a reorg due to mismatch in block heights
368                            if darkfid_block_height <= height {
369                                info!(target: "explorerd::rpc_blocks::subscribe_blocks",
370                                    "Reorg detected with heights: darkfid.{darkfid_block_height} <= explorer.{height}");
371
372                                // Calculate the reset height
373                                let reset_height = darkfid_block_height.saturating_sub(1);
374
375                                // Record the start time to measure the duration of the reorg
376                                let start_reorg_time = Instant::now();
377
378                                // Execute the reorg by resetting the explorer state to reset height
379                                explorer.service.reset_explorer_state(reset_height)?;
380                                info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Completed reorg to height: {reset_height} [{}]", fmt_duration(start_reorg_time.elapsed()));
381                            }
382
383
384                            // Record the start time to measure the duration to store the block
385                            let start_reorg_time = Instant::now();
386
387                            if let Err(e) = explorer.service.put_block(&darkfid_block).await {
388                                return Err(Error::DatabaseError(format!(
389                                    "[subscribe_blocks] Put block failed: {e:?}"
390                                )))
391                            }
392
393                            info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Stored new block at height: {} [{}]", darkfid_block.header.height, fmt_duration(start_reorg_time.elapsed()));
394
395                            // Process the next block
396                            height = darkfid_block.header.height;
397                        }
398                    }
399
400                    JsonResult::Error(e) => {
401                        // Some error happened in the transmission
402                        return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
403                    }
404
405                    x => {
406                        // And this is weird
407                        return Err(Error::UnexpectedJsonRpc(format!(
408                            "Got unexpected data from JSON-RPC: {x:?}"
409                        )))
410                    }
411                }
412            };
413        },
414        |res| async move {
415            match res {
416                Ok(()) => { /* Do nothing */ }
417                Err(e) => error!(target: "explorerd::rpc_blocks::subscribe_blocks", "[subscribe_blocks] JSON-RPC server error: {e:?}"),
418            }
419        },
420        Error::RpcServerStopped,
421        ex,
422    );
423
424    Ok((subscriber_task, listener_task))
425}