use std::{collections::HashSet, sync::Arc};
use async_trait::async_trait;
use log::{debug, error};
use smol::lock::RwLock;
use tinyjson::JsonValue;
use darkfi::{
impl_p2p_message,
net::{
protocol::protocol_generic::{
ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
},
session::SESSION_DEFAULT,
Message, P2pPtr,
},
rpc::jsonrpc::JsonSubscriber,
system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
util::encoding::base64,
validator::{consensus::Proposal, ValidatorPtr},
Error, Result,
};
use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
use crate::task::handle_unknown_proposal;
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
pub struct ProposalMessage(pub Proposal);
impl_p2p_message!(ProposalMessage, "proposal");
pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
pub struct ProtocolProposalHandler {
handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
tasks: Arc<RwLock<HashSet<StoppableTaskPtr>>>,
}
impl ProtocolProposalHandler {
pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
debug!(
target: "darkfid::proto::protocol_proposal::init",
"Adding ProtocolProposal to the protocol registry"
);
let handler = ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
let tasks = Arc::new(RwLock::new(HashSet::new()));
Arc::new(Self { handler, tasks })
}
pub async fn start(
&self,
executor: &ExecutorPtr,
validator: &ValidatorPtr,
p2p: &P2pPtr,
proposals_sub: JsonSubscriber,
blocks_sub: JsonSubscriber,
) -> Result<()> {
debug!(
target: "darkfid::proto::protocol_proposal::start",
"Starting ProtocolProposal handler task..."
);
self.handler.task.clone().start(
handle_receive_proposal(self.handler.clone(), self.tasks.clone(), validator.clone(), p2p.clone(), proposals_sub, blocks_sub, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
debug!(
target: "darkfid::proto::protocol_proposal::start",
"ProtocolProposal handler task started!"
);
Ok(())
}
pub async fn stop(&self) {
debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
self.handler.task.stop().await;
let mut tasks = self.tasks.write().await;
for task in tasks.iter() {
task.stop().await;
}
*tasks = HashSet::new();
drop(tasks);
debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
}
}
async fn handle_receive_proposal(
handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
tasks: Arc<RwLock<HashSet<StoppableTaskPtr>>>,
validator: ValidatorPtr,
p2p: P2pPtr,
proposals_sub: JsonSubscriber,
blocks_sub: JsonSubscriber,
executor: ExecutorPtr,
) -> Result<()> {
debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
loop {
let (channel, proposal) = match handler.receiver.recv().await {
Ok(r) => r,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"recv fail: {e}"
);
continue
}
};
if !*validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"Node still syncing blockchain, skipping..."
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
continue
}
match validator.append_proposal(&proposal.0).await {
Ok(()) => {
handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
proposals_sub.notify(vec![enc_prop].into()).await;
continue
}
Err(e) => {
debug!(
target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
"append_proposal fail: {e}",
);
handler.send_action(channel, ProtocolGenericAction::Skip).await;
match e {
Error::ExtendedChainIndexNotFound => { }
_ => continue,
}
}
};
let task = StoppableTask::new();
let _tasks = tasks.clone();
let _task = task.clone();
task.clone().start(
handle_unknown_proposal(validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub.clone(), channel, proposal.0),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { _tasks.write().await.remove(&_task); }
Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
tasks.write().await.insert(task);
}
}