darkfi/net/session/
refine_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! `RefineSession` manages the `GreylistRefinery`, which randomly selects
20//! entries on the greylist and updates them to whitelist if active,
21//!
22//! `GreylistRefinery` makes use of a `RefineSession` method called
23//! `handshake_node()`, which uses a `Connector` to establish a `Channel` with
24//! a provided address, and then does a version exchange across the channel
25//! (`perform_handshake_protocols`). `handshake_node()` can either succeed,
26//! fail, or timeout.
27
28use futures::{
29    future::{select, Either},
30    pin_mut,
31};
32use smol::Timer;
33use std::{
34    sync::{Arc, Weak},
35    time::{Duration, Instant, UNIX_EPOCH},
36};
37
38use async_trait::async_trait;
39use log::{debug, warn};
40use url::Url;
41
42use super::super::p2p::{P2p, P2pPtr};
43
44use crate::{
45    net::{
46        connector::Connector,
47        hosts::{HostColor, HostState},
48        protocol::ProtocolVersion,
49        session::{Session, SessionBitFlag, SESSION_REFINE},
50    },
51    system::{sleep, StoppableTask, StoppableTaskPtr},
52    Error,
53};
54
55pub type RefineSessionPtr = Arc<RefineSession>;
56
57pub struct RefineSession {
58    /// Weak pointer to parent p2p object
59    pub(in crate::net) p2p: Weak<P2p>,
60
61    /// Task that periodically checks entries in the greylist.
62    pub(in crate::net) refinery: Arc<GreylistRefinery>,
63}
64
65impl RefineSession {
66    pub fn new(p2p: Weak<P2p>) -> RefineSessionPtr {
67        Arc::new_cyclic(|session| Self { p2p, refinery: GreylistRefinery::new(session.clone()) })
68    }
69
70    /// Start the refinery and self handshake processes.
71    pub(crate) async fn start(self: Arc<Self>) {
72        if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
73            match self.p2p().hosts().container.load_all(hostlist) {
74                Ok(()) => {
75                    debug!(target: "net::refine_session::start", "Load hosts successful!");
76                }
77                Err(e) => {
78                    warn!(target: "net::refine_session::start", "Error loading hosts {}", e);
79                }
80            }
81        }
82
83        match self.p2p().hosts().import_blacklist().await {
84            Ok(()) => {
85                debug!(target: "net::refine_session::start", "Import blacklist successful!");
86            }
87            Err(e) => {
88                warn!(target: "net::refine_session::start",
89                    "Error importing blacklist from config file {}", e);
90            }
91        }
92
93        debug!(target: "net::refine_session", "Starting greylist refinery process");
94        self.refinery.clone().start().await;
95    }
96
97    /// Stop the refinery and self handshake processes.
98    pub(crate) async fn stop(&self) {
99        debug!(target: "net::refine_session", "Stopping refinery process");
100        self.refinery.clone().stop().await;
101
102        if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
103            match self.p2p().hosts().container.save_all(hostlist) {
104                Ok(()) => {
105                    debug!(target: "net::refine_session::stop()", "Save hosts successful!");
106                }
107                Err(e) => {
108                    warn!(target: "net::refine_session::stop()", "Error saving hosts {}", e);
109                }
110            }
111        }
112    }
113
114    /// Globally accessible function to perform a version exchange with a
115    /// given address.  Returns `true` if an address is accessible, false
116    /// otherwise.  
117    pub async fn handshake_node(self: Arc<Self>, addr: Url, p2p: P2pPtr) -> bool {
118        let self_ = Arc::downgrade(&self);
119        let connector = Connector::new(self.p2p().settings(), self_);
120
121        debug!(target: "net::refinery::handshake_node()", "Attempting to connect to {}", addr);
122        match connector.connect(&addr).await {
123            Ok((url, channel)) => {
124                debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url);
125                // First initialize the version protocol and its Version, Verack subscriptions.
126                let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
127
128                debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url);
129                // Then run the version exchange, store the channel and subscribe to a stop signal.
130                let handshake =
131                    self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());
132
133                debug!(target: "net::refinery::handshake_node()", "Starting channel {}", url);
134                channel.clone().start(p2p.executor());
135
136                // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
137                // the handshake does not finish channel.stop() will never get called, resulting in
138                // zombie processes.
139                let timeout = Timer::after(Duration::from_secs(5));
140
141                pin_mut!(timeout);
142                pin_mut!(handshake);
143
144                let result = match select(handshake, timeout).await {
145                    Either::Left((Ok(_), _)) => {
146                        debug!(target: "net::refinery::handshake_node()", "Handshake success!");
147                        true
148                    }
149                    Either::Left((Err(e), _)) => {
150                        debug!(target: "net::refinery::handshake_node()", "Handshake error={}", e);
151                        false
152                    }
153                    Either::Right((_, _)) => {
154                        debug!(target: "net::refinery::handshake_node()", "Handshake timed out");
155                        false
156                    }
157                };
158
159                debug!(target: "net::refinery::handshake_node()", "Stopping channel {}", url);
160                channel.stop().await;
161
162                result
163            }
164
165            Err(e) => {
166                debug!(target: "net::refinery::handshake_node()", "Failed to connect to {}, ({})", addr, e);
167                false
168            }
169        }
170    }
171}
172
173#[async_trait]
174impl Session for RefineSession {
175    fn p2p(&self) -> P2pPtr {
176        self.p2p.upgrade().unwrap()
177    }
178
179    fn type_id(&self) -> SessionBitFlag {
180        SESSION_REFINE
181    }
182}
183
184/// Periodically probes entries in the greylist.
185///
186/// Randomly selects a greylist entry and tries to establish a local
187/// connection to it using the method handshake_node(), which creates a
188/// channel and does a version exchange using `perform_handshake_protocols()`.
189///
190/// If successful, the entry is removed from the greylist and added to the
191/// whitelist with an updated last_seen timestamp. If non-successful, the
192/// entry is removed from the greylist.
193pub struct GreylistRefinery {
194    /// Weak pointer to parent object
195    session: Weak<RefineSession>,
196    process: StoppableTaskPtr,
197}
198
199impl GreylistRefinery {
200    pub fn new(session: Weak<RefineSession>) -> Arc<Self> {
201        Arc::new(Self { session, process: StoppableTask::new() })
202    }
203
204    pub async fn start(self: Arc<Self>) {
205        let ex = self.p2p().executor();
206        self.process.clone().start(
207            async move {
208                self.run().await;
209                unreachable!();
210            },
211            // Ignore stop handler
212            |_| async {},
213            Error::NetworkServiceStopped,
214            ex,
215        );
216    }
217
218    pub async fn stop(self: Arc<Self>) {
219        self.process.stop().await;
220    }
221
222    // Randomly select a peer on the greylist and probe it. This method will remove from the
223    // greylist and store on the whitelist providing the peer is responsive.
224    async fn run(self: Arc<Self>) {
225        let hosts = self.p2p().hosts();
226
227        loop {
228            // Acquire read lock on P2P settings and load necessary settings
229            let settings = self.p2p().settings().read_arc().await;
230            let greylist_refinery_interval = settings.greylist_refinery_interval;
231            let time_with_no_connections = settings.time_with_no_connections;
232            let allowed_transports = settings.allowed_transports.clone();
233            drop(settings);
234
235            sleep(greylist_refinery_interval).await;
236
237            if hosts.container.is_empty(HostColor::Grey) {
238                debug!(target: "net::refinery",
239                "Greylist is empty! Cannot start refinery process");
240
241                continue
242            }
243
244            // Pause the refinery if we've had zero connections for longer than the configured
245            // limit.
246            let offline_limit = Duration::from_secs(time_with_no_connections);
247
248            let offline_timer =
249                { Instant::now().duration_since(*hosts.last_connection.lock().unwrap()) };
250
251            if !self.p2p().is_connected() && offline_timer >= offline_limit {
252                warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
253                          offline_timer.as_secs());
254
255                // It is neccessary to Free suspended hosts at this point, otherwise these
256                // hosts cannot be connected to in Outbound Session. Failure to do this could
257                // result in the refinery being paused forver (since connections could never be
258                // made).
259                let suspended_hosts = hosts.suspended();
260                for host in suspended_hosts {
261                    hosts.unregister(&host);
262                }
263
264                continue
265            }
266
267            // Only attempt to refine peers that match our transports.
268            match hosts.container.fetch_random_with_schemes(HostColor::Grey, &allowed_transports) {
269                Some((entry, _)) => {
270                    let url = &entry.0;
271
272                    if let Err(e) = hosts.try_register(url.clone(), HostState::Refine) {
273                        debug!(target: "net::refinery", "Unable to refine addr={}, err={}",
274                               url.clone(), e);
275                        continue
276                    }
277
278                    if !self.session().handshake_node(url.clone(), self.p2p().clone()).await {
279                        hosts.container.remove_if_exists(HostColor::Grey, url);
280
281                        debug!(
282                            target: "net::refinery",
283                            "Peer {} handshake failed. Removed from greylist", url,
284                        );
285
286                        // Free up this addr for future operations.
287                        hosts.unregister(url);
288
289                        continue
290                    }
291                    debug!(
292                        target: "net::refinery",
293                        "Peer {} handshake successful. Adding to whitelist", url,
294                    );
295                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
296
297                    hosts.whitelist_host(url, last_seen).unwrap();
298
299                    debug!(target: "net::refinery", "GreylistRefinery complete!");
300
301                    continue
302                }
303                None => {
304                    debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");
305
306                    continue
307                }
308            }
309        }
310    }
311
312    fn session(&self) -> RefineSessionPtr {
313        self.session.upgrade().unwrap()
314    }
315
316    fn p2p(&self) -> P2pPtr {
317        self.session().p2p()
318    }
319}