darkfi/net/
connector.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    sync::{atomic::Ordering, Arc},
21    time::Duration,
22};
23
24use futures::{
25    future::{select, Either},
26    pin_mut,
27};
28use log::warn;
29use smol::lock::RwLock as AsyncRwLock;
30use url::Url;
31
32use super::{
33    channel::{Channel, ChannelPtr},
34    hosts::HostColor,
35    session::SessionWeakPtr,
36    settings::Settings,
37    transport::Dialer,
38};
39use crate::{system::CondVar, Error, Result};
40
41/// Create outbound socket connections
42pub struct Connector {
43    /// P2P settings
44    settings: Arc<AsyncRwLock<Settings>>,
45    /// Weak pointer to the session
46    pub session: SessionWeakPtr,
47    /// Stop signal that aborts the connector if received.
48    stop_signal: CondVar,
49}
50
51impl Connector {
52    /// Create a new connector with given network settings
53    pub fn new(settings: Arc<AsyncRwLock<Settings>>, session: SessionWeakPtr) -> Self {
54        Self { settings, session, stop_signal: CondVar::new() }
55    }
56
57    /// Establish an outbound connection
58    pub async fn connect(&self, url: &Url) -> Result<(Url, ChannelPtr)> {
59        let hosts = self.session.upgrade().unwrap().p2p().hosts();
60        if hosts.container.contains(HostColor::Black as usize, url) || hosts.block_all_ports(url) {
61            warn!(target: "net::connector::connect", "Peer {url} is blacklisted");
62            return Err(Error::ConnectFailed)
63        }
64
65        let settings = self.settings.read().await;
66        let transports = settings.allowed_transports.clone();
67        let mixed_transports = settings.mixed_transports.clone();
68        let datastore = settings.p2p_datastore.clone();
69        let outbound_connect_timeout = settings.outbound_connect_timeout;
70        let i2p_socks5_proxy = settings.i2p_socks5_proxy.clone();
71        let tor_socks5_proxy = settings.tor_socks5_proxy.clone();
72        let nym_socks5_proxy = settings.nym_socks5_proxy.clone();
73        drop(settings);
74
75        let mut endpoint = url.clone();
76        let scheme = endpoint.scheme();
77
78        if mixed_transports.contains(&scheme.to_string()) {
79            if transports.contains(&"socks5".to_string()) && (scheme == "tcp" || scheme == "tor") {
80                // Prioritize connection through nym socks5 proxy for tcp endpoint mixing
81                if scheme == "tcp" && nym_socks5_proxy.is_some() {
82                    endpoint = nym_socks5_proxy.unwrap();
83                } else if tor_socks5_proxy.is_some() {
84                    endpoint = tor_socks5_proxy.unwrap();
85                } else {
86                    warn!(target: "net::connector::connect", "Transport mixing is enabled but tor_socks5_proxy is not set");
87                    return Err(Error::ConnectFailed)
88                }
89
90                endpoint.set_path(&format!(
91                    "{}:{}",
92                    endpoint.host().unwrap(),
93                    endpoint.port().unwrap()
94                ));
95                endpoint.set_scheme("socks5")?;
96            } else if transports.contains(&"socks5+tls".to_string()) &&
97                (scheme == "tcp+tls" || scheme == "tor+tls")
98            {
99                // Prioritize connection through nym socks5 proxy for tcp+tls endpoint mixing
100                if scheme == "tcp+tls" && nym_socks5_proxy.is_some() {
101                    endpoint = nym_socks5_proxy.unwrap();
102                } else if tor_socks5_proxy.is_some() {
103                    endpoint = tor_socks5_proxy.unwrap();
104                } else {
105                    warn!(target: "net::connector::connect", "Transport mixing is enabled but tor_socks5_proxy is not set");
106                    return Err(Error::ConnectFailed)
107                }
108
109                endpoint.set_path(&format!(
110                    "{}:{}",
111                    endpoint.host().unwrap(),
112                    endpoint.port().unwrap()
113                ));
114                endpoint.set_scheme("socks5+tls")?;
115            } else if transports.contains(&"tor".to_string()) && scheme == "tcp" {
116                endpoint.set_scheme("tor")?;
117            } else if transports.contains(&"tor+tls".to_string()) && scheme == "tcp+tls" {
118                endpoint.set_scheme("tor+tls")?;
119            } else if transports.contains(&"nym".to_string()) && scheme == "tcp" {
120                endpoint.set_scheme("nym")?;
121            } else if transports.contains(&"nym+tls".to_string()) && scheme == "tcp+tls" {
122                endpoint.set_scheme("nym+tls")?;
123            }
124        }
125
126        let dialer = Dialer::new(endpoint.clone(), datastore, Some(i2p_socks5_proxy)).await?;
127        let timeout = Duration::from_secs(outbound_connect_timeout);
128
129        let stop_fut = async {
130            self.stop_signal.wait().await;
131        };
132        let dial_fut = async { dialer.dial(Some(timeout)).await };
133
134        pin_mut!(stop_fut);
135        pin_mut!(dial_fut);
136
137        match select(dial_fut, stop_fut).await {
138            Either::Left((Ok(ptstream), _)) => {
139                let channel = Channel::new(
140                    ptstream,
141                    Some(endpoint.clone()),
142                    url.clone(),
143                    self.session.clone(),
144                )
145                .await;
146                Ok((endpoint, channel))
147            }
148
149            Either::Left((Err(e), _)) => {
150                // If we get ENETUNREACH, we don't have IPv6 connectivity so note it down.
151                if e.raw_os_error() == Some(libc::ENETUNREACH) {
152                    self.session
153                        .upgrade()
154                        .unwrap()
155                        .p2p()
156                        .hosts()
157                        .ipv6_available
158                        .store(false, Ordering::SeqCst);
159                }
160                Err(e.into())
161            }
162
163            Either::Right((_, _)) => Err(Error::ConnectorStopped),
164        }
165    }
166
167    pub(crate) fn stop(&self) {
168        self.stop_signal.notify()
169    }
170}