1use std::{
20 collections::{HashMap, HashSet},
21 sync::Arc,
22};
23
24use smol::lock::Mutex;
25use tracing::{debug, error, info};
26
27use darkfi::{
28 blockchain::BlockInfo,
29 net::settings::Settings,
30 rpc::{
31 jsonrpc::JsonSubscriber,
32 server::{listen_and_serve, RequestHandler},
33 settings::RpcSettings,
34 },
35 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
36 validator::{Validator, ValidatorConfig, ValidatorPtr},
37 zk::{empty_witnesses, ProvingKey, ZkCircuit},
38 zkas::ZkBinary,
39 Error, Result,
40};
41use darkfi_money_contract::MONEY_CONTRACT_ZKAS_MINT_NS_V1;
42use darkfi_sdk::crypto::{keypair::SecretKey, MONEY_CONTRACT_ID};
43
44#[cfg(test)]
45mod tests;
46
47mod error;
48use error::{server_error, RpcError};
49
50mod rpc;
52use rpc::{DefaultRpcHandler, MmRpcHandler};
53mod rpc_blockchain;
54mod rpc_miner;
55mod rpc_tx;
56use rpc_miner::BlockTemplate;
57mod rpc_xmr;
58
59pub mod task;
61use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
62
63mod proto;
65use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
66
67pub type DarkfiNodePtr = Arc<DarkfiNode>;
69
70pub struct DarkfiNode {
72 p2p_handler: DarkfidP2pHandlerPtr,
74 validator: ValidatorPtr,
76 txs_batch_size: usize,
78 subscribers: HashMap<&'static str, JsonSubscriber>,
80 blocktemplates: Mutex<HashMap<Vec<u8>, BlockTemplate>>,
82 mm_blocktemplates: Mutex<HashMap<Vec<u8>, (BlockInfo, f64, SecretKey)>>,
84 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
86 mm_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
88 powrewardv1_zk: PowRewardV1Zk,
90}
91
92impl DarkfiNode {
93 pub async fn new(
94 p2p_handler: DarkfidP2pHandlerPtr,
95 validator: ValidatorPtr,
96 txs_batch_size: usize,
97 subscribers: HashMap<&'static str, JsonSubscriber>,
98 ) -> Result<DarkfiNodePtr> {
99 let powrewardv1_zk = PowRewardV1Zk::new(validator.clone())?;
100
101 Ok(Arc::new(Self {
102 p2p_handler,
103 validator,
104 txs_batch_size,
105 subscribers,
106 blocktemplates: Mutex::new(HashMap::new()),
107 mm_blocktemplates: Mutex::new(HashMap::new()),
108 rpc_connections: Mutex::new(HashSet::new()),
109 mm_rpc_connections: Mutex::new(HashSet::new()),
110 powrewardv1_zk,
111 }))
112 }
113}
114
115pub(crate) struct PowRewardV1Zk {
117 pub zkbin: ZkBinary,
118 pub provingkey: ProvingKey,
119}
120
121impl PowRewardV1Zk {
122 pub fn new(validator: ValidatorPtr) -> Result<Self> {
123 info!(
124 target: "darkfid::PowRewardV1Zk::new",
125 "Generating PowRewardV1 ZkCircuit and ProvingKey...",
126 );
127
128 let (zkbin, _) = validator.blockchain.contracts.get_zkas(
129 &validator.blockchain.sled_db,
130 &MONEY_CONTRACT_ID,
131 MONEY_CONTRACT_ZKAS_MINT_NS_V1,
132 )?;
133
134 let circuit = ZkCircuit::new(empty_witnesses(&zkbin)?, &zkbin);
135 let provingkey = ProvingKey::build(zkbin.k, &circuit);
136
137 Ok(Self { zkbin, provingkey })
138 }
139}
140
141pub type DarkfidPtr = Arc<Darkfid>;
143
144pub struct Darkfid {
146 node: DarkfiNodePtr,
148 dnet_task: StoppableTaskPtr,
150 rpc_task: StoppableTaskPtr,
152 mm_rpc_task: StoppableTaskPtr,
154 consensus_task: StoppableTaskPtr,
156}
157
158impl Darkfid {
159 pub async fn init(
164 sled_db: &sled_overlay::sled::Db,
165 config: &ValidatorConfig,
166 net_settings: &Settings,
167 txs_batch_size: &Option<usize>,
168 ex: &ExecutorPtr,
169 ) -> Result<DarkfidPtr> {
170 info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
171 let validator = Validator::new(sled_db, config).await?;
173
174 let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
176
177 let txs_batch_size = match txs_batch_size {
179 Some(b) => {
180 if *b > 0 {
181 *b
182 } else {
183 50
184 }
185 }
186 None => 50,
187 };
188
189 let mut subscribers = HashMap::new();
191 subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
192 subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
193 subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
194 subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
195
196 let node = DarkfiNode::new(p2p_handler, validator, txs_batch_size, subscribers).await?;
198
199 let dnet_task = StoppableTask::new();
201 let rpc_task = StoppableTask::new();
202 let mm_rpc_task = StoppableTask::new();
203 let consensus_task = StoppableTask::new();
204
205 info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
206
207 Ok(Arc::new(Self { node, dnet_task, rpc_task, mm_rpc_task, consensus_task }))
208 }
209
210 pub async fn start(
213 &self,
214 executor: &ExecutorPtr,
215 rpc_settings: &RpcSettings,
216 mm_rpc_settings: &Option<RpcSettings>,
217 config: &ConsensusInitTaskConfig,
218 ) -> Result<()> {
219 info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
220
221 info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
223 let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
224 let p2p_ = self.node.p2p_handler.p2p.clone();
225 self.dnet_task.clone().start(
226 async move {
227 let dnet_sub = p2p_.dnet_subscribe().await;
228 loop {
229 let event = dnet_sub.receive().await;
230 debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {event:?}");
231 dnet_sub_.notify(vec![event.into()].into()).await;
232 }
233 },
234 |res| async {
235 match res {
236 Ok(()) | Err(Error::DetachedTaskStopped) => { }
237 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {e}"),
238 }
239 },
240 Error::DetachedTaskStopped,
241 executor.clone(),
242 );
243
244 info!(target: "darkfid::Darkfid::start", "Starting JSON-RPC server");
246 let node_ = self.node.clone();
247 self.rpc_task.clone().start(
248 listen_and_serve::<DefaultRpcHandler>(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
249 |res| async move {
250 match res {
251 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::stop_connections(&node_).await,
252 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting JSON-RPC server: {e}"),
253 }
254 },
255 Error::RpcServerStopped,
256 executor.clone(),
257 );
258
259 if let Some(mm_rpc) = mm_rpc_settings {
261 info!(target: "darkfid::Darkfid::start", "Starting HTTP JSON-RPC server");
262 let node_ = self.node.clone();
263 self.mm_rpc_task.clone().start(
264 listen_and_serve::<MmRpcHandler>(mm_rpc.clone(), self.node.clone(), None, executor.clone()),
265 |res| async move {
266 match res {
267 Ok(()) | Err(Error::RpcServerStopped) => <DarkfiNode as RequestHandler<MmRpcHandler>>::stop_connections(&node_).await,
268 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting HTTP JSON-RPC server: {e}"),
269 }
270 },
271 Error::RpcServerStopped,
272 executor.clone(),
273 );
274 } else {
275 self.mm_rpc_task.clone().start(
277 async { Ok(()) },
278 |_| async { },
279 Error::RpcServerStopped,
280 executor.clone(),
281 );
282 }
283
284 info!(target: "darkfid::Darkfid::start", "Starting P2P network");
286 self.node
287 .p2p_handler
288 .clone()
289 .start(executor, &self.node.validator, &self.node.subscribers)
290 .await?;
291
292 info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
294 self.consensus_task.clone().start(
295 consensus_init_task(
296 self.node.clone(),
297 config.clone(),
298 executor.clone(),
299 ),
300 |res| async move {
301 match res {
302 Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { }
303 Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {e}"),
304 }
305 },
306 Error::ConsensusTaskStopped,
307 executor.clone(),
308 );
309
310 info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
311 Ok(())
312 }
313
314 pub async fn stop(&self) -> Result<()> {
316 info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
317
318 info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
320 self.dnet_task.stop().await;
321
322 info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC server...");
324 self.rpc_task.stop().await;
325
326 info!(target: "darkfid::Darkfid::stop", "Stopping HTTP JSON-RPC server...");
328 self.rpc_task.stop().await;
329
330 info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
332 self.node.p2p_handler.stop().await;
333
334 info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
336 self.consensus_task.stop().await;
337
338 info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
340 let flushed_bytes = self.node.validator.blockchain.sled_db.flush_async().await?;
341 info!(target: "darkfid::Darkfid::stop", "Flushed {flushed_bytes} bytes");
342
343 info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
344 Ok(())
345 }
346}