1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
1819use std::sync::Arc;
2021use log::{debug, error};
22use tinyjson::JsonValue;
2324use darkfi::{
25 net::{
26 protocol::protocol_generic::{
27 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
28 },
29 session::SESSION_DEFAULT,
30 P2pPtr,
31 },
32 rpc::jsonrpc::JsonSubscriber,
33 system::ExecutorPtr,
34 tx::Transaction,
35 util::encoding::base64,
36 validator::ValidatorPtr,
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
4041/// Atomic pointer to the `ProtocolTx` handler.
42pub type ProtocolTxHandlerPtr = Arc<ProtocolTxHandler>;
4344/// Handler managing [`Transaction`] messages, over a generic P2P protocol.
45pub struct ProtocolTxHandler {
46/// The generic handler for [`Transaction`] messages.
47handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
48}
4950impl ProtocolTxHandler {
51/// Initialize a generic prototocol handler for [`Transaction`] messages
52 /// and registers it to the provided P2P network, using the default session flag.
53pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr {
54debug!(
55 target: "darkfid::proto::protocol_tx::init",
56"Adding ProtocolTx to the protocol registry"
57);
5859let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await;
6061 Arc::new(Self { handler })
62 }
6364/// Start the `ProtocolTx` background task.
65pub async fn start(
66&self,
67 executor: &ExecutorPtr,
68 validator: &ValidatorPtr,
69 subscriber: JsonSubscriber,
70 ) -> Result<()> {
71debug!(
72 target: "darkfid::proto::protocol_tx::start",
73"Starting ProtocolTx handler task..."
74);
7576self.handler.task.clone().start(
77 handle_receive_tx(self.handler.clone(), validator.clone(), subscriber),
78 |res| async move {
79match res {
80Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
81Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"),
82 }
83 },
84 Error::DetachedTaskStopped,
85 executor.clone(),
86 );
8788debug!(
89 target: "darkfid::proto::protocol_tx::start",
90"ProtocolTx handler task started!"
91);
9293Ok(())
94 }
9596/// Stop the `ProtocolTx` background task.
97pub async fn stop(&self) {
98debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task...");
99self.handler.task.stop().await;
100debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!");
101 }
102}
103104/// Background handler function for ProtocolTx.
105async fn handle_receive_tx(
106 handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
107 validator: ValidatorPtr,
108 subscriber: JsonSubscriber,
109) -> Result<()> {
110debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START");
111loop {
112// Wait for a new transaction message
113let (channel, tx) = match handler.receiver.recv().await {
114Ok(r) => r,
115Err(e) => {
116debug!(
117 target: "darkfid::proto::protocol_tx::handle_receive_tx",
118"recv fail: {e}"
119);
120continue
121}
122 };
123124// Check if node has finished syncing its blockchain
125if !*validator.synced.read().await {
126debug!(
127 target: "darkfid::proto::protocol_tx::handle_receive_tx",
128"Node still syncing blockchain, skipping..."
129);
130 handler.send_action(channel, ProtocolGenericAction::Skip).await;
131continue
132}
133134// Append transaction
135if let Err(e) = validator.append_tx(&tx, true).await {
136debug!(
137 target: "darkfid::proto::protocol_tx::handle_receive_tx",
138"append_tx fail: {e}"
139);
140 handler.send_action(channel, ProtocolGenericAction::Skip).await;
141continue
142}
143144// Signal handler to broadcast the valid transaction to rest nodes
145handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
146147// Notify subscriber
148let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await));
149 subscriber.notify(vec![encoded_tx].into()).await;
150 }
151}