darkfid/proto/
protocol_tx.rs1use std::sync::Arc;
20
21use log::{debug, error};
22use tinyjson::JsonValue;
23
24use darkfi::{
25 net::{
26 protocol::protocol_generic::{
27 ProtocolGenericAction, ProtocolGenericHandler, ProtocolGenericHandlerPtr,
28 },
29 session::SESSION_DEFAULT,
30 P2pPtr,
31 },
32 rpc::jsonrpc::JsonSubscriber,
33 system::ExecutorPtr,
34 tx::Transaction,
35 util::encoding::base64,
36 validator::ValidatorPtr,
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41pub type ProtocolTxHandlerPtr = Arc<ProtocolTxHandler>;
43
44pub struct ProtocolTxHandler {
46 handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
48}
49
50impl ProtocolTxHandler {
51 pub async fn init(p2p: &P2pPtr) -> ProtocolTxHandlerPtr {
54 debug!(
55 target: "darkfid::proto::protocol_tx::init",
56 "Adding ProtocolTx to the protocol registry"
57 );
58
59 let handler = ProtocolGenericHandler::new(p2p, "ProtocolTx", SESSION_DEFAULT).await;
60
61 Arc::new(Self { handler })
62 }
63
64 pub async fn start(
66 &self,
67 executor: &ExecutorPtr,
68 validator: &ValidatorPtr,
69 subscriber: JsonSubscriber,
70 ) -> Result<()> {
71 debug!(
72 target: "darkfid::proto::protocol_tx::start",
73 "Starting ProtocolTx handler task..."
74 );
75
76 self.handler.task.clone().start(
77 handle_receive_tx(self.handler.clone(), validator.clone(), subscriber),
78 |res| async move {
79 match res {
80 Ok(()) | Err(Error::DetachedTaskStopped) => { }
81 Err(e) => error!(target: "darkfid::proto::protocol_tx::start", "Failed starting ProtocolTx handler task: {e}"),
82 }
83 },
84 Error::DetachedTaskStopped,
85 executor.clone(),
86 );
87
88 debug!(
89 target: "darkfid::proto::protocol_tx::start",
90 "ProtocolTx handler task started!"
91 );
92
93 Ok(())
94 }
95
96 pub async fn stop(&self) {
98 debug!(target: "darkfid::proto::protocol_tx::stop", "Terminating ProtocolTx handler task...");
99 self.handler.task.stop().await;
100 debug!(target: "darkfid::proto::protocol_tx::stop", "ProtocolTx handler task terminated!");
101 }
102}
103
104async fn handle_receive_tx(
106 handler: ProtocolGenericHandlerPtr<Transaction, Transaction>,
107 validator: ValidatorPtr,
108 subscriber: JsonSubscriber,
109) -> Result<()> {
110 debug!(target: "darkfid::proto::protocol_tx::handle_receive_tx", "START");
111 loop {
112 let (channel, tx) = match handler.receiver.recv().await {
114 Ok(r) => r,
115 Err(e) => {
116 debug!(
117 target: "darkfid::proto::protocol_tx::handle_receive_tx",
118 "recv fail: {e}"
119 );
120 continue
121 }
122 };
123
124 if !*validator.synced.read().await {
126 debug!(
127 target: "darkfid::proto::protocol_tx::handle_receive_tx",
128 "Node still syncing blockchain, skipping..."
129 );
130 handler.send_action(channel, ProtocolGenericAction::Skip).await;
131 continue
132 }
133
134 if let Err(e) = validator.append_tx(&tx, true).await {
136 debug!(
137 target: "darkfid::proto::protocol_tx::handle_receive_tx",
138 "append_tx fail: {e}"
139 );
140 handler.send_action(channel, ProtocolGenericAction::Skip).await;
141 continue
142 }
143
144 handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
146
147 let encoded_tx = JsonValue::String(base64::encode(&serialize_async(&tx).await));
149 subscriber.notify(vec![encoded_tx].into()).await;
150 }
151}