use log::{debug, error, info, warn};
use tinyjson::JsonValue;
use darkfi::{
blockchain::BlockDifficulty,
net::{ChannelPtr, P2pPtr},
rpc::jsonrpc::JsonSubscriber,
util::encoding::base64,
validator::{
consensus::{Fork, Proposal},
pow::PoWModule,
utils::{best_fork_index, header_rank},
verification::verify_fork_proposal,
ValidatorPtr,
},
Error, Result,
};
use darkfi_serial::serialize_async;
use crate::proto::{
ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
ProposalMessage, BATCH,
};
pub async fn handle_unknown_proposal(
validator: ValidatorPtr,
p2p: P2pPtr,
proposals_sub: JsonSubscriber,
blocks_sub: JsonSubscriber,
channel: u32,
proposal: Proposal,
) -> Result<()> {
debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
let Some(channel) = p2p.get_channel(channel) else {
error!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
return Ok(())
};
let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
error!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
return Ok(())
};
let last = match validator.blockchain.last() {
Ok(l) => l,
Err(e) => {
debug!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
return Ok(())
}
};
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
if let Err(e) = channel.send(&request).await {
debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
return Ok(())
};
let response = match response_sub
.receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
return Ok(())
}
};
debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
if response.proposals.is_empty() {
warn!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
}
if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
}
if response.proposals[0].block.header.previous != last.1 {
debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
}
if response.proposals.last().unwrap().hash != proposal.hash {
debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
}
for proposal in &response.proposals {
match validator.append_proposal(proposal).await {
Ok(()) => { }
Err(Error::ProposalAlreadyExists) => continue,
Err(e) => {
error!(
target: "darkfid::task::handle_unknown_proposal",
"Error while appending response proposal: {e}"
);
break;
}
};
let message = ProposalMessage(proposal.clone());
p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
proposals_sub.notify(vec![enc_prop].into()).await;
}
Ok(())
}
async fn handle_reorg(
validator: ValidatorPtr,
p2p: P2pPtr,
proposals_sub: JsonSubscriber,
blocks_sub: JsonSubscriber,
channel: ChannelPtr,
proposal: Proposal,
) -> Result<()> {
info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
if proposal.block.header.height == 0 {
info!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
return Ok(())
}
let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
error!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
return Ok(())
};
let mut peer_header_hashes = vec![];
let mut previous_height = proposal.block.header.height;
let mut previous_hash = proposal.hash;
for height in (0..proposal.block.header.height).rev() {
let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
if let Err(e) = channel.send(&request).await {
debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
return Ok(())
};
let response = match response_sub
.receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
return Ok(())
}
};
debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
let Some(peer_header) = response.fork_header else {
info!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
return Ok(())
};
match validator.blockchain.blocks.get_order(&[height], false)?[0] {
Some(known_header) => {
if known_header == peer_header {
previous_height = height;
previous_hash = known_header;
break
}
peer_header_hashes.insert(0, peer_header);
}
None => peer_header_hashes.insert(0, peer_header),
}
}
if peer_header_hashes.is_empty() {
info!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
return Ok(())
}
let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
error!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
return Ok(())
};
let last_common_height = previous_height;
let last_difficulty = match previous_height {
0 => BlockDifficulty::genesis(validator.blockchain.genesis_block()?.header.timestamp),
_ => validator.blockchain.blocks.get_difficulty(&[last_common_height], true)?[0]
.clone()
.unwrap(),
};
let module = PoWModule::new(
validator.consensus.blockchain.clone(),
validator.consensus.module.read().await.target,
validator.consensus.module.read().await.fixed_difficulty.clone(),
Some(last_common_height + 1),
)?;
info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
let mut batch = Vec::with_capacity(BATCH);
let mut total_processed = 0;
let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
let mut headers_module = module.clone();
for (index, hash) in peer_header_hashes.iter().enumerate() {
batch.push(*hash);
if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
continue
}
let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
if let Err(e) = channel.send(&request).await {
debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
return Ok(())
};
let response = match response_sub
.receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
return Ok(())
}
};
debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
if response.headers.len() != batch.len() {
error!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
return Ok(())
}
for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
let peer_header_hash = peer_header.hash();
info!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
if peer_header_hash != batch[peer_header_index] {
error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
return Ok(())
}
if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
error!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
return Ok(())
}
let (next_target, next_difficulty) =
headers_module.next_mine_target_and_difficulty()?;
let (target_distance_sq, hash_distance_sq) = match header_rank(
peer_header,
&next_target,
) {
Ok(distances) => distances,
Err(e) => {
error!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
return Ok(())
}
};
targets_rank += target_distance_sq.clone();
hashes_rank += hash_distance_sq.clone();
headers_module.append(peer_header.timestamp, &next_difficulty);
previous_height = peer_header.height;
previous_hash = peer_header_hash;
}
total_processed += response.headers.len();
info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
batch = Vec::with_capacity(BATCH);
}
let forks = validator.consensus.forks.read().await;
let best_fork = &forks[best_fork_index(&forks)?];
if targets_rank < best_fork.targets_rank ||
(targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
{
info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
drop(forks);
return Ok(())
}
drop(forks);
let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
error!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
return Ok(())
};
let mut peer_fork = Fork::new(validator.consensus.blockchain.clone(), module).await?;
peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
let diffs = validator.blockchain.blocks.get_state_diffs_after(last_common_height)?;
for diff in diffs.iter().rev() {
peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(&diff.inverse())?;
}
info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
let mut batch = Vec::with_capacity(BATCH);
let mut total_processed = 0;
for (index, hash) in peer_header_hashes.iter().enumerate() {
batch.push(*hash);
if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
continue
}
let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
if let Err(e) = channel.send(&request).await {
debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
return Ok(())
};
let response = match response_sub
.receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
return Ok(())
}
};
debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
if response.proposals.len() != batch.len() {
error!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
return Ok(())
}
for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
if peer_proposal.hash != batch[peer_proposal_index] {
error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
return Ok(())
}
if let Err(e) =
verify_fork_proposal(&peer_fork, peer_proposal, validator.verify_fees).await
{
error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
return Ok(())
}
if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
return Ok(())
}
}
total_processed += response.proposals.len();
info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
batch = Vec::with_capacity(BATCH);
}
if let Err(e) = verify_fork_proposal(&peer_fork, &proposal, validator.verify_fees).await {
error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
return Ok(())
}
if let Err(e) = peer_fork.append_proposal(&proposal).await {
error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
return Ok(())
}
let mut forks = validator.consensus.forks.write().await;
let best_fork = &forks[best_fork_index(&forks)?];
if peer_fork.targets_rank < best_fork.targets_rank ||
(peer_fork.targets_rank == best_fork.targets_rank &&
peer_fork.hashes_rank <= best_fork.hashes_rank)
{
info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
drop(forks);
return Ok(())
}
info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
*forks = vec![peer_fork];
drop(forks);
let confirmed = match validator.confirmation().await {
Ok(f) => f,
Err(e) => {
error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
return Ok(())
}
};
if !confirmed.is_empty() {
let mut notif_blocks = Vec::with_capacity(confirmed.len());
for block in confirmed {
notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
}
blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
}
let message = ProposalMessage(proposal.clone());
p2p.broadcast(&message).await;
let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
proposals_sub.notify(vec![enc_prop].into()).await;
Ok(())
}