1use std::{collections::HashSet, path::Path, sync::Arc};
20
21use log::{error, info};
22use smol::{lock::Mutex, stream::StreamExt};
23use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
24use url::Url;
25
26use darkfi::{
27 async_daemonize, cli_desc,
28 rpc::server::{listen_and_serve, RequestHandler},
29 system::{StoppableTask, StoppableTaskPtr},
30 util::path::get_config_path,
31 Error, Result,
32};
33
34use crate::{
35 config::ExplorerNetworkConfig,
36 rpc::DarkfidRpcClient,
37 service::{sync::subscribe_sync_blocks, ExplorerService},
38};
39
40mod config;
42
43mod rpc;
45
46mod service;
49
50mod store;
52
53mod error;
55
56#[cfg(test)]
58mod test_utils;
59
60const CONFIG_FILE: &str = "explorerd_config.toml";
61const CONFIG_FILE_CONTENTS: &str = include_str!("../explorerd_config.toml");
62
63#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
64#[serde(default)]
65#[structopt(name = "explorerd", about = cli_desc!())]
66struct Args {
67 #[structopt(short, long)]
68 config: Option<String>,
70
71 #[structopt(short, long, default_value = "testnet")]
72 network: String,
74
75 #[structopt(long)]
76 reset: bool,
78
79 #[structopt(short, long)]
80 log: Option<String>,
82
83 #[structopt(short, parse(from_occurrences))]
84 verbose: u8,
86
87 #[structopt(short, long)]
88 no_sync: bool,
92}
93
94pub struct Explorerd {
104 pub service: ExplorerService,
106 pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
108 pub darkfid_client: Arc<DarkfidRpcClient>,
110 darkfid_endpoint: Url,
112 executor: Arc<smol::Executor<'static>>,
114}
115
116impl Explorerd {
117 async fn new(
119 db_path: String,
120 darkfid_endpoint: Url,
121 ex: Arc<smol::Executor<'static>>,
122 ) -> Result<Self> {
123 let darkfid_client = Arc::new(DarkfidRpcClient::new());
125
126 let service = ExplorerService::new(db_path, darkfid_client.clone())?;
128
129 service.init().await?;
131
132 Ok(Self {
133 service,
134 rpc_connections: Mutex::new(HashSet::new()),
135 darkfid_client,
136 darkfid_endpoint,
137 executor: ex,
138 })
139 }
140
141 async fn connect(&self) -> Result<()> {
144 self.darkfid_client.connect(self.darkfid_endpoint.clone(), self.executor.clone()).await
145 }
146}
147
148async_daemonize!(realmain);
149async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
150 info!(target: "explorerd", "Initializing DarkFi blockchain explorer node...");
151
152 let config_path = get_config_path(args.config.clone(), CONFIG_FILE)?;
154
155 let config: ExplorerNetworkConfig = (&config_path, &args.network).try_into()?;
157
158 let explorer =
160 Explorerd::new(config.database.clone(), config.endpoint.clone(), ex.clone()).await?;
161 let explorer = Arc::new(explorer);
162 info!(target: "explorerd", "Node initialized successfully!");
163
164 let rpc_task = StoppableTask::new();
167 let explorer_ = explorer.clone();
168 rpc_task.clone().start(
169 listen_and_serve(config.rpc.clone().into(), explorer.clone(), None, ex.clone()),
170 |res| async move {
171 match res {
172 Ok(()) | Err(Error::RpcServerStopped) => explorer_.stop_connections().await,
173 Err(e) => {
174 error!(target: "explorerd", "Failed starting sync JSON-RPC server: {e}")
175 }
176 }
177 },
178 Error::RpcServerStopped,
179 ex.clone(),
180 );
181 info!(target: "explorerd", "Started JSON-RPC server: {}", config.rpc.rpc_listen.to_string().trim_end_matches("/"));
182
183 let mut subscriber_task = None;
185 let mut listener_task = None;
186
187 if !args.no_sync {
189 explorer.connect().await?;
190
191 sync_blocks(explorer.clone(), args.reset).await?;
193
194 (subscriber_task, listener_task) =
196 subscribe_blocks(explorer.clone(), config.endpoint.clone(), ex.clone(), args.reset)
197 .await?;
198 }
199
200 log_started_banner(explorer.clone(), &config, &args, &config_path, args.no_sync);
201 info!(target: "explorerd::", "All is good. Waiting for block notifications...");
202
203 let (signals_handler, signals_task) = SignalHandler::new(ex)?;
205 signals_handler.wait_termination(signals_task).await?;
206 info!(target: "explorerd", "Caught termination signal, cleaning up and exiting...");
207
208 info!(target: "explorerd", "Stopping JSON-RPC server...");
209 rpc_task.stop().await;
210
211 if let Some(task) = listener_task {
213 info!(target: "explorerd", "Stopping darkfid listener...");
214 task.stop().await;
215 }
216
217 if let Some(task) = subscriber_task {
219 info!(target: "explorerd", "Stopping darkfid subscriber...");
220 task.stop().await;
221 }
222
223 info!(target: "explorerd", "Stopping JSON-RPC client...");
224 let _ = explorer.darkfid_client.stop().await;
225
226 Ok(())
227}
228
229async fn sync_blocks(explorer: Arc<Explorerd>, reset: bool) -> Result<()> {
231 info!(target: "explorerd", "Syncing blocks from darkfid...");
232 if let Err(e) = explorer.service.sync_blocks(reset).await {
233 let error_message = format!("Error syncing blocks: {e:?}");
234 error!(target: "explorerd", "{error_message}");
235 return Err(Error::DatabaseError(error_message));
236 }
237 Ok(())
238}
239
240async fn subscribe_blocks(
243 explorer: Arc<Explorerd>,
244 endpoint: Url,
245 executor: Arc<smol::Executor<'static>>,
246 reset: bool,
247) -> Result<(Option<StoppableTaskPtr>, Option<StoppableTaskPtr>)> {
248 info!(target: "explorerd", "Subscribing to new blocks...");
249
250 let result = match subscribe_sync_blocks(explorer.clone(), endpoint.clone(), executor.clone())
251 .await
252 {
253 Ok((subscriber_task, listener_task)) => Ok((subscriber_task, listener_task)),
254 Err(e) => {
255 if e.to_string().contains("Blockchain not fully synced") {
257 sync_blocks(explorer.clone(), reset).await?;
258 subscribe_sync_blocks(explorer.clone(), endpoint.clone(), executor.clone()).await
259 } else {
260 let error_message = format!("Error setting up blocks subscriber: {e:?}");
261 error!(target: "explorerd", "{error_message}");
262 return Err(Error::DatabaseError(error_message));
263 }
264 }
265 };
266
267 let (subscriber_task, listener_task) = result?;
268 info!(target: "explorerd", "Successfully subscribed to new blocks!");
269 Ok((Some(subscriber_task), Some(listener_task)))
270}
271
272fn log_started_banner(
274 explorer: Arc<Explorerd>,
275 config: &ExplorerNetworkConfig,
276 args: &Args,
277 config_path: &Path,
278 no_sync: bool,
279) {
280 let connected_node = if no_sync {
282 "Not connected".to_string()
283 } else {
284 config.endpoint.to_string().trim_end_matches('/').to_string()
285 };
286
287 info!(target: "explorerd", "========================================================================================");
289 info!(target: "explorerd", " Started DarkFi Explorer Node{} ",
290 if no_sync { " (No-Sync Mode)" } else { "" });
291 info!(target: "explorerd", "========================================================================================");
292 info!(target: "explorerd", " - Network: {}", args.network);
293 info!(target: "explorerd", " - JSON-RPC Endpoint: {}", config.rpc.rpc_listen.to_string().trim_end_matches('/'));
294 info!(target: "explorerd", " - Database: {}", config.database);
295 info!(target: "explorerd", " - Configuration: {}", config_path.to_str().unwrap_or("Error: configuration path not found!"));
296 info!(target: "explorerd", " - Reset Blocks: {}", if args.reset { "Yes" } else { "No" });
297 info!(target: "explorerd", "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
298 info!(target: "explorerd", " - Synced Blocks: {}", explorer.service.db.blockchain.len());
299 info!(target: "explorerd", " - Synced Transactions: {}", explorer.service.db.blockchain.len());
300 info!(target: "explorerd", " - Connected Darkfi Node: {connected_node}");
301 info!(target: "explorerd", "========================================================================================");
302}