darkfid/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    sync::Arc,
22};
23
24use log::{debug, error, info, warn};
25use smol::lock::Mutex;
26use url::Url;
27
28use darkfi::{
29    net::settings::Settings,
30    rpc::{
31        jsonrpc::JsonSubscriber,
32        server::{listen_and_serve, RequestHandler},
33        settings::RpcSettings,
34    },
35    system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
36    validator::{Validator, ValidatorConfig, ValidatorPtr},
37    Error, Result,
38};
39
40#[cfg(test)]
41mod tests;
42
43mod error;
44use error::{server_error, RpcError};
45
46/// JSON-RPC requests handler and methods
47mod rpc;
48use rpc::{DefaultRpcHandler, MinerRpcClient, MmRpcHandler};
49mod rpc_blockchain;
50mod rpc_tx;
51mod rpc_xmr;
52
53/// Validator async tasks
54pub mod task;
55use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
56
57/// P2P net protocols
58mod proto;
59use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
60
61/// Atomic pointer to the DarkFi node
62pub type DarkfiNodePtr = Arc<DarkfiNode>;
63
64/// Structure representing a DarkFi node
65pub struct DarkfiNode {
66    /// P2P network protocols handler.
67    p2p_handler: DarkfidP2pHandlerPtr,
68    /// Validator(node) pointer
69    validator: ValidatorPtr,
70    /// Garbage collection task transactions batch size
71    txs_batch_size: usize,
72    /// A map of various subscribers exporting live info from the blockchain
73    subscribers: HashMap<&'static str, JsonSubscriber>,
74    /// JSON-RPC connection tracker
75    rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
76    /// JSON-RPC client to execute requests to the miner daemon
77    rpc_client: Option<Mutex<MinerRpcClient>>,
78    /// HTTP JSON-RPC connection tracker
79    mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
80}
81
82impl DarkfiNode {
83    pub async fn new(
84        p2p_handler: DarkfidP2pHandlerPtr,
85        validator: ValidatorPtr,
86        txs_batch_size: usize,
87        subscribers: HashMap<&'static str, JsonSubscriber>,
88        rpc_client: Option<Mutex<MinerRpcClient>>,
89    ) -> DarkfiNodePtr {
90        Arc::new(Self {
91            p2p_handler,
92            validator,
93            txs_batch_size,
94            subscribers,
95            rpc_connections: Mutex::new(HashSet::new()),
96            rpc_client,
97            mm_rpc_connections: Mutex::new(HashSet::new()),
98        })
99    }
100}
101
102/// Atomic pointer to the DarkFi daemon
103pub type DarkfidPtr = Arc<Darkfid>;
104
105/// Structure representing a DarkFi daemon
106pub struct Darkfid {
107    /// Darkfi node instance
108    node: DarkfiNodePtr,
109    /// `dnet` background task
110    dnet_task: StoppableTaskPtr,
111    /// JSON-RPC background task
112    rpc_task: StoppableTaskPtr,
113    /// HTTP JSON-RPC background task
114    mm_rpc_task: StoppableTaskPtr,
115    /// Consensus protocol background task
116    consensus_task: StoppableTaskPtr,
117}
118
119impl Darkfid {
120    /// Initialize a DarkFi daemon.
121    ///
122    /// Generates a new `DarkfiNode` for provided configuration,
123    /// along with all the corresponding background tasks.
124    pub async fn init(
125        sled_db: &sled_overlay::sled::Db,
126        config: &ValidatorConfig,
127        net_settings: &Settings,
128        minerd_endpoint: &Option<Url>,
129        txs_batch_size: &Option<usize>,
130        ex: &ExecutorPtr,
131    ) -> Result<DarkfidPtr> {
132        info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
133        // Initialize validator
134        let validator = Validator::new(sled_db, config).await?;
135
136        // Initialize P2P network
137        let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
138
139        // Grab blockchain network configured transactions batch size for garbage collection
140        let txs_batch_size = match txs_batch_size {
141            Some(b) => {
142                if *b > 0 {
143                    *b
144                } else {
145                    50
146                }
147            }
148            None => 50,
149        };
150
151        // Here we initialize various subscribers that can export live blockchain/consensus data.
152        let mut subscribers = HashMap::new();
153        subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
154        subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
155        subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
156        subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
157
158        // Initialize JSON-RPC client to perform requests to minerd
159        let rpc_client = match minerd_endpoint {
160            Some(endpoint) => {
161                Some(Mutex::new(MinerRpcClient::new(endpoint.clone(), ex.clone()).await))
162            }
163            None => None,
164        };
165
166        // Initialize node
167        let node =
168            DarkfiNode::new(p2p_handler, validator, txs_batch_size, subscribers, rpc_client).await;
169
170        // Generate the background tasks
171        let dnet_task = StoppableTask::new();
172        let rpc_task = StoppableTask::new();
173        let mm_rpc_task = StoppableTask::new();
174        let consensus_task = StoppableTask::new();
175
176        info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
177
178        Ok(Arc::new(Self { node, dnet_task, rpc_task, mm_rpc_task, consensus_task }))
179    }
180
181    /// Start the DarkFi daemon in the given executor, using the provided JSON-RPC listen url
182    /// and consensus initialization configuration.
183    pub async fn start(
184        &self,
185        executor: &ExecutorPtr,
186        rpc_settings: &RpcSettings,
187        mm_rpc_settings: &Option<RpcSettings>,
188        config: &ConsensusInitTaskConfig,
189    ) -> Result<()> {
190        info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
191
192        // Pinging minerd daemon to verify it listens
193        if self.node.rpc_client.is_some() {
194            if let Err(e) = self.node.ping_miner_daemon().await {
195                warn!(target: "darkfid::Darkfid::start", "Failed to ping miner daemon: {}", e);
196            }
197        }
198
199        // Start the `dnet` task
200        info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
201        let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
202        let p2p_ = self.node.p2p_handler.p2p.clone();
203        self.dnet_task.clone().start(
204            async move {
205                let dnet_sub = p2p_.dnet_subscribe().await;
206                loop {
207                    let event = dnet_sub.receive().await;
208                    debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {:?}", event);
209                    dnet_sub_.notify(vec![event.into()].into()).await;
210                }
211            },
212            |res| async {
213                match res {
214                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
215                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {}", e),
216                }
217            },
218            Error::DetachedTaskStopped,
219            executor.clone(),
220        );
221
222        // Start the JSON-RPC task
223        info!(target: "darkfid::Darkfid::start", "Starting JSON-RPC server");
224        let node_ = self.node.clone();
225        self.rpc_task.clone().start(
226            listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
227            |res| async move {
228                match res {
229                    Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
230                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting JSON-RPC server: {}", e),
231                }
232            },
233            Error::RpcServerStopped,
234            executor.clone(),
235        );
236
237        // Start the HTTP JSON-RPC task
238        if let Some(mm_rpc) = mm_rpc_settings {
239            info!(target: "darkfid::Darkfid::start", "Starting HTTP JSON-RPC server");
240            let node_ = self.node.clone();
241            self.mm_rpc_task.clone().start(
242                listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), self.node.clone(), None, executor.clone()),
243                |res| async move {
244                    match res {
245                        Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
246                        Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting HTTP JSON-RPC server: {}", e),
247                    }
248                },
249                Error::RpcServerStopped,
250                executor.clone(),
251            );
252        } else {
253            // Create a dummy task
254            self.mm_rpc_task.clone().start(
255                async { Ok(()) },
256                |_| async { /* Do nothing */ },
257                Error::RpcServerStopped,
258                executor.clone(),
259            );
260        }
261
262        // Start the P2P network
263        info!(target: "darkfid::Darkfid::start", "Starting P2P network");
264        self.node
265            .p2p_handler
266            .clone()
267            .start(executor, &self.node.validator, &self.node.subscribers)
268            .await?;
269
270        // Start the consensus protocol
271        info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
272        self.consensus_task.clone().start(
273            consensus_init_task(
274                self.node.clone(),
275                config.clone(),
276                executor.clone(),
277            ),
278            |res| async move {
279                match res {
280                    Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }
281                    Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {}", e),
282                }
283            },
284            Error::ConsensusTaskStopped,
285            executor.clone(),
286        );
287
288        info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
289        Ok(())
290    }
291
292    /// Stop the DarkFi daemon.
293    pub async fn stop(&self) -> Result<()> {
294        info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
295
296        // Stop the `dnet` node
297        info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
298        self.dnet_task.stop().await;
299
300        // Stop the JSON-RPC task
301        info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC server...");
302        self.rpc_task.stop().await;
303
304        // Stop the HTTP JSON-RPC task
305        info!(target: "darkfid::Darkfid::stop", "Stopping HTTP JSON-RPC server...");
306        self.rpc_task.stop().await;
307
308        // Stop the P2P network
309        info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
310        self.node.p2p_handler.stop().await;
311
312        // Stop the consensus task
313        info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
314        self.consensus_task.stop().await;
315
316        // Flush sled database data
317        info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
318        let flushed_bytes = self.node.validator.blockchain.sled_db.flush_async().await?;
319        info!(target: "darkfid::Darkfid::stop", "Flushed {} bytes", flushed_bytes);
320
321        // Close the JSON-RPC client, if it was initialized
322        if let Some(ref rpc_client) = self.node.rpc_client {
323            info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC client...");
324            rpc_client.lock().await.stop().await;
325        };
326
327        info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
328        Ok(())
329    }
330}