dchatd/
protocol_dchat.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19// ANCHOR: protocol_dchat
20use async_trait::async_trait;
21use darkfi::{net, Result};
22use log::debug;
23use smol::Executor;
24use std::sync::Arc;
25
26use crate::dchatmsg::{DchatMsg, DchatMsgsBuffer};
27
28pub struct ProtocolDchat {
29    jobsman: net::ProtocolJobsManagerPtr,
30    msg_sub: net::MessageSubscription<DchatMsg>,
31    msgs: DchatMsgsBuffer,
32}
33// ANCHOR_END: protocol_dchat
34
35// ANCHOR: constructor
36impl ProtocolDchat {
37    pub async fn init(channel: net::ChannelPtr, msgs: DchatMsgsBuffer) -> net::ProtocolBasePtr {
38        debug!(target: "dchat", "ProtocolDchat::init() [START]");
39        let message_subsytem = channel.message_subsystem();
40        message_subsytem.add_dispatch::<DchatMsg>().await;
41
42        let msg_sub =
43            channel.subscribe_msg::<DchatMsg>().await.expect("Missing DchatMsg dispatcher!");
44
45        Arc::new(Self {
46            jobsman: net::ProtocolJobsManager::new("ProtocolDchat", channel.clone()),
47            msg_sub,
48            msgs,
49        })
50    }
51    // ANCHOR_END: constructor
52
53    // ANCHOR: receive
54    async fn handle_receive_msg(self: Arc<Self>) -> Result<()> {
55        debug!(target: "dchat", "ProtocolDchat::handle_receive_msg() [START]");
56        while let Ok(msg) = self.msg_sub.receive().await {
57            let msg = (*msg).to_owned();
58            self.msgs.lock().await.push(msg);
59        }
60
61        Ok(())
62    }
63    // ANCHOR_END: receive
64}
65
66#[async_trait]
67impl net::ProtocolBase for ProtocolDchat {
68    // ANCHOR: start
69    async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
70        debug!(target: "dchat", "ProtocolDchat::ProtocolBase::start() [START]");
71        self.jobsman.clone().start(executor.clone());
72        self.jobsman.clone().spawn(self.clone().handle_receive_msg(), executor.clone()).await;
73        debug!(target: "dchat", "ProtocolDchat::ProtocolBase::start() [STOP]");
74        Ok(())
75    }
76    // ANCHOR_END: start
77
78    fn name(&self) -> &'static str {
79        "ProtocolDchat"
80    }
81}