1use std::sync::Arc;
20
21use async_trait::async_trait;
22use log::{debug, error};
23
24use darkfi::{
25 blockchain::{BlockInfo, Header, HeaderHash},
26 impl_p2p_message,
27 net::{
28 metering::MeteringConfiguration,
29 protocol::protocol_generic::{
30 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
31 },
32 session::SESSION_DEFAULT,
33 Message, P2pPtr,
34 },
35 system::ExecutorPtr,
36 util::time::NanoTimestamp,
37 validator::{consensus::Proposal, ValidatorPtr},
38 Error, Result,
39};
40use darkfi_serial::{SerialDecodable, SerialEncodable};
41
42pub const BATCH: usize = 20;
44
45const PROTOCOL_SYNC_METERING_CONFIGURATION: MeteringConfiguration = MeteringConfiguration {
52 threshold: 20,
53 sleep_step: 500,
54 expiry_time: NanoTimestamp::from_secs(5),
55};
56
57#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
61pub struct TipRequest {
62 pub tip: HeaderHash,
64}
65
66impl_p2p_message!(TipRequest, "tiprequest", 32, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
67
68#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
72pub struct TipResponse {
73 pub synced: bool,
75 pub height: Option<u32>,
77 pub hash: Option<HeaderHash>,
79}
80
81impl_p2p_message!(TipResponse, "tipresponse", 39, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
82
83#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
86pub struct HeaderSyncRequest {
87 pub height: u32,
89}
90
91impl_p2p_message!(
92 HeaderSyncRequest,
93 "headersyncrequest",
94 4,
95 1,
96 PROTOCOL_SYNC_METERING_CONFIGURATION
97);
98
99#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
102pub struct HeaderSyncResponse {
103 pub headers: Vec<Header>,
105}
106
107impl_p2p_message!(
108 HeaderSyncResponse,
109 "headersyncresponse",
110 1701,
111 1,
112 PROTOCOL_SYNC_METERING_CONFIGURATION
113);
114
115#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
118pub struct SyncRequest {
119 pub headers: Vec<HeaderHash>,
121}
122
123impl_p2p_message!(SyncRequest, "syncrequest", 641, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
124
125#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
128pub struct SyncResponse {
129 pub blocks: Vec<BlockInfo>,
131}
132
133impl_p2p_message!(SyncResponse, "syncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
134
135#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
141pub struct ForkSyncRequest {
142 pub tip: HeaderHash,
144 pub fork_tip: Option<HeaderHash>,
146}
147
148impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 65, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
149
150#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
153pub struct ForkSyncResponse {
154 pub proposals: Vec<Proposal>,
156}
157
158impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
159
160#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
163pub struct ForkHeaderHashRequest {
164 pub height: u32,
166 pub fork_header: HeaderHash,
168}
169
170impl_p2p_message!(
171 ForkHeaderHashRequest,
172 "forkheaderhashrequest",
173 36,
174 1,
175 PROTOCOL_SYNC_METERING_CONFIGURATION
176);
177
178#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
181pub struct ForkHeaderHashResponse {
182 pub fork_header: Option<HeaderHash>,
184}
185
186impl_p2p_message!(
187 ForkHeaderHashResponse,
188 "forkheaderhashresponse",
189 33,
190 1,
191 PROTOCOL_SYNC_METERING_CONFIGURATION
192);
193
194#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
198pub struct ForkHeadersRequest {
199 pub headers: Vec<HeaderHash>,
201 pub fork_header: HeaderHash,
203}
204
205impl_p2p_message!(
206 ForkHeadersRequest,
207 "forkheadersrequest",
208 673,
209 1,
210 PROTOCOL_SYNC_METERING_CONFIGURATION
211);
212
213#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
216pub struct ForkHeadersResponse {
217 pub headers: Vec<Header>,
219}
220
221impl_p2p_message!(
222 ForkHeadersResponse,
223 "forkheadersresponse",
224 1701,
225 1,
226 PROTOCOL_SYNC_METERING_CONFIGURATION
227);
228
229#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
233pub struct ForkProposalsRequest {
234 pub headers: Vec<HeaderHash>,
236 pub fork_header: HeaderHash,
238}
239
240impl_p2p_message!(
241 ForkProposalsRequest,
242 "forkproposalsrequest",
243 673,
244 1,
245 PROTOCOL_SYNC_METERING_CONFIGURATION
246);
247
248#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
251pub struct ForkProposalsResponse {
252 pub proposals: Vec<Proposal>,
254}
255
256impl_p2p_message!(
257 ForkProposalsResponse,
258 "forkproposalsresponse",
259 0,
260 1,
261 PROTOCOL_SYNC_METERING_CONFIGURATION
262);
263
264pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
266
267pub struct ProtocolSyncHandler {
269 tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
271 header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
273 sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
275 fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
277 fork_header_hash_handler:
279 ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
280 fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
282 fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
284}
285
286impl ProtocolSyncHandler {
287 pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
290 debug!(
291 target: "darkfid::proto::protocol_sync::init",
292 "Adding all sync protocols to the protocol registry"
293 );
294
295 let tip_handler =
296 ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
297 let header_handler =
298 ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
299 let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
300 let fork_sync_handler =
301 ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
302 let fork_header_hash_handler =
303 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
304 let fork_headers_handler =
305 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
306 let fork_proposals_handler =
307 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
308
309 Arc::new(Self {
310 tip_handler,
311 header_handler,
312 sync_handler,
313 fork_sync_handler,
314 fork_header_hash_handler,
315 fork_headers_handler,
316 fork_proposals_handler,
317 })
318 }
319
320 pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
322 debug!(
323 target: "darkfid::proto::protocol_sync::start",
324 "Starting sync protocols handlers tasks..."
325 );
326
327 self.tip_handler.task.clone().start(
328 handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
329 |res| async move {
330 match res {
331 Ok(()) | Err(Error::DetachedTaskStopped) => { }
332 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
333 }
334 },
335 Error::DetachedTaskStopped,
336 executor.clone(),
337 );
338
339 self.header_handler.task.clone().start(
340 handle_receive_header_request(self.header_handler.clone(), validator.clone()),
341 |res| async move {
342 match res {
343 Ok(()) | Err(Error::DetachedTaskStopped) => { }
344 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
345 }
346 },
347 Error::DetachedTaskStopped,
348 executor.clone(),
349 );
350
351 self.sync_handler.task.clone().start(
352 handle_receive_request(self.sync_handler.clone(), validator.clone()),
353 |res| async move {
354 match res {
355 Ok(()) | Err(Error::DetachedTaskStopped) => { }
356 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
357 }
358 },
359 Error::DetachedTaskStopped,
360 executor.clone(),
361 );
362
363 self.fork_sync_handler.task.clone().start(
364 handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
365 |res| async move {
366 match res {
367 Ok(()) | Err(Error::DetachedTaskStopped) => { }
368 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
369 }
370 },
371 Error::DetachedTaskStopped,
372 executor.clone(),
373 );
374
375 self.fork_header_hash_handler.task.clone().start(
376 handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
377 |res| async move {
378 match res {
379 Ok(()) | Err(Error::DetachedTaskStopped) => { }
380 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
381 }
382 },
383 Error::DetachedTaskStopped,
384 executor.clone(),
385 );
386
387 self.fork_headers_handler.task.clone().start(
388 handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
389 |res| async move {
390 match res {
391 Ok(()) | Err(Error::DetachedTaskStopped) => { }
392 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
393 }
394 },
395 Error::DetachedTaskStopped,
396 executor.clone(),
397 );
398
399 self.fork_proposals_handler.task.clone().start(
400 handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
401 |res| async move {
402 match res {
403 Ok(()) | Err(Error::DetachedTaskStopped) => { }
404 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
405 }
406 },
407 Error::DetachedTaskStopped,
408 executor.clone(),
409 );
410
411 debug!(
412 target: "darkfid::proto::protocol_sync::start",
413 "Sync protocols handlers tasks started!"
414 );
415
416 Ok(())
417 }
418
419 pub async fn stop(&self) {
421 debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
422 self.tip_handler.task.stop().await;
423 self.header_handler.task.stop().await;
424 self.sync_handler.task.stop().await;
425 self.fork_sync_handler.task.stop().await;
426 self.fork_header_hash_handler.task.stop().await;
427 self.fork_headers_handler.task.stop().await;
428 self.fork_proposals_handler.task.stop().await;
429 debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
430 }
431}
432
433async fn handle_receive_tip_request(
435 handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
436 validator: ValidatorPtr,
437) -> Result<()> {
438 debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
439 loop {
440 let (channel, request) = match handler.receiver.recv().await {
442 Ok(r) => r,
443 Err(e) => {
444 debug!(
445 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
446 "recv fail: {e}"
447 );
448 continue
449 }
450 };
451
452 debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
453
454 if !*validator.synced.read().await {
456 debug!(
457 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
458 "Node still syncing blockchain"
459 );
460 handler
461 .send_action(
462 channel,
463 ProtocolGenericAction::Response(TipResponse {
464 synced: false,
465 height: None,
466 hash: None,
467 }),
468 )
469 .await;
470 continue
471 }
472
473 match validator.blockchain.blocks.contains(&request.tip) {
475 Ok(contains) => {
476 if !contains {
477 debug!(
478 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
479 "Node doesn't follow request sequence"
480 );
481 handler.send_action(channel, ProtocolGenericAction::Skip).await;
482 continue
483 }
484 }
485 Err(e) => {
486 error!(
487 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
488 "block_store.contains fail: {e}"
489 );
490 handler.send_action(channel, ProtocolGenericAction::Skip).await;
491 continue
492 }
493 }
494
495 let tip = match validator.blockchain.last() {
497 Ok(v) => v,
498 Err(e) => {
499 error!(
500 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
501 "blockchain.last fail: {e}"
502 );
503 handler.send_action(channel, ProtocolGenericAction::Skip).await;
504 continue
505 }
506 };
507
508 handler
510 .send_action(
511 channel,
512 ProtocolGenericAction::Response(TipResponse {
513 synced: true,
514 height: Some(tip.0),
515 hash: Some(tip.1),
516 }),
517 )
518 .await;
519 }
520}
521
522async fn handle_receive_header_request(
524 handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
525 validator: ValidatorPtr,
526) -> Result<()> {
527 debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
528 loop {
529 let (channel, request) = match handler.receiver.recv().await {
531 Ok(r) => r,
532 Err(e) => {
533 debug!(
534 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
535 "recv fail: {e}"
536 );
537 continue
538 }
539 };
540
541 if !*validator.synced.read().await {
543 debug!(
544 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
545 "Node still syncing blockchain, skipping..."
546 );
547 handler.send_action(channel, ProtocolGenericAction::Skip).await;
548 continue
549 }
550
551 debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
552
553 let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
555 Ok(v) => v,
556 Err(e) => {
557 error!(
558 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
559 "get_headers_before fail: {}",
560 e
561 );
562 handler.send_action(channel, ProtocolGenericAction::Skip).await;
563 continue
564 }
565 };
566
567 handler
569 .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
570 .await;
571 }
572}
573
574async fn handle_receive_request(
576 handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
577 validator: ValidatorPtr,
578) -> Result<()> {
579 debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
580 loop {
581 let (channel, request) = match handler.receiver.recv().await {
583 Ok(r) => r,
584 Err(e) => {
585 debug!(
586 target: "darkfid::proto::protocol_sync::handle_receive_request",
587 "recv fail: {e}"
588 );
589 continue
590 }
591 };
592
593 if !*validator.synced.read().await {
595 debug!(
596 target: "darkfid::proto::protocol_sync::handle_receive_request",
597 "Node still syncing blockchain, skipping..."
598 );
599 handler.send_action(channel, ProtocolGenericAction::Skip).await;
600 continue
601 }
602
603 if request.headers.len() > BATCH {
605 debug!(
606 target: "darkfid::proto::protocol_sync::handle_receive_request",
607 "Node requested more blocks than allowed."
608 );
609 handler.send_action(channel, ProtocolGenericAction::Skip).await;
610 continue
611 }
612
613 debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
614
615 let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
617 Ok(v) => v,
618 Err(e) => {
619 error!(
620 target: "darkfid::proto::protocol_sync::handle_receive_request",
621 "get_blocks_after fail: {}",
622 e
623 );
624 handler.send_action(channel, ProtocolGenericAction::Skip).await;
625 continue
626 }
627 };
628
629 handler
631 .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
632 .await;
633 }
634}
635
636async fn handle_receive_fork_request(
638 handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
639 validator: ValidatorPtr,
640) -> Result<()> {
641 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
642 loop {
643 let (channel, request) = match handler.receiver.recv().await {
645 Ok(r) => r,
646 Err(e) => {
647 debug!(
648 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
649 "recv fail: {e}"
650 );
651 continue
652 }
653 };
654
655 if !*validator.synced.read().await {
657 debug!(
658 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
659 "Node still syncing blockchain, skipping..."
660 );
661 handler.send_action(channel, ProtocolGenericAction::Skip).await;
662 continue
663 }
664
665 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
666
667 let proposals = match validator
669 .consensus
670 .get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
671 .await
672 {
673 Ok(p) => p,
674 Err(e) => {
675 debug!(
676 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
677 "Getting fork proposals failed: {}",
678 e
679 );
680 handler.send_action(channel, ProtocolGenericAction::Skip).await;
681 continue
682 }
683 };
684
685 handler
687 .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
688 .await;
689 }
690}
691
692async fn handle_receive_fork_header_hash_request(
694 handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
695 validator: ValidatorPtr,
696) -> Result<()> {
697 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
698 loop {
699 let (channel, request) = match handler.receiver.recv().await {
701 Ok(r) => r,
702 Err(e) => {
703 debug!(
704 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
705 "recv fail: {e}"
706 );
707 continue
708 }
709 };
710
711 if !*validator.synced.read().await {
713 debug!(
714 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
715 "Node still syncing blockchain, skipping..."
716 );
717 handler.send_action(channel, ProtocolGenericAction::Skip).await;
718 continue
719 }
720
721 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
722
723 let fork_header = match validator
725 .consensus
726 .get_fork_header_hash(request.height, &request.fork_header)
727 .await
728 {
729 Ok(h) => h,
730 Err(e) => {
731 debug!(
732 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
733 "Getting fork header hash failed: {}",
734 e
735 );
736 handler.send_action(channel, ProtocolGenericAction::Skip).await;
737 continue
738 }
739 };
740
741 handler
743 .send_action(
744 channel,
745 ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
746 )
747 .await;
748 }
749}
750
751async fn handle_receive_fork_headers_request(
753 handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
754 validator: ValidatorPtr,
755) -> Result<()> {
756 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
757 loop {
758 let (channel, request) = match handler.receiver.recv().await {
760 Ok(r) => r,
761 Err(e) => {
762 debug!(
763 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
764 "recv fail: {e}"
765 );
766 continue
767 }
768 };
769
770 if !*validator.synced.read().await {
772 debug!(
773 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
774 "Node still syncing blockchain, skipping..."
775 );
776 handler.send_action(channel, ProtocolGenericAction::Skip).await;
777 continue
778 }
779
780 if request.headers.len() > BATCH {
782 debug!(
783 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
784 "Node requested more headers than allowed."
785 );
786 handler.send_action(channel, ProtocolGenericAction::Skip).await;
787 continue
788 }
789
790 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
791
792 let headers = match validator
794 .consensus
795 .get_fork_headers(&request.headers, &request.fork_header)
796 .await
797 {
798 Ok(h) => h,
799 Err(e) => {
800 debug!(
801 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
802 "Getting fork headers failed: {}",
803 e
804 );
805 handler.send_action(channel, ProtocolGenericAction::Skip).await;
806 continue
807 }
808 };
809
810 handler
812 .send_action(channel, ProtocolGenericAction::Response(ForkHeadersResponse { headers }))
813 .await;
814 }
815}
816
817async fn handle_receive_fork_proposals_request(
819 handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
820 validator: ValidatorPtr,
821) -> Result<()> {
822 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
823 loop {
824 let (channel, request) = match handler.receiver.recv().await {
826 Ok(r) => r,
827 Err(e) => {
828 debug!(
829 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
830 "recv fail: {e}"
831 );
832 continue
833 }
834 };
835
836 if !*validator.synced.read().await {
838 debug!(
839 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
840 "Node still syncing blockchain, skipping..."
841 );
842 handler.send_action(channel, ProtocolGenericAction::Skip).await;
843 continue
844 }
845
846 if request.headers.len() > BATCH {
848 debug!(
849 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
850 "Node requested more proposals than allowed."
851 );
852 handler.send_action(channel, ProtocolGenericAction::Skip).await;
853 continue
854 }
855
856 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
857
858 let proposals = match validator
860 .consensus
861 .get_fork_proposals(&request.headers, &request.fork_header)
862 .await
863 {
864 Ok(p) => p,
865 Err(e) => {
866 debug!(
867 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
868 "Getting fork proposals failed: {}",
869 e
870 );
871 handler.send_action(channel, ProtocolGenericAction::Skip).await;
872 continue
873 }
874 };
875
876 handler
878 .send_action(
879 channel,
880 ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
881 )
882 .await;
883 }
884}