1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
/* This file is part of DarkFi (https://dark.fi)
 *
 * Copyright (C) 2020-2024 Dyne.org foundation
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

//! `RefineSession` manages the `GreylistRefinery`, which randomly selects
//! entries on the greylist and updates them to whitelist if active,
//!
//! `GreylistRefinery` makes use of a `RefineSession` method called
//! `handshake_node()`, which uses a `Connector` to establish a `Channel` with
//! a provided address, and then does a version exchange across the channel
//! (`perform_handshake_protocols`). `handshake_node()` can either succeed,
//! fail, or timeout.

use futures::{
    future::{select, Either},
    pin_mut,
};
use smol::Timer;
use std::{
    sync::{Arc, Weak},
    time::{Duration, Instant, UNIX_EPOCH},
};

use async_trait::async_trait;
use log::{debug, warn};
use url::Url;

use super::super::p2p::{P2p, P2pPtr};

use crate::{
    net::{
        connector::Connector,
        hosts::{HostColor, HostState},
        protocol::ProtocolVersion,
        session::{Session, SessionBitFlag, SESSION_REFINE},
    },
    system::{sleep, StoppableTask, StoppableTaskPtr},
    Error,
};

pub type RefineSessionPtr = Arc<RefineSession>;

pub struct RefineSession {
    /// Weak pointer to parent p2p object
    pub(in crate::net) p2p: Weak<P2p>,

    /// Task that periodically checks entries in the greylist.
    pub(in crate::net) refinery: Arc<GreylistRefinery>,
}

impl RefineSession {
    pub fn new(p2p: Weak<P2p>) -> RefineSessionPtr {
        Arc::new_cyclic(|session| Self { p2p, refinery: GreylistRefinery::new(session.clone()) })
    }

    /// Start the refinery and self handshake processes.
    pub(crate) async fn start(self: Arc<Self>) {
        if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
            match self.p2p().hosts().container.load_all(hostlist) {
                Ok(()) => {
                    debug!(target: "net::refine_session::start", "Load hosts successful!");
                }
                Err(e) => {
                    warn!(target: "net::refine_session::start", "Error loading hosts {}", e);
                }
            }
        }

        match self.p2p().hosts().import_blacklist().await {
            Ok(()) => {
                debug!(target: "net::refine_session::start", "Import blacklist successful!");
            }
            Err(e) => {
                warn!(target: "net::refine_session::start",
                    "Error importing blacklist from config file {}", e);
            }
        }

        debug!(target: "net::refine_session", "Starting greylist refinery process");
        self.refinery.clone().start().await;
    }

    /// Stop the refinery and self handshake processes.
    pub(crate) async fn stop(&self) {
        debug!(target: "net::refine_session", "Stopping refinery process");
        self.refinery.clone().stop().await;

        if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
            match self.p2p().hosts().container.save_all(hostlist) {
                Ok(()) => {
                    debug!(target: "net::refine_session::stop()", "Save hosts successful!");
                }
                Err(e) => {
                    warn!(target: "net::refine_session::stop()", "Error saving hosts {}", e);
                }
            }
        }
    }

    /// Globally accessible function to perform a version exchange with a
    /// given address.  Returns `true` if an address is accessible, false
    /// otherwise.  
    pub async fn handshake_node(self: Arc<Self>, addr: Url, p2p: P2pPtr) -> bool {
        let self_ = Arc::downgrade(&self);
        let connector = Connector::new(self.p2p().settings(), self_);

        debug!(target: "net::refinery::handshake_node()", "Attempting to connect to {}", addr);
        match connector.connect(&addr).await {
            Ok((url, channel)) => {
                debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url);
                // First initialize the version protocol and its Version, Verack subscriptions.
                let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;

                debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url);
                // Then run the version exchange, store the channel and subscribe to a stop signal.
                let handshake =
                    self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());

                debug!(target: "net::refinery::handshake_node()", "Starting channel {}", url);
                channel.clone().start(p2p.executor());

                // Ensure the channel gets stopped by adding a timeout to the handshake. Otherwise if
                // the handshake does not finish channel.stop() will never get called, resulting in
                // zombie processes.
                let timeout = Timer::after(Duration::from_secs(5));

                pin_mut!(timeout);
                pin_mut!(handshake);

                let result = match select(handshake, timeout).await {
                    Either::Left((Ok(_), _)) => {
                        debug!(target: "net::refinery::handshake_node()", "Handshake success!");
                        true
                    }
                    Either::Left((Err(e), _)) => {
                        debug!(target: "net::refinery::handshake_node()", "Handshake error={}", e);
                        false
                    }
                    Either::Right((_, _)) => {
                        debug!(target: "net::refinery::handshake_node()", "Handshake timed out");
                        false
                    }
                };

                debug!(target: "net::refinery::handshake_node()", "Stopping channel {}", url);
                channel.stop().await;

                result
            }

            Err(e) => {
                debug!(target: "net::refinery::handshake_node()", "Failed to connect to {}, ({})", addr, e);
                false
            }
        }
    }
}

#[async_trait]
impl Session for RefineSession {
    fn p2p(&self) -> P2pPtr {
        self.p2p.upgrade().unwrap()
    }

    fn type_id(&self) -> SessionBitFlag {
        SESSION_REFINE
    }
}

/// Periodically probes entries in the greylist.
///
/// Randomly selects a greylist entry and tries to establish a local
/// connection to it using the method handshake_node(), which creates a
/// channel and does a version exchange using `perform_handshake_protocols()`.
///
/// If successful, the entry is removed from the greylist and added to the
/// whitelist with an updated last_seen timestamp. If non-successful, the
/// entry is removed from the greylist.
pub struct GreylistRefinery {
    /// Weak pointer to parent object
    session: Weak<RefineSession>,
    process: StoppableTaskPtr,
}

impl GreylistRefinery {
    pub fn new(session: Weak<RefineSession>) -> Arc<Self> {
        Arc::new(Self { session, process: StoppableTask::new() })
    }

    pub async fn start(self: Arc<Self>) {
        let ex = self.p2p().executor();
        self.process.clone().start(
            async move {
                self.run().await;
                unreachable!();
            },
            // Ignore stop handler
            |_| async {},
            Error::NetworkServiceStopped,
            ex,
        );
    }

    pub async fn stop(self: Arc<Self>) {
        self.process.stop().await;
    }

    // Randomly select a peer on the greylist and probe it. This method will remove from the
    // greylist and store on the whitelist providing the peer is responsive.
    async fn run(self: Arc<Self>) {
        let hosts = self.p2p().hosts();

        loop {
            // Acquire read lock on P2P settings and load necessary settings
            let settings = self.p2p().settings().read_arc().await;
            let greylist_refinery_interval = settings.greylist_refinery_interval;
            let time_with_no_connections = settings.time_with_no_connections;
            let allowed_transports = settings.allowed_transports.clone();
            drop(settings);

            sleep(greylist_refinery_interval).await;

            if hosts.container.is_empty(HostColor::Grey) {
                debug!(target: "net::refinery",
                "Greylist is empty! Cannot start refinery process");

                continue
            }

            // Pause the refinery if we've had zero connections for longer than the configured
            // limit.
            let offline_limit = Duration::from_secs(time_with_no_connections);

            let offline_timer =
                { Instant::now().duration_since(*hosts.last_connection.lock().unwrap()) };

            if !self.p2p().is_connected() && offline_timer >= offline_limit {
                warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
                          offline_timer.as_secs());

                // It is neccessary to Free suspended hosts at this point, otherwise these
                // hosts cannot be connected to in Outbound Session. Failure to do this could
                // result in the refinery being paused forver (since connections could never be
                // made).
                let suspended_hosts = hosts.suspended();
                for host in suspended_hosts {
                    hosts.unregister(&host);
                }

                continue
            }

            // Only attempt to refine peers that match our transports.
            match hosts.container.fetch_random_with_schemes(HostColor::Grey, &allowed_transports) {
                Some((entry, _)) => {
                    let url = &entry.0;

                    if let Err(e) = hosts.try_register(url.clone(), HostState::Refine) {
                        debug!(target: "net::refinery", "Unable to refine addr={}, err={}",
                               url.clone(), e);
                        continue
                    }

                    if !self.session().handshake_node(url.clone(), self.p2p().clone()).await {
                        hosts.container.remove_if_exists(HostColor::Grey, url);

                        debug!(
                            target: "net::refinery",
                            "Peer {} handshake failed. Removed from greylist", url,
                        );

                        // Free up this addr for future operations.
                        hosts.unregister(url);

                        continue
                    }
                    debug!(
                        target: "net::refinery",
                        "Peer {} handshake successful. Adding to whitelist", url,
                    );
                    let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();

                    hosts.whitelist_host(url, last_seen).unwrap();

                    debug!(target: "net::refinery", "GreylistRefinery complete!");

                    continue
                }
                None => {
                    debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");

                    continue
                }
            }
        }
    }

    fn session(&self) -> RefineSessionPtr {
        self.session.upgrade().unwrap()
    }

    fn p2p(&self) -> P2pPtr {
        self.session().p2p()
    }
}