darkfid/proto/
protocol_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{collections::HashSet, sync::Arc};
20
21use async_trait::async_trait;
22use log::{debug, error};
23use smol::lock::RwLock;
24use tinyjson::JsonValue;
25
26use darkfi::{
27    impl_p2p_message,
28    net::{
29        metering::MeteringConfiguration,
30        protocol::protocol_generic::{
31            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
32        },
33        session::SESSION_DEFAULT,
34        Message, P2pPtr,
35    },
36    rpc::jsonrpc::JsonSubscriber,
37    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
38    util::{encoding::base64, time::NanoTimestamp},
39    validator::{consensus::Proposal, ValidatorPtr},
40    Error, Result,
41};
42use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
43
44use crate::task::handle_unknown_proposal;
45
46/// Auxiliary [`Proposal`] wrapper structure used for messaging.
47#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
48pub struct ProposalMessage(pub Proposal);
49
50// TODO: Fine tune
51// Since messages are asynchronous we will define loose rules to prevent spamming.
52// Each message score will be 1, with a threshold of 50 and expiry time of 5.
53// We are not limiting `Proposal` size.
54impl_p2p_message!(
55    ProposalMessage,
56    "proposal",
57    0,
58    1,
59    MeteringConfiguration {
60        threshold: 50,
61        sleep_step: 500,
62        expiry_time: NanoTimestamp::from_secs(5),
63    }
64);
65
66/// Atomic pointer to the `ProtocolProposal` handler.
67pub type ProtocolProposalHandlerPtr = Arc<ProtocolProposalHandler>;
68
69/// Handler managing [`Proposal`] messages, over a generic P2P protocol.
70pub struct ProtocolProposalHandler {
71    /// The generic handler for [`Proposal`] messages.
72    handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
73    /// Background tasks invoked by the handler.
74    tasks: Arc<RwLock<HashSet<StoppableTaskPtr>>>,
75}
76
77impl ProtocolProposalHandler {
78    /// Initialize a generic prototocol handler for [`Proposal`] messages
79    /// and registers it to the provided P2P network, using the default session flag.
80    pub async fn init(p2p: &P2pPtr) -> ProtocolProposalHandlerPtr {
81        debug!(
82            target: "darkfid::proto::protocol_proposal::init",
83            "Adding ProtocolProposal to the protocol registry"
84        );
85
86        let handler = ProtocolGenericHandler::new(p2p, "ProtocolProposal", SESSION_DEFAULT).await;
87        let tasks = Arc::new(RwLock::new(HashSet::new()));
88
89        Arc::new(Self { handler, tasks })
90    }
91
92    /// Start the `ProtocolProposal` background task.
93    pub async fn start(
94        &self,
95        executor: &ExecutorPtr,
96        validator: &ValidatorPtr,
97        p2p: &P2pPtr,
98        proposals_sub: JsonSubscriber,
99        blocks_sub: JsonSubscriber,
100    ) -> Result<()> {
101        debug!(
102            target: "darkfid::proto::protocol_proposal::start",
103            "Starting ProtocolProposal handler task..."
104        );
105
106        self.handler.task.clone().start(
107            handle_receive_proposal(self.handler.clone(), self.tasks.clone(), validator.clone(), p2p.clone(), proposals_sub, blocks_sub, executor.clone()),
108            |res| async move {
109                match res {
110                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
111                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
112                }
113            },
114            Error::DetachedTaskStopped,
115            executor.clone(),
116        );
117
118        debug!(
119            target: "darkfid::proto::protocol_proposal::start",
120            "ProtocolProposal handler task started!"
121        );
122
123        Ok(())
124    }
125
126    /// Stop the `ProtocolProposal` background tasks.
127    pub async fn stop(&self) {
128        debug!(target: "darkfid::proto::protocol_proposal::stop", "Terminating ProtocolProposal handler task...");
129        self.handler.task.stop().await;
130        let mut tasks = self.tasks.write().await;
131        for task in tasks.iter() {
132            task.stop().await;
133        }
134        *tasks = HashSet::new();
135        drop(tasks);
136        debug!(target: "darkfid::proto::protocol_proposal::stop", "ProtocolProposal handler task terminated!");
137    }
138}
139
140/// Background handler function for ProtocolProposal.
141async fn handle_receive_proposal(
142    handler: ProtocolGenericHandlerPtr<ProposalMessage, ProposalMessage>,
143    tasks: Arc<RwLock<HashSet<StoppableTaskPtr>>>,
144    validator: ValidatorPtr,
145    p2p: P2pPtr,
146    proposals_sub: JsonSubscriber,
147    blocks_sub: JsonSubscriber,
148    executor: ExecutorPtr,
149) -> Result<()> {
150    debug!(target: "darkfid::proto::protocol_proposal::handle_receive_proposal", "START");
151    loop {
152        // Wait for a new proposal message
153        let (channel, proposal) = match handler.receiver.recv().await {
154            Ok(r) => r,
155            Err(e) => {
156                debug!(
157                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
158                    "recv fail: {e}"
159                );
160                continue
161            }
162        };
163
164        // Check if node has finished syncing its blockchain
165        if !*validator.synced.read().await {
166            debug!(
167                target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
168                "Node still syncing blockchain, skipping..."
169            );
170            handler.send_action(channel, ProtocolGenericAction::Skip).await;
171            continue
172        }
173
174        // Append proposal
175        match validator.append_proposal(&proposal.0).await {
176            Ok(()) => {
177                // Signal handler to broadcast the valid proposal to rest nodes
178                handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
179
180                // Notify proposals subscriber
181                let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
182                proposals_sub.notify(vec![enc_prop].into()).await;
183
184                continue
185            }
186            Err(e) => {
187                debug!(
188                    target: "darkfid::proto::protocol_proposal::handle_receive_proposal",
189                    "append_proposal fail: {e}",
190                );
191
192                handler.send_action(channel, ProtocolGenericAction::Skip).await;
193
194                match e {
195                    Error::ExtendedChainIndexNotFound => { /* Do nothing */ }
196                    _ => continue,
197                }
198            }
199        };
200
201        // Handle unknown proposal in the background
202        let task = StoppableTask::new();
203        let _tasks = tasks.clone();
204        let _task = task.clone();
205        task.clone().start(
206            handle_unknown_proposal(validator.clone(), p2p.clone(), proposals_sub.clone(), blocks_sub.clone(), channel, proposal.0),
207            |res| async move {
208                match res {
209                    Ok(()) | Err(Error::DetachedTaskStopped) => { _tasks.write().await.remove(&_task); }
210                    Err(e) => error!(target: "darkfid::proto::protocol_proposal::start", "Failed starting ProtocolProposal handler task: {e}"),
211                }
212            },
213            Error::DetachedTaskStopped,
214            executor.clone(),
215        );
216        tasks.write().await.insert(task);
217    }
218}