darkfi/net/protocol/
protocol_ping.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    sync::Arc,
21    time::{Duration, Instant},
22};
23
24use async_trait::async_trait;
25use log::{debug, error, warn};
26use rand::{rngs::OsRng, Rng};
27use smol::{lock::RwLock as AsyncRwLock, Executor};
28
29use super::{
30    super::{
31        channel::ChannelPtr,
32        message::{PingMessage, PongMessage},
33        message_publisher::MessageSubscription,
34        p2p::P2pPtr,
35        settings::Settings,
36    },
37    protocol_base::{ProtocolBase, ProtocolBasePtr},
38    protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
39};
40use crate::{
41    system::{sleep, timeout::timeout},
42    Error, Result,
43};
44
45/// Defines ping and pong messages
46pub struct ProtocolPing {
47    channel: ChannelPtr,
48    ping_sub: MessageSubscription<PingMessage>,
49    pong_sub: MessageSubscription<PongMessage>,
50    settings: Arc<AsyncRwLock<Settings>>,
51    jobsman: ProtocolJobsManagerPtr,
52}
53
54const PROTO_NAME: &str = "ProtocolPing";
55
56impl ProtocolPing {
57    /// Create a new ping-pong protocol.
58    pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
59        // Creates a subscription to ping message
60        let ping_sub =
61            channel.subscribe_msg::<PingMessage>().await.expect("Missing ping dispatcher!");
62
63        // Creates a subscription to pong message
64        let pong_sub =
65            channel.subscribe_msg::<PongMessage>().await.expect("Missing pong dispatcher!");
66
67        Arc::new(Self {
68            channel: channel.clone(),
69            ping_sub,
70            pong_sub,
71            settings: p2p.settings(),
72            jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
73        })
74    }
75
76    /// Runs the ping-pong protocol. Creates a subscription to pong, then
77    /// starts a loop. Loop sleeps for the duration of the channel heartbeat,
78    /// then sends a ping message with a random nonce. Loop starts a timer,
79    /// waits for the pong reply and ensures the nonce is the same.
80    async fn run_ping_pong(self: Arc<Self>) -> Result<()> {
81        debug!(
82            target: "net::protocol_ping::run_ping_pong()",
83            "START => address={}", self.channel.address(),
84        );
85
86        loop {
87            let settings = self.settings.read().await;
88            let outbound_connect_timeout = settings.outbound_connect_timeout;
89            let channel_heartbeat_interval = settings.channel_heartbeat_interval;
90            drop(settings);
91
92            // Create a random nonce.
93            let nonce = Self::random_nonce();
94
95            // Send ping message.
96            let ping = PingMessage { nonce };
97            self.channel.send(&ping).await?;
98
99            // Start the timer for the ping timer
100            let timer = Instant::now();
101
102            // Wait for pong, check nonce matches.
103            let pong_msg = match timeout(
104                Duration::from_secs(outbound_connect_timeout),
105                self.pong_sub.receive(),
106            )
107            .await
108            {
109                Ok(msg) => {
110                    // msg will be an error when the channel is stopped
111                    // so just yield out of this function.
112                    msg?
113                }
114                Err(_e) => {
115                    // Pong timeout. We didn't receive any message back
116                    // so close the connection.
117                    warn!(
118                        target: "net::protocol_ping::run_ping_pong()",
119                        "[P2P] Ping-Pong protocol timed out for {}", self.channel.address(),
120                    );
121                    self.channel.stop().await;
122                    return Err(Error::ChannelStopped)
123                }
124            };
125
126            if pong_msg.nonce != nonce {
127                error!(
128                    target: "net::protocol_ping::run_ping_pong()",
129                    "[P2P] Wrong nonce in pingpong, disconnecting {}",
130                    self.channel.address(),
131                );
132                self.channel.stop().await;
133                return Err(Error::ChannelStopped)
134            }
135
136            debug!(
137                target: "net::protocol_ping::run_ping_pong()",
138                "Received Pong from {}: {:?}",
139                self.channel.address(),
140                timer.elapsed(),
141            );
142
143            // Sleep until next heartbeat
144            sleep(channel_heartbeat_interval).await;
145        }
146    }
147
148    /// Waits for ping, then replies with pong.
149    /// Copies ping's nonce into the pong reply.
150    async fn reply_to_ping(self: Arc<Self>) -> Result<()> {
151        debug!(
152            target: "net::protocol_ping::reply_to_ping()",
153            "START => address={}", self.channel.address(),
154        );
155
156        loop {
157            // Wait for ping, reply with pong that has a matching nonce.
158            let ping = self.ping_sub.receive().await?;
159            debug!(
160                target: "net::protocol_ping::reply_to_ping()",
161                "Received Ping from {}", self.channel.address(),
162            );
163
164            // Send pong message
165            let pong = PongMessage { nonce: ping.nonce };
166            self.channel.send(&pong).await?;
167
168            debug!(
169                target: "net::protocol_ping::reply_to_ping()",
170                "Sent Pong reply to {}", self.channel.address(),
171            );
172        }
173    }
174
175    fn random_nonce() -> u16 {
176        OsRng::gen(&mut OsRng)
177    }
178}
179
180#[async_trait]
181impl ProtocolBase for ProtocolPing {
182    /// Starts ping-pong keepalive messages exchange. Runs ping-pong in the
183    /// protocol task manager, then queues the reply. Sends out a ping and
184    /// waits for pong reply. Waits for ping and replies with a pong.
185    async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
186        debug!(target: "net::protocol_ping::start()", "START => address={}", self.channel.address());
187        self.jobsman.clone().start(ex.clone());
188        self.jobsman.clone().spawn(self.clone().run_ping_pong(), ex.clone()).await;
189        self.jobsman.clone().spawn(self.clone().reply_to_ping(), ex).await;
190        debug!(target: "net::protocol_ping::start()", "END => address={}", self.channel.address());
191        Ok(())
192    }
193
194    fn name(&self) -> &'static str {
195        PROTO_NAME
196    }
197}