darkfid/proto/
protocol_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{collections::HashSet, sync::Arc};
20
21use async_trait::async_trait;
22use log::{debug, error};
23use smol::{channel::Sender, lock::RwLock};
24use tinyjson::JsonValue;
25
26use darkfi::{
27    impl_p2p_message,
28    net::{
29        metering::MeteringConfiguration,
30        protocol::protocol_generic::{
31            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
32        },
33        session::SESSION_DEFAULT,
34        Message, P2pPtr,
35    },
36    rpc::jsonrpc::JsonSubscriber,
37    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
38    util::{encoding::base64, time::NanoTimestamp},
39    validator::{consensus::Proposal, ValidatorPtr},
40    Error, Result,
41};
42use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
43
44use crate::task::handle_unknown_proposals;
45
46/// Auxiliary [`Proposal`] wrapper structure used for messaging.
47#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
48pub struct ProposalMessage(pub Proposal);
49
50// TODO: Fine tune
51// Since messages are asynchronous we will define loose rules to prevent spamming.
52// Each message score will be 1, with a threshold of 50 and expiry time of 5.
53// We are not limiting `Proposal` size.
54impl_p2p_message!(
55    ProposalMessage,
56    "proposal",
57    0,
58    1,
59    MeteringConfiguration {
60        threshold: 50,
61        sleep_step: 500,
62        expiry_time: NanoTimestamp::from_secs(5),
63    }
64);
65
66/// Atomic pointer to the `ProtocolProposal` handler.
67pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
68
69/// Handler managing [`Proposal`] messages, over a generic P2P protocol.
70pub struct ProtocolProposalHandler {
71    /// The generic handler for [`Proposal`] messages.
72    proposals_handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
73    /// Unknown proposals queue to be checked for reorg.
74    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
75    /// Handler background task to process unknown proposals queue.
76    unknown_proposals_handler: StoppableTaskPtr,
77}
78
79impl ProtocolProposalHandler {
80    /// Initialize a generic prototocol handler for [`Proposal`] messages
81    /// and registers it to the provided P2P network, using the default session flag.
82    pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
83        debug!(
84            target: "darkfid::proto::protocol_proposal::init",
85            "Adding ProtocolProposal to the protocol registry"
86        );
87
88        let proposals_handler =
89            ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
90        let unknown_proposals = Arc::new(RwLock::new(HashSet::new()));
91        let unknown_proposals_handler = StoppableTask::new();
92
93        Arc::new(Self { proposals_handler, unknown_proposals, unknown_proposals_handler })
94    }
95
96    /// Start the `ProtocolProposal` background task.
97    pub async fn start(
98        &self,
99        executor: &ExecutorPtr,
100        validator: &ValidatorPtr,
101        p2p: &P2pPtr,
102        proposals_sub: JsonSubscriber,
103        blocks_sub: JsonSubscriber,
104    ) -> Result<()> {
105        debug!(
106            target: "darkfid::proto::protocol_proposal::start",
107            "Starting ProtocolProposal handler task..."
108        );
109
110        // Generate the message queue smol channel
111        let (sender, receiver) = smol::channel::unbounded::<(Proposal, u32)>();
112
113        // Start the unkown proposals handler task
114        self.unknown_proposals_handler.clone().start(
115            handle_unknown_proposals(receiver, self.unknown_proposals.clone(), validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub),
116            |res| async move {
117                match res {
118                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
119                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting unknown proposals handler task: {e}"),
120                }
121            },
122            Error::DetachedTaskStopped,
123            executor.clone(),
124        );
125
126        // Start the proposals handler task
127        self.proposals_handler.task.clone().start(
128            handle_receive_proposal(self.proposals_handler.clone(), sender, self.unknown_proposals.clone(), validator.clone(), proposals_sub),
129            |res| async move {
130                match res {
131                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
132                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
133                }
134            },
135            Error::DetachedTaskStopped,
136            executor.clone(),
137        );
138
139        debug!(
140            target: "darkfid::proto::protocol_proposal::start",
141            "ProtocolProposal handler task started!"
142        );
143
144        Ok(())
145    }
146
147    /// Stop the `ProtocolProposal` background tasks.
148    pub async fn stop(&self) {
149        debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
150        self.unknown_proposals_handler.stop().await;
151        self.proposals_handler.task.stop().await;
152        let mut unknown_proposals = self.unknown_proposals.write().await;
153        *unknown_proposals = HashSet::new();
154        drop(unknown_proposals);
155        debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
156    }
157}
158
159/// Background handler function for ProtocolProposal.
160async fn handle_receive_proposal(
161    handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
162    sender: Sender<(Proposal, u32)>,
163    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
164    validator: ValidatorPtr,
165    proposals_sub: JsonSubscriber,
166) -> Result<()> {
167    debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
168    loop {
169        // Wait for a new proposal message
170        let (channel, proposal) = match handler.receiver.recv().await {
171            Ok(r) => r,
172            Err(e) => {
173                debug!(
174                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
175                    "recv fail: {e}"
176                );
177                continue
178            }
179        };
180
181        // Check if node has finished syncing its blockchain
182        if !*validator.synced.read().await {
183            debug!(
184                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
185                "Node still syncing blockchain, skipping..."
186            );
187            handler.send_action(channel, ProtocolGenericAction::Skip).await;
188            continue
189        }
190
191        // Append proposal
192        match validator.append_proposal(&proposal.0).await {
193            Ok(()) => {
194                // Signal handler to broadcast the valid proposal to rest nodes
195                handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
196
197                // Notify proposals subscriber
198                let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
199                proposals_sub.notify(vec![enc_prop].into()).await;
200
201                continue
202            }
203            Err(e) => {
204                debug!(
205                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
206                    "append_proposal fail: {e}",
207                );
208
209                handler.send_action(channel, ProtocolGenericAction::Skip).await;
210
211                match e {
212                    Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
213                    _ => continue,
214                }
215            }
216        };
217
218        // Check if we already have the unknown proposal record in our
219        // queue.
220        let mut lock = unknown_proposals.write().await;
221        if lock.contains(proposal.0.hash.inner()) {
222            debug!(
223                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
224                "Proposal {} is already in our unknown proposals queue.",
225                proposal.0.hash,
226            );
227            drop(lock);
228            continue
229        }
230
231        // Insert new record in our queue
232        lock.insert(proposal.0.hash.0);
233        drop(lock);
234
235        // Notify the unknown proposals handler task
236        if let Err(e) = sender.send((proposal.0, channel)).await {
237            debug!(
238                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
239                "Channel {channel} send fail: {e}"
240            );
241        };
242    }
243}