1use log::{debug, error, info};
21use smol::{lock::Mutex, stream::StreamExt};
22use std::{collections::HashSet, sync::Arc};
23
24use darkfi::{
25 async_daemonize, cli_desc, net,
26 net::settings::SettingsOpt,
27 rpc::{
28 jsonrpc::JsonSubscriber,
29 server::{listen_and_serve, RequestHandler},
30 settings::{RpcSettings, RpcSettingsOpt},
31 },
32 system::{StoppableTask, StoppableTaskPtr},
33 Error, Result,
34};
35
36use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
37
38use crate::{
39 dchatmsg::{DchatMsg, DchatMsgsBuffer},
40 protocol_dchat::ProtocolDchat,
41};
42pub mod dchat_error;
45pub mod dchatmsg;
46pub mod protocol_dchat;
47pub mod rpc;
48
49const CONFIG_FILE: &str = "dchatd_config.toml";
50const CONFIG_FILE_CONTENTS: &str = include_str!("../dchatd_config.toml");
51
52#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
54#[serde(default)]
55#[structopt(name = "dchat", about = cli_desc!())]
56struct Args {
57 #[structopt(flatten)]
58 rpc: RpcSettingsOpt,
60
61 #[structopt(short, long)]
62 config: Option<String>,
64
65 #[structopt(short, long)]
66 log: Option<String>,
68
69 #[structopt(short, parse(from_occurrences))]
70 verbose: u8,
72
73 #[structopt(flatten)]
74 net: SettingsOpt,
76}
77struct Dchat {
81 p2p: net::P2pPtr,
82 recv_msgs: DchatMsgsBuffer,
83 pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
84 pub dnet_sub: JsonSubscriber,
85}
86
87impl Dchat {
88 fn new(
89 p2p: net::P2pPtr,
90 recv_msgs: DchatMsgsBuffer,
91 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
92 dnet_sub: JsonSubscriber,
93 ) -> Self {
94 Self { p2p, recv_msgs, rpc_connections, dnet_sub }
95 }
96}
97async_daemonize!(realmain);
101async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
102 let p2p = net::P2p::new(args.net.into(), ex.clone()).await?;
103
104 info!("Starting dnet subs task");
106 let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
107 let dnet_sub_ = dnet_sub.clone();
108 let p2p_ = p2p.clone();
109 let dnet_task = StoppableTask::new();
110 dnet_task.clone().start(
111 async move {
112 let dnet_sub = p2p_.dnet_subscribe().await;
113 loop {
114 let event = dnet_sub.receive().await;
115 debug!("Got dnet event: {:?}", event);
116 dnet_sub_.notify(vec![event.into()].into()).await;
117 }
118 },
119 |res| async {
120 match res {
121 Ok(()) | Err(Error::DetachedTaskStopped) => { }
122 Err(e) => panic!("{}", e),
123 }
124 },
125 Error::DetachedTaskStopped,
126 ex.clone(),
127 );
128 let rpc_settings: RpcSettings = args.rpc.into();
132 info!("Starting JSON-RPC server on port {}", rpc_settings.listen);
133 let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
134 let rpc_connections = Mutex::new(HashSet::new());
135 let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
136 let _ex = ex.clone();
137
138 let rpc_task = StoppableTask::new();
139 rpc_task.clone().start(
140 listen_and_serve(rpc_settings, dchat.clone(), None, ex.clone()),
141 |res| async move {
142 match res {
143 Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
144 Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
145 }
146 },
147 Error::RpcServerStopped,
148 ex.clone(),
149 );
150 info!("Registering Dchat protocol");
154 let registry = p2p.protocol_registry();
155 registry
156 .register(net::session::SESSION_DEFAULT, move |channel, _p2p| {
157 let msgs_ = msgs.clone();
158 async move { ProtocolDchat::init(channel, msgs_).await }
159 })
160 .await;
161 info!("Starting P2P network");
165 p2p.clone().start().await?;
166 let (signals_handler, signals_task) = SignalHandler::new(ex)?;
170 signals_handler.wait_termination(signals_task).await?;
171 info!("Caught termination signal, cleaning up and exiting...");
172
173 info!("Stopping JSON-RPC server");
174 rpc_task.stop().await;
175
176 info!("Stopping dnet tasks");
177 dnet_task.stop().await;
178
179 info!("Stopping P2P network");
180 p2p.stop().await;
181
182 info!("Shut down successfully");
183 Ok(())
185}
186