darkfi/net/protocol/
protocol_ping.rs1use std::{
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24use async_trait::async_trait;
25use log::{debug, error, warn};
26use rand::{rngs::OsRng, Rng};
27use smol::{lock::RwLock as AsyncRwLock, Executor};
28
29use super::{
30 super::{
31 channel::ChannelPtr,
32 message::{PingMessage, PongMessage},
33 message_publisher::MessageSubscription,
34 p2p::P2pPtr,
35 settings::Settings,
36 },
37 protocol_base::{ProtocolBase, ProtocolBasePtr},
38 protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
39};
40use crate::{
41 system::{sleep, timeout::timeout},
42 Error, Result,
43};
44
45pub struct ProtocolPing {
47 channel: ChannelPtr,
48 ping_sub: MessageSubscription<PingMessage>,
49 pong_sub: MessageSubscription<PongMessage>,
50 settings: Arc<AsyncRwLock<Settings>>,
51 jobsman: ProtocolJobsManagerPtr,
52}
53
54const PROTO_NAME: &str = "ProtocolPing";
55
56impl ProtocolPing {
57 pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
59 let ping_sub =
61 channel.subscribe_msg::<PingMessage>().await.expect("Missing ping dispatcher!");
62
63 let pong_sub =
65 channel.subscribe_msg::<PongMessage>().await.expect("Missing pong dispatcher!");
66
67 Arc::new(Self {
68 channel: channel.clone(),
69 ping_sub,
70 pong_sub,
71 settings: p2p.settings(),
72 jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
73 })
74 }
75
76 async fn run_ping_pong(self: Arc<Self>) -> Result<()> {
81 debug!(
82 target: "net::protocol_ping::run_ping_pong()",
83 "START => address={}", self.channel.address(),
84 );
85
86 loop {
87 let settings = self.settings.read().await;
88 let outbound_connect_timeout = settings.outbound_connect_timeout;
89 let channel_heartbeat_interval = settings.channel_heartbeat_interval;
90 drop(settings);
91
92 let nonce = Self::random_nonce();
94
95 let ping = PingMessage { nonce };
97 self.channel.send(&ping).await?;
98
99 let timer = Instant::now();
101
102 let pong_msg = match timeout(
104 Duration::from_secs(outbound_connect_timeout),
105 self.pong_sub.receive(),
106 )
107 .await
108 {
109 Ok(msg) => {
110 msg?
113 }
114 Err(_e) => {
115 warn!(
118 target: "net::protocol_ping::run_ping_pong()",
119 "[P2P] Ping-Pong protocol timed out for {}", self.channel.address(),
120 );
121 self.channel.stop().await;
122 return Err(Error::ChannelStopped)
123 }
124 };
125
126 if pong_msg.nonce != nonce {
127 error!(
128 target: "net::protocol_ping::run_ping_pong()",
129 "[P2P] Wrong nonce in pingpong, disconnecting {}",
130 self.channel.address(),
131 );
132 self.channel.stop().await;
133 return Err(Error::ChannelStopped)
134 }
135
136 debug!(
137 target: "net::protocol_ping::run_ping_pong()",
138 "Received Pong from {}: {:?}",
139 self.channel.address(),
140 timer.elapsed(),
141 );
142
143 sleep(channel_heartbeat_interval).await;
145 }
146 }
147
148 async fn reply_to_ping(self: Arc<Self>) -> Result<()> {
151 debug!(
152 target: "net::protocol_ping::reply_to_ping()",
153 "START => address={}", self.channel.address(),
154 );
155
156 loop {
157 let ping = self.ping_sub.receive().await?;
159 debug!(
160 target: "net::protocol_ping::reply_to_ping()",
161 "Received Ping from {}", self.channel.address(),
162 );
163
164 let pong = PongMessage { nonce: ping.nonce };
166 self.channel.send(&pong).await?;
167
168 debug!(
169 target: "net::protocol_ping::reply_to_ping()",
170 "Sent Pong reply to {}", self.channel.address(),
171 );
172 }
173 }
174
175 fn random_nonce() -> u16 {
176 OsRng::gen(&mut OsRng)
177 }
178}
179
180#[async_trait]
181impl ProtocolBase for ProtocolPing {
182 async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
186 debug!(target: "net::protocol_ping::start()", "START => address={}", self.channel.address());
187 self.jobsman.clone().start(ex.clone());
188 self.jobsman.clone().spawn(self.clone().run_ping_pong(), ex.clone()).await;
189 self.jobsman.clone().spawn(self.clone().reply_to_ping(), ex).await;
190 debug!(target: "net::protocol_ping::start()", "END => address={}", self.channel.address());
191 Ok(())
192 }
193
194 fn name(&self) -> &'static str {
195 PROTO_NAME
196 }
197}