1use std::{collections::HashSet, sync::Arc};
20
21use async_trait::async_trait;
22use log::{debug, error};
23use smol::{channel::Sender, lock::RwLock};
24use tinyjson::JsonValue;
25
26use darkfi::{
27 impl_p2p_message,
28 net::{
29 metering::MeteringConfiguration,
30 protocol::protocol_generic::{
31 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
32 },
33 session::SESSION_DEFAULT,
34 Message, P2pPtr,
35 },
36 rpc::jsonrpc::JsonSubscriber,
37 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
38 util::{encoding::base64, time::NanoTimestamp},
39 validator::{consensus::Proposal, ValidatorPtr},
40 Error, Result,
41};
42use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
43
44use crate::task::handle_unknown_proposals;
45
46#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
48pub struct ProposalMessage(pub Proposal);
49
50impl_p2p_message!(
55 ProposalMessage,
56 "proposal",
57 0,
58 1,
59 MeteringConfiguration {
60 threshold: 50,
61 sleep_step: 500,
62 expiry_time: NanoTimestamp::from_secs(5),
63 }
64);
65
66pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
68
69pub struct ProtocolProposalHandler {
71 proposals_handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
73 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
75 unknown_proposals_handler: StoppableTaskPtr,
77}
78
79impl ProtocolProposalHandler {
80 pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
83 debug!(
84 target: "darkfid::proto::protocol_proposal::init",
85 "Adding ProtocolProposal to the protocol registry"
86 );
87
88 let proposals_handler =
89 ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
90 let unknown_proposals = Arc::new(RwLock::new(HashSet::new()));
91 let unknown_proposals_handler = StoppableTask::new();
92
93 Arc::new(Self { proposals_handler, unknown_proposals, unknown_proposals_handler })
94 }
95
96 pub async fn start(
98 &self,
99 executor: &ExecutorPtr,
100 validator: &ValidatorPtr,
101 p2p: &P2pPtr,
102 proposals_sub: JsonSubscriber,
103 blocks_sub: JsonSubscriber,
104 ) -> Result<()> {
105 debug!(
106 target: "darkfid::proto::protocol_proposal::start",
107 "Starting ProtocolProposal handler task..."
108 );
109
110 let (sender, receiver) = smol::channel::unbounded::<(Proposal, u32)>();
112
113 self.unknown_proposals_handler.clone().start(
115 handle_unknown_proposals(receiver, self.unknown_proposals.clone(), validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub),
116 |res| async move {
117 match res {
118 Ok(()) | Err(Error::DetachedTaskStopped) => { }
119 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting unknown proposals handler task: {e}"),
120 }
121 },
122 Error::DetachedTaskStopped,
123 executor.clone(),
124 );
125
126 self.proposals_handler.task.clone().start(
128 handle_receive_proposal(self.proposals_handler.clone(), sender, self.unknown_proposals.clone(), validator.clone(), proposals_sub),
129 |res| async move {
130 match res {
131 Ok(()) | Err(Error::DetachedTaskStopped) => { }
132 Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
133 }
134 },
135 Error::DetachedTaskStopped,
136 executor.clone(),
137 );
138
139 debug!(
140 target: "darkfid::proto::protocol_proposal::start",
141 "ProtocolProposal handler task started!"
142 );
143
144 Ok(())
145 }
146
147 pub async fn stop(&self) {
149 debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
150 self.unknown_proposals_handler.stop().await;
151 self.proposals_handler.task.stop().await;
152 let mut unknown_proposals = self.unknown_proposals.write().await;
153 *unknown_proposals = HashSet::new();
154 drop(unknown_proposals);
155 debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
156 }
157}
158
159async fn handle_receive_proposal(
161 handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
162 sender: Sender<(Proposal, u32)>,
163 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
164 validator: ValidatorPtr,
165 proposals_sub: JsonSubscriber,
166) -> Result<()> {
167 debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
168 loop {
169 let (channel, proposal) = match handler.receiver.recv().await {
171 Ok(r) => r,
172 Err(e) => {
173 debug!(
174 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
175 "recv fail: {e}"
176 );
177 continue
178 }
179 };
180
181 if !*validator.synced.read().await {
183 debug!(
184 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
185 "Node still syncing blockchain, skipping..."
186 );
187 handler.send_action(channel, ProtocolGenericAction::Skip).await;
188 continue
189 }
190
191 match validator.append_proposal(&proposal.0).await {
193 Ok(()) => {
194 handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
196
197 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
199 proposals_sub.notify(vec![enc_prop].into()).await;
200
201 continue
202 }
203 Err(e) => {
204 debug!(
205 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
206 "append_proposal fail: {e}",
207 );
208
209 handler.send_action(channel, ProtocolGenericAction::Skip).await;
210
211 match e {
212 Error::ExtendedChainIndexNotFound => { }
213 _ => continue,
214 }
215 }
216 };
217
218 let mut lock = unknown_proposals.write().await;
221 if lock.contains(proposal.0.hash.inner()) {
222 debug!(
223 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
224 "Proposal {} is already in our unknown proposals queue.",
225 proposal.0.hash,
226 );
227 drop(lock);
228 continue
229 }
230
231 lock.insert(proposal.0.hash.0);
233 drop(lock);
234
235 if let Err(e) = sender.send((proposal.0, channel)).await {
237 debug!(
238 target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
239 "Channel {channel} send fail: {e}"
240 );
241 };
242 }
243}