explorerd/service/sync.rs
1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! # Sync Module
20//!
21//! The `sync` module is responsible for synchronizing the explorer's database with the Darkfi
22//! blockchain network. It ensures consistency between the explorer and the blockchain by
23//! fetching missing blocks, handling reorganizations (reorgs), and subscribing to live updates
24//! through Darkfi's JSON-RPC service.
25//!
26//! ## Responsibilities
27//!
28//! - **Block Synchronization**: Handles fetching and storing blocks from a Darkfi
29//! blockchain node during startup or when syncing, ensuring the explorer stays synchronized
30//! with the latest confirmed blocks.
31//! - **Real-Time Updates**: Subscribes to Darkfi's JSON-RPC notification service,
32//! allowing the explorer to process and sync new blocks as they are confirmed.
33//! - **Reorg Handling**: Detects and resolves blockchain reorganizations by identifying
34//! the last common block (in case of divergence) and re-aligning the explorer's state with the
35//! latest blockchain state. Reorgs are an importnt part of synchronization because they prevent
36//! syncing invalid or outdated states, ensuring the explorer maintains an accurate view of a
37//! Darkfi blockchain network.
38
39use std::{sync::Arc, time::Instant};
40
41use log::{debug, error, info, warn};
42use tinyjson::JsonValue;
43use url::Url;
44
45use darkfi::{
46 blockchain::BlockInfo,
47 rpc::{
48 client::RpcClient,
49 jsonrpc::{JsonRequest, JsonResult},
50 },
51 system::{Publisher, StoppableTask, StoppableTaskPtr},
52 util::{encoding::base64, time::fmt_duration},
53 Error,
54};
55use darkfi_serial::deserialize_async;
56
57use crate::{error::handle_database_error, service::ExplorerService, Explorerd};
58
59impl ExplorerService {
60 /// Synchronizes blocks between the explorer and a Darkfi blockchain node, ensuring
61 /// the database remains consistent by syncing any missing or outdated blocks.
62 ///
63 /// If provided `reset` is true, the explorer's blockchain-related and metric sled trees are purged
64 /// and syncing starts from the genesis block. The function also handles reorgs by re-aligning the
65 /// explorer state to the correct height when blocks are outdated. Returns a result indicating
66 /// success or failure.
67 ///
68 /// Reorg handling is delegated to the [`Self::reorg_blocks`] function, whose
69 /// documentation provides more details on the reorg process during block syncing.
70 pub async fn sync_blocks(&self, reset: bool) -> darkfi::Result<()> {
71 // Grab last synced block height from the explorer's database.
72 let last_synced_block = self.last_block().map_err(|e| {
73 handle_database_error(
74 "rpc_blocks::sync_blocks",
75 "[sync_blocks] Retrieving last synced block failed",
76 e,
77 )
78 })?;
79
80 // Grab the last confirmed block height and hash from the darkfi node
81 let (last_darkfid_height, last_darkfid_hash) =
82 self.darkfid_client.get_last_confirmed_block().await?;
83
84 // Initialize the current height to sync from, starting from genesis block if last sync block does not exist
85 let (last_synced_height, last_synced_hash) = last_synced_block
86 .map_or((0, "".to_string()), |(height, header_hash)| (height, header_hash));
87
88 // Declare a mutable variable to track the current sync height while processing blocks
89 let mut current_height = last_synced_height;
90
91 info!(target: "explorerd::rpc_blocks::sync_blocks", "Syncing from block number: {current_height}");
92 info!(target: "explorerd::rpc_blocks::sync_blocks", "Last confirmed darkfid block: {last_darkfid_height} - {last_darkfid_hash}");
93
94 // A reorg is detected if the hash of the last synced block differs from the hash of the last confirmed block,
95 // unless the reset flag is set or the current height is 0
96 let reorg_detected = last_synced_hash != last_darkfid_hash && !reset && current_height != 0;
97
98 // If the reset flag is set, reset the explorer state and start syncing from the genesis block height.
99 // Otherwise, handle reorgs if detected, or proceed to the next block if not at the genesis height.
100 if reset {
101 self.reset_explorer_state(0)?;
102 current_height = 0;
103 info!(target: "explorerd::rpc_blocks::sync_blocks", "Reset explorer database based on set reset parameter");
104 } else if reorg_detected {
105 // Record the start time to measure the duration of potential reorg
106 let start_reorg_time = Instant::now();
107
108 // Process reorg
109 current_height = self.reorg_blocks(last_synced_height, last_darkfid_height).await?;
110
111 // Log only if a reorg occurred (i.e., the explorer wasn't merely catching up to Darkfi node blocks)
112 if current_height != last_synced_height {
113 info!(target: "explorerd::rpc_blocks::sync_blocks", "Completed reorg to height: {current_height} [{}]", fmt_duration(start_reorg_time.elapsed()));
114 }
115
116 // Prepare to sync the next block after reorg if not from genesis height
117 if current_height != 0 {
118 current_height += 1;
119 }
120 } else if current_height != 0 {
121 // Resume syncing from the block after the last synced height
122 current_height += 1;
123 }
124
125 // Record the sync start time to measure the total block sync duration
126 let sync_start_time = Instant::now();
127 // Track the number of blocks synced for reporting
128 let mut blocks_synced = 0;
129
130 // Sync blocks until the explorer is up to date with the last confirmed block
131 while current_height <= last_darkfid_height {
132 // Record the start time to measure the duration it took to sync the block
133 let block_sync_start = Instant::now();
134
135 // Retrieve the block from darkfi node by height
136 let block = match self.darkfid_client.get_block_by_height(current_height).await {
137 Ok(r) => r,
138 Err(e) => {
139 return Err(handle_database_error(
140 "rpc_blocks::sync_blocks",
141 "[sync_blocks] RPC client request failed",
142 e,
143 ))
144 }
145 };
146
147 // Store the retrieved block in the explorer's database
148 if let Err(e) = self.put_block(&block).await {
149 return Err(handle_database_error(
150 "rpc_blocks::sync_blocks",
151 "[sync_blocks] Put block failed",
152 e,
153 ));
154 };
155
156 debug!(
157 target: "explorerd::rpc_blocks::sync_blocks",
158 "Synced block {current_height} [{}]",
159 fmt_duration(block_sync_start.elapsed())
160 );
161
162 // Increment the current height to sync the next block
163 current_height += 1;
164 // Increment the count of successfully synced blocks
165 blocks_synced += 1;
166 }
167
168 info!(
169 target: "explorerd::rpc_blocks::sync_blocks",
170 "Synced {blocks_synced} blocks: explorer blocks total {} [{}]",
171 self.db.blockchain.blocks.len(),
172 fmt_duration(sync_start_time.elapsed()),
173 );
174
175 Ok(())
176 }
177
178 /// Handles blockchain reorganizations (reorgs) during the explorer node's startup synchronization
179 /// with Darkfi nodes, ensuring the explorer provides a consistent and accurate view of the blockchain.
180 ///
181 /// A reorg occurs when the blocks stored by the blockchain nodes diverge from those stored by the explorer.
182 /// This function resolves inconsistencies by identifying the point of divergence, searching backward through
183 /// block heights, and comparing block hashes between the explorer database and the blockchain node. Once a
184 /// common block height is found, the explorer is re-aligned to that height.
185 ///
186 /// If no common block can be found, the explorer resets to the "genesis height," removing all blocks,
187 /// transactions, and metrics from its database to resynchronize with the canonical chain from the nodes.
188 ///
189 /// Returns the last height at which the explorer's state was successfully re-aligned with the blockchain.
190 async fn reorg_blocks(
191 &self,
192 last_synced_height: u32,
193 last_darkfid_height: u32,
194 ) -> darkfi::Result<u32> {
195 // Log reorg detection in the case that explorer height is greater or equal to height of darkfi node
196 if last_synced_height >= last_darkfid_height {
197 info!(target: "explorerd::rpc_blocks::process_sync_blocks_reorg",
198 "Reorg detected with heights: explorer.{last_synced_height} >= darkfid.{last_darkfid_height}");
199 }
200
201 // Declare a mutable variable to track the current height while searching for a common block
202 let mut cur_height = last_synced_height;
203 // Search for an explorer block that matches a darkfi node block
204 while cur_height > 0 {
205 let synced_block = self.get_block_by_height(cur_height)?;
206 debug!(target: "explorerd::rpc_blocks::process_sync_blocks_reorg", "Searching for common block: {cur_height}");
207
208 // Check if we found a synced block for current height being searched
209 if let Some(synced_block) = synced_block {
210 // Fetch the block from darkfi node to check for a match
211 match self.darkfid_client.get_block_by_height(cur_height).await {
212 Ok(darkfid_block) => {
213 // If hashes match, we've found the point of divergence
214 if synced_block.header_hash == darkfid_block.hash().to_string() {
215 // If hashes match but the cur_height differs from the last synced height, reset the explorer state
216 if cur_height != last_synced_height {
217 self.reset_explorer_state(cur_height)?;
218 debug!(target: "explorerd::rpc_blocks::process_sync_blocks_reorg", "Completed reorg to height: {cur_height}");
219 }
220 break;
221 } else {
222 // Log reorg detection with height and header hash mismatch details
223 if cur_height == last_synced_height {
224 info!(
225 target: "explorerd::rpc_blocks::process_sync_blocks_reorg",
226 "Reorg detected at height {cur_height}: explorer.{} != darkfid.{}",
227 synced_block.header_hash,
228 darkfid_block.hash()
229 );
230 }
231 }
232 }
233 // Continue searching for blocks that do not exist on darkfi nodes
234 Err(Error::JsonRpcError((-32121, _))) => (),
235 Err(e) => {
236 return Err(handle_database_error(
237 "rpc_blocks::process_sync_blocks_reorg",
238 "[process_sync_blocks_reorg] RPC client request failed",
239 e,
240 ))
241 }
242 }
243 }
244
245 // Move to previous block to search for a match
246 cur_height = cur_height.saturating_sub(1);
247 }
248
249 // Check if genesis block reorg is needed
250 if cur_height == 0 {
251 self.reset_explorer_state(0)?;
252 }
253
254 // Return the last height we reorged to
255 Ok(cur_height)
256 }
257}
258/// Subscribes to darkfid's JSON-RPC notification endpoint that serves
259/// new confirmed blocks. Upon receiving them, store them to the database.
260pub async fn subscribe_sync_blocks(
261 explorer: Arc<Explorerd>,
262 endpoint: Url,
263 ex: Arc<smol::Executor<'static>>,
264) -> darkfi::Result<(StoppableTaskPtr, StoppableTaskPtr)> {
265 // Grab last confirmed block
266 let (last_darkfid_height, last_darkfid_hash) =
267 explorer.darkfid_client.get_last_confirmed_block().await?;
268
269 // Grab last synced block
270 let (mut height, hash) = match explorer.service.last_block() {
271 Ok(Some((height, hash))) => (height, hash),
272 Ok(None) => (0, "".to_string()),
273 Err(e) => {
274 return Err(Error::DatabaseError(format!(
275 "[subscribe_blocks] Retrieving last synced block failed: {e:?}"
276 )))
277 }
278 };
279
280 // Evaluates whether there is a mismatch between the last confirmed block and the last synced block
281 let blocks_mismatch = (last_darkfid_height != height || last_darkfid_hash != hash) &&
282 last_darkfid_height != 0 &&
283 height != 0;
284
285 // Check if there is a mismatch, throwing an error to prevent operating in a potentially inconsistent state
286 if blocks_mismatch {
287 warn!(target: "explorerd::rpc_blocks::subscribe_blocks",
288 "Warning: Last synced block is not the last confirmed block: \
289 last_darkfid_height={last_darkfid_height}, last_synced_height={height}, last_darkfid_hash={last_darkfid_hash}, last_synced_hash={hash}");
290 warn!(target: "explorerd::rpc_blocks::subscribe_blocks", "You should first fully sync the blockchain, and then subscribe");
291 return Err(Error::DatabaseError(
292 "[subscribe_blocks] Blockchain not fully synced".to_string(),
293 ));
294 }
295
296 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Subscribing to receive notifications of incoming blocks");
297 let publisher = Publisher::new();
298 let subscription = publisher.clone().subscribe().await;
299 let _ex = ex.clone();
300 let subscriber_task = StoppableTask::new();
301 subscriber_task.clone().start(
302 // Weird hack to prevent lifetimes hell
303 async move {
304 let ex = _ex.clone();
305 let rpc_client = RpcClient::new(endpoint, ex).await?;
306 let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
307 rpc_client.subscribe(req, publisher).await
308 },
309 |res| async move {
310 match res {
311 Ok(()) => { /* Do nothing */ }
312 Err(e) => error!(target: "explorerd::rpc_blocks::subscribe_blocks", "[subscribe_blocks] JSON-RPC server error: {e:?}"),
313 }
314 },
315 Error::RpcServerStopped,
316 ex.clone(),
317 );
318 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Detached subscription to background");
319
320 let listener_task = StoppableTask::new();
321 listener_task.clone().start(
322 // Weird hack to prevent lifetimes hell
323 async move {
324 loop {
325 match subscription.receive().await {
326 JsonResult::Notification(n) => {
327 debug!(target: "explorerd::rpc_blocks::subscribe_blocks", "Got Block notification from darkfid subscription");
328 if n.method != "blockchain.subscribe_blocks" {
329 return Err(Error::UnexpectedJsonRpc(format!(
330 "Got foreign notification from darkfid: {}",
331 n.method
332 )))
333 }
334
335 // Verify parameters
336 if !n.params.is_array() {
337 return Err(Error::UnexpectedJsonRpc(
338 "Received notification params are not an array".to_string(),
339 ))
340 }
341 let params = n.params.get::<Vec<JsonValue>>().unwrap();
342 if params.is_empty() {
343 return Err(Error::UnexpectedJsonRpc(
344 "Notification parameters are empty".to_string(),
345 ))
346 }
347
348 for param in params {
349 let param = param.get::<String>().unwrap();
350 let bytes = base64::decode(param).unwrap();
351
352 let darkfid_block: BlockInfo = match deserialize_async(&bytes).await {
353 Ok(b) => b,
354 Err(e) => {
355 return Err(Error::UnexpectedJsonRpc(format!(
356 "[subscribe_blocks] Deserializing block failed: {e:?}"
357 )))
358 },
359 };
360 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "========================================================================================");
361 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "| Block Notification: {} |", darkfid_block.hash());
362 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "========================================================================================");
363
364 // Store darkfi node block height for later use
365 let darkfid_block_height = darkfid_block.header.height;
366
367 // Check if we need to perform a reorg due to mismatch in block heights
368 if darkfid_block_height <= height {
369 info!(target: "explorerd::rpc_blocks::subscribe_blocks",
370 "Reorg detected with heights: darkfid.{darkfid_block_height} <= explorer.{height}");
371
372 // Calculate the reset height
373 let reset_height = darkfid_block_height.saturating_sub(1);
374
375 // Record the start time to measure the duration of the reorg
376 let start_reorg_time = Instant::now();
377
378 // Execute the reorg by resetting the explorer state to reset height
379 explorer.service.reset_explorer_state(reset_height)?;
380 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Completed reorg to height: {reset_height} [{}]", fmt_duration(start_reorg_time.elapsed()));
381 }
382
383
384 // Record the start time to measure the duration to store the block
385 let start_reorg_time = Instant::now();
386
387 if let Err(e) = explorer.service.put_block(&darkfid_block).await {
388 return Err(Error::DatabaseError(format!(
389 "[subscribe_blocks] Put block failed: {e:?}"
390 )))
391 }
392
393 info!(target: "explorerd::rpc_blocks::subscribe_blocks", "Stored new block at height: {} [{}]", darkfid_block.header.height, fmt_duration(start_reorg_time.elapsed()));
394
395 // Process the next block
396 height = darkfid_block.header.height;
397 }
398 }
399
400 JsonResult::Error(e) => {
401 // Some error happened in the transmission
402 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
403 }
404
405 x => {
406 // And this is weird
407 return Err(Error::UnexpectedJsonRpc(format!(
408 "Got unexpected data from JSON-RPC: {x:?}"
409 )))
410 }
411 }
412 };
413 },
414 |res| async move {
415 match res {
416 Ok(()) => { /* Do nothing */ }
417 Err(e) => error!(target: "explorerd::rpc_blocks::subscribe_blocks", "[subscribe_blocks] JSON-RPC server error: {e:?}"),
418 }
419 },
420 Error::RpcServerStopped,
421 ex,
422 );
423
424 Ok((subscriber_task, listener_task))
425}