darkfid/task/
sync.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::collections::HashMap;
20
21use darkfi::{
22    blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23    util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use log::{debug, info, warn};
27use rand::{prelude::SliceRandom, rngs::OsRng};
28use tinyjson::JsonValue;
29
30use crate::{
31    proto::{
32        ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33        SyncResponse, TipRequest, TipResponse, BATCH,
34    },
35    DarkfiNodePtr,
36};
37
38// TODO: Parallelize independent requests.
39//       We can also make them be like torrents, where we retrieve chunks not in order.
40/// async task used for block syncing.
41/// A checkpoint can be provided to ensure node syncs the correct sequence.
42pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43    info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45    // Grab blocks subscriber
46    let block_sub = node.subscribers.get("blocks").unwrap();
47
48    // Grab last known block header, including existing pending sync ones
49    let mut last = node.validator.blockchain.last()?;
50
51    // If checkpoint is not reached, purge headers and start syncing from scratch
52    if let Some(checkpoint) = checkpoint {
53        if checkpoint.0 > last.0 {
54            node.validator.blockchain.headers.remove_all_sync()?;
55        }
56    }
57
58    // Check sync headers first record is the next one
59    if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
60        if next.height == last.0 + 1 {
61            // Grab last sync header to continue syncing from
62            if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
63                last = (last_sync.height, last_sync.hash());
64            }
65        } else {
66            // Purge headers and start syncing from scratch
67            node.validator.blockchain.headers.remove_all_sync()?;
68        }
69    }
70    info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
71
72    // Grab the most common tip and the corresponding peers
73    let (mut common_tip_height, mut common_tip_peers) =
74        most_common_tip(node, &last.1, checkpoint).await;
75
76    // If last known block header is before the checkpoint, we sync until that first.
77    if let Some(checkpoint) = checkpoint {
78        if checkpoint.0 > last.0 {
79            info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
80            // Retrieve all the headers backwards until our last known one and verify them.
81            // We use the next height, in order to also retrieve the checkpoint header.
82            retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
83
84            // Retrieve all the blocks for those headers and apply them to canonical
85            last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
86            info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
87
88            // Grab synced peers most common tip again
89            (common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await;
90        }
91    }
92
93    // Sync headers and blocks
94    loop {
95        // Retrieve all the headers backwards until our last known one and verify them.
96        // We use the next height, in order to also retrieve the peers tip header.
97        retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
98
99        // Retrieve all the blocks for those headers and apply them to canonical
100        let last_received =
101            retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
102        info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
103
104        if last == last_received {
105            break
106        }
107
108        last = last_received;
109
110        // Grab synced peers most common tip again
111        (common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await;
112    }
113
114    // Sync best fork
115    sync_best_fork(node, &common_tip_peers, &last.1).await;
116
117    // Perform confirmation
118    let confirmed = node.validator.confirmation().await?;
119    if !confirmed.is_empty() {
120        // Notify subscriber
121        let mut notif_blocks = Vec::with_capacity(confirmed.len());
122        for block in confirmed {
123            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
124        }
125        block_sub.notify(JsonValue::Array(notif_blocks)).await;
126    }
127
128    *node.validator.synced.write().await = true;
129    info!(target: "darkfid::task::sync_task", "Blockchain synced!");
130    Ok(())
131}
132
133/// Auxiliary function to block until node is connected to at least one synced peer,
134/// and retrieve the synced peers tips.
135async fn synced_peers(
136    node: &DarkfiNodePtr,
137    last_tip: &HeaderHash,
138    checkpoint: Option<(u32, HeaderHash)>,
139) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
140    info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
141    let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
142    let mut tips = HashMap::new();
143    loop {
144        // Grab channels
145        let peers = node.p2p_handler.p2p.hosts().channels();
146
147        // Ask each peer(if we got any) if they are synced
148        for peer in peers {
149            // If a checkpoint was provider, we check that the peer follows that sequence
150            if let Some(c) = checkpoint {
151                // Communication setup
152                let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
153                    debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
154                    continue
155                };
156
157                // Node creates a `HeaderSyncRequest` and sends it
158                let request = HeaderSyncRequest { height: c.0 + 1 };
159                if let Err(e) = peer.send(&request).await {
160                    debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
161                    continue
162                };
163
164                // Node waits for response
165                let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
166                    debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
167                    continue
168                };
169
170                // Handle response
171                if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
172                    debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
173                    continue
174                }
175            }
176
177            // Communication setup
178            let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
179                debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
180                continue
181            };
182
183            // Node creates a `TipRequest` and sends it
184            let request = TipRequest { tip: *last_tip };
185            if let Err(e) = peer.send(&request).await {
186                debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
187                continue
188            };
189
190            // Node waits for response
191            let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
192                debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
193                continue
194            };
195
196            // Handle response
197            if response.synced && response.height.is_some() && response.hash.is_some() {
198                let tip = (response.height.unwrap(), *response.hash.unwrap().inner());
199                let Some(tip_peers) = tips.get_mut(&tip) else {
200                    tips.insert(tip, vec![peer.clone()]);
201                    continue
202                };
203                tip_peers.push(peer.clone());
204            }
205        }
206
207        // Check if we got any tips
208        if !tips.is_empty() {
209            break
210        }
211
212        warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
213        let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
214        let _ = subscription.receive().await;
215        subscription.unsubscribe().await;
216
217        info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
218        sleep(comms_timeout).await;
219    }
220
221    tips
222}
223
224/// Auxiliary function to ask all peers for their current tip and find the most common one.
225async fn most_common_tip(
226    node: &DarkfiNodePtr,
227    last_tip: &HeaderHash,
228    checkpoint: Option<(u32, HeaderHash)>,
229) -> (u32, Vec<ChannelPtr>) {
230    // Grab synced peers tips
231    let tips = synced_peers(node, last_tip, checkpoint).await;
232
233    // Grab the most common highest tip peers
234    info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
235    let mut common_tip = (0, [0u8; 32], vec![]);
236    for (tip, peers) in tips {
237        // Check if tip peers is less than the most common tip peers
238        if peers.len() < common_tip.2.len() {
239            continue;
240        }
241        // If peers are the same length, skip if tip height is less than
242        // the most common tip height.
243        if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
244            continue;
245        }
246        // Keep the heighest tip with the most peers
247        common_tip = (tip.0, tip.1, peers);
248    }
249
250    info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
251    (common_tip.0, common_tip.2)
252}
253
254/// Auxiliary function to retrieve headers backwards until our last known one and verify them.
255async fn retrieve_headers(
256    node: &DarkfiNodePtr,
257    peers: &[ChannelPtr],
258    last_known: u32,
259    tip_height: u32,
260) -> Result<()> {
261    info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
262    // Communication setup
263    let mut peer_subs = vec![];
264    for peer in peers {
265        match peer.subscribe_msg::<HeaderSyncResponse>().await {
266            Ok(response_sub) => peer_subs.push(Some(response_sub)),
267            Err(e) => {
268                debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
269                peer_subs.push(None)
270            }
271        }
272    }
273    let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
274
275    // We subtract 1 since tip_height is increased by one
276    let total = tip_height - last_known - 1;
277    let mut last_tip_height = tip_height;
278    'headers_loop: loop {
279        for (index, peer) in peers.iter().enumerate() {
280            // Grab the response sub reference
281            let Some(ref response_sub) = peer_subs[index] else {
282                continue;
283            };
284
285            // Node creates a `HeaderSyncRequest` and sends it
286            let request = HeaderSyncRequest { height: last_tip_height };
287            if let Err(e) = peer.send(&request).await {
288                debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
289                continue
290            };
291
292            // Node waits for response
293            let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
294                debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
295                continue
296            };
297
298            // Retain only the headers after our last known
299            let mut response_headers = response.headers.to_vec();
300            response_headers.retain(|h| h.height > last_known);
301
302            if response_headers.is_empty() {
303                break 'headers_loop
304            }
305
306            // Store the headers
307            node.validator.blockchain.headers.insert_sync(&response_headers)?;
308            last_tip_height = response_headers[0].height;
309            info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{}", node.validator.blockchain.headers.len_sync(), total);
310        }
311    }
312
313    // Check if we retrieved any new headers
314    if node.validator.blockchain.headers.is_empty_sync() {
315        return Ok(());
316    }
317
318    // Verify headers sequence. Here we do a quick and dirty verification
319    // of just the hashes and heights sequence. We will formaly verify
320    // the blocks when we retrieve them. We verify them in batches,
321    // to not load them all in memory.
322    info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
323    let mut verified_headers = 0;
324    let total = node.validator.blockchain.headers.len_sync();
325    // First we verify the first `BATCH` sequence, using the last known header
326    // as the first sync header previous.
327    let last_known = node.validator.consensus.best_fork_last_header().await?;
328    let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
329    if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
330        node.validator.blockchain.headers.remove_all_sync()?;
331        return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
332    }
333    verified_headers += 1;
334    for (index, header) in headers[1..].iter().enumerate() {
335        if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
336            node.validator.blockchain.headers.remove_all_sync()?;
337            return Err(Error::BlockIsInvalid(header.hash().as_string()))
338        }
339        verified_headers += 1;
340    }
341    info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total);
342
343    // Now we verify the rest sequences
344    let mut last_checked = headers.last().unwrap().clone();
345    headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
346    while !headers.is_empty() {
347        if headers[0].previous != last_checked.hash() ||
348            headers[0].height != last_checked.height + 1
349        {
350            node.validator.blockchain.headers.remove_all_sync()?;
351            return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
352        }
353        verified_headers += 1;
354        for (index, header) in headers[1..].iter().enumerate() {
355            if header.previous != headers[index].hash() ||
356                header.height != headers[index].height + 1
357            {
358                node.validator.blockchain.headers.remove_all_sync()?;
359                return Err(Error::BlockIsInvalid(header.hash().as_string()))
360            }
361            verified_headers += 1;
362        }
363        last_checked = headers.last().unwrap().clone();
364        headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
365        info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total);
366    }
367
368    info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
369    Ok(())
370}
371
372/// Auxiliary function to retrieve blocks of provided headers and apply them to canonical.
373async fn retrieve_blocks(
374    node: &DarkfiNodePtr,
375    peers: &[ChannelPtr],
376    last_known: (u32, HeaderHash),
377    block_sub: &JsonSubscriber,
378    checkpoint_blocks: bool,
379) -> Result<(u32, HeaderHash)> {
380    info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
381    let mut last_received = last_known;
382    // Communication setup
383    let mut peer_subs = vec![];
384    for peer in peers {
385        match peer.subscribe_msg::<SyncResponse>().await {
386            Ok(response_sub) => peer_subs.push(Some(response_sub)),
387            Err(e) => {
388                debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
389                peer_subs.push(None)
390            }
391        }
392    }
393    let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
394
395    let mut received_blocks = 0;
396    let total = node.validator.blockchain.headers.len_sync();
397    'blocks_loop: loop {
398        'peers_loop: for (index, peer) in peers.iter().enumerate() {
399            // Grab the response sub reference
400            let Some(ref response_sub) = peer_subs[index] else {
401                continue;
402            };
403
404            // Grab first `BATCH` headers
405            let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
406            if headers.is_empty() {
407                break 'blocks_loop
408            }
409            let mut headers_hashes = Vec::with_capacity(headers.len());
410            let mut synced_headers = Vec::with_capacity(headers.len());
411            for header in &headers {
412                headers_hashes.push(header.hash());
413                synced_headers.push(header.height);
414            }
415
416            // Node creates a `SyncRequest` and sends it
417            let request = SyncRequest { headers: headers_hashes.clone() };
418            if let Err(e) = peer.send(&request).await {
419                debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
420                continue
421            };
422
423            // Node waits for response
424            let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
425                debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
426                continue
427            };
428
429            // Verify and store retrieved blocks
430            debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
431            received_blocks += response.blocks.len();
432            if checkpoint_blocks {
433                if let Err(e) =
434                    node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
435                {
436                    debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
437                    continue
438                };
439            } else {
440                for block in &response.blocks {
441                    if let Err(e) =
442                        node.validator.append_proposal(&Proposal::new(block.clone())).await
443                    {
444                        debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
445                        continue 'peers_loop
446                    };
447                }
448            }
449            last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
450
451            // Remove synced headers
452            node.validator.blockchain.headers.remove_sync(&synced_headers)?;
453
454            if checkpoint_blocks {
455                // Notify subscriber
456                let mut notif_blocks = Vec::with_capacity(response.blocks.len());
457                info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
458                for (index, block) in response.blocks.iter().enumerate() {
459                    info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
460                    notif_blocks
461                        .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
462                }
463                block_sub.notify(JsonValue::Array(notif_blocks)).await;
464            } else {
465                // Perform confirmation for received blocks
466                let confirmed = node.validator.confirmation().await?;
467                if !confirmed.is_empty() {
468                    // Notify subscriber
469                    let mut notif_blocks = Vec::with_capacity(confirmed.len());
470                    for block in confirmed {
471                        notif_blocks.push(JsonValue::String(base64::encode(
472                            &serialize_async(&block).await,
473                        )));
474                    }
475                    block_sub.notify(JsonValue::Array(notif_blocks)).await;
476                }
477            }
478
479            info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {}/{}", received_blocks, total);
480        }
481    }
482
483    Ok(last_received)
484}
485
486/// Auxiliary function to retrieve best fork state from a random peer.
487async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
488    info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
489    // Getting a random peer to ask for blocks
490    let peer = &peers.choose(&mut OsRng).unwrap();
491
492    // Communication setup
493    let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
494        debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
495        return
496    };
497    let notif_sub = node.subscribers.get("proposals").unwrap();
498
499    // Node creates a `ForkSyncRequest` and sends it
500    let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
501    if let Err(e) = peer.send(&request).await {
502        debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
503        return
504    };
505
506    // Node waits for response
507    let Ok(response) = response_sub
508        .receive_with_timeout(node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout)
509        .await
510    else {
511        debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
512        return
513    };
514
515    // Verify and store retrieved proposals
516    debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
517    for proposal in &response.proposals {
518        if let Err(e) = node.validator.append_proposal(proposal).await {
519            debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
520            return
521        };
522        // Notify subscriber
523        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
524        notif_sub.notify(vec![enc_prop].into()).await;
525    }
526}