1use futures::{
29 future::{select, Either},
30 pin_mut,
31};
32use smol::Timer;
33use std::{
34 sync::{Arc, Weak},
35 time::{Duration, Instant, UNIX_EPOCH},
36};
37
38use async_trait::async_trait;
39use log::{debug, warn};
40use url::Url;
41
42use super::super::p2p::{P2p, P2pPtr};
43
44use crate::{
45 net::{
46 connector::Connector,
47 hosts::{HostColor, HostState},
48 protocol::ProtocolVersion,
49 session::{Session, SessionBitFlag, SESSION_REFINE},
50 },
51 system::{sleep, StoppableTask, StoppableTaskPtr},
52 Error,
53};
54
55pub type RefineSessionPtr = Arc<RefineSession>;
56
57pub struct RefineSession {
58 pub(in crate::net) p2p: Weak<P2p>,
60
61 pub(in crate::net) refinery: Arc<GreylistRefinery>,
63}
64
65impl RefineSession {
66 pub fn new(p2p: Weak<P2p>) -> RefineSessionPtr {
67 Arc::new_cyclic(|session| Self { p2p, refinery: GreylistRefinery::new(session.clone()) })
68 }
69
70 pub(crate) async fn start(self: Arc<Self>) {
72 if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
73 match self.p2p().hosts().container.load_all(hostlist) {
74 Ok(()) => {
75 debug!(target: "net::refine_session::start", "Load hosts successful!");
76 }
77 Err(e) => {
78 warn!(target: "net::refine_session::start", "Error loading hosts {}", e);
79 }
80 }
81 }
82
83 match self.p2p().hosts().import_blacklist().await {
84 Ok(()) => {
85 debug!(target: "net::refine_session::start", "Import blacklist successful!");
86 }
87 Err(e) => {
88 warn!(target: "net::refine_session::start",
89 "Error importing blacklist from config file {}", e);
90 }
91 }
92
93 debug!(target: "net::refine_session", "Starting greylist refinery process");
94 self.refinery.clone().start().await;
95 }
96
97 pub(crate) async fn stop(&self) {
99 debug!(target: "net::refine_session", "Stopping refinery process");
100 self.refinery.clone().stop().await;
101
102 if let Some(ref hostlist) = self.p2p().settings().read().await.hostlist {
103 match self.p2p().hosts().container.save_all(hostlist) {
104 Ok(()) => {
105 debug!(target: "net::refine_session::stop()", "Save hosts successful!");
106 }
107 Err(e) => {
108 warn!(target: "net::refine_session::stop()", "Error saving hosts {}", e);
109 }
110 }
111 }
112 }
113
114 pub async fn handshake_node(self: Arc<Self>, addr: Url, p2p: P2pPtr) -> bool {
118 let self_ = Arc::downgrade(&self);
119 let connector = Connector::new(self.p2p().settings(), self_);
120
121 debug!(target: "net::refinery::handshake_node()", "Attempting to connect to {}", addr);
122 match connector.connect(&addr).await {
123 Ok((url, channel)) => {
124 debug!(target: "net::refinery::handshake_node()", "Successfully created a channel with {}", url);
125 let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
127
128 debug!(target: "net::refinery::handshake_node()", "Performing handshake protocols with {}", url);
129 let handshake =
131 self.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());
132
133 debug!(target: "net::refinery::handshake_node()", "Starting channel {}", url);
134 channel.clone().start(p2p.executor());
135
136 let timeout = Timer::after(Duration::from_secs(5));
140
141 pin_mut!(timeout);
142 pin_mut!(handshake);
143
144 let result = match select(handshake, timeout).await {
145 Either::Left((Ok(_), _)) => {
146 debug!(target: "net::refinery::handshake_node()", "Handshake success!");
147 true
148 }
149 Either::Left((Err(e), _)) => {
150 debug!(target: "net::refinery::handshake_node()", "Handshake error={}", e);
151 false
152 }
153 Either::Right((_, _)) => {
154 debug!(target: "net::refinery::handshake_node()", "Handshake timed out");
155 false
156 }
157 };
158
159 debug!(target: "net::refinery::handshake_node()", "Stopping channel {}", url);
160 channel.stop().await;
161
162 result
163 }
164
165 Err(e) => {
166 debug!(target: "net::refinery::handshake_node()", "Failed to connect to {}, ({})", addr, e);
167 false
168 }
169 }
170 }
171}
172
173#[async_trait]
174impl Session for RefineSession {
175 fn p2p(&self) -> P2pPtr {
176 self.p2p.upgrade().unwrap()
177 }
178
179 fn type_id(&self) -> SessionBitFlag {
180 SESSION_REFINE
181 }
182}
183
184pub struct GreylistRefinery {
194 session: Weak<RefineSession>,
196 process: StoppableTaskPtr,
197}
198
199impl GreylistRefinery {
200 pub fn new(session: Weak<RefineSession>) -> Arc<Self> {
201 Arc::new(Self { session, process: StoppableTask::new() })
202 }
203
204 pub async fn start(self: Arc<Self>) {
205 let ex = self.p2p().executor();
206 self.process.clone().start(
207 async move {
208 self.run().await;
209 unreachable!();
210 },
211 |_| async {},
213 Error::NetworkServiceStopped,
214 ex,
215 );
216 }
217
218 pub async fn stop(self: Arc<Self>) {
219 self.process.stop().await;
220 }
221
222 async fn run(self: Arc<Self>) {
225 let hosts = self.p2p().hosts();
226
227 loop {
228 let settings = self.p2p().settings().read_arc().await;
230 let greylist_refinery_interval = settings.greylist_refinery_interval;
231 let time_with_no_connections = settings.time_with_no_connections;
232 let allowed_transports = settings.allowed_transports.clone();
233 drop(settings);
234
235 sleep(greylist_refinery_interval).await;
236
237 if hosts.container.is_empty(HostColor::Grey) {
238 debug!(target: "net::refinery",
239 "Greylist is empty! Cannot start refinery process");
240
241 continue
242 }
243
244 let offline_limit = Duration::from_secs(time_with_no_connections);
247
248 let offline_timer =
249 { Instant::now().duration_since(*hosts.last_connection.lock().unwrap()) };
250
251 if !self.p2p().is_connected() && offline_timer >= offline_limit {
252 warn!(target: "net::refinery", "No connections for {}s. GreylistRefinery paused.",
253 offline_timer.as_secs());
254
255 let suspended_hosts = hosts.suspended();
260 for host in suspended_hosts {
261 hosts.unregister(&host);
262 }
263
264 continue
265 }
266
267 match hosts.container.fetch_random_with_schemes(HostColor::Grey, &allowed_transports) {
269 Some((entry, _)) => {
270 let url = &entry.0;
271
272 if let Err(e) = hosts.try_register(url.clone(), HostState::Refine) {
273 debug!(target: "net::refinery", "Unable to refine addr={}, err={}",
274 url.clone(), e);
275 continue
276 }
277
278 if !self.session().handshake_node(url.clone(), self.p2p().clone()).await {
279 hosts.container.remove_if_exists(HostColor::Grey, url);
280
281 debug!(
282 target: "net::refinery",
283 "Peer {} handshake failed. Removed from greylist", url,
284 );
285
286 hosts.unregister(url);
288
289 continue
290 }
291 debug!(
292 target: "net::refinery",
293 "Peer {} handshake successful. Adding to whitelist", url,
294 );
295 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
296
297 hosts.whitelist_host(url, last_seen).unwrap();
298
299 debug!(target: "net::refinery", "GreylistRefinery complete!");
300
301 continue
302 }
303 None => {
304 debug!(target: "net::refinery", "No matching greylist entries found. Cannot proceed with refinery");
305
306 continue
307 }
308 }
309 }
310 }
311
312 fn session(&self) -> RefineSessionPtr {
313 self.session.upgrade().unwrap()
314 }
315
316 fn p2p(&self) -> P2pPtr {
317 self.session().p2p()
318 }
319}