use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error};
use darkfi::{
blockchain::{BlockInfo, Header, HeaderHash},
impl_p2p_message,
net::{
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
Message, P2pPtr,
},
system::ExecutorPtr,
validator::{consensus::Proposal, ValidatorPtr},
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
pub const BATCH: usize = 20;
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct TipRequest {
pub tip: HeaderHash,
}
impl_p2p_message!(TipRequest, "tiprequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct TipResponse {
pub synced: bool,
pub height: Option<u32>,
pub hash: Option<HeaderHash>,
}
impl_p2p_message!(TipResponse, "tipresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncRequest {
pub height: u32,
}
impl_p2p_message!(HeaderSyncRequest, "headersyncrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncResponse {
pub headers: Vec<Header>,
}
impl_p2p_message!(HeaderSyncResponse, "headersyncresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct SyncRequest {
pub headers: Vec<HeaderHash>,
}
impl_p2p_message!(SyncRequest, "syncrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct SyncResponse {
pub blocks: Vec<BlockInfo>,
}
impl_p2p_message!(SyncResponse, "syncresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncRequest {
pub tip: HeaderHash,
pub fork_tip: Option<HeaderHash>,
}
impl_p2p_message!(ForkSyncRequest, "forksyncrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncResponse {
pub proposals: Vec<Proposal>,
}
impl_p2p_message!(ForkSyncResponse, "forksyncresponse");
pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
pub struct ProtocolSyncHandler {
tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
}
impl ProtocolSyncHandler {
pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
debug!(
target: "darkfid::proto::protocol_sync::init",
"Adding all sync protocols to the protocol registry"
);
let tip_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
let header_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
let fork_sync_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
Arc::new(Self { tip_handler, header_handler, sync_handler, fork_sync_handler })
}
pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
debug!(
target: "darkfid::proto::protocol_sync::start",
"Starting sync protocols handlers tasks..."
);
self.tip_handler.task.clone().start(
handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.header_handler.task.clone().start(
handle_receive_header_request(self.header_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.sync_handler.task.clone().start(
handle_receive_request(self.sync_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.fork_sync_handler.task.clone().start(
handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
debug!(
target: "darkfid::proto::protocol_sync::start",
"Sync protocols handlers tasks started!"
);
Ok(())
}
pub async fn stop(&self) {
debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
self.tip_handler.task.stop().await;
self.header_handler.task.stop().await;
self.sync_handler.task.stop().await;
self.fork_sync_handler.task.stop().await;
debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
}
}
async fn handle_receive_tip_request(
handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"recv fail: {e}"
);
continue
}
};
let response = if !*validator.synced.read().await {
TipResponse { synced: false, height: None, hash: None }
} else {
match validator.blockchain.blocks.contains(&request.tip) {
Ok(contains) => {
if !contains {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node doesn't follow request sequence"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
}
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"block_store.contains fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
}
let tip = match validator.blockchain.last() {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"blockchain.last fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
TipResponse { synced: true, height: Some(tip.0), hash: Some(tip.1) }
};
handler.send_action(channel, ProtocolGenericAction::Response(response)).await;
}
}
async fn handle_receive_header_request(
handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"get_headers_before fail: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
.await;
}
}
async fn handle_receive_request(
handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node requested more blocks than allowed."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"get_blocks_after fail: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
.await;
}
}
async fn handle_receive_fork_request(
handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
let proposals = match request.fork_tip {
Some(fork_tip) => {
validator.consensus.get_fork_proposals(request.tip, fork_tip, BATCH as u32).await
}
None => validator.consensus.get_best_fork_proposals(request.tip, BATCH as u32).await,
};
let proposals = match proposals {
Ok(p) => p,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Getting fork proposals failed: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
.await;
}
}