use log::{debug, error, info};
use smol::{lock::Mutex, stream::StreamExt};
use std::{collections::HashSet, sync::Arc};
use url::Url;
use darkfi::{
async_daemonize, cli_desc, net,
net::settings::SettingsOpt,
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{StoppableTask, StoppableTaskPtr},
Error, Result,
};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use crate::{
dchatmsg::{DchatMsg, DchatMsgsBuffer},
protocol_dchat::ProtocolDchat,
};
pub mod dchat_error;
pub mod dchatmsg;
pub mod protocol_dchat;
pub mod rpc;
const CONFIG_FILE: &str = "dchatd_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../dchatd_config.toml");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "dchat", about = cli_desc!())]
struct Args {
#[structopt(long, default_value = "tcp://127.0.0.1:51054")]
rpc_listen: Url,
#[structopt(short, long)]
config: Option<String>,
#[structopt(short, long)]
log: Option<String>,
#[structopt(short, parse(from_occurrences))]
verbose: u8,
#[structopt(flatten)]
net: SettingsOpt,
}
struct Dchat {
p2p: net::P2pPtr,
recv_msgs: DchatMsgsBuffer,
pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
pub dnet_sub: JsonSubscriber,
}
impl Dchat {
fn new(
p2p: net::P2pPtr,
recv_msgs: DchatMsgsBuffer,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
dnet_sub: JsonSubscriber,
) -> Self {
Self { p2p, recv_msgs, rpc_connections, dnet_sub }
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
let p2p = net::P2p::new(args.net.into(), ex.clone()).await?;
info!("Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
let dnet_task = StoppableTask::new();
dnet_task.clone().start(
async move {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
debug!("Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => panic!("{}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
info!("Starting JSON-RPC server on port {}", args.rpc_listen);
let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
let rpc_connections = Mutex::new(HashSet::new());
let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
let _ex = ex.clone();
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
ex.clone(),
);
info!("Registering Dchat protocol");
let registry = p2p.protocol_registry();
registry
.register(net::session::SESSION_DEFAULT, move |channel, _p2p| {
let msgs_ = msgs.clone();
async move { ProtocolDchat::init(channel, msgs_).await }
})
.await;
info!("Starting P2P network");
p2p.clone().start().await?;
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!("Caught termination signal, cleaning up and exiting...");
info!("Stopping JSON-RPC server");
rpc_task.stop().await;
info!("Stopping dnet tasks");
dnet_task.stop().await;
info!("Stopping P2P network");
p2p.stop().await;
info!("Shut down successfully");
Ok(())
}