use log::{debug, error, warn};
use tinyjson::JsonValue;
use darkfi::{
net::P2pPtr,
rpc::jsonrpc::JsonSubscriber,
util::encoding::base64,
validator::{consensus::Proposal, ValidatorPtr},
Error, Result,
};
use darkfi_serial::serialize_async;
use crate::proto::{ForkSyncRequest, ForkSyncResponse, ProposalMessage};
pub async fn handle_unknown_proposal(
validator: ValidatorPtr,
p2p: P2pPtr,
subscriber: JsonSubscriber,
channel: u32,
proposal: Proposal,
) -> Result<()> {
debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
let Some(channel) = p2p.get_channel(channel) else {
error!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
return Ok(())
};
let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
error!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
return Ok(())
};
let last = match validator.blockchain.last() {
Ok(l) => l,
Err(e) => {
debug!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
return Ok(())
}
};
let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
if let Err(e) = channel.send(&request).await {
debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
return Ok(())
};
let response = match response_sub
.receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
.await
{
Ok(r) => r,
Err(e) => {
debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
return Ok(())
}
};
debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
if response.proposals.is_empty() {
warn!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
return Ok(())
}
if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
return Ok(())
}
if response.proposals[0].block.header.previous != last.1 {
debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
return Ok(())
}
if response.proposals.last().unwrap().hash != proposal.hash {
debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
return Ok(())
}
for proposal in &response.proposals {
match validator.append_proposal(proposal).await {
Ok(()) => { }
Err(Error::ProposalAlreadyExists) => continue,
Err(e) => {
error!(
target: "darkfid::task::handle_unknown_proposal",
"Error while appending response proposal: {e}"
);
break;
}
};
let message = ProposalMessage(proposal.clone());
p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
subscriber.notify(vec![enc_prop].into()).await;
}
Ok(())
}