use std::collections::HashMap;
use darkfi::{
blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
util::encoding::base64, validator::consensus::Proposal, Error, Result,
};
use darkfi_serial::serialize_async;
use log::{debug, info, warn};
use rand::{prelude::SliceRandom, rngs::OsRng};
use tinyjson::JsonValue;
use crate::{
proto::{
ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
SyncResponse, TipRequest, TipResponse, BATCH,
},
DarkfiNodePtr,
};
pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
let block_sub = node.subscribers.get("blocks").unwrap();
let mut last = node.validator.blockchain.last()?;
if let Some(checkpoint) = checkpoint {
if checkpoint.0 > last.0 {
node.validator.blockchain.headers.remove_all_sync()?;
}
}
if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
if next.height == last.0 + 1 {
if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
last = (last_sync.height, last_sync.hash());
}
} else {
node.validator.blockchain.headers.remove_all_sync()?;
}
}
info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
let (mut common_tip_height, mut common_tip_peers) =
most_common_tip(node, &last.1, checkpoint).await;
if let Some(checkpoint) = checkpoint {
if checkpoint.0 > last.0 {
info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
(common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await;
}
}
loop {
retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
let last_received =
retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
if last == last_received {
break
}
last = last_received;
(common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await;
}
sync_best_fork(node, &common_tip_peers, &last.1).await;
let finalized = node.validator.finalization().await?;
if !finalized.is_empty() {
let mut notif_blocks = Vec::with_capacity(finalized.len());
for block in finalized {
notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
}
block_sub.notify(JsonValue::Array(notif_blocks)).await;
}
*node.validator.synced.write().await = true;
info!(target: "darkfid::task::sync_task", "Blockchain synced!");
Ok(())
}
async fn synced_peers(
node: &DarkfiNodePtr,
last_tip: &HeaderHash,
checkpoint: Option<(u32, HeaderHash)>,
) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
let mut tips = HashMap::new();
loop {
let peers = node.p2p_handler.p2p.hosts().channels();
for peer in peers {
if let Some(c) = checkpoint {
let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
continue
};
let request = HeaderSyncRequest { height: c.0 + 1 };
if let Err(e) = peer.send(&request).await {
debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
continue
};
let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
continue
};
if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
continue
}
}
let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
continue
};
let request = TipRequest { tip: *last_tip };
if let Err(e) = peer.send(&request).await {
debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
continue
};
let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
continue
};
if response.synced && response.height.is_some() && response.hash.is_some() {
let tip = (response.height.unwrap(), *response.hash.unwrap().inner());
let Some(tip_peers) = tips.get_mut(&tip) else {
tips.insert(tip, vec![peer.clone()]);
continue
};
tip_peers.push(peer.clone());
}
}
if !tips.is_empty() {
break
}
warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
let _ = subscription.receive().await;
subscription.unsubscribe().await;
info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
sleep(comms_timeout).await;
}
tips
}
async fn most_common_tip(
node: &DarkfiNodePtr,
last_tip: &HeaderHash,
checkpoint: Option<(u32, HeaderHash)>,
) -> (u32, Vec<ChannelPtr>) {
let tips = synced_peers(node, last_tip, checkpoint).await;
info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
let mut common_tip = (0, [0u8; 32], vec![]);
for (tip, peers) in tips {
if peers.len() < common_tip.2.len() {
continue;
}
if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
continue;
}
common_tip = (tip.0, tip.1, peers);
}
info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
(common_tip.0, common_tip.2)
}
async fn retrieve_headers(
node: &DarkfiNodePtr,
peers: &[ChannelPtr],
last_known: u32,
tip_height: u32,
) -> Result<()> {
info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
let mut peer_subs = vec![];
for peer in peers {
match peer.subscribe_msg::<HeaderSyncResponse>().await {
Ok(response_sub) => peer_subs.push(Some(response_sub)),
Err(e) => {
debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
peer_subs.push(None)
}
}
}
let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
let total = tip_height - last_known - 1;
let mut last_tip_height = tip_height;
'headers_loop: loop {
for (index, peer) in peers.iter().enumerate() {
let Some(ref response_sub) = peer_subs[index] else {
continue;
};
let request = HeaderSyncRequest { height: last_tip_height };
if let Err(e) = peer.send(&request).await {
debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
continue
};
let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
continue
};
let mut response_headers = response.headers.to_vec();
response_headers.retain(|h| h.height > last_known);
if response_headers.is_empty() {
break 'headers_loop
}
node.validator.blockchain.headers.insert_sync(&response_headers)?;
last_tip_height = response_headers[0].height;
info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{}", node.validator.blockchain.headers.len_sync(), total);
}
}
if node.validator.blockchain.headers.is_empty_sync() {
return Ok(());
}
info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
let mut verified_headers = 0;
let total = node.validator.blockchain.headers.len_sync();
let last_known = node.validator.consensus.best_fork_last_header().await?;
let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
node.validator.blockchain.headers.remove_all_sync()?;
return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
}
verified_headers += 1;
for (index, header) in headers[1..].iter().enumerate() {
if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
node.validator.blockchain.headers.remove_all_sync()?;
return Err(Error::BlockIsInvalid(header.hash().as_string()))
}
verified_headers += 1;
}
info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total);
let mut last_checked = headers.last().unwrap().clone();
headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
while !headers.is_empty() {
if headers[0].previous != last_checked.hash() ||
headers[0].height != last_checked.height + 1
{
node.validator.blockchain.headers.remove_all_sync()?;
return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
}
verified_headers += 1;
for (index, header) in headers[1..].iter().enumerate() {
if header.previous != headers[index].hash() ||
header.height != headers[index].height + 1
{
node.validator.blockchain.headers.remove_all_sync()?;
return Err(Error::BlockIsInvalid(header.hash().as_string()))
}
verified_headers += 1;
}
last_checked = headers.last().unwrap().clone();
headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total);
}
info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
Ok(())
}
async fn retrieve_blocks(
node: &DarkfiNodePtr,
peers: &[ChannelPtr],
last_known: (u32, HeaderHash),
block_sub: &JsonSubscriber,
checkpoint_blocks: bool,
) -> Result<(u32, HeaderHash)> {
info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
let mut last_received = last_known;
let mut peer_subs = vec![];
for peer in peers {
match peer.subscribe_msg::<SyncResponse>().await {
Ok(response_sub) => peer_subs.push(Some(response_sub)),
Err(e) => {
debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
peer_subs.push(None)
}
}
}
let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
let mut received_blocks = 0;
let total = node.validator.blockchain.headers.len_sync();
'blocks_loop: loop {
'peers_loop: for (index, peer) in peers.iter().enumerate() {
let Some(ref response_sub) = peer_subs[index] else {
continue;
};
let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
if headers.is_empty() {
break 'blocks_loop
}
let mut headers_hashes = Vec::with_capacity(headers.len());
let mut synced_headers = Vec::with_capacity(headers.len());
for header in &headers {
headers_hashes.push(header.hash());
synced_headers.push(header.height);
}
let request = SyncRequest { headers: headers_hashes.clone() };
if let Err(e) = peer.send(&request).await {
debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
continue
};
let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
continue
};
debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
received_blocks += response.blocks.len();
if checkpoint_blocks {
if let Err(e) =
node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
{
debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
continue
};
} else {
for block in &response.blocks {
if let Err(e) =
node.validator.append_proposal(&Proposal::new(block.clone())).await
{
debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
continue 'peers_loop
};
}
}
last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
node.validator.blockchain.headers.remove_sync(&synced_headers)?;
if checkpoint_blocks {
let mut notif_blocks = Vec::with_capacity(response.blocks.len());
info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
for (index, block) in response.blocks.iter().enumerate() {
info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
notif_blocks
.push(JsonValue::String(base64::encode(&serialize_async(block).await)));
}
block_sub.notify(JsonValue::Array(notif_blocks)).await;
} else {
let finalized = node.validator.finalization().await?;
if !finalized.is_empty() {
let mut notif_blocks = Vec::with_capacity(finalized.len());
for block in finalized {
notif_blocks.push(JsonValue::String(base64::encode(
&serialize_async(&block).await,
)));
}
block_sub.notify(JsonValue::Array(notif_blocks)).await;
}
}
info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {}/{}", received_blocks, total);
}
}
Ok(last_received)
}
async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
let peer = &peers.choose(&mut OsRng).unwrap();
let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
return
};
let notif_sub = node.subscribers.get("proposals").unwrap();
let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
if let Err(e) = peer.send(&request).await {
debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
return
};
let Ok(response) = response_sub
.receive_with_timeout(node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout)
.await
else {
debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
return
};
debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
for proposal in &response.proposals {
if let Err(e) = node.validator.append_proposal(proposal).await {
debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
return
};
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
notif_sub.notify(vec![enc_prop].into()).await;
}
}