1use std::sync::Arc;
20
21use async_trait::async_trait;
22use log::{debug, error};
23
24use darkfi::{
25 blockchain::{BlockInfo, Header, HeaderHash},
26 impl_p2p_message,
27 net::{
28 metering::MeteringConfiguration,
29 protocol::protocol_generic::{
30 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
31 },
32 session::SESSION_DEFAULT,
33 Message, P2pPtr,
34 },
35 system::ExecutorPtr,
36 util::time::NanoTimestamp,
37 validator::{consensus::Proposal, ValidatorPtr},
38 Error, Result,
39};
40use darkfi_serial::{SerialDecodable, SerialEncodable};
41
42pub const BATCH: usize = 20;
44
45const PROTOCOL_SYNC_METERING_CONFIGURATION: MeteringConfiguration = MeteringConfiguration {
52 threshold: 20,
53 sleep_step: 500,
54 expiry_time: NanoTimestamp::from_secs(5),
55};
56
57#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
61pub struct TipRequest {
62 pub tip: HeaderHash,
64}
65
66impl_p2p_message!(TipRequest, "tiprequest", 32, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
67
68#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
72pub struct TipResponse {
73 pub synced: bool,
75 pub height: Option<u32>,
77 pub hash: Option<HeaderHash>,
79}
80
81impl_p2p_message!(TipResponse, "tipresponse", 39, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
82
83#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
86pub struct HeaderSyncRequest {
87 pub height: u32,
89}
90
91impl_p2p_message!(
92 HeaderSyncRequest,
93 "headersyncrequest",
94 4,
95 1,
96 PROTOCOL_SYNC_METERING_CONFIGURATION
97);
98
99#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
102pub struct HeaderSyncResponse {
103 pub headers: Vec<Header>,
105}
106
107impl_p2p_message!(
108 HeaderSyncResponse,
109 "headersyncresponse",
110 8192, 1,
112 PROTOCOL_SYNC_METERING_CONFIGURATION
113);
114
115#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
118pub struct SyncRequest {
119 pub headers: Vec<HeaderHash>,
121}
122
123impl_p2p_message!(SyncRequest, "syncrequest", 641, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
124
125#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
128pub struct SyncResponse {
129 pub blocks: Vec<BlockInfo>,
131}
132
133impl_p2p_message!(SyncResponse, "syncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
134
135#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
141pub struct ForkSyncRequest {
142 pub tip: HeaderHash,
144 pub fork_tip: Option<HeaderHash>,
146}
147
148impl_p2p_message!(ForkSyncRequest, "forksyncrequest", 65, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
149
150#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
153pub struct ForkSyncResponse {
154 pub proposals: Vec<Proposal>,
156}
157
158impl_p2p_message!(ForkSyncResponse, "forksyncresponse", 0, 1, PROTOCOL_SYNC_METERING_CONFIGURATION);
159
160#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
163pub struct ForkHeaderHashRequest {
164 pub height: u32,
166 pub fork_header: HeaderHash,
168}
169
170impl_p2p_message!(
171 ForkHeaderHashRequest,
172 "forkheaderhashrequest",
173 36,
174 1,
175 PROTOCOL_SYNC_METERING_CONFIGURATION
176);
177
178#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
181pub struct ForkHeaderHashResponse {
182 pub fork_header: Option<HeaderHash>,
184}
185
186impl_p2p_message!(
187 ForkHeaderHashResponse,
188 "forkheaderhashresponse",
189 33,
190 1,
191 PROTOCOL_SYNC_METERING_CONFIGURATION
192);
193
194#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
198pub struct ForkHeadersRequest {
199 pub headers: Vec<HeaderHash>,
201 pub fork_header: HeaderHash,
203}
204
205impl_p2p_message!(
206 ForkHeadersRequest,
207 "forkheadersrequest",
208 673,
209 1,
210 PROTOCOL_SYNC_METERING_CONFIGURATION
211);
212
213#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
216pub struct ForkHeadersResponse {
217 pub headers: Vec<Header>,
219}
220
221impl_p2p_message!(
222 ForkHeadersResponse,
223 "forkheadersresponse",
224 8192, 1,
226 PROTOCOL_SYNC_METERING_CONFIGURATION
227);
228
229#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
233pub struct ForkProposalsRequest {
234 pub headers: Vec<HeaderHash>,
236 pub fork_header: HeaderHash,
238}
239
240impl_p2p_message!(
241 ForkProposalsRequest,
242 "forkproposalsrequest",
243 673,
244 1,
245 PROTOCOL_SYNC_METERING_CONFIGURATION
246);
247
248#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
251pub struct ForkProposalsResponse {
252 pub proposals: Vec<Proposal>,
254}
255
256impl_p2p_message!(
257 ForkProposalsResponse,
258 "forkproposalsresponse",
259 0,
260 1,
261 PROTOCOL_SYNC_METERING_CONFIGURATION
262);
263
264pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
266
267pub struct ProtocolSyncHandler {
269 tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
271 header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
273 sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
275 fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
277 fork_header_hash_handler:
279 ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
280 fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
282 fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
284}
285
286impl ProtocolSyncHandler {
287 pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
290 debug!(
291 target: "darkfid::proto::protocol_sync::init",
292 "Adding all sync protocols to the protocol registry"
293 );
294
295 let tip_handler =
296 ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
297 let header_handler =
298 ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
299 let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
300 let fork_sync_handler =
301 ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
302 let fork_header_hash_handler =
303 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
304 let fork_headers_handler =
305 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
306 let fork_proposals_handler =
307 ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
308
309 Arc::new(Self {
310 tip_handler,
311 header_handler,
312 sync_handler,
313 fork_sync_handler,
314 fork_header_hash_handler,
315 fork_headers_handler,
316 fork_proposals_handler,
317 })
318 }
319
320 pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
322 debug!(
323 target: "darkfid::proto::protocol_sync::start",
324 "Starting sync protocols handlers tasks..."
325 );
326
327 self.tip_handler.task.clone().start(
328 handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
329 |res| async move {
330 match res {
331 Ok(()) | Err(Error::DetachedTaskStopped) => { }
332 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
333 }
334 },
335 Error::DetachedTaskStopped,
336 executor.clone(),
337 );
338
339 self.header_handler.task.clone().start(
340 handle_receive_header_request(self.header_handler.clone(), validator.clone()),
341 |res| async move {
342 match res {
343 Ok(()) | Err(Error::DetachedTaskStopped) => { }
344 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
345 }
346 },
347 Error::DetachedTaskStopped,
348 executor.clone(),
349 );
350
351 self.sync_handler.task.clone().start(
352 handle_receive_request(self.sync_handler.clone(), validator.clone()),
353 |res| async move {
354 match res {
355 Ok(()) | Err(Error::DetachedTaskStopped) => { }
356 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
357 }
358 },
359 Error::DetachedTaskStopped,
360 executor.clone(),
361 );
362
363 self.fork_sync_handler.task.clone().start(
364 handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
365 |res| async move {
366 match res {
367 Ok(()) | Err(Error::DetachedTaskStopped) => { }
368 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
369 }
370 },
371 Error::DetachedTaskStopped,
372 executor.clone(),
373 );
374
375 self.fork_header_hash_handler.task.clone().start(
376 handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
377 |res| async move {
378 match res {
379 Ok(()) | Err(Error::DetachedTaskStopped) => { }
380 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
381 }
382 },
383 Error::DetachedTaskStopped,
384 executor.clone(),
385 );
386
387 self.fork_headers_handler.task.clone().start(
388 handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
389 |res| async move {
390 match res {
391 Ok(()) | Err(Error::DetachedTaskStopped) => { }
392 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
393 }
394 },
395 Error::DetachedTaskStopped,
396 executor.clone(),
397 );
398
399 self.fork_proposals_handler.task.clone().start(
400 handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
401 |res| async move {
402 match res {
403 Ok(()) | Err(Error::DetachedTaskStopped) => { }
404 Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
405 }
406 },
407 Error::DetachedTaskStopped,
408 executor.clone(),
409 );
410
411 debug!(
412 target: "darkfid::proto::protocol_sync::start",
413 "Sync protocols handlers tasks started!"
414 );
415
416 Ok(())
417 }
418
419 pub async fn stop(&self) {
421 debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
422 self.tip_handler.task.stop().await;
423 self.header_handler.task.stop().await;
424 self.sync_handler.task.stop().await;
425 self.fork_sync_handler.task.stop().await;
426 self.fork_header_hash_handler.task.stop().await;
427 self.fork_headers_handler.task.stop().await;
428 self.fork_proposals_handler.task.stop().await;
429 debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
430 }
431}
432
433async fn handle_receive_tip_request(
435 handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
436 validator: ValidatorPtr,
437) -> Result<()> {
438 debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
439 loop {
440 let (channel, request) = match handler.receiver.recv().await {
442 Ok(r) => r,
443 Err(e) => {
444 debug!(
445 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
446 "recv fail: {e}"
447 );
448 continue
449 }
450 };
451
452 debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
453
454 if !*validator.synced.read().await {
456 debug!(
457 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
458 "Node still syncing blockchain"
459 );
460 handler
461 .send_action(
462 channel,
463 ProtocolGenericAction::Response(TipResponse {
464 synced: false,
465 height: None,
466 hash: None,
467 }),
468 )
469 .await;
470 continue
471 }
472
473 match validator.blockchain.blocks.contains(&request.tip) {
475 Ok(contains) => {
476 if !contains {
477 debug!(
478 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
479 "Node doesn't follow request sequence"
480 );
481 handler
482 .send_action(
483 channel,
484 ProtocolGenericAction::Response(TipResponse {
485 synced: true,
486 height: None,
487 hash: None,
488 }),
489 )
490 .await;
491 continue
492 }
493 }
494 Err(e) => {
495 error!(
496 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
497 "block_store.contains fail: {e}"
498 );
499 handler.send_action(channel, ProtocolGenericAction::Skip).await;
500 continue
501 }
502 }
503
504 let tip = match validator.blockchain.last() {
506 Ok(v) => v,
507 Err(e) => {
508 error!(
509 target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
510 "blockchain.last fail: {e}"
511 );
512 handler.send_action(channel, ProtocolGenericAction::Skip).await;
513 continue
514 }
515 };
516
517 handler
519 .send_action(
520 channel,
521 ProtocolGenericAction::Response(TipResponse {
522 synced: true,
523 height: Some(tip.0),
524 hash: Some(tip.1),
525 }),
526 )
527 .await;
528 }
529}
530
531async fn handle_receive_header_request(
533 handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
534 validator: ValidatorPtr,
535) -> Result<()> {
536 debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
537 loop {
538 let (channel, request) = match handler.receiver.recv().await {
540 Ok(r) => r,
541 Err(e) => {
542 debug!(
543 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
544 "recv fail: {e}"
545 );
546 continue
547 }
548 };
549
550 if !*validator.synced.read().await {
552 debug!(
553 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
554 "Node still syncing blockchain, skipping..."
555 );
556 handler.send_action(channel, ProtocolGenericAction::Skip).await;
557 continue
558 }
559
560 debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
561
562 let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
564 Ok(v) => v,
565 Err(e) => {
566 error!(
567 target: "darkfid::proto::protocol_sync::handle_receive_header_request",
568 "get_headers_before fail: {e}"
569 );
570 handler.send_action(channel, ProtocolGenericAction::Skip).await;
571 continue
572 }
573 };
574
575 handler
577 .send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
578 .await;
579 }
580}
581
582async fn handle_receive_request(
584 handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
585 validator: ValidatorPtr,
586) -> Result<()> {
587 debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
588 loop {
589 let (channel, request) = match handler.receiver.recv().await {
591 Ok(r) => r,
592 Err(e) => {
593 debug!(
594 target: "darkfid::proto::protocol_sync::handle_receive_request",
595 "recv fail: {e}"
596 );
597 continue
598 }
599 };
600
601 if !*validator.synced.read().await {
603 debug!(
604 target: "darkfid::proto::protocol_sync::handle_receive_request",
605 "Node still syncing blockchain, skipping..."
606 );
607 handler.send_action(channel, ProtocolGenericAction::Skip).await;
608 continue
609 }
610
611 if request.headers.len() > BATCH {
613 debug!(
614 target: "darkfid::proto::protocol_sync::handle_receive_request",
615 "Node requested more blocks than allowed."
616 );
617 handler.send_action(channel, ProtocolGenericAction::Skip).await;
618 continue
619 }
620
621 debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
622
623 let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
625 Ok(v) => v,
626 Err(e) => {
627 error!(
628 target: "darkfid::proto::protocol_sync::handle_receive_request",
629 "get_blocks_after fail: {e}"
630 );
631 handler.send_action(channel, ProtocolGenericAction::Skip).await;
632 continue
633 }
634 };
635
636 handler
638 .send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
639 .await;
640 }
641}
642
643async fn handle_receive_fork_request(
645 handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
646 validator: ValidatorPtr,
647) -> Result<()> {
648 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
649 loop {
650 let (channel, request) = match handler.receiver.recv().await {
652 Ok(r) => r,
653 Err(e) => {
654 debug!(
655 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
656 "recv fail: {e}"
657 );
658 continue
659 }
660 };
661
662 if !*validator.synced.read().await {
664 debug!(
665 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
666 "Node still syncing blockchain, skipping..."
667 );
668 handler.send_action(channel, ProtocolGenericAction::Skip).await;
669 continue
670 }
671
672 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
673
674 let proposals = match validator
676 .consensus
677 .get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
678 .await
679 {
680 Ok(p) => p,
681 Err(e) => {
682 debug!(
683 target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
684 "Getting fork proposals failed: {e}"
685 );
686 handler.send_action(channel, ProtocolGenericAction::Skip).await;
687 continue
688 }
689 };
690
691 handler
693 .send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
694 .await;
695 }
696}
697
698async fn handle_receive_fork_header_hash_request(
700 handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
701 validator: ValidatorPtr,
702) -> Result<()> {
703 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
704 loop {
705 let (channel, request) = match handler.receiver.recv().await {
707 Ok(r) => r,
708 Err(e) => {
709 debug!(
710 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
711 "recv fail: {e}"
712 );
713 continue
714 }
715 };
716
717 if !*validator.synced.read().await {
719 debug!(
720 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
721 "Node still syncing blockchain, skipping..."
722 );
723 handler.send_action(channel, ProtocolGenericAction::Skip).await;
724 continue
725 }
726
727 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
728
729 let fork_header = match validator
731 .consensus
732 .get_fork_header_hash(request.height, &request.fork_header)
733 .await
734 {
735 Ok(h) => h,
736 Err(e) => {
737 debug!(
738 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
739 "Getting fork header hash failed: {e}"
740 );
741 handler.send_action(channel, ProtocolGenericAction::Skip).await;
742 continue
743 }
744 };
745
746 if fork_header.is_some() {
748 handler
749 .send_action(
750 channel,
751 ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
752 )
753 .await;
754 continue
755 }
756
757 if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
759 debug!(
760 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
761 "Getting fork header hash failed: {e}"
762 );
763 handler.send_action(channel, ProtocolGenericAction::Skip).await;
764 continue
765 };
766
767 let response = match validator.blockchain.blocks.get_order(&[request.height], false) {
768 Ok(h) => ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header: h[0] }),
769 Err(e) => {
770 debug!(
771 target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
772 "Getting fork header hash failed: {e}"
773 );
774 ProtocolGenericAction::Skip
775 }
776 };
777
778 handler.send_action(channel, response).await;
780 }
781}
782
783async fn handle_receive_fork_headers_request(
785 handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
786 validator: ValidatorPtr,
787) -> Result<()> {
788 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
789 loop {
790 let (channel, request) = match handler.receiver.recv().await {
792 Ok(r) => r,
793 Err(e) => {
794 debug!(
795 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
796 "recv fail: {e}"
797 );
798 continue
799 }
800 };
801
802 if !*validator.synced.read().await {
804 debug!(
805 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
806 "Node still syncing blockchain, skipping..."
807 );
808 handler.send_action(channel, ProtocolGenericAction::Skip).await;
809 continue
810 }
811
812 if request.headers.len() > BATCH {
814 debug!(
815 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
816 "Node requested more headers than allowed."
817 );
818 handler.send_action(channel, ProtocolGenericAction::Skip).await;
819 continue
820 }
821
822 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
823
824 let headers = match validator
826 .consensus
827 .get_fork_headers(&request.headers, &request.fork_header)
828 .await
829 {
830 Ok(h) => h,
831 Err(e) => {
832 debug!(
833 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
834 "Getting fork headers failed: {e}"
835 );
836 handler.send_action(channel, ProtocolGenericAction::Skip).await;
837 continue
838 }
839 };
840
841 if !headers.is_empty() {
843 handler
844 .send_action(
845 channel,
846 ProtocolGenericAction::Response(ForkHeadersResponse { headers }),
847 )
848 .await;
849 continue
850 }
851
852 if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
854 debug!(
855 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
856 "Getting fork header hash failed: {e}"
857 );
858 handler.send_action(channel, ProtocolGenericAction::Skip).await;
859 continue
860 };
861
862 let response = match validator.blockchain.headers.get(&request.headers, true) {
863 Ok(h) => ProtocolGenericAction::Response(ForkHeadersResponse {
864 headers: h.iter().map(|x| x.clone().unwrap()).collect(),
865 }),
866 Err(e) => {
867 debug!(
868 target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
869 "Getting fork headers failed: {e}"
870 );
871 ProtocolGenericAction::Skip
872 }
873 };
874
875 handler.send_action(channel, response).await;
877 }
878}
879
880async fn handle_receive_fork_proposals_request(
882 handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
883 validator: ValidatorPtr,
884) -> Result<()> {
885 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
886 loop {
887 let (channel, request) = match handler.receiver.recv().await {
889 Ok(r) => r,
890 Err(e) => {
891 debug!(
892 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
893 "recv fail: {e}"
894 );
895 continue
896 }
897 };
898
899 if !*validator.synced.read().await {
901 debug!(
902 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
903 "Node still syncing blockchain, skipping..."
904 );
905 handler.send_action(channel, ProtocolGenericAction::Skip).await;
906 continue
907 }
908
909 if request.headers.len() > BATCH {
911 debug!(
912 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
913 "Node requested more proposals than allowed."
914 );
915 handler.send_action(channel, ProtocolGenericAction::Skip).await;
916 continue
917 }
918
919 debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
920
921 let proposals = match validator
923 .consensus
924 .get_fork_proposals(&request.headers, &request.fork_header)
925 .await
926 {
927 Ok(p) => p,
928 Err(e) => {
929 debug!(
930 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
931 "Getting fork proposals failed: {e}"
932 );
933 handler.send_action(channel, ProtocolGenericAction::Skip).await;
934 continue
935 }
936 };
937
938 if !proposals.is_empty() {
940 handler
941 .send_action(
942 channel,
943 ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
944 )
945 .await;
946 continue
947 }
948
949 if let Err(e) = validator.blockchain.headers.get(&[request.fork_header], true) {
951 debug!(
952 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
953 "Getting fork header hash failed: {e}"
954 );
955 handler.send_action(channel, ProtocolGenericAction::Skip).await;
956 continue
957 };
958
959 let response = match validator.blockchain.get_blocks_by_hash(&request.headers) {
960 Ok(blocks) => {
961 let mut proposals = Vec::with_capacity(blocks.len());
962 for block in blocks {
963 proposals.push(Proposal::new(block));
964 }
965 ProtocolGenericAction::Response(ForkProposalsResponse { proposals })
966 }
967 Err(e) => {
968 debug!(
969 target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
970 "Getting fork proposals failed: {e}"
971 );
972 ProtocolGenericAction::Skip
973 }
974 };
975
976 handler.send_action(channel, response).await;
978 }
979}