darkfid/proto/
protocol_tx.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use log::{debug, error};
22use tinyjson::JsonValue;
23
24use darkfi::{
25    net::{
26        protocol::protocol_generic::{
27            ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
28        },
29        session::SESSION_DEFAULT,
30        P2pPtr,
31    },
32    rpc::jsonrpc::JsonSubscriber,
33    system::ExecutorPtr,
34    tx::Transaction,
35    util::encoding::base64,
36    validator::ValidatorPtr,
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41/// Atomic pointer to the `ProtocolTx` handler.
42pub type ProtocolTxHandlerPtr = Arc<ProtocolTxHandler>;
43
44/// Handler managing [`Transaction`] messages, over a generic P2P protocol.
45pub struct ProtocolTxHandler {
46    /// The generic handler for [`Transaction`] messages.
47    handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
48}
49
50impl ProtocolTxHandler {
51    /// Initialize a generic prototocol handler for [`Transaction`] messages
52    /// and registers it to the provided P2P network, using the default session flag.
53    pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr {
54        debug!(
55            target: "darkfid::proto::protocol_tx::init",
56            "Adding ProtocolTx to the protocol registry"
57        );
58
59        let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await;
60
61        Arc::new(Self { handler })
62    }
63
64    /// Start the `ProtocolTx` background task.
65    pub async fn start(
66        &self,
67        executor: &ExecutorPtr,
68        validator: &ValidatorPtr,
69        subscriber: JsonSubscriber,
70    ) -> Result<()> {
71        debug!(
72            target: "darkfid::proto::protocol_tx::start",
73            "Starting ProtocolTx handler task..."
74        );
75
76        self.handler.task.clone().start(
77            handle_receive_tx(self.handler.clone(), validator.clone(), subscriber),
78            |res| async move {
79                match res {
80                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
81                    Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"),
82                }
83            },
84            Error::DetachedTaskStopped,
85            executor.clone(),
86        );
87
88        debug!(
89            target: "darkfid::proto::protocol_tx::start",
90            "ProtocolTx handler task started!"
91        );
92
93        Ok(())
94    }
95
96    /// Stop the `ProtocolTx` background task.
97    pub async fn stop(&self) {
98        debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task...");
99        self.handler.task.stop().await;
100        debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!");
101    }
102}
103
104/// Background handler function for ProtocolTx.
105async fn handle_receive_tx(
106    handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
107    validator: ValidatorPtr,
108    subscriber: JsonSubscriber,
109) -> Result<()> {
110    debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START");
111    loop {
112        // Wait for a new transaction message
113        let (channel, tx) = match handler.receiver.recv().await {
114            Ok(r) => r,
115            Err(e) => {
116                debug!(
117                    target: "darkfid::proto::protocol_tx::handle_receive_tx",
118                    "recv fail: {e}"
119                );
120                continue
121            }
122        };
123
124        // Check if node has finished syncing its blockchain
125        if !*validator.synced.read().await {
126            debug!(
127                target: "darkfid::proto::protocol_tx::handle_receive_tx",
128                "Node still syncing blockchain, skipping..."
129            );
130            handler.send_action(channel, ProtocolGenericAction::Skip).await;
131            continue
132        }
133
134        // Append transaction
135        if let Err(e) = validator.append_tx(&tx, true).await {
136            debug!(
137                target: "darkfid::proto::protocol_tx::handle_receive_tx",
138                "append_tx fail: {e}"
139            );
140            handler.send_action(channel, ProtocolGenericAction::Skip).await;
141            continue
142        }
143
144        // Signal handler to broadcast the valid transaction to rest nodes
145        handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
146
147        // Notify subscriber
148        let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await));
149        subscriber.notify(vec![encoded_tx].into()).await;
150    }
151}