darkfid/proto/
protocol_sync.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use log::{debug, error};
23
24use darkfi::{
25    blockchain::{BlockInfo, Header, HeaderHash},
26    impl_p2p_message,
27    net::{
28        metering::MeteringConfiguration,
29        protocol::protocol_generic::{
30            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
31        },
32        session::SESSION_DEFAULT,
33        Message, P2pPtr,
34    },
35    system::ExecutorPtr,
36    util::time::NanoTimestamp,
37    validator::{consensus::Proposal, ValidatorPtr},
38    Error, Result,
39};
40use darkfi_serial::{SerialDecodable, SerialEncodable};
41
42// Constant defining max elements we send in vectors during syncing.
43pub const BATCH: usize = 20;
44
45// TODO: Fine tune
46// Protocol metering configuration.
47// Since all messages are synchronous(request -> response) we will define
48// strict rules to prevent spamming.
49// Each message score will be 1, with a threshold of 20 and expiry time of 5.
50// Check ../tests/metering.rs for each message max bytes definition.
51const PROTOCOL_SYNC_METERING_CONFIGURATION: MeteringConfiguration = MeteringConfiguration {
52    threshold: 20,
53    sleep_step: 500,
54    expiry_time: NanoTimestamp::from_secs(5),
55};
56
57/// Structure represening a request to ask a node for their current
58/// canonical(confirmed) tip block hash, if they are synced. We also
59/// include our own tip, so they can verify we follow the same sequence.
60#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
61pub struct TipRequest {
62    /// Canonical(confirmed) tip block hash
63    pub tip: HeaderHash,
64}
65
66impl_p2p_message!(TipRequest, "tiprequest", 32, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
67
68/// Structure representing the response to `TipRequest`,
69/// containing a boolean flag to indicate if we are synced,
70/// and our canonical(confirmed) tip block height and hash.
71#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
72pub struct TipResponse {
73    /// Flag indicating the node is synced
74    pub synced: bool,
75    /// Canonical(confirmed) tip block height
76    pub height: Option<u32>,
77    /// Canonical(confirmed) tip block hash
78    pub hash: Option<HeaderHash>,
79}
80
81impl_p2p_message!(TipResponse, "tipresponse", 39, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
82
83/// Structure represening a request to ask a node for up to `BATCH` headers before
84/// the provided header height.
85#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
86pub struct HeaderSyncRequest {
87    /// Header height
88    pub height: u32,
89}
90
91impl_p2p_message!(
92    HeaderSyncRequest,
93    "headersyncrequest",
94    4,
95    1,
96    PROTOCOL_SYNC_METERING_CONFIGURATION
97);
98
99/// Structure representing the response to `HeaderSyncRequest`,
100/// containing up to `BATCH` headers before the requested block height.
101#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
102pub struct HeaderSyncResponse {
103    /// Response headers
104    pub headers: Vec<Header>,
105}
106
107impl_p2p_message!(
108    HeaderSyncResponse,
109    "headersyncresponse",
110    1701,
111    1,
112    PROTOCOL_SYNC_METERING_CONFIGURATION
113);
114
115/// Structure represening a request to ask a node for up to`BATCH` blocks
116/// of provided headers.
117#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
118pub struct SyncRequest {
119    /// Header hashes
120    pub headers: Vec<HeaderHash>,
121}
122
123impl_p2p_message!(SyncRequest, "syncrequest", 641, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
124
125/// Structure representing the response to `SyncRequest`,
126/// containing up to `BATCH` blocks after the requested block height.
127#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
128pub struct SyncResponse {
129    /// Response blocks
130    pub blocks: Vec<BlockInfo>,
131}
132
133impl_p2p_message!(SyncResponse, "syncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
134
135/// Structure represening a request to ask a node a fork sequence.
136/// If we include a specific fork tip, they have to return its sequence,
137/// otherwise they respond with their best fork sequence.
138/// We also include our own canonical(confirmed) tip, so they can verify
139/// we follow the same sequence.
140#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
141pub struct ForkSyncRequest {
142    /// Canonical(confirmed) tip block hash
143    pub tip: HeaderHash,
144    /// Optional fork tip block hash
145    pub fork_tip: Option<HeaderHash>,
146}
147
148impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 65, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
149
150/// Structure representing the response to `ForkSyncRequest`,
151/// containing the requested fork sequence, up to `BATCH` proposals.
152#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
153pub struct ForkSyncResponse {
154    /// Response fork proposals
155    pub proposals: Vec<Proposal>,
156}
157
158impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
159
160/// Structure represening a request to ask a node a fork header for the
161/// requested height. The fork is identified by the provided header hash.
162#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
163pub struct ForkHeaderHashRequest {
164    /// Header height
165    pub height: u32,
166    /// Block header hash to identify the fork
167    pub fork_header: HeaderHash,
168}
169
170impl_p2p_message!(
171    ForkHeaderHashRequest,
172    "forkheaderhashrequest",
173    36,
174    1,
175    PROTOCOL_SYNC_METERING_CONFIGURATION
176);
177
178/// Structure representing the response to `ForkHeaderHashRequest`,
179/// containing the requested fork header hash, if it was found.
180#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
181pub struct ForkHeaderHashResponse {
182    /// Response fork block header hash
183    pub fork_header: Option<HeaderHash>,
184}
185
186impl_p2p_message!(
187    ForkHeaderHashResponse,
188    "forkheaderhashresponse",
189    33,
190    1,
191    PROTOCOL_SYNC_METERING_CONFIGURATION
192);
193
194/// Structure represening a request to ask a node for up to `BATCH`
195/// fork headers for provided header hashes.  The fork is identified
196/// by the provided header hash.
197#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
198pub struct ForkHeadersRequest {
199    /// Header hashes
200    pub headers: Vec<HeaderHash>,
201    /// Block header hash to identify the fork
202    pub fork_header: HeaderHash,
203}
204
205impl_p2p_message!(
206    ForkHeadersRequest,
207    "forkheadersrequest",
208    673,
209    1,
210    PROTOCOL_SYNC_METERING_CONFIGURATION
211);
212
213/// Structure representing the response to `ForkHeadersRequest`,
214/// containing up to `BATCH` fork headers.
215#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
216pub struct ForkHeadersResponse {
217    /// Response headers
218    pub headers: Vec<Header>,
219}
220
221impl_p2p_message!(
222    ForkHeadersResponse,
223    "forkheadersresponse",
224    1701,
225    1,
226    PROTOCOL_SYNC_METERING_CONFIGURATION
227);
228
229/// Structure represening a request to ask a node for up to `BATCH`
230/// fork proposals for provided header hashes.  The fork is identified
231/// by the provided header hash.
232#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
233pub struct ForkProposalsRequest {
234    /// Header hashes
235    pub headers: Vec<HeaderHash>,
236    /// Block header hash to identify the fork
237    pub fork_header: HeaderHash,
238}
239
240impl_p2p_message!(
241    ForkProposalsRequest,
242    "forkproposalsrequest",
243    673,
244    1,
245    PROTOCOL_SYNC_METERING_CONFIGURATION
246);
247
248/// Structure representing the response to `ForkProposalsRequest`,
249/// containing up to `BATCH` fork headers.
250#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
251pub struct ForkProposalsResponse {
252    /// Response proposals
253    pub proposals: Vec<Proposal>,
254}
255
256impl_p2p_message!(
257    ForkProposalsResponse,
258    "forkproposalsresponse",
259    0,
260    1,
261    PROTOCOL_SYNC_METERING_CONFIGURATION
262);
263
264/// Atomic pointer to the `ProtocolSync` handler.
265pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
266
267/// Handler managing all `ProtocolSync` messages, over generic P2P protocols.
268pub struct ProtocolSyncHandler {
269    /// The generic handler for `TipRequest` messages.
270    tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
271    /// The generic handler for `HeaderSyncRequest` messages.
272    header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
273    /// The generic handler for `SyncRequest` messages.
274    sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
275    /// The generic handler for `ForkSyncRequest` messages.
276    fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
277    /// The generic handler for `ForkHeaderHashRequest` messages.
278    fork_header_hash_handler:
279        ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
280    /// The generic handler for `ForkHeadersRequest` messages.
281    fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
282    /// The generic handler for `ForkProposalsRequest` messages.
283    fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
284}
285
286impl ProtocolSyncHandler {
287    /// Initialize the generic prototocol handlers for all `ProtocolSync` messages
288    /// and register them to the provided P2P network, using the default session flag.
289    pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
290        debug!(
291            target: "darkfid::proto::protocol_sync::init",
292            "Adding all sync protocols to the protocol registry"
293        );
294
295        let tip_handler =
296            ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
297        let header_handler =
298            ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
299        let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
300        let fork_sync_handler =
301            ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
302        let fork_header_hash_handler =
303            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
304        let fork_headers_handler =
305            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
306        let fork_proposals_handler =
307            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
308
309        Arc::new(Self {
310            tip_handler,
311            header_handler,
312            sync_handler,
313            fork_sync_handler,
314            fork_header_hash_handler,
315            fork_headers_handler,
316            fork_proposals_handler,
317        })
318    }
319
320    /// Start all `ProtocolSync` background tasks.
321    pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
322        debug!(
323            target: "darkfid::proto::protocol_sync::start",
324            "Starting sync protocols handlers tasks..."
325        );
326
327        self.tip_handler.task.clone().start(
328            handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
329            |res| async move {
330                match res {
331                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
332                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
333                }
334            },
335            Error::DetachedTaskStopped,
336            executor.clone(),
337        );
338
339        self.header_handler.task.clone().start(
340            handle_receive_header_request(self.header_handler.clone(), validator.clone()),
341            |res| async move {
342                match res {
343                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
344                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
345                }
346            },
347            Error::DetachedTaskStopped,
348            executor.clone(),
349        );
350
351        self.sync_handler.task.clone().start(
352            handle_receive_request(self.sync_handler.clone(), validator.clone()),
353            |res| async move {
354                match res {
355                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
356                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
357                }
358            },
359            Error::DetachedTaskStopped,
360            executor.clone(),
361        );
362
363        self.fork_sync_handler.task.clone().start(
364            handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
365            |res| async move {
366                match res {
367                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
368                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
369                }
370            },
371            Error::DetachedTaskStopped,
372            executor.clone(),
373        );
374
375        self.fork_header_hash_handler.task.clone().start(
376            handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
377            |res| async move {
378                match res {
379                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
380                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
381                }
382            },
383            Error::DetachedTaskStopped,
384            executor.clone(),
385        );
386
387        self.fork_headers_handler.task.clone().start(
388            handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
389            |res| async move {
390                match res {
391                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
392                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
393                }
394            },
395            Error::DetachedTaskStopped,
396            executor.clone(),
397        );
398
399        self.fork_proposals_handler.task.clone().start(
400            handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
401            |res| async move {
402                match res {
403                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
404                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
405                }
406            },
407            Error::DetachedTaskStopped,
408            executor.clone(),
409        );
410
411        debug!(
412            target: "darkfid::proto::protocol_sync::start",
413            "Sync protocols handlers tasks started!"
414        );
415
416        Ok(())
417    }
418
419    /// Stop all `ProtocolSync` background tasks.
420    pub async fn stop(&self) {
421        debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
422        self.tip_handler.task.stop().await;
423        self.header_handler.task.stop().await;
424        self.sync_handler.task.stop().await;
425        self.fork_sync_handler.task.stop().await;
426        self.fork_header_hash_handler.task.stop().await;
427        self.fork_headers_handler.task.stop().await;
428        self.fork_proposals_handler.task.stop().await;
429        debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
430    }
431}
432
433/// Background handler function for ProtocolSyncTip.
434async fn handle_receive_tip_request(
435    handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
436    validator: ValidatorPtr,
437) -> Result<()> {
438    debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
439    loop {
440        // Wait for a new tip request message
441        let (channel, request) = match handler.receiver.recv().await {
442            Ok(r) => r,
443            Err(e) => {
444                debug!(
445                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
446                    "recv fail: {e}"
447                );
448                continue
449            }
450        };
451
452        debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
453
454        // Check if node has finished syncing its blockchain
455        if !*validator.synced.read().await {
456            debug!(
457                target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
458                "Node still syncing blockchain"
459            );
460            handler
461                .send_action(
462                    channel,
463                    ProtocolGenericAction::Response(TipResponse {
464                        synced: false,
465                        height: None,
466                        hash: None,
467                    }),
468                )
469                .await;
470            continue
471        }
472
473        // Check we follow the same sequence
474        match validator.blockchain.blocks.contains(&request.tip) {
475            Ok(contains) => {
476                if !contains {
477                    debug!(
478                        target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
479                        "Node doesn't follow request sequence"
480                    );
481                    handler.send_action(channel, ProtocolGenericAction::Skip).await;
482                    continue
483                }
484            }
485            Err(e) => {
486                error!(
487                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
488                    "block_store.contains fail: {e}"
489                );
490                handler.send_action(channel, ProtocolGenericAction::Skip).await;
491                continue
492            }
493        }
494
495        // Grab our current tip and return it
496        let tip = match validator.blockchain.last() {
497            Ok(v) => v,
498            Err(e) => {
499                error!(
500                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
501                    "blockchain.last fail: {e}"
502                );
503                handler.send_action(channel, ProtocolGenericAction::Skip).await;
504                continue
505            }
506        };
507
508        // Send response
509        handler
510            .send_action(
511                channel,
512                ProtocolGenericAction::Response(TipResponse {
513                    synced: true,
514                    height: Some(tip.0),
515                    hash: Some(tip.1),
516                }),
517            )
518            .await;
519    }
520}
521
522/// Background handler function for ProtocolSyncHeader.
523async fn handle_receive_header_request(
524    handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
525    validator: ValidatorPtr,
526) -> Result<()> {
527    debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
528    loop {
529        // Wait for a new header request message
530        let (channel, request) = match handler.receiver.recv().await {
531            Ok(r) => r,
532            Err(e) => {
533                debug!(
534                    target: "darkfid::proto::protocol_sync::handle_receive_header_request",
535                    "recv fail: {e}"
536                );
537                continue
538            }
539        };
540
541        // Check if node has finished syncing its blockchain
542        if !*validator.synced.read().await {
543            debug!(
544                target: "darkfid::proto::protocol_sync::handle_receive_header_request",
545                "Node still syncing blockchain, skipping..."
546            );
547            handler.send_action(channel, ProtocolGenericAction::Skip).await;
548            continue
549        }
550
551        debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
552
553        // Grab the corresponding headers
554        let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
555            Ok(v) => v,
556            Err(e) => {
557                error!(
558                    target: "darkfid::proto::protocol_sync::handle_receive_header_request",
559                    "get_headers_before fail: {}",
560                    e
561                );
562                handler.send_action(channel, ProtocolGenericAction::Skip).await;
563                continue
564            }
565        };
566
567        // Send response
568        handler
569            .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
570            .await;
571    }
572}
573
574/// Background handler function for ProtocolSync.
575async fn handle_receive_request(
576    handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
577    validator: ValidatorPtr,
578) -> Result<()> {
579    debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
580    loop {
581        // Wait for a new sync request message
582        let (channel, request) = match handler.receiver.recv().await {
583            Ok(r) => r,
584            Err(e) => {
585                debug!(
586                    target: "darkfid::proto::protocol_sync::handle_receive_request",
587                    "recv fail: {e}"
588                );
589                continue
590            }
591        };
592
593        // Check if node has finished syncing its blockchain
594        if !*validator.synced.read().await {
595            debug!(
596                target: "darkfid::proto::protocol_sync::handle_receive_request",
597                "Node still syncing blockchain, skipping..."
598            );
599            handler.send_action(channel, ProtocolGenericAction::Skip).await;
600            continue
601        }
602
603        // Check if request exists the configured limit
604        if request.headers.len() > BATCH {
605            debug!(
606                target: "darkfid::proto::protocol_sync::handle_receive_request",
607                "Node requested more blocks than allowed."
608            );
609            handler.send_action(channel, ProtocolGenericAction::Skip).await;
610            continue
611        }
612
613        debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
614
615        // Grab the corresponding blocks
616        let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
617            Ok(v) => v,
618            Err(e) => {
619                error!(
620                    target: "darkfid::proto::protocol_sync::handle_receive_request",
621                    "get_blocks_after fail: {}",
622                    e
623                );
624                handler.send_action(channel, ProtocolGenericAction::Skip).await;
625                continue
626            }
627        };
628
629        // Send response
630        handler
631            .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
632            .await;
633    }
634}
635
636/// Background handler function for ProtocolSyncFork.
637async fn handle_receive_fork_request(
638    handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
639    validator: ValidatorPtr,
640) -> Result<()> {
641    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
642    loop {
643        // Wait for a new fork sync request message
644        let (channel, request) = match handler.receiver.recv().await {
645            Ok(r) => r,
646            Err(e) => {
647                debug!(
648                    target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
649                    "recv fail: {e}"
650                );
651                continue
652            }
653        };
654
655        // Check if node has finished syncing its blockchain
656        if !*validator.synced.read().await {
657            debug!(
658                target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
659                "Node still syncing blockchain, skipping..."
660            );
661            handler.send_action(channel, ProtocolGenericAction::Skip).await;
662            continue
663        }
664
665        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
666
667        // Retrieve proposals sequence
668        let proposals = match validator
669            .consensus
670            .get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
671            .await
672        {
673            Ok(p) => p,
674            Err(e) => {
675                debug!(
676                    target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
677                    "Getting fork proposals failed: {}",
678                    e
679                );
680                handler.send_action(channel, ProtocolGenericAction::Skip).await;
681                continue
682            }
683        };
684
685        // Send response
686        handler
687            .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
688            .await;
689    }
690}
691
692/// Background handler function for ProtocolSyncForkHeaderHash.
693async fn handle_receive_fork_header_hash_request(
694    handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
695    validator: ValidatorPtr,
696) -> Result<()> {
697    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
698    loop {
699        // Wait for a new fork header hash request message
700        let (channel, request) = match handler.receiver.recv().await {
701            Ok(r) => r,
702            Err(e) => {
703                debug!(
704                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
705                    "recv fail: {e}"
706                );
707                continue
708            }
709        };
710
711        // Check if node has finished syncing its blockchain
712        if !*validator.synced.read().await {
713            debug!(
714                target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
715                "Node still syncing blockchain, skipping..."
716            );
717            handler.send_action(channel, ProtocolGenericAction::Skip).await;
718            continue
719        }
720
721        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
722
723        // Retrieve fork header
724        let fork_header = match validator
725            .consensus
726            .get_fork_header_hash(request.height, &request.fork_header)
727            .await
728        {
729            Ok(h) => h,
730            Err(e) => {
731                debug!(
732                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
733                    "Getting fork header hash failed: {}",
734                    e
735                );
736                handler.send_action(channel, ProtocolGenericAction::Skip).await;
737                continue
738            }
739        };
740
741        // Send response
742        handler
743            .send_action(
744                channel,
745                ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
746            )
747            .await;
748    }
749}
750
751/// Background handler function for ProtocolSyncForkHeaders.
752async fn handle_receive_fork_headers_request(
753    handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
754    validator: ValidatorPtr,
755) -> Result<()> {
756    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
757    loop {
758        // Wait for a new fork header hash request message
759        let (channel, request) = match handler.receiver.recv().await {
760            Ok(r) => r,
761            Err(e) => {
762                debug!(
763                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
764                    "recv fail: {e}"
765                );
766                continue
767            }
768        };
769
770        // Check if node has finished syncing its blockchain
771        if !*validator.synced.read().await {
772            debug!(
773                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
774                "Node still syncing blockchain, skipping..."
775            );
776            handler.send_action(channel, ProtocolGenericAction::Skip).await;
777            continue
778        }
779
780        // Check if request exists the configured limit
781        if request.headers.len() > BATCH {
782            debug!(
783                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
784                "Node requested more headers than allowed."
785            );
786            handler.send_action(channel, ProtocolGenericAction::Skip).await;
787            continue
788        }
789
790        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
791
792        // Retrieve fork headers
793        let headers = match validator
794            .consensus
795            .get_fork_headers(&request.headers, &request.fork_header)
796            .await
797        {
798            Ok(h) => h,
799            Err(e) => {
800                debug!(
801                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
802                    "Getting fork headers failed: {}",
803                    e
804                );
805                handler.send_action(channel, ProtocolGenericAction::Skip).await;
806                continue
807            }
808        };
809
810        // Send response
811        handler
812            .send_action(channel, ProtocolGenericAction::Response(ForkHeadersResponse { headers }))
813            .await;
814    }
815}
816
817/// Background handler function for ProtocolSyncForkProposals.
818async fn handle_receive_fork_proposals_request(
819    handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
820    validator: ValidatorPtr,
821) -> Result<()> {
822    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
823    loop {
824        // Wait for a new fork header hash request message
825        let (channel, request) = match handler.receiver.recv().await {
826            Ok(r) => r,
827            Err(e) => {
828                debug!(
829                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
830                    "recv fail: {e}"
831                );
832                continue
833            }
834        };
835
836        // Check if node has finished syncing its blockchain
837        if !*validator.synced.read().await {
838            debug!(
839                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
840                "Node still syncing blockchain, skipping..."
841            );
842            handler.send_action(channel, ProtocolGenericAction::Skip).await;
843            continue
844        }
845
846        // Check if request exists the configured limit
847        if request.headers.len() > BATCH {
848            debug!(
849                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
850                "Node requested more proposals than allowed."
851            );
852            handler.send_action(channel, ProtocolGenericAction::Skip).await;
853            continue
854        }
855
856        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
857
858        // Retrieve fork headers
859        let proposals = match validator
860            .consensus
861            .get_fork_proposals(&request.headers, &request.fork_header)
862            .await
863        {
864            Ok(p) => p,
865            Err(e) => {
866                debug!(
867                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
868                    "Getting fork proposals failed: {}",
869                    e
870                );
871                handler.send_action(channel, ProtocolGenericAction::Skip).await;
872                continue
873            }
874        };
875
876        // Send response
877        handler
878            .send_action(
879                channel,
880                ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
881            )
882            .await;
883    }
884}