1use std::{collections::HashSet, sync::Arc};
20
21use async_trait::async_trait;
22use log::{debug, error};
23use smol::lock::RwLock;
24use tinyjson::JsonValue;
25
26use darkfi::{
27 impl_p2p_message,
28 net::{
29 metering::MeteringConfiguration,
30 protocol::protocol_generic::{
31 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
32 },
33 session::SESSION_DEFAULT,
34 Message, P2pPtr,
35 },
36 rpc::jsonrpc::JsonSubscriber,
37 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
38 util::{encoding::base64, time::NanoTimestamp},
39 validator::{consensus::Proposal, ValidatorPtr},
40 Error, Result,
41};
42use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
43
44use crate::task::handle_unknown_proposal;
45
46#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
48pub struct ProposalMessage(pub Proposal);
49
50impl_p2p_message!(
55 ProposalMessage,
56 "proposal",
57 0,
58 1,
59 MeteringConfiguration {
60 threshold: 50,
61 sleep_step: 500,
62 expiry_time: NanoTimestamp::from_secs(5),
63 }
64);
65
66pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
68
69pub struct ProtocolProposalHandler {
71 handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
73 tasks: Arc<RwLock<HashSet<StoppableTaskPtr>>>,
75}
76
77impl ProtocolProposalHandler {
78 pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
81 debug!(
82 target: "darkfid::proto::protocol_proposal::init",
83 "Adding ProtocolProposal to the protocol registry"
84 );
85
86 let handler = ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
87 let tasks = Arc::new(RwLock::new(HashSet::new()));
88
89 Arc::new(Self { handler, tasks })
90 }
91
92 pub async fn start(
94 &self,
95 executor: &ExecutorPtr,
96 validator: &ValidatorPtr,
97 p2p: &P2pPtr,
98 proposals_sub: JsonSubscriber,
99 blocks_sub: JsonSubscriber,
100 ) -> Result<()> {
101 debug!(
102 target: "darkfid::proto::protocol_proposal::start",
103 "Starting ProtocolProposal handler task..."
104 );
105
106 self.handler.task.clone().start(
107 handle_receive_proposal(self.handler.clone(), self.tasks.clone(), validator.clone(), p2p.clone(), proposals_sub, blocks_sub, executor.clone()),
108 |res| async move {
109 match res {
110 Ok(()) | Err(Error::DetachedTaskStopped) => { }
111 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
112 }
113 },
114 Error::DetachedTaskStopped,
115 executor.clone(),
116 );
117
118 debug!(
119 target: "darkfid::proto::protocol_proposal::start",
120 "ProtocolProposal handler task started!"
121 );
122
123 Ok(())
124 }
125
126 pub async fn stop(&self) {
128 debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
129 self.handler.task.stop().await;
130 let mut tasks = self.tasks.write().await;
131 for task in tasks.iter() {
132 task.stop().await;
133 }
134 *tasks = HashSet::new();
135 drop(tasks);
136 debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
137 }
138}
139
140async fn handle_receive_proposal(
142 handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
143 tasks: Arc<RwLock<HashSet<StoppableTaskPtr>>>,
144 validator: ValidatorPtr,
145 p2p: P2pPtr,
146 proposals_sub: JsonSubscriber,
147 blocks_sub: JsonSubscriber,
148 executor: ExecutorPtr,
149) -> Result<()> {
150 debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
151 loop {
152 let (channel, proposal) = match handler.receiver.recv().await {
154 Ok(r) => r,
155 Err(e) => {
156 debug!(
157 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
158 "recv fail: {e}"
159 );
160 continue
161 }
162 };
163
164 if !*validator.synced.read().await {
166 debug!(
167 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
168 "Node still syncing blockchain, skipping..."
169 );
170 handler.send_action(channel, ProtocolGenericAction::Skip).await;
171 continue
172 }
173
174 match validator.append_proposal(&proposal.0).await {
176 Ok(()) => {
177 handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
179
180 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
182 proposals_sub.notify(vec![enc_prop].into()).await;
183
184 continue
185 }
186 Err(e) => {
187 debug!(
188 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
189 "append_proposal fail: {e}",
190 );
191
192 handler.send_action(channel, ProtocolGenericAction::Skip).await;
193
194 match e {
195 Error::ExtendedChainIndexNotFound => { }
196 _ => continue,
197 }
198 }
199 };
200
201 let task = StoppableTask::new();
203 let _tasks = tasks.clone();
204 let _task = task.clone();
205 task.clone().start(
206 handle_unknown_proposal(validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub.clone(), channel, proposal.0),
207 |res| async move {
208 match res {
209 Ok(()) | Err(Error::DetachedTaskStopped) => { _tasks.write().await.remove(&_task); }
210 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
211 }
212 },
213 Error::DetachedTaskStopped,
214 executor.clone(),
215 );
216 tasks.write().await.insert(task);
217 }
218}