dchatd/
main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/* This file is part of DarkFi (https://dark.fi)
 *
 * Copyright (C) 2020-2024 Dyne.org foundation
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

// ANCHOR: imports
use log::{debug, error, info};
use smol::{lock::Mutex, stream::StreamExt};
use std::{collections::HashSet, sync::Arc};
use url::Url;

use darkfi::{
    async_daemonize, cli_desc, net,
    net::settings::SettingsOpt,
    rpc::{
        jsonrpc::JsonSubscriber,
        server::{listen_and_serve, RequestHandler},
    },
    system::{StoppableTask, StoppableTaskPtr},
    Error, Result,
};

use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};

use crate::{
    dchatmsg::{DchatMsg, DchatMsgsBuffer},
    protocol_dchat::ProtocolDchat,
};
// ANCHOR_END: imports

pub mod dchat_error;
pub mod dchatmsg;
pub mod protocol_dchat;
pub mod rpc;

const CONFIG_FILE: &str = "dchatd_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../dchatd_config.toml");

// ANCHOR: args
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "dchat", about = cli_desc!())]
struct Args {
    #[structopt(long, default_value = "tcp://127.0.0.1:51054")]
    /// RPC server listen address
    rpc_listen: Url,

    #[structopt(short, long)]
    /// Configuration file to use
    config: Option<String>,

    #[structopt(short, long)]
    /// Set log file to ouput into
    log: Option<String>,

    #[structopt(short, parse(from_occurrences))]
    /// Increase verbosity (-vvv supported)
    verbose: u8,

    /// P2P network settings
    #[structopt(flatten)]
    net: SettingsOpt,
}
// ANCHOR_END: args

// ANCHOR: dchat
struct Dchat {
    p2p: net::P2pPtr,
    recv_msgs: DchatMsgsBuffer,
    pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
    pub dnet_sub: JsonSubscriber,
}

impl Dchat {
    fn new(
        p2p: net::P2pPtr,
        recv_msgs: DchatMsgsBuffer,
        rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
        dnet_sub: JsonSubscriber,
    ) -> Self {
        Self { p2p, recv_msgs, rpc_connections, dnet_sub }
    }
}
// ANCHOR_END: dchat

// ANCHOR: main
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
    let p2p = net::P2p::new(args.net.into(), ex.clone()).await?;

    // ANCHOR: dnet
    info!("Starting dnet subs task");
    let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
    let dnet_sub_ = dnet_sub.clone();
    let p2p_ = p2p.clone();
    let dnet_task = StoppableTask::new();
    dnet_task.clone().start(
        async move {
            let dnet_sub = p2p_.dnet_subscribe().await;
            loop {
                let event = dnet_sub.receive().await;
                debug!("Got dnet event: {:?}", event);
                dnet_sub_.notify(vec![event.into()].into()).await;
            }
        },
        |res| async {
            match res {
                Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
                Err(e) => panic!("{}", e),
            }
        },
        Error::DetachedTaskStopped,
        ex.clone(),
    );
    // ANCHOR_end: dnet

    // ANCHOR: rpc
    info!("Starting JSON-RPC server on port {}", args.rpc_listen);
    let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
    let rpc_connections = Mutex::new(HashSet::new());
    let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
    let _ex = ex.clone();

    let rpc_task = StoppableTask::new();
    rpc_task.clone().start(
        listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()),
        |res| async move {
            match res {
                Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
                Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
            }
        },
        Error::RpcServerStopped,
        ex.clone(),
    );
    // ANCHOR_end: rpc

    // ANCHOR: register_protocol
    info!("Registering Dchat protocol");
    let registry = p2p.protocol_registry();
    registry
        .register(net::session::SESSION_DEFAULT, move |channel, _p2p| {
            let msgs_ = msgs.clone();
            async move { ProtocolDchat::init(channel, msgs_).await }
        })
        .await;
    // ANCHOR_END: register_protocol

    // ANCHOR: p2p_start
    info!("Starting P2P network");
    p2p.clone().start().await?;
    // ANCHOR_END: p2p_start

    // ANCHOR: shutdown
    let (signals_handler, signals_task) = SignalHandler::new(ex)?;
    signals_handler.wait_termination(signals_task).await?;
    info!("Caught termination signal, cleaning up and exiting...");

    info!("Stopping JSON-RPC server");
    rpc_task.stop().await;

    info!("Stopping dnet tasks");
    dnet_task.stop().await;

    info!("Stopping P2P network");
    p2p.stop().await;

    info!("Shut down successfully");
    // ANCHOR_END: shutdown
    Ok(())
}
// ANCHOR_END: main