lilith/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    process::exit,
22    sync::Arc,
23    time::UNIX_EPOCH,
24};
25
26use async_trait::async_trait;
27use log::{debug, error, info, warn};
28use semver::Version;
29use smol::{
30    lock::{Mutex, MutexGuard},
31    stream::StreamExt,
32    Executor,
33};
34use structopt::StructOpt;
35use structopt_toml::StructOptToml;
36use tinyjson::JsonValue;
37use toml::Value;
38use url::Url;
39
40use darkfi::{
41    async_daemonize, cli_desc,
42    net::{self, hosts::HostColor, settings::BanPolicy, P2p, P2pPtr},
43    rpc::{
44        jsonrpc::*,
45        server::{listen_and_serve, RequestHandler},
46        settings::{RpcSettings, RpcSettingsOpt},
47    },
48    system::{sleep, StoppableTask, StoppableTaskPtr},
49    util::path::get_config_path,
50    Error, Result,
51};
52
53const CONFIG_FILE: &str = "lilith_config.toml";
54const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
55
56#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
57#[serde(default)]
58#[structopt(name = "lilith", about = cli_desc!())]
59struct Args {
60    #[structopt(flatten)]
61    /// JSON-RPC settings
62    rpc: RpcSettingsOpt,
63
64    #[structopt(short, long)]
65    /// Configuration file to use
66    config: Option<String>,
67
68    #[structopt(short, long)]
69    /// Set log file to ouput into
70    log: Option<String>,
71
72    #[structopt(short, parse(from_occurrences))]
73    /// Increase verbosity (-vvv supported)
74    verbose: u8,
75
76    #[structopt(long, default_value = "120")]
77    /// Interval after which to check whitelist peers
78    whitelist_refinery_interval: u64,
79}
80
81/// Struct representing a spawned P2P network
82struct Spawn {
83    /// String identifier,
84    pub name: String,
85    /// P2P pointer
86    pub p2p: P2pPtr,
87}
88
89impl Spawn {
90    async fn get_whitelist(&self) -> Vec<JsonValue> {
91        self.p2p
92            .hosts()
93            .container
94            .fetch_all(HostColor::White)
95            .iter()
96            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
97            .collect()
98    }
99
100    async fn get_greylist(&self) -> Vec<JsonValue> {
101        self.p2p
102            .hosts()
103            .container
104            .fetch_all(HostColor::Grey)
105            .iter()
106            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
107            .collect()
108    }
109
110    async fn get_goldlist(&self) -> Vec<JsonValue> {
111        self.p2p
112            .hosts()
113            .container
114            .fetch_all(HostColor::Gold)
115            .iter()
116            .map(|(addr, _url)| JsonValue::String(addr.to_string()))
117            .collect()
118    }
119
120    async fn info(&self) -> JsonValue {
121        let mut addr_vec = vec![];
122        for addr in &self.p2p.settings().read().await.inbound_addrs {
123            addr_vec.push(JsonValue::String(addr.as_ref().to_string()));
124        }
125
126        JsonValue::Object(HashMap::from([
127            ("name".to_string(), JsonValue::String(self.name.clone())),
128            ("urls".to_string(), JsonValue::Array(addr_vec)),
129            ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)),
130            ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)),
131            ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)),
132        ]))
133    }
134}
135
136/// Defines the network-specific settings
137#[derive(Clone)]
138struct NetInfo {
139    /// Accept addresses the network will use
140    pub accept_addrs: Vec<Url>,
141    /// Other seeds to connect to
142    pub seeds: Vec<Url>,
143    /// Manual peers to connect to
144    pub peers: Vec<Url>,
145    /// Supported network version
146    pub version: Version,
147    /// Enable localnet hosts
148    pub localnet: bool,
149    /// Path to P2P datastore
150    pub datastore: String,
151    /// Path to hostlist
152    pub hostlist: String,
153}
154
155/// Struct representing the daemon
156struct Lilith {
157    /// Spawned networks
158    pub networks: Vec<Spawn>,
159    /// JSON-RPC connection tracker
160    pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
161}
162
163impl Lilith {
164    /// Since `Lilith` does not make outbound connections, if a peer is
165    /// upgraded to whitelist it will remain on the whitelist even if the
166    /// give peer is no longer online.
167    ///
168    /// To protect `Lilith` from sharing potentially offline nodes,
169    /// `whitelist_refinery` periodically ping nodes on the whitelist. If they
170    /// are reachable, we update their last seen field. Otherwise, we downgrade
171    /// them to the greylist.
172    ///
173    /// Note: if `Lilith` loses connectivity this method will delete peers from
174    /// the whitelist, meaning `Lilith` will need to rebuild its hostlist when
175    /// it comes back online.
176    async fn whitelist_refinery(
177        network_name: String,
178        p2p: P2pPtr,
179        refinery_interval: u64,
180    ) -> Result<()> {
181        debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\"");
182
183        let hosts = p2p.hosts();
184
185        loop {
186            sleep(refinery_interval).await;
187
188            match hosts.container.fetch_last(HostColor::White) {
189                Some(entry) => {
190                    let url = &entry.0;
191                    let last_seen = &entry.1;
192
193                    if !hosts.refinable(url.clone()) {
194                        debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!",
195                       url.clone());
196
197                        continue
198                    }
199
200                    if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
201                        debug!(target: "net::refinery:::whitelist_refinery",
202                       "Host {url} is not responsive. Downgrading from whitelist");
203
204                        hosts.greylist_host(url, *last_seen).await?;
205
206                        continue
207                    }
208
209                    debug!(target: "net::refinery::whitelist_refinery",
210                   "Peer {url} is responsive. Updating last_seen");
211
212                    // This node is active. Update the last seen field.
213                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
214
215                    hosts.whitelist_host(url, last_seen).await?;
216                }
217                None => {
218                    debug!(target: "net::refinery::whitelist_refinery",
219                              "Whitelist is empty! Cannot start refinery process");
220
221                    continue
222                }
223            }
224        }
225    }
226    // RPCAPI:
227    // Returns all spawned networks names with their node addresses.
228    // --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42}
229    // <-- {"jsonrpc": "2.0", "result": {"spawns": spawns_info}, "id": 42}
230    async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult {
231        let mut spawns = vec![];
232        for spawn in &self.networks {
233            spawns.push(spawn.info().await);
234        }
235
236        let json =
237            JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))]));
238
239        JsonResponse::new(json, id).into()
240    }
241}
242
243#[async_trait]
244impl RequestHandler<()> for Lilith {
245    async fn handle_request(&self, req: JsonRequest) -> JsonResult {
246        return match req.method.as_str() {
247            "ping" => self.pong(req.id, req.params).await,
248            "spawns" => self.spawns(req.id, req.params).await,
249            _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
250        }
251    }
252
253    async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
254        self.rpc_connections.lock().await
255    }
256}
257
258/// Parse a TOML string for any configured network and return a map containing
259/// said configurations.
260fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
261    let mut ret = HashMap::new();
262
263    if let Value::Table(map) = toml::from_str(data)? {
264        if map.contains_key("network") && map["network"].is_table() {
265            for net in map["network"].as_table().unwrap() {
266                info!(target: "lilith", "Found configuration for network: {}", net.0);
267                let table = net.1.as_table().unwrap();
268                if !table.contains_key("accept_addrs") {
269                    warn!(target: "lilith", "Network accept addrs are mandatory, skipping network.");
270                    continue
271                }
272
273                if !table.contains_key("hostlist") {
274                    error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
275                    exit(1)
276                }
277
278                let name = net.0.to_string();
279                let accept_addrs: Vec<Url> = table["accept_addrs"]
280                    .as_array()
281                    .unwrap()
282                    .iter()
283                    .map(|x| Url::parse(x.as_str().unwrap()).unwrap())
284                    .collect();
285
286                let mut seeds = vec![];
287                if table.contains_key("seeds") {
288                    if let Some(s) = table["seeds"].as_array() {
289                        for seed in s {
290                            if let Some(u) = seed.as_str() {
291                                if let Ok(url) = Url::parse(u) {
292                                    seeds.push(url);
293                                }
294                            }
295                        }
296                    }
297                }
298
299                let mut peers = vec![];
300                if table.contains_key("peers") {
301                    if let Some(p) = table["peers"].as_array() {
302                        for peer in p {
303                            if let Some(u) = peer.as_str() {
304                                if let Ok(url) = Url::parse(u) {
305                                    peers.push(url);
306                                }
307                            }
308                        }
309                    }
310                }
311
312                let localnet = if table.contains_key("localnet") {
313                    table["localnet"].as_bool().unwrap()
314                } else {
315                    false
316                };
317
318                let version = if table.contains_key("version") {
319                    semver::Version::parse(table["version"].as_str().unwrap())?
320                } else {
321                    semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
322                };
323
324                let datastore: String = table["datastore"].as_str().unwrap().to_string();
325
326                let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
327
328                let net_info =
329                    NetInfo { accept_addrs, seeds, peers, version, localnet, datastore, hostlist };
330                ret.insert(name, net_info);
331            }
332        }
333    }
334
335    Ok(ret)
336}
337
338async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
339    let mut listen_urls = vec![];
340
341    // Configure listen addrs for this network
342    for url in &info.accept_addrs {
343        listen_urls.push(url.clone());
344    }
345
346    // P2P network settings
347    let settings = net::Settings {
348        inbound_addrs: listen_urls.clone(),
349        seeds: info.seeds.clone(),
350        peers: info.peers.clone(),
351        outbound_connections: 0,
352        outbound_connect_timeout: 30,
353        inbound_connections: 512,
354        app_version: info.version.clone(),
355        localnet: info.localnet,
356        p2p_datastore: Some(info.datastore.clone()),
357        hostlist: Some(info.hostlist.clone()),
358        allowed_transports: vec![
359            "tcp".to_string(),
360            "tcp+tls".to_string(),
361            "tor".to_string(),
362            "tor+tls".to_string(),
363            "nym".to_string(),
364            "nym+tls".to_string(),
365            "i2p".to_string(),
366            "i2p+tls".to_string(),
367        ],
368        ban_policy: BanPolicy::Relaxed,
369        ..Default::default()
370    };
371
372    // Create P2P instance
373    let p2p = P2p::new(settings, ex.clone()).await?;
374
375    let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
376    info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}");
377    p2p.clone().start().await?;
378
379    let spawn = Spawn { name, p2p };
380    Ok(spawn)
381}
382
383async_daemonize!(realmain);
384async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
385    // Pick up network settings from the TOML config
386    let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
387    let toml_contents = std::fs::read_to_string(cfg_path)?;
388    let configured_nets = parse_configured_networks(&toml_contents)?;
389
390    if configured_nets.is_empty() {
391        error!(target: "lilith", "No networks are enabled in config");
392        exit(1);
393    }
394
395    // Spawn configured networks
396    let mut networks = vec![];
397    for (name, info) in &configured_nets {
398        match spawn_net(name.to_string(), info, ex.clone()).await {
399            Ok(spawn) => networks.push(spawn),
400            Err(e) => {
401                error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}");
402                exit(1);
403            }
404        }
405    }
406
407    // Set up main daemon and background refinery_tasks
408    let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
409    let mut refinery_tasks = HashMap::new();
410    for network in &lilith.networks {
411        let name = network.name.clone();
412        let task = StoppableTask::new();
413        task.clone().start(
414            Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval),
415            |res| async move {
416                match res {
417                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
418                    Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"),
419                }
420            },
421            Error::DetachedTaskStopped,
422            ex.clone(),
423        );
424        refinery_tasks.insert(network.name.clone(), task);
425    }
426
427    // JSON-RPC server
428    let rpc_settings: RpcSettings = args.rpc.into();
429    info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen);
430    let lilith_ = lilith.clone();
431    let rpc_task = StoppableTask::new();
432    rpc_task.clone().start(
433        listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()),
434        |res| async move {
435            match res {
436                Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
437                Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"),
438            }
439        },
440        Error::RpcServerStopped,
441        ex.clone(),
442    );
443
444    // Signal handling for graceful termination.
445    let (signals_handler, signals_task) = SignalHandler::new(ex)?;
446    signals_handler.wait_termination(signals_task).await?;
447    info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
448
449    info!(target: "lilith", "Stopping JSON-RPC server...");
450    rpc_task.stop().await;
451
452    // Cleanly stop p2p networks
453    for spawn in &lilith.networks {
454        info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
455        refinery_tasks.get(&spawn.name).unwrap().stop().await;
456        info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
457        spawn.p2p.stop().await;
458    }
459
460    info!(target: "lilith", "Bye!");
461    Ok(())
462}