1use std::{
20 sync::{atomic::Ordering, Arc},
21 time::Duration,
22};
23
24use futures::{
25 future::{select, Either},
26 pin_mut,
27};
28use log::warn;
29use smol::lock::RwLock as AsyncRwLock;
30use url::Url;
31
32use super::{
33 channel::{Channel, ChannelPtr},
34 hosts::HostColor,
35 session::SessionWeakPtr,
36 settings::Settings,
37 transport::Dialer,
38};
39use crate::{system::CondVar, Error, Result};
40
41pub struct Connector {
43 settings: Arc<AsyncRwLock<Settings>>,
45 pub session: SessionWeakPtr,
47 stop_signal: CondVar,
49}
50
51impl Connector {
52 pub fn new(settings: Arc<AsyncRwLock<Settings>>, session: SessionWeakPtr) -> Self {
54 Self { settings, session, stop_signal: CondVar::new() }
55 }
56
57 pub async fn connect(&self, url: &Url) -> Result<(Url, ChannelPtr)> {
59 let hosts = self.session.upgrade().unwrap().p2p().hosts();
60 if hosts.container.contains(HostColor::Black as usize, url) || hosts.block_all_ports(url) {
61 warn!(target: "net::connector::connect", "Peer {url} is blacklisted");
62 return Err(Error::ConnectFailed)
63 }
64
65 let settings = self.settings.read().await;
66 let transports = settings.allowed_transports.clone();
67 let mixed_transports = settings.mixed_transports.clone();
68 let datastore = settings.p2p_datastore.clone();
69 let outbound_connect_timeout = settings.outbound_connect_timeout;
70 let i2p_socks5_proxy = settings.i2p_socks5_proxy.clone();
71 let tor_socks5_proxy = settings.tor_socks5_proxy.clone();
72 let nym_socks5_proxy = settings.nym_socks5_proxy.clone();
73 drop(settings);
74
75 let mut endpoint = url.clone();
76 let scheme = endpoint.scheme();
77
78 if mixed_transports.contains(&scheme.to_string()) {
79 if transports.contains(&"socks5".to_string()) && (scheme == "tcp" || scheme == "tor") {
80 if scheme == "tcp" && nym_socks5_proxy.is_some() {
82 endpoint = nym_socks5_proxy.unwrap();
83 } else if tor_socks5_proxy.is_some() {
84 endpoint = tor_socks5_proxy.unwrap();
85 } else {
86 warn!(target: "net::connector::connect", "Transport mixing is enabled but tor_socks5_proxy is not set");
87 return Err(Error::ConnectFailed)
88 }
89
90 endpoint.set_path(&format!(
91 "{}:{}",
92 endpoint.host().unwrap(),
93 endpoint.port().unwrap()
94 ));
95 endpoint.set_scheme("socks5")?;
96 } else if transports.contains(&"socks5+tls".to_string()) &&
97 (scheme == "tcp+tls" || scheme == "tor+tls")
98 {
99 if scheme == "tcp+tls" && nym_socks5_proxy.is_some() {
101 endpoint = nym_socks5_proxy.unwrap();
102 } else if tor_socks5_proxy.is_some() {
103 endpoint = tor_socks5_proxy.unwrap();
104 } else {
105 warn!(target: "net::connector::connect", "Transport mixing is enabled but tor_socks5_proxy is not set");
106 return Err(Error::ConnectFailed)
107 }
108
109 endpoint.set_path(&format!(
110 "{}:{}",
111 endpoint.host().unwrap(),
112 endpoint.port().unwrap()
113 ));
114 endpoint.set_scheme("socks5+tls")?;
115 } else if transports.contains(&"tor".to_string()) && scheme == "tcp" {
116 endpoint.set_scheme("tor")?;
117 } else if transports.contains(&"tor+tls".to_string()) && scheme == "tcp+tls" {
118 endpoint.set_scheme("tor+tls")?;
119 } else if transports.contains(&"nym".to_string()) && scheme == "tcp" {
120 endpoint.set_scheme("nym")?;
121 } else if transports.contains(&"nym+tls".to_string()) && scheme == "tcp+tls" {
122 endpoint.set_scheme("nym+tls")?;
123 }
124 }
125
126 let dialer = Dialer::new(endpoint.clone(), datastore, Some(i2p_socks5_proxy)).await?;
127 let timeout = Duration::from_secs(outbound_connect_timeout);
128
129 let stop_fut = async {
130 self.stop_signal.wait().await;
131 };
132 let dial_fut = async { dialer.dial(Some(timeout)).await };
133
134 pin_mut!(stop_fut);
135 pin_mut!(dial_fut);
136
137 match select(dial_fut, stop_fut).await {
138 Either::Left((Ok(ptstream), _)) => {
139 let channel = Channel::new(
140 ptstream,
141 Some(endpoint.clone()),
142 url.clone(),
143 self.session.clone(),
144 )
145 .await;
146 Ok((endpoint, channel))
147 }
148
149 Either::Left((Err(e), _)) => {
150 if e.raw_os_error() == Some(libc::ENETUNREACH) {
152 self.session
153 .upgrade()
154 .unwrap()
155 .p2p()
156 .hosts()
157 .ipv6_available
158 .store(false, Ordering::SeqCst);
159 }
160 Err(e.into())
161 }
162
163 Either::Right((_, _)) => Err(Error::ConnectorStopped),
164 }
165 }
166
167 pub(crate) fn stop(&self) {
168 self.stop_signal.notify()
169 }
170}