1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use log::{debug, error, info, warn};
25use smol::lock::Mutex;
26use url::Url;
27
28use darkfi::{
29 net::settings::Settings,
30 rpc::{
31 jsonrpc::JsonSubscriber,
32 server::{listen_and_serve, RequestHandler},
33 settings::RpcSettings,
34 },
35 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
36 validator::{Validator, ValidatorConfig, ValidatorPtr},
37 Error, Result,
38};
39
40#[cfg(test)]
41mod tests;
42
43mod error;
44use error::{server_error, RpcError};
45
46mod rpc;
48use rpc::{DefaultRpcHandler, MinerRpcClient, MmRpcHandler};
49mod rpc_blockchain;
50mod rpc_tx;
51mod rpc_xmr;
52
53pub mod task;
55use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
56
57mod proto;
59use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
60
61pub type DarkfiNodePtr = Arc<DarkfiNode>;
63
64pub struct DarkfiNode {
66 p2p_handler: DarkfidP2pHandlerPtr,
68 validator: ValidatorPtr,
70 txs_batch_size: usize,
72 subscribers: HashMap<&'static str, JsonSubscriber>,
74 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
76 rpc_client: Option<Mutex<MinerRpcClient>>,
78 mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
80}
81
82impl DarkfiNode {
83 pub async fn new(
84 p2p_handler: DarkfidP2pHandlerPtr,
85 validator: ValidatorPtr,
86 txs_batch_size: usize,
87 subscribers: HashMap<&'static str, JsonSubscriber>,
88 rpc_client: Option<Mutex<MinerRpcClient>>,
89 ) -> DarkfiNodePtr {
90 Arc::new(Self {
91 p2p_handler,
92 validator,
93 txs_batch_size,
94 subscribers,
95 rpc_connections: Mutex::new(HashSet::new()),
96 rpc_client,
97 mm_rpc_connections: Mutex::new(HashSet::new()),
98 })
99 }
100}
101
102pub type DarkfidPtr = Arc<Darkfid>;
104
105pub struct Darkfid {
107 node: DarkfiNodePtr,
109 dnet_task: StoppableTaskPtr,
111 rpc_task: StoppableTaskPtr,
113 mm_rpc_task: StoppableTaskPtr,
115 consensus_task: StoppableTaskPtr,
117}
118
119impl Darkfid {
120 pub async fn init(
125 sled_db: &sled_overlay::sled::Db,
126 config: &ValidatorConfig,
127 net_settings: &Settings,
128 minerd_endpoint: &Option<Url>,
129 txs_batch_size: &Option<usize>,
130 ex: &ExecutorPtr,
131 ) -> Result<DarkfidPtr> {
132 info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
133 let validator = Validator::new(sled_db, config).await?;
135
136 let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
138
139 let txs_batch_size = match txs_batch_size {
141 Some(b) => {
142 if *b > 0 {
143 *b
144 } else {
145 50
146 }
147 }
148 None => 50,
149 };
150
151 let mut subscribers = HashMap::new();
153 subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
154 subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
155 subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
156 subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
157
158 let rpc_client = match minerd_endpoint {
160 Some(endpoint) => {
161 Some(Mutex::new(MinerRpcClient::new(endpoint.clone(), ex.clone()).await))
162 }
163 None => None,
164 };
165
166 let node =
168 DarkfiNode::new(p2p_handler, validator, txs_batch_size, subscribers, rpc_client).await;
169
170 let dnet_task = StoppableTask::new();
172 let rpc_task = StoppableTask::new();
173 let mm_rpc_task = StoppableTask::new();
174 let consensus_task = StoppableTask::new();
175
176 info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
177
178 Ok(Arc::new(Self { node, dnet_task, rpc_task, mm_rpc_task, consensus_task }))
179 }
180
181 pub async fn start(
184 &self,
185 executor: &ExecutorPtr,
186 rpc_settings: &RpcSettings,
187 mm_rpc_settings: &Option<RpcSettings>,
188 config: &ConsensusInitTaskConfig,
189 ) -> Result<()> {
190 info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
191
192 if self.node.rpc_client.is_some() {
194 if let Err(e) = self.node.ping_miner_daemon().await {
195 warn!(target: "darkfid::Darkfid::start", "Failed to ping miner daemon: {}", e);
196 }
197 }
198
199 info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
201 let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
202 let p2p_ = self.node.p2p_handler.p2p.clone();
203 self.dnet_task.clone().start(
204 async move {
205 let dnet_sub = p2p_.dnet_subscribe().await;
206 loop {
207 let event = dnet_sub.receive().await;
208 debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {:?}", event);
209 dnet_sub_.notify(vec![event.into()].into()).await;
210 }
211 },
212 |res| async {
213 match res {
214 Ok(()) | Err(Error::DetachedTaskStopped) => { }
215 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {}", e),
216 }
217 },
218 Error::DetachedTaskStopped,
219 executor.clone(),
220 );
221
222 info!(target: "darkfid::Darkfid::start", "Starting JSON-RPC server");
224 let node_ = self.node.clone();
225 self.rpc_task.clone().start(
226 listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
227 |res| async move {
228 match res {
229 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
230 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting JSON-RPC server: {}", e),
231 }
232 },
233 Error::RpcServerStopped,
234 executor.clone(),
235 );
236
237 if let Some(mm_rpc) = mm_rpc_settings {
239 info!(target: "darkfid::Darkfid::start", "Starting HTTP JSON-RPC server");
240 let node_ = self.node.clone();
241 self.mm_rpc_task.clone().start(
242 listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), self.node.clone(), None, executor.clone()),
243 |res| async move {
244 match res {
245 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
246 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting HTTP JSON-RPC server: {}", e),
247 }
248 },
249 Error::RpcServerStopped,
250 executor.clone(),
251 );
252 } else {
253 self.mm_rpc_task.clone().start(
255 async { Ok(()) },
256 |_| async { },
257 Error::RpcServerStopped,
258 executor.clone(),
259 );
260 }
261
262 info!(target: "darkfid::Darkfid::start", "Starting P2P network");
264 self.node
265 .p2p_handler
266 .clone()
267 .start(executor, &self.node.validator, &self.node.subscribers)
268 .await?;
269
270 info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
272 self.consensus_task.clone().start(
273 consensus_init_task(
274 self.node.clone(),
275 config.clone(),
276 executor.clone(),
277 ),
278 |res| async move {
279 match res {
280 Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { }
281 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {}", e),
282 }
283 },
284 Error::ConsensusTaskStopped,
285 executor.clone(),
286 );
287
288 info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
289 Ok(())
290 }
291
292 pub async fn stop(&self) -> Result<()> {
294 info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
295
296 info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
298 self.dnet_task.stop().await;
299
300 info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC server...");
302 self.rpc_task.stop().await;
303
304 info!(target: "darkfid::Darkfid::stop", "Stopping HTTP JSON-RPC server...");
306 self.rpc_task.stop().await;
307
308 info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
310 self.node.p2p_handler.stop().await;
311
312 info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
314 self.consensus_task.stop().await;
315
316 info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
318 let flushed_bytes = self.node.validator.blockchain.sled_db.flush_async().await?;
319 info!(target: "darkfid::Darkfid::stop", "Flushed {} bytes", flushed_bytes);
320
321 if let Some(ref rpc_client) = self.node.rpc_client {
323 info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC client...");
324 rpc_client.lock().await.stop().await;
325 };
326
327 info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
328 Ok(())
329 }
330}