use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error};
use darkfi::{
blockchain::{BlockInfo, Header, HeaderHash},
impl_p2p_message,
net::{
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
Message, P2pPtr,
},
system::ExecutorPtr,
validator::{consensus::Proposal, ValidatorPtr},
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
pub const BATCH: usize = 20;
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct TipRequest {
pub tip: HeaderHash,
}
impl_p2p_message!(TipRequest, "tiprequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct TipResponse {
pub synced: bool,
pub height: Option<u32>,
pub hash: Option<HeaderHash>,
}
impl_p2p_message!(TipResponse, "tipresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncRequest {
pub height: u32,
}
impl_p2p_message!(HeaderSyncRequest, "headersyncrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncResponse {
pub headers: Vec<Header>,
}
impl_p2p_message!(HeaderSyncResponse, "headersyncresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct SyncRequest {
pub headers: Vec<HeaderHash>,
}
impl_p2p_message!(SyncRequest, "syncrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct SyncResponse {
pub blocks: Vec<BlockInfo>,
}
impl_p2p_message!(SyncResponse, "syncresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncRequest {
pub tip: HeaderHash,
pub fork_tip: Option<HeaderHash>,
}
impl_p2p_message!(ForkSyncRequest, "forksyncrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncResponse {
pub proposals: Vec<Proposal>,
}
impl_p2p_message!(ForkSyncResponse, "forksyncresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkHeaderHashRequest {
pub height: u32,
pub fork_header: HeaderHash,
}
impl_p2p_message!(ForkHeaderHashRequest, "forkheaderhashrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkHeaderHashResponse {
pub fork_header: Option<HeaderHash>,
}
impl_p2p_message!(ForkHeaderHashResponse, "forkheaderhashresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkHeadersRequest {
pub headers: Vec<HeaderHash>,
pub fork_header: HeaderHash,
}
impl_p2p_message!(ForkHeadersRequest, "forkheadersrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkHeadersResponse {
pub headers: Vec<Header>,
}
impl_p2p_message!(ForkHeadersResponse, "forkheadersresponse");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkProposalsRequest {
pub headers: Vec<HeaderHash>,
pub fork_header: HeaderHash,
}
impl_p2p_message!(ForkProposalsRequest, "forkproposalsrequest");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ForkProposalsResponse {
pub proposals: Vec<Proposal>,
}
impl_p2p_message!(ForkProposalsResponse, "forkproposalsresponse");
pub type ProtocolSyncHandlerPtr = Arc<ProtocolSyncHandler>;
pub struct ProtocolSyncHandler {
tip_handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
header_handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
sync_handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
fork_sync_handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
fork_header_hash_handler:
ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
fork_headers_handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
fork_proposals_handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
}
impl ProtocolSyncHandler {
pub async fn init(p2p: &P2pPtr) -> ProtocolSyncHandlerPtr {
debug!(
target: "darkfid::proto::protocol_sync::init",
"Adding all sync protocols to the protocol registry"
);
let tip_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncTip", SESSION_DEFAULT).await;
let header_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncHeader", SESSION_DEFAULT).await;
let sync_handler = ProtocolGenericHandler::new(p2p, "ProtocolSync", SESSION_DEFAULT).await;
let fork_sync_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncFork", SESSION_DEFAULT).await;
let fork_header_hash_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaderHash", SESSION_DEFAULT).await;
let fork_headers_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncForkHeaders", SESSION_DEFAULT).await;
let fork_proposals_handler =
ProtocolGenericHandler::new(p2p, "ProtocolSyncForkProposals", SESSION_DEFAULT).await;
Arc::new(Self {
tip_handler,
header_handler,
sync_handler,
fork_sync_handler,
fork_header_hash_handler,
fork_headers_handler,
fork_proposals_handler,
})
}
pub async fn start(&self, executor: &ExecutorPtr, validator: &ValidatorPtr) -> Result<()> {
debug!(
target: "darkfid::proto::protocol_sync::start",
"Starting sync protocols handlers tasks..."
);
self.tip_handler.task.clone().start(
handle_receive_tip_request(self.tip_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncTip handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.header_handler.task.clone().start(
handle_receive_header_request(self.header_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncHeader handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.sync_handler.task.clone().start(
handle_receive_request(self.sync_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSync handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.fork_sync_handler.task.clone().start(
handle_receive_fork_request(self.fork_sync_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncFork handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.fork_header_hash_handler.task.clone().start(
handle_receive_fork_header_hash_request(self.fork_header_hash_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaderHash handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.fork_headers_handler.task.clone().start(
handle_receive_fork_headers_request(self.fork_headers_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkHeaders handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
self.fork_proposals_handler.task.clone().start(
handle_receive_fork_proposals_request(self.fork_proposals_handler.clone(), validator.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_sync::start", "Failed starting ProtocolSyncForkProposals handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
debug!(
target: "darkfid::proto::protocol_sync::start",
"Sync protocols handlers tasks started!"
);
Ok(())
}
pub async fn stop(&self) {
debug!(target: "darkfid::proto::protocol_sync::stop", "Terminating sync protocols handlers tasks...");
self.tip_handler.task.stop().await;
self.header_handler.task.stop().await;
self.sync_handler.task.stop().await;
self.fork_sync_handler.task.stop().await;
self.fork_header_hash_handler.task.stop().await;
self.fork_headers_handler.task.stop().await;
self.fork_proposals_handler.task.stop().await;
debug!(target: "darkfid::proto::protocol_sync::stop", "Sync protocols handlers tasks terminated!");
}
}
async fn handle_receive_tip_request(
handler: ProtocolGenericHandlerPtr<TipRequest, TipResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"recv fail: {e}"
);
continue
}
};
debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "Received request: {request:?}");
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node still syncing blockchain"
);
handler
.send_action(
channel,
ProtocolGenericAction::Response(TipResponse {
synced: false,
height: None,
hash: None,
}),
)
.await;
continue
}
match validator.blockchain.blocks.contains(&request.tip) {
Ok(contains) => {
if !contains {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node doesn't follow request sequence"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
}
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"block_store.contains fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
}
let tip = match validator.blockchain.last() {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"blockchain.last fail: {e}"
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(
channel,
ProtocolGenericAction::Response(TipResponse {
synced: true,
height: Some(tip.0),
hash: Some(tip.1),
}),
)
.await;
}
}
async fn handle_receive_header_request(
handler: ProtocolGenericHandlerPtr<HeaderSyncRequest, HeaderSyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "Received request: {request:?}");
let headers = match validator.blockchain.get_headers_before(request.height, BATCH) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"get_headers_before fail: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(HeaderSyncResponse { headers }))
.await;
}
}
async fn handle_receive_request(
handler: ProtocolGenericHandlerPtr<SyncRequest, SyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node requested more blocks than allowed."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "Received request: {request:?}");
let blocks = match validator.blockchain.get_blocks_by_hash(&request.headers) {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"get_blocks_after fail: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(SyncResponse { blocks }))
.await;
}
}
async fn handle_receive_fork_request(
handler: ProtocolGenericHandlerPtr<ForkSyncRequest, ForkSyncResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_request", "Received request: {request:?}");
let proposals = match validator
.consensus
.get_fork_proposals_after(request.tip, request.fork_tip, BATCH as u32)
.await
{
Ok(p) => p,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_request",
"Getting fork proposals failed: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(ForkSyncResponse { proposals }))
.await;
}
}
async fn handle_receive_fork_header_hash_request(
handler: ProtocolGenericHandlerPtr<ForkHeaderHashRequest, ForkHeaderHashResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request", "Received request: {request:?}");
let fork_header = match validator
.consensus
.get_fork_header_hash(request.height, &request.fork_header)
.await
{
Ok(h) => h,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_header_hash_request",
"Getting fork header hash failed: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(
channel,
ProtocolGenericAction::Response(ForkHeaderHashResponse { fork_header }),
)
.await;
}
}
async fn handle_receive_fork_headers_request(
handler: ProtocolGenericHandlerPtr<ForkHeadersRequest, ForkHeadersResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
"Node requested more headers than allowed."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request", "Received request: {request:?}");
let headers = match validator
.consensus
.get_fork_headers(&request.headers, &request.fork_header)
.await
{
Ok(h) => h,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_headers_request",
"Getting fork headers failed: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(channel, ProtocolGenericAction::Response(ForkHeadersResponse { headers }))
.await;
}
}
async fn handle_receive_fork_proposals_request(
handler: ProtocolGenericHandlerPtr<ForkProposalsRequest, ForkProposalsResponse>,
validator: ValidatorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "START");
loop {
let (channel, request) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
"Node requested more proposals than allowed."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
debug!(target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request", "Received request: {request:?}");
let proposals = match validator
.consensus
.get_fork_proposals(&request.headers, &request.fork_header)
.await
{
Ok(p) => p,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_fork_proposals_request",
"Getting fork proposals failed: {}",
e
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
};
handler
.send_action(
channel,
ProtocolGenericAction::Response(ForkProposalsResponse { proposals }),
)
.await;
}
}