1use std::{
20 collections::{HashMap, HashSet},
21 process::exit,
22 sync::Arc,
23 time::UNIX_EPOCH,
24};
25
26use async_trait::async_trait;
27use log::{debug, error, info, warn};
28use semver::Version;
29use smol::{
30 lock::{Mutex, MutexGuard},
31 stream::StreamExt,
32 Executor,
33};
34use structopt::StructOpt;
35use structopt_toml::StructOptToml;
36use tinyjson::JsonValue;
37use toml::Value;
38use url::Url;
39
40use darkfi::{
41 async_daemonize, cli_desc,
42 net::{self, hosts::HostColor, settings::BanPolicy, P2p, P2pPtr},
43 rpc::{
44 jsonrpc::*,
45 server::{listen_and_serve, RequestHandler},
46 settings::{RpcSettings, RpcSettingsOpt},
47 },
48 system::{sleep, StoppableTask, StoppableTaskPtr},
49 util::path::get_config_path,
50 Error, Result,
51};
52
53const CONFIG_FILE: &str = "lilith_config.toml";
54const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
55
56#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
57#[serde(default)]
58#[structopt(name = "lilith", about = cli_desc!())]
59struct Args {
60 #[structopt(flatten)]
61 rpc: RpcSettingsOpt,
63
64 #[structopt(short, long)]
65 config: Option<String>,
67
68 #[structopt(short, long)]
69 log: Option<String>,
71
72 #[structopt(short, parse(from_occurrences))]
73 verbose: u8,
75
76 #[structopt(long, default_value = "120")]
77 whitelist_refinery_interval: u64,
79}
80
81struct Spawn {
83 pub name: String,
85 pub p2p: P2pPtr,
87}
88
89impl Spawn {
90 async fn get_whitelist(&self) -> Vec<JsonValue> {
91 self.p2p
92 .hosts()
93 .container
94 .fetch_all(HostColor::White)
95 .iter()
96 .map(|(addr, _url)| JsonValue::String(addr.to_string()))
97 .collect()
98 }
99
100 async fn get_greylist(&self) -> Vec<JsonValue> {
101 self.p2p
102 .hosts()
103 .container
104 .fetch_all(HostColor::Grey)
105 .iter()
106 .map(|(addr, _url)| JsonValue::String(addr.to_string()))
107 .collect()
108 }
109
110 async fn get_goldlist(&self) -> Vec<JsonValue> {
111 self.p2p
112 .hosts()
113 .container
114 .fetch_all(HostColor::Gold)
115 .iter()
116 .map(|(addr, _url)| JsonValue::String(addr.to_string()))
117 .collect()
118 }
119
120 async fn info(&self) -> JsonValue {
121 let mut addr_vec = vec![];
122 for addr in &self.p2p.settings().read().await.inbound_addrs {
123 addr_vec.push(JsonValue::String(addr.as_ref().to_string()));
124 }
125
126 JsonValue::Object(HashMap::from([
127 ("name".to_string(), JsonValue::String(self.name.clone())),
128 ("urls".to_string(), JsonValue::Array(addr_vec)),
129 ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)),
130 ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)),
131 ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)),
132 ]))
133 }
134}
135
136#[derive(Clone)]
138struct NetInfo {
139 pub accept_addrs: Vec<Url>,
141 pub seeds: Vec<Url>,
143 pub peers: Vec<Url>,
145 pub version: Version,
147 pub localnet: bool,
149 pub datastore: String,
151 pub hostlist: String,
153}
154
155struct Lilith {
157 pub networks: Vec<Spawn>,
159 pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
161}
162
163impl Lilith {
164 async fn whitelist_refinery(
177 network_name: String,
178 p2p: P2pPtr,
179 refinery_interval: u64,
180 ) -> Result<()> {
181 debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\"");
182
183 let hosts = p2p.hosts();
184
185 loop {
186 sleep(refinery_interval).await;
187
188 match hosts.container.fetch_last(HostColor::White) {
189 Some(entry) => {
190 let url = &entry.0;
191 let last_seen = &entry.1;
192
193 if !hosts.refinable(url.clone()) {
194 debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!",
195 url.clone());
196
197 continue
198 }
199
200 if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
201 debug!(target: "net::refinery:::whitelist_refinery",
202 "Host {url} is not responsive. Downgrading from whitelist");
203
204 hosts.greylist_host(url, *last_seen).await?;
205
206 continue
207 }
208
209 debug!(target: "net::refinery::whitelist_refinery",
210 "Peer {url} is responsive. Updating last_seen");
211
212 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
214
215 hosts.whitelist_host(url, last_seen).await?;
216 }
217 None => {
218 debug!(target: "net::refinery::whitelist_refinery",
219 "Whitelist is empty! Cannot start refinery process");
220
221 continue
222 }
223 }
224 }
225 }
226 async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult {
231 let mut spawns = vec![];
232 for spawn in &self.networks {
233 spawns.push(spawn.info().await);
234 }
235
236 let json =
237 JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))]));
238
239 JsonResponse::new(json, id).into()
240 }
241}
242
243#[async_trait]
244impl RequestHandler<()> for Lilith {
245 async fn handle_request(&self, req: JsonRequest) -> JsonResult {
246 return match req.method.as_str() {
247 "ping" => self.pong(req.id, req.params).await,
248 "spawns" => self.spawns(req.id, req.params).await,
249 _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
250 }
251 }
252
253 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
254 self.rpc_connections.lock().await
255 }
256}
257
258fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
261 let mut ret = HashMap::new();
262
263 if let Value::Table(map) = toml::from_str(data)? {
264 if map.contains_key("network") && map["network"].is_table() {
265 for net in map["network"].as_table().unwrap() {
266 info!(target: "lilith", "Found configuration for network: {}", net.0);
267 let table = net.1.as_table().unwrap();
268 if !table.contains_key("accept_addrs") {
269 warn!(target: "lilith", "Network accept addrs are mandatory, skipping network.");
270 continue
271 }
272
273 if !table.contains_key("hostlist") {
274 error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
275 exit(1)
276 }
277
278 let name = net.0.to_string();
279 let accept_addrs: Vec<Url> = table["accept_addrs"]
280 .as_array()
281 .unwrap()
282 .iter()
283 .map(|x| Url::parse(x.as_str().unwrap()).unwrap())
284 .collect();
285
286 let mut seeds = vec![];
287 if table.contains_key("seeds") {
288 if let Some(s) = table["seeds"].as_array() {
289 for seed in s {
290 if let Some(u) = seed.as_str() {
291 if let Ok(url) = Url::parse(u) {
292 seeds.push(url);
293 }
294 }
295 }
296 }
297 }
298
299 let mut peers = vec![];
300 if table.contains_key("peers") {
301 if let Some(p) = table["peers"].as_array() {
302 for peer in p {
303 if let Some(u) = peer.as_str() {
304 if let Ok(url) = Url::parse(u) {
305 peers.push(url);
306 }
307 }
308 }
309 }
310 }
311
312 let localnet = if table.contains_key("localnet") {
313 table["localnet"].as_bool().unwrap()
314 } else {
315 false
316 };
317
318 let version = if table.contains_key("version") {
319 semver::Version::parse(table["version"].as_str().unwrap())?
320 } else {
321 semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
322 };
323
324 let datastore: String = table["datastore"].as_str().unwrap().to_string();
325
326 let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
327
328 let net_info =
329 NetInfo { accept_addrs, seeds, peers, version, localnet, datastore, hostlist };
330 ret.insert(name, net_info);
331 }
332 }
333 }
334
335 Ok(ret)
336}
337
338async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
339 let mut listen_urls = vec![];
340
341 for url in &info.accept_addrs {
343 listen_urls.push(url.clone());
344 }
345
346 let settings = net::Settings {
348 inbound_addrs: listen_urls.clone(),
349 seeds: info.seeds.clone(),
350 peers: info.peers.clone(),
351 outbound_connections: 0,
352 outbound_connect_timeout: 30,
353 inbound_connections: 512,
354 app_version: info.version.clone(),
355 localnet: info.localnet,
356 p2p_datastore: Some(info.datastore.clone()),
357 hostlist: Some(info.hostlist.clone()),
358 allowed_transports: vec![
359 "tcp".to_string(),
360 "tcp+tls".to_string(),
361 "tor".to_string(),
362 "tor+tls".to_string(),
363 "nym".to_string(),
364 "nym+tls".to_string(),
365 "i2p".to_string(),
366 "i2p+tls".to_string(),
367 ],
368 ban_policy: BanPolicy::Relaxed,
369 ..Default::default()
370 };
371
372 let p2p = P2p::new(settings, ex.clone()).await?;
374
375 let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
376 info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}");
377 p2p.clone().start().await?;
378
379 let spawn = Spawn { name, p2p };
380 Ok(spawn)
381}
382
383async_daemonize!(realmain);
384async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
385 let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
387 let toml_contents = std::fs::read_to_string(cfg_path)?;
388 let configured_nets = parse_configured_networks(&toml_contents)?;
389
390 if configured_nets.is_empty() {
391 error!(target: "lilith", "No networks are enabled in config");
392 exit(1);
393 }
394
395 let mut networks = vec![];
397 for (name, info) in &configured_nets {
398 match spawn_net(name.to_string(), info, ex.clone()).await {
399 Ok(spawn) => networks.push(spawn),
400 Err(e) => {
401 error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}");
402 exit(1);
403 }
404 }
405 }
406
407 let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
409 let mut refinery_tasks = HashMap::new();
410 for network in &lilith.networks {
411 let name = network.name.clone();
412 let task = StoppableTask::new();
413 task.clone().start(
414 Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval),
415 |res| async move {
416 match res {
417 Ok(()) | Err(Error::DetachedTaskStopped) => { }
418 Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"),
419 }
420 },
421 Error::DetachedTaskStopped,
422 ex.clone(),
423 );
424 refinery_tasks.insert(network.name.clone(), task);
425 }
426
427 let rpc_settings: RpcSettings = args.rpc.into();
429 info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen);
430 let lilith_ = lilith.clone();
431 let rpc_task = StoppableTask::new();
432 rpc_task.clone().start(
433 listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()),
434 |res| async move {
435 match res {
436 Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
437 Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"),
438 }
439 },
440 Error::RpcServerStopped,
441 ex.clone(),
442 );
443
444 let (signals_handler, signals_task) = SignalHandler::new(ex)?;
446 signals_handler.wait_termination(signals_task).await?;
447 info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
448
449 info!(target: "lilith", "Stopping JSON-RPC server...");
450 rpc_task.stop().await;
451
452 for spawn in &lilith.networks {
454 info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
455 refinery_tasks.get(&spawn.name).unwrap().stop().await;
456 info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
457 spawn.p2p.stop().await;
458 }
459
460 info!(target: "lilith", "Bye!");
461 Ok(())
462}