dchatd/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19// ANCHOR: imports
20use log::{debug, error, info};
21use smol::{lock::Mutex, stream::StreamExt};
22use std::{collections::HashSet, sync::Arc};
23
24use darkfi::{
25    async_daemonize, cli_desc, net,
26    net::settings::SettingsOpt,
27    rpc::{
28        jsonrpc::JsonSubscriber,
29        server::{listen_and_serve, RequestHandler},
30        settings::{RpcSettings, RpcSettingsOpt},
31    },
32    system::{StoppableTask, StoppableTaskPtr},
33    Error, Result,
34};
35
36use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
37
38use crate::{
39    dchatmsg::{DchatMsg, DchatMsgsBuffer},
40    protocol_dchat::ProtocolDchat,
41};
42// ANCHOR_END: imports
43
44pub mod dchat_error;
45pub mod dchatmsg;
46pub mod protocol_dchat;
47pub mod rpc;
48
49const CONFIG_FILE: &str = "dchatd_config.toml";
50const CONFIG_FILE_CONTENTS: &str = include_str!("../dchatd_config.toml");
51
52// ANCHOR: args
53#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
54#[serde(default)]
55#[structopt(name = "dchat", about = cli_desc!())]
56struct Args {
57    #[structopt(flatten)]
58    /// JSON-RPC settings
59    rpc: RpcSettingsOpt,
60
61    #[structopt(short, long)]
62    /// Configuration file to use
63    config: Option<String>,
64
65    #[structopt(short, long)]
66    /// Set log file to ouput into
67    log: Option<String>,
68
69    #[structopt(short, parse(from_occurrences))]
70    /// Increase verbosity (-vvv supported)
71    verbose: u8,
72
73    #[structopt(flatten)]
74    /// P2P network settings
75    net: SettingsOpt,
76}
77// ANCHOR_END: args
78
79// ANCHOR: dchat
80struct Dchat {
81    p2p: net::P2pPtr,
82    recv_msgs: DchatMsgsBuffer,
83    pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
84    pub dnet_sub: JsonSubscriber,
85}
86
87impl Dchat {
88    fn new(
89        p2p: net::P2pPtr,
90        recv_msgs: DchatMsgsBuffer,
91        rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
92        dnet_sub: JsonSubscriber,
93    ) -> Self {
94        Self { p2p, recv_msgs, rpc_connections, dnet_sub }
95    }
96}
97// ANCHOR_END: dchat
98
99// ANCHOR: main
100async_daemonize!(realmain);
101async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
102    let p2p = net::P2p::new(args.net.into(), ex.clone()).await?;
103
104    // ANCHOR: dnet
105    info!("Starting dnet subs task");
106    let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
107    let dnet_sub_ = dnet_sub.clone();
108    let p2p_ = p2p.clone();
109    let dnet_task = StoppableTask::new();
110    dnet_task.clone().start(
111        async move {
112            let dnet_sub = p2p_.dnet_subscribe().await;
113            loop {
114                let event = dnet_sub.receive().await;
115                debug!("Got dnet event: {:?}", event);
116                dnet_sub_.notify(vec![event.into()].into()).await;
117            }
118        },
119        |res| async {
120            match res {
121                Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
122                Err(e) => panic!("{}", e),
123            }
124        },
125        Error::DetachedTaskStopped,
126        ex.clone(),
127    );
128    // ANCHOR_end: dnet
129
130    // ANCHOR: rpc
131    let rpc_settings: RpcSettings = args.rpc.into();
132    info!("Starting JSON-RPC server on port {}", rpc_settings.listen);
133    let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
134    let rpc_connections = Mutex::new(HashSet::new());
135    let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
136    let _ex = ex.clone();
137
138    let rpc_task = StoppableTask::new();
139    rpc_task.clone().start(
140        listen_and_serve(rpc_settings, dchat.clone(), None, ex.clone()),
141        |res| async move {
142            match res {
143                Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
144                Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
145            }
146        },
147        Error::RpcServerStopped,
148        ex.clone(),
149    );
150    // ANCHOR_end: rpc
151
152    // ANCHOR: register_protocol
153    info!("Registering Dchat protocol");
154    let registry = p2p.protocol_registry();
155    registry
156        .register(net::session::SESSION_DEFAULT, move |channel, _p2p| {
157            let msgs_ = msgs.clone();
158            async move { ProtocolDchat::init(channel, msgs_).await }
159        })
160        .await;
161    // ANCHOR_END: register_protocol
162
163    // ANCHOR: p2p_start
164    info!("Starting P2P network");
165    p2p.clone().start().await?;
166    // ANCHOR_END: p2p_start
167
168    // ANCHOR: shutdown
169    let (signals_handler, signals_task) = SignalHandler::new(ex)?;
170    signals_handler.wait_termination(signals_task).await?;
171    info!("Caught termination signal, cleaning up and exiting...");
172
173    info!("Stopping JSON-RPC server");
174    rpc_task.stop().await;
175
176    info!("Stopping dnet tasks");
177    dnet_task.stop().await;
178
179    info!("Stopping P2P network");
180    p2p.stop().await;
181
182    info!("Shut down successfully");
183    // ANCHOR_END: shutdown
184    Ok(())
185}
186// ANCHOR_END: main