darkfid/proto/
protocol_sync.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use log::{debug, error};
23
24use darkfi::{
25    blockchain::{BlockInfo, Header, HeaderHash},
26    impl_p2p_message,
27    net::{
28        metering::MeteringConfiguration,
29        protocol::protocol_generic::{
30            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
31        },
32        session::SESSION_DEFAULT,
33        Message, P2pPtr,
34    },
35    system::ExecutorPtr,
36    util::time::NanoTimestamp,
37    validator::{consensus::Proposal, ValidatorPtr},
38    Error, Result,
39};
40use darkfi_serial::{SerialDecodable, SerialEncodable};
41
42// Constant defining max elements we send in vectors during syncing.
43pub const BATCH: usize = 20;
44
45// TODO: Fine tune
46// Protocol metering configuration.
47// Since all messages are synchronous(request -> response) we will define
48// strict rules to prevent spamming.
49// Each message score will be 1, with a threshold of 20 and expiry time of 5.
50// Check ../tests/metering.rs for each message max bytes definition.
51const PROTOCOL_SYNC_METERING_CONFIGURATION: MeteringConfiguration = MeteringConfiguration {
52    threshold: 20,
53    sleep_step: 500,
54    expiry_time: NanoTimestamp::from_secs(5),
55};
56
57/// Structure represening a request to ask a node for their current
58/// canonical(confirmed) tip block hash, if they are synced. We also
59/// include our own tip, so they can verify we follow the same sequence.
60#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
61pub struct TipRequest {
62    /// Canonical(confirmed) tip block hash
63    pub tip: HeaderHash,
64}
65
66impl_p2p_message!(TipRequest, "tiprequest", 32, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
67
68/// Structure representing the response to `TipRequest`,
69/// containing a boolean flag to indicate if we are synced,
70/// and our canonical(confirmed) tip block height and hash.
71#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
72pub struct TipResponse {
73    /// Flag indicating the node is synced
74    pub synced: bool,
75    /// Canonical(confirmed) tip block height
76    pub height: Option<u32>,
77    /// Canonical(confirmed) tip block hash
78    pub hash: Option<HeaderHash>,
79}
80
81impl_p2p_message!(TipResponse, "tipresponse", 39, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
82
83/// Structure represening a request to ask a node for up to `BATCH` headers before
84/// the provided header height.
85#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
86pub struct HeaderSyncRequest {
87    /// Header height
88    pub height: u32,
89}
90
91impl_p2p_message!(
92    HeaderSyncRequest,
93    "headersyncrequest",
94    4,
95    1,
96    PROTOCOL_SYNC_METERING_CONFIGURATION
97);
98
99/// Structure representing the response to `HeaderSyncRequest`,
100/// containing up to `BATCH` headers before the requested block height.
101#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
102pub struct HeaderSyncResponse {
103    /// Response headers
104    pub headers: Vec<Header>,
105}
106
107impl_p2p_message!(
108    HeaderSyncResponse,
109    "headersyncresponse",
110    8192, // We leave some headroom for merge mining data
111    1,
112    PROTOCOL_SYNC_METERING_CONFIGURATION
113);
114
115/// Structure represening a request to ask a node for up to`BATCH` blocks
116/// of provided headers.
117#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
118pub struct SyncRequest {
119    /// Header hashes
120    pub headers: Vec<HeaderHash>,
121}
122
123impl_p2p_message!(SyncRequest, "syncrequest", 641, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
124
125/// Structure representing the response to `SyncRequest`,
126/// containing up to `BATCH` blocks after the requested block height.
127#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
128pub struct SyncResponse {
129    /// Response blocks
130    pub blocks: Vec<BlockInfo>,
131}
132
133impl_p2p_message!(SyncResponse, "syncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
134
135/// Structure represening a request to ask a node a fork sequence.
136/// If we include a specific fork tip, they have to return its sequence,
137/// otherwise they respond with their best fork sequence.
138/// We also include our own canonical(confirmed) tip, so they can verify
139/// we follow the same sequence.
140#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
141pub struct ForkSyncRequest {
142    /// Canonical(confirmed) tip block hash
143    pub tip: HeaderHash,
144    /// Optional fork tip block hash
145    pub fork_tip: Option<HeaderHash>,
146}
147
148impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 65, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
149
150/// Structure representing the response to `ForkSyncRequest`,
151/// containing the requested fork sequence, up to `BATCH` proposals.
152#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
153pub struct ForkSyncResponse {
154    /// Response fork proposals
155    pub proposals: Vec<Proposal>,
156}
157
158impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
159
160/// Structure represening a request to ask a node a fork header for the
161/// requested height. The fork is identified by the provided header hash.
162#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
163pub struct ForkHeaderHashRequest {
164    /// Header height
165    pub height: u32,
166    /// Block header hash to identify the fork
167    pub fork_header: HeaderHash,
168}
169
170impl_p2p_message!(
171    ForkHeaderHashRequest,
172    "forkheaderhashrequest",
173    36,
174    1,
175    PROTOCOL_SYNC_METERING_CONFIGURATION
176);
177
178/// Structure representing the response to `ForkHeaderHashRequest`,
179/// containing the requested fork header hash, if it was found.
180#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
181pub struct ForkHeaderHashResponse {
182    /// Response fork block header hash
183    pub fork_header: Option<HeaderHash>,
184}
185
186impl_p2p_message!(
187    ForkHeaderHashResponse,
188    "forkheaderhashresponse",
189    33,
190    1,
191    PROTOCOL_SYNC_METERING_CONFIGURATION
192);
193
194/// Structure represening a request to ask a node for up to `BATCH`
195/// fork headers for provided header hashes.  The fork is identified
196/// by the provided header hash.
197#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
198pub struct ForkHeadersRequest {
199    /// Header hashes
200    pub headers: Vec<HeaderHash>,
201    /// Block header hash to identify the fork
202    pub fork_header: HeaderHash,
203}
204
205impl_p2p_message!(
206    ForkHeadersRequest,
207    "forkheadersrequest",
208    673,
209    1,
210    PROTOCOL_SYNC_METERING_CONFIGURATION
211);
212
213/// Structure representing the response to `ForkHeadersRequest`,
214/// containing up to `BATCH` fork headers.
215#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
216pub struct ForkHeadersResponse {
217    /// Response headers
218    pub headers: Vec<Header>,
219}
220
221impl_p2p_message!(
222    ForkHeadersResponse,
223    "forkheadersresponse",
224    8192, // We leave some headroom for merge mining data
225    1,
226    PROTOCOL_SYNC_METERING_CONFIGURATION
227);
228
229/// Structure represening a request to ask a node for up to `BATCH`
230/// fork proposals for provided header hashes.  The fork is identified
231/// by the provided header hash.
232#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
233pub struct ForkProposalsRequest {
234    /// Header hashes
235    pub headers: Vec<HeaderHash>,
236    /// Block header hash to identify the fork
237    pub fork_header: HeaderHash,
238}
239
240impl_p2p_message!(
241    ForkProposalsRequest,
242    "forkproposalsrequest",
243    673,
244    1,
245    PROTOCOL_SYNC_METERING_CONFIGURATION
246);
247
248/// Structure representing the response to `ForkProposalsRequest`,
249/// containing up to `BATCH` fork headers.
250#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
251pub struct ForkProposalsResponse {
252    /// Response proposals
253    pub proposals: Vec<Proposal>,
254}
255
256impl_p2p_message!(
257    ForkProposalsResponse,
258    "forkproposalsresponse",
259    0,
260    1,
261    PROTOCOL_SYNC_METERING_CONFIGURATION
262);
263
264/// Atomic pointer to the `ProtocolSync` handler.
265pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
266
267/// Handler managing all `ProtocolSync` messages, over generic P2P protocols.
268pub struct ProtocolSyncHandler {
269    /// The generic handler for `TipRequest` messages.
270    tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
271    /// The generic handler for `HeaderSyncRequest` messages.
272    header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
273    /// The generic handler for `SyncRequest` messages.
274    sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
275    /// The generic handler for `ForkSyncRequest` messages.
276    fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
277    /// The generic handler for `ForkHeaderHashRequest` messages.
278    fork_header_hash_handler:
279        ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
280    /// The generic handler for `ForkHeadersRequest` messages.
281    fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
282    /// The generic handler for `ForkProposalsRequest` messages.
283    fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
284}
285
286impl ProtocolSyncHandler {
287    /// Initialize the generic prototocol handlers for all `ProtocolSync` messages
288    /// and register them to the provided P2P network, using the default session flag.
289    pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
290        debug!(
291            target: "darkfid::proto::protocol_sync::init",
292            "Adding all sync protocols to the protocol registry"
293        );
294
295        let tip_handler =
296            ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
297        let header_handler =
298            ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
299        let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
300        let fork_sync_handler =
301            ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
302        let fork_header_hash_handler =
303            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
304        let fork_headers_handler =
305            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
306        let fork_proposals_handler =
307            ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
308
309        Arc::new(Self {
310            tip_handler,
311            header_handler,
312            sync_handler,
313            fork_sync_handler,
314            fork_header_hash_handler,
315            fork_headers_handler,
316            fork_proposals_handler,
317        })
318    }
319
320    /// Start all `ProtocolSync` background tasks.
321    pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
322        debug!(
323            target: "darkfid::proto::protocol_sync::start",
324            "Starting sync protocols handlers tasks..."
325        );
326
327        self.tip_handler.task.clone().start(
328            handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
329            |res| async move {
330                match res {
331                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
332                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
333                }
334            },
335            Error::DetachedTaskStopped,
336            executor.clone(),
337        );
338
339        self.header_handler.task.clone().start(
340            handle_receive_header_request(self.header_handler.clone(), validator.clone()),
341            |res| async move {
342                match res {
343                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
344                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
345                }
346            },
347            Error::DetachedTaskStopped,
348            executor.clone(),
349        );
350
351        self.sync_handler.task.clone().start(
352            handle_receive_request(self.sync_handler.clone(), validator.clone()),
353            |res| async move {
354                match res {
355                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
356                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
357                }
358            },
359            Error::DetachedTaskStopped,
360            executor.clone(),
361        );
362
363        self.fork_sync_handler.task.clone().start(
364            handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
365            |res| async move {
366                match res {
367                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
368                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
369                }
370            },
371            Error::DetachedTaskStopped,
372            executor.clone(),
373        );
374
375        self.fork_header_hash_handler.task.clone().start(
376            handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
377            |res| async move {
378                match res {
379                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
380                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
381                }
382            },
383            Error::DetachedTaskStopped,
384            executor.clone(),
385        );
386
387        self.fork_headers_handler.task.clone().start(
388            handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
389            |res| async move {
390                match res {
391                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
392                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
393                }
394            },
395            Error::DetachedTaskStopped,
396            executor.clone(),
397        );
398
399        self.fork_proposals_handler.task.clone().start(
400            handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
401            |res| async move {
402                match res {
403                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
404                    Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
405                }
406            },
407            Error::DetachedTaskStopped,
408            executor.clone(),
409        );
410
411        debug!(
412            target: "darkfid::proto::protocol_sync::start",
413            "Sync protocols handlers tasks started!"
414        );
415
416        Ok(())
417    }
418
419    /// Stop all `ProtocolSync` background tasks.
420    pub async fn stop(&self) {
421        debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
422        self.tip_handler.task.stop().await;
423        self.header_handler.task.stop().await;
424        self.sync_handler.task.stop().await;
425        self.fork_sync_handler.task.stop().await;
426        self.fork_header_hash_handler.task.stop().await;
427        self.fork_headers_handler.task.stop().await;
428        self.fork_proposals_handler.task.stop().await;
429        debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
430    }
431}
432
433/// Background handler function for ProtocolSyncTip.
434async fn handle_receive_tip_request(
435    handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
436    validator: ValidatorPtr,
437) -> Result<()> {
438    debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
439    loop {
440        // Wait for a new tip request message
441        let (channel, request) = match handler.receiver.recv().await {
442            Ok(r) => r,
443            Err(e) => {
444                debug!(
445                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
446                    "recv fail: {e}"
447                );
448                continue
449            }
450        };
451
452        debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
453
454        // Check if node has finished syncing its blockchain
455        if !*validator.synced.read().await {
456            debug!(
457                target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
458                "Node still syncing blockchain"
459            );
460            handler
461                .send_action(
462                    channel,
463                    ProtocolGenericAction::Response(TipResponse {
464                        synced: false,
465                        height: None,
466                        hash: None,
467                    }),
468                )
469                .await;
470            continue
471        }
472
473        // Check we follow the same sequence
474        match validator.blockchain.blocks.contains(&request.tip) {
475            Ok(contains) => {
476                if !contains {
477                    debug!(
478                        target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
479                        "Node doesn't follow request sequence"
480                    );
481                    handler
482                        .send_action(
483                            channel,
484                            ProtocolGenericAction::Response(TipResponse {
485                                synced: true,
486                                height: None,
487                                hash: None,
488                            }),
489                        )
490                        .await;
491                    continue
492                }
493            }
494            Err(e) => {
495                error!(
496                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
497                    "block_store.contains fail: {e}"
498                );
499                handler.send_action(channel, ProtocolGenericAction::Skip).await;
500                continue
501            }
502        }
503
504        // Grab our current tip and return it
505        let tip = match validator.blockchain.last() {
506            Ok(v) => v,
507            Err(e) => {
508                error!(
509                    target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
510                    "blockchain.last fail: {e}"
511                );
512                handler.send_action(channel, ProtocolGenericAction::Skip).await;
513                continue
514            }
515        };
516
517        // Send response
518        handler
519            .send_action(
520                channel,
521                ProtocolGenericAction::Response(TipResponse {
522                    synced: true,
523                    height: Some(tip.0),
524                    hash: Some(tip.1),
525                }),
526            )
527            .await;
528    }
529}
530
531/// Background handler function for ProtocolSyncHeader.
532async fn handle_receive_header_request(
533    handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
534    validator: ValidatorPtr,
535) -> Result<()> {
536    debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
537    loop {
538        // Wait for a new header request message
539        let (channel, request) = match handler.receiver.recv().await {
540            Ok(r) => r,
541            Err(e) => {
542                debug!(
543                    target: "darkfid::proto::protocol_sync::handle_receive_header_request",
544                    "recv fail: {e}"
545                );
546                continue
547            }
548        };
549
550        // Check if node has finished syncing its blockchain
551        if !*validator.synced.read().await {
552            debug!(
553                target: "darkfid::proto::protocol_sync::handle_receive_header_request",
554                "Node still syncing blockchain, skipping..."
555            );
556            handler.send_action(channel, ProtocolGenericAction::Skip).await;
557            continue
558        }
559
560        debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
561
562        // Grab the corresponding headers
563        let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
564            Ok(v) => v,
565            Err(e) => {
566                error!(
567                    target: "darkfid::proto::protocol_sync::handle_receive_header_request",
568                    "get_headers_before fail: {e}"
569                );
570                handler.send_action(channel, ProtocolGenericAction::Skip).await;
571                continue
572            }
573        };
574
575        // Send response
576        handler
577            .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
578            .await;
579    }
580}
581
582/// Background handler function for ProtocolSync.
583async fn handle_receive_request(
584    handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
585    validator: ValidatorPtr,
586) -> Result<()> {
587    debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
588    loop {
589        // Wait for a new sync request message
590        let (channel, request) = match handler.receiver.recv().await {
591            Ok(r) => r,
592            Err(e) => {
593                debug!(
594                    target: "darkfid::proto::protocol_sync::handle_receive_request",
595                    "recv fail: {e}"
596                );
597                continue
598            }
599        };
600
601        // Check if node has finished syncing its blockchain
602        if !*validator.synced.read().await {
603            debug!(
604                target: "darkfid::proto::protocol_sync::handle_receive_request",
605                "Node still syncing blockchain, skipping..."
606            );
607            handler.send_action(channel, ProtocolGenericAction::Skip).await;
608            continue
609        }
610
611        // Check if request exists the configured limit
612        if request.headers.len() > BATCH {
613            debug!(
614                target: "darkfid::proto::protocol_sync::handle_receive_request",
615                "Node requested more blocks than allowed."
616            );
617            handler.send_action(channel, ProtocolGenericAction::Skip).await;
618            continue
619        }
620
621        debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
622
623        // Grab the corresponding blocks
624        let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
625            Ok(v) => v,
626            Err(e) => {
627                error!(
628                    target: "darkfid::proto::protocol_sync::handle_receive_request",
629                    "get_blocks_after fail: {e}"
630                );
631                handler.send_action(channel, ProtocolGenericAction::Skip).await;
632                continue
633            }
634        };
635
636        // Send response
637        handler
638            .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
639            .await;
640    }
641}
642
643/// Background handler function for ProtocolSyncFork.
644async fn handle_receive_fork_request(
645    handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
646    validator: ValidatorPtr,
647) -> Result<()> {
648    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
649    loop {
650        // Wait for a new fork sync request message
651        let (channel, request) = match handler.receiver.recv().await {
652            Ok(r) => r,
653            Err(e) => {
654                debug!(
655                    target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
656                    "recv fail: {e}"
657                );
658                continue
659            }
660        };
661
662        // Check if node has finished syncing its blockchain
663        if !*validator.synced.read().await {
664            debug!(
665                target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
666                "Node still syncing blockchain, skipping..."
667            );
668            handler.send_action(channel, ProtocolGenericAction::Skip).await;
669            continue
670        }
671
672        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
673
674        // Retrieve proposals sequence
675        let proposals = match validator
676            .consensus
677            .get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
678            .await
679        {
680            Ok(p) => p,
681            Err(e) => {
682                debug!(
683                    target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
684                    "Getting fork proposals failed: {e}"
685                );
686                handler.send_action(channel, ProtocolGenericAction::Skip).await;
687                continue
688            }
689        };
690
691        // Send response
692        handler
693            .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
694            .await;
695    }
696}
697
698/// Background handler function for ProtocolSyncForkHeaderHash.
699async fn handle_receive_fork_header_hash_request(
700    handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
701    validator: ValidatorPtr,
702) -> Result<()> {
703    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
704    loop {
705        // Wait for a new fork header hash request message
706        let (channel, request) = match handler.receiver.recv().await {
707            Ok(r) => r,
708            Err(e) => {
709                debug!(
710                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
711                    "recv fail: {e}"
712                );
713                continue
714            }
715        };
716
717        // Check if node has finished syncing its blockchain
718        if !*validator.synced.read().await {
719            debug!(
720                target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
721                "Node still syncing blockchain, skipping..."
722            );
723            handler.send_action(channel, ProtocolGenericAction::Skip).await;
724            continue
725        }
726
727        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
728
729        // Retrieve fork header
730        let fork_header = match validator
731            .consensus
732            .get_fork_header_hash(request.height, &request.fork_header)
733            .await
734        {
735            Ok(h) => h,
736            Err(e) => {
737                debug!(
738                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
739                    "Getting fork header hash failed: {e}"
740                );
741                handler.send_action(channel, ProtocolGenericAction::Skip).await;
742                continue
743            }
744        };
745
746        // Send response if header was found
747        if fork_header.is_some() {
748            handler
749                .send_action(
750                    channel,
751                    ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
752                )
753                .await;
754            continue
755        }
756
757        // If header wasn't found in a fork, check canonical
758        if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
759            debug!(
760                target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
761                "Getting fork header hash failed: {e}"
762            );
763            handler.send_action(channel, ProtocolGenericAction::Skip).await;
764            continue
765        };
766
767        let response = match validator.blockchain.blocks.get_order(&[request.height], false) {
768            Ok(h) => ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header: h[0] }),
769            Err(e) => {
770                debug!(
771                    target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
772                    "Getting fork header hash failed: {e}"
773                );
774                ProtocolGenericAction::Skip
775            }
776        };
777
778        // Send response
779        handler.send_action(channel, response).await;
780    }
781}
782
783/// Background handler function for ProtocolSyncForkHeaders.
784async fn handle_receive_fork_headers_request(
785    handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
786    validator: ValidatorPtr,
787) -> Result<()> {
788    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
789    loop {
790        // Wait for a new fork header hash request message
791        let (channel, request) = match handler.receiver.recv().await {
792            Ok(r) => r,
793            Err(e) => {
794                debug!(
795                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
796                    "recv fail: {e}"
797                );
798                continue
799            }
800        };
801
802        // Check if node has finished syncing its blockchain
803        if !*validator.synced.read().await {
804            debug!(
805                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
806                "Node still syncing blockchain, skipping..."
807            );
808            handler.send_action(channel, ProtocolGenericAction::Skip).await;
809            continue
810        }
811
812        // Check if request exists the configured limit
813        if request.headers.len() > BATCH {
814            debug!(
815                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
816                "Node requested more headers than allowed."
817            );
818            handler.send_action(channel, ProtocolGenericAction::Skip).await;
819            continue
820        }
821
822        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
823
824        // Retrieve fork headers
825        let headers = match validator
826            .consensus
827            .get_fork_headers(&request.headers, &request.fork_header)
828            .await
829        {
830            Ok(h) => h,
831            Err(e) => {
832                debug!(
833                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
834                    "Getting fork headers failed: {e}"
835                );
836                handler.send_action(channel, ProtocolGenericAction::Skip).await;
837                continue
838            }
839        };
840
841        // Send response if headers were found
842        if !headers.is_empty() {
843            handler
844                .send_action(
845                    channel,
846                    ProtocolGenericAction::Response(ForkHeadersResponse { headers }),
847                )
848                .await;
849            continue
850        }
851
852        // If headers weren't found in a fork, check canonical
853        if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
854            debug!(
855                target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
856                "Getting fork header hash failed: {e}"
857            );
858            handler.send_action(channel, ProtocolGenericAction::Skip).await;
859            continue
860        };
861
862        let response = match validator.blockchain.headers.get(&request.headers, true) {
863            Ok(h) => ProtocolGenericAction::Response(ForkHeadersResponse {
864                headers: h.iter().map(|x| x.clone().unwrap()).collect(),
865            }),
866            Err(e) => {
867                debug!(
868                    target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
869                    "Getting fork headers failed: {e}"
870                );
871                ProtocolGenericAction::Skip
872            }
873        };
874
875        // Send response
876        handler.send_action(channel, response).await;
877    }
878}
879
880/// Background handler function for ProtocolSyncForkProposals.
881async fn handle_receive_fork_proposals_request(
882    handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
883    validator: ValidatorPtr,
884) -> Result<()> {
885    debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
886    loop {
887        // Wait for a new fork header hash request message
888        let (channel, request) = match handler.receiver.recv().await {
889            Ok(r) => r,
890            Err(e) => {
891                debug!(
892                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
893                    "recv fail: {e}"
894                );
895                continue
896            }
897        };
898
899        // Check if node has finished syncing its blockchain
900        if !*validator.synced.read().await {
901            debug!(
902                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
903                "Node still syncing blockchain, skipping..."
904            );
905            handler.send_action(channel, ProtocolGenericAction::Skip).await;
906            continue
907        }
908
909        // Check if request exists the configured limit
910        if request.headers.len() > BATCH {
911            debug!(
912                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
913                "Node requested more proposals than allowed."
914            );
915            handler.send_action(channel, ProtocolGenericAction::Skip).await;
916            continue
917        }
918
919        debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
920
921        // Retrieve fork proposals
922        let proposals = match validator
923            .consensus
924            .get_fork_proposals(&request.headers, &request.fork_header)
925            .await
926        {
927            Ok(p) => p,
928            Err(e) => {
929                debug!(
930                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
931                    "Getting fork proposals failed: {e}"
932                );
933                handler.send_action(channel, ProtocolGenericAction::Skip).await;
934                continue
935            }
936        };
937
938        // Send response if proposals were found
939        if !proposals.is_empty() {
940            handler
941                .send_action(
942                    channel,
943                    ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
944                )
945                .await;
946            continue
947        }
948
949        // If proposals weren't found in a fork, check canonical
950        if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
951            debug!(
952                target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
953                "Getting fork header hash failed: {e}"
954            );
955            handler.send_action(channel, ProtocolGenericAction::Skip).await;
956            continue
957        };
958
959        let response = match validator.blockchain.get_blocks_by_hash(&request.headers) {
960            Ok(blocks) => {
961                let mut proposals = Vec::with_capacity(blocks.len());
962                for block in blocks {
963                    proposals.push(Proposal::new(block));
964                }
965                ProtocolGenericAction::Response(ForkProposalsResponse { proposals })
966            }
967            Err(e) => {
968                debug!(
969                    target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
970                    "Getting fork proposals failed: {e}"
971                );
972                ProtocolGenericAction::Skip
973            }
974        };
975
976        // Send response
977        handler.send_action(channel, response).await;
978    }
979}