darkfi/net/session/
manual_session.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Manual connections session. Manages the creation of manual sessions.
20//! Used to create a manual session and to stop and start the session.
21//!
22//! A manual session is a type of outbound session in which we attempt
23//! connection to a predefined set of peers. Manual sessions loop forever
24//! continually trying to connect to a given peer, and sleep
25//! `outbound_connect_timeout` times between each attempt.
26//!
27//! Class consists of a weak pointer to the p2p interface and a vector of
28//! outbound connection slots. Using a weak pointer to p2p allows us to
29//! avoid circular dependencies. The vector of slots is wrapped in a mutex
30//! lock. This is switched on every time we instantiate a connection slot
31//! and insures that no other part of the program uses the slots at the
32//! same time.
33
34use std::sync::{Arc, Weak};
35
36use async_trait::async_trait;
37use futures::stream::{FuturesUnordered, StreamExt};
38use log::{debug, error, info, warn};
39use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
40use url::Url;
41
42use super::{
43    super::{
44        connector::Connector,
45        p2p::{P2p, P2pPtr},
46    },
47    Session, SessionBitFlag, SESSION_MANUAL,
48};
49use crate::{
50    net::{hosts::HostState, settings::Settings},
51    system::{sleep, StoppableTask, StoppableTaskPtr},
52    Error, Result,
53};
54
55pub type ManualSessionPtr = Arc<ManualSession>;
56
57/// Defines manual connections session.
58pub struct ManualSession {
59    pub(in crate::net) p2p: Weak<P2p>,
60    slots: AsyncMutex<Vec<Arc<Slot>>>,
61}
62
63impl ManualSession {
64    /// Create a new manual session.
65    pub fn new(p2p: Weak<P2p>) -> ManualSessionPtr {
66        Arc::new(Self { p2p, slots: AsyncMutex::new(Vec::new()) })
67    }
68
69    pub(crate) async fn start(self: Arc<Self>) {
70        // Activate mutex lock on connection slots.
71        let mut slots = self.slots.lock().await;
72
73        let mut futures = FuturesUnordered::new();
74
75        let self_ = Arc::downgrade(&self);
76
77        // Initialize a slot for each configured peer.
78        // Connections will be started by not yet activated.
79        for peer in &self.p2p().settings().read().await.peers {
80            let slot = Slot::new(self_.clone(), peer.clone(), self.p2p().settings());
81            futures.push(slot.clone().start());
82            slots.push(slot);
83        }
84
85        while (futures.next().await).is_some() {}
86    }
87
88    /// Stops the manual session.
89    pub async fn stop(&self) {
90        let slots = &*self.slots.lock().await;
91        let mut futures = FuturesUnordered::new();
92
93        for slot in slots {
94            futures.push(slot.stop());
95        }
96
97        while (futures.next().await).is_some() {}
98    }
99}
100
101#[async_trait]
102impl Session for ManualSession {
103    fn p2p(&self) -> P2pPtr {
104        self.p2p.upgrade().unwrap()
105    }
106
107    fn type_id(&self) -> SessionBitFlag {
108        SESSION_MANUAL
109    }
110}
111
112struct Slot {
113    addr: Url,
114    process: StoppableTaskPtr,
115    session: Weak<ManualSession>,
116    connector: Connector,
117}
118
119impl Slot {
120    fn new(
121        session: Weak<ManualSession>,
122        addr: Url,
123        settings: Arc<AsyncRwLock<Settings>>,
124    ) -> Arc<Self> {
125        Arc::new(Self {
126            addr,
127            process: StoppableTask::new(),
128            session: session.clone(),
129            connector: Connector::new(settings, session),
130        })
131    }
132
133    async fn start(self: Arc<Self>) {
134        let ex = self.p2p().executor();
135
136        self.process.clone().start(
137            self.run(),
138            |res| async {
139                match res {
140                    Ok(()) | Err(Error::NetworkServiceStopped) => {}
141                    Err(e) => error!("net::manual_session {}", e),
142                }
143            },
144            Error::NetworkServiceStopped,
145            ex,
146        );
147    }
148
149    /// Attempts a connection on the associated Connector object.
150    async fn run(self: Arc<Self>) -> Result<()> {
151        let ex = self.p2p().executor();
152
153        let mut attempts = 0;
154        loop {
155            attempts += 1;
156
157            info!(
158                target: "net::manual_session",
159                "[P2P] Connecting to manual outbound [{}] (attempt #{})",
160                self.addr, attempts
161            );
162
163            let settings = self.p2p().settings().read_arc().await;
164            let seeds = settings.seeds.clone();
165            let outbound_connect_timeout = settings.outbound_connect_timeout;
166            drop(settings);
167
168            // Do not establish a connection to a host that is also configured as a seed.
169            // This indicates a user misconfiguration.
170            if seeds.contains(&self.addr) {
171                error!(
172                    target: "net::manual_session",
173                    "[P2P] Suspending manual connection to seed [{}]", self.addr.clone(),
174                );
175                return Ok(())
176            }
177
178            if let Err(e) = self.p2p().hosts().try_register(self.addr.clone(), HostState::Connect) {
179                debug!(target: "net::manual_session",
180                    "Cannot connect to manual={}, err={}", &self.addr, e);
181
182                sleep(outbound_connect_timeout).await;
183
184                continue
185            }
186
187            match self.connector.connect(&self.addr).await {
188                Ok((url, channel)) => {
189                    info!(
190                        target: "net::manual_session",
191                        "[P2P] Manual outbound connected [{}]", url,
192                    );
193
194                    let stop_sub = channel.subscribe_stop().await?;
195
196                    // Channel is now connected but not yet setup
197
198                    // Register the new channel
199                    match self.session().register_channel(channel.clone(), ex.clone()).await {
200                        Ok(()) => {
201                            // Wait for channel to close
202                            stop_sub.receive().await;
203
204                            info!(
205                                target: "net::manual_session",
206                                "[P2P] Manual outbound disconnected [{}]", url,
207                            );
208                        }
209                        Err(e) => {
210                            self.handle_failure(e, &url);
211                        }
212                    }
213                }
214                Err(e) => {
215                    self.handle_failure(e, &self.addr);
216                }
217            }
218
219            info!(
220                target: "net::manual_session",
221                "[P2P] Waiting {} seconds until next manual outbound connection attempt [{}]",
222                outbound_connect_timeout, self.addr,
223            );
224
225            sleep(outbound_connect_timeout).await;
226        }
227    }
228
229    fn handle_failure(&self, error: Error, addr: &Url) {
230        warn!(
231            target: "net::manual_session",
232            "[P2P] Unable to connect to manual outbound [{}]: {}",
233            self.addr, error,
234        );
235
236        // Free up this addr for future operations.
237        self.p2p().hosts().unregister(addr);
238    }
239
240    fn session(&self) -> ManualSessionPtr {
241        self.session.upgrade().unwrap()
242    }
243
244    fn p2p(&self) -> P2pPtr {
245        self.session().p2p()
246    }
247
248    async fn stop(&self) {
249        self.connector.stop();
250        self.process.stop().await;
251    }
252}