darkfi/net/transport/
mod.rsuse std::{io, time::Duration};
use async_trait::async_trait;
use log::error;
use smol::io::{AsyncRead, AsyncWrite};
use url::Url;
#[cfg(feature = "p2p-unix")]
use std::io::ErrorKind;
pub(crate) mod tls;
pub mod socks5;
pub(crate) mod tcp;
#[cfg(feature = "p2p-tor")]
pub(crate) mod tor;
#[cfg(feature = "p2p-nym")]
pub(crate) mod nym;
#[cfg(feature = "p2p-unix")]
pub(crate) mod unix;
#[derive(Debug, Clone)]
pub enum DialerVariant {
Tcp(tcp::TcpDialer),
TcpTls(tcp::TcpDialer),
#[cfg(feature = "p2p-tor")]
Tor(tor::TorDialer),
#[cfg(feature = "p2p-tor")]
TorTls(tor::TorDialer),
#[cfg(feature = "p2p-nym")]
Nym(nym::NymDialer),
#[cfg(feature = "p2p-nym")]
NymTls(nym::NymDialer),
#[cfg(feature = "p2p-unix")]
Unix(unix::UnixDialer),
Socks5(socks5::Socks5Dialer),
Socks5Tls(socks5::Socks5Dialer),
}
#[derive(Debug, Clone)]
pub enum ListenerVariant {
Tcp(tcp::TcpListener),
TcpTls(tcp::TcpListener),
#[cfg(feature = "p2p-tor")]
Tor(tor::TorListener),
#[cfg(feature = "p2p-unix")]
Unix(unix::UnixListener),
}
pub struct Dialer {
endpoint: Url,
variant: DialerVariant,
}
macro_rules! enforce_hostport {
($endpoint:ident) => {
if $endpoint.host_str().is_none() || $endpoint.port().is_none() {
return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
}
};
}
#[cfg(feature = "p2p-unix")]
macro_rules! enforce_abspath {
($endpoint:ident) => {
if $endpoint.host_str().is_some() || $endpoint.port().is_some() {
return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
}
if $endpoint.to_file_path().is_err() {
return Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
}
};
}
impl Dialer {
pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
match endpoint.scheme().to_lowercase().as_str() {
"tcp" => {
enforce_hostport!(endpoint);
let variant = tcp::TcpDialer::new(None).await?;
let variant = DialerVariant::Tcp(variant);
Ok(Self { endpoint, variant })
}
"tcp+tls" => {
enforce_hostport!(endpoint);
let variant = tcp::TcpDialer::new(None).await?;
let variant = DialerVariant::TcpTls(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-tor")]
"tor" => {
enforce_hostport!(endpoint);
let variant = tor::TorDialer::new(datastore).await?;
let variant = DialerVariant::Tor(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-tor")]
"tor+tls" => {
enforce_hostport!(endpoint);
let variant = tor::TorDialer::new(datastore).await?;
let variant = DialerVariant::TorTls(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-nym")]
"nym" => {
enforce_hostport!(endpoint);
let variant = nym::NymDialer::new().await?;
let variant = DialerVariant::Nym(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-nym")]
"nym+tls" => {
enforce_hostport!(endpoint);
let variant = nym::NymDialer::new().await?;
let variant = DialerVariant::NymTls(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-unix")]
"unix" => {
enforce_abspath!(endpoint);
let variant = unix::UnixDialer::new().await?;
let variant = DialerVariant::Unix(variant);
Ok(Self { endpoint, variant })
}
"socks5" => {
enforce_hostport!(endpoint);
let variant = socks5::Socks5Dialer::new(&endpoint).await?;
let variant = DialerVariant::Socks5(variant);
Ok(Self { endpoint, variant })
}
"socks5+tls" => {
enforce_hostport!(endpoint);
let variant = socks5::Socks5Dialer::new(&endpoint).await?;
let variant = DialerVariant::Socks5Tls(variant);
Ok(Self { endpoint, variant })
}
x => {
error!("[P2P] Requested unsupported transport: {}", x);
Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
}
}
}
pub async fn dial(&self, timeout: Option<Duration>) -> io::Result<Box<dyn PtStream>> {
match &self.variant {
DialerVariant::Tcp(dialer) => {
let sockaddr = self.endpoint.socket_addrs(|| None)?;
let stream = dialer.do_dial(sockaddr[0], timeout).await?;
Ok(Box::new(stream))
}
DialerVariant::TcpTls(dialer) => {
let sockaddr = self.endpoint.socket_addrs(|| None)?;
let stream = dialer.do_dial(sockaddr[0], timeout).await?;
let tlsupgrade = tls::TlsUpgrade::new().await;
let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
Ok(Box::new(stream))
}
#[cfg(feature = "p2p-tor")]
DialerVariant::Tor(dialer) => {
let host = self.endpoint.host_str().unwrap();
let port = self.endpoint.port().unwrap();
let stream = dialer.do_dial(host, port, timeout).await?;
Ok(Box::new(stream))
}
#[cfg(feature = "p2p-tor")]
DialerVariant::TorTls(dialer) => {
let host = self.endpoint.host_str().unwrap();
let port = self.endpoint.port().unwrap();
let stream = dialer.do_dial(host, port, timeout).await?;
let tlsupgrade = tls::TlsUpgrade::new().await;
let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
Ok(Box::new(stream))
}
#[cfg(feature = "p2p-nym")]
DialerVariant::Nym(_dialer) => {
todo!();
}
#[cfg(feature = "p2p-nym")]
DialerVariant::NymTls(_dialer) => {
todo!();
}
#[cfg(feature = "p2p-unix")]
DialerVariant::Unix(dialer) => {
let path = match self.endpoint.to_file_path() {
Ok(v) => v,
Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
};
let stream = dialer.do_dial(path).await?;
Ok(Box::new(stream))
}
DialerVariant::Socks5(dialer) => {
let stream = dialer.do_dial().await?;
Ok(Box::new(stream))
}
DialerVariant::Socks5Tls(dialer) => {
let stream = dialer.do_dial().await?;
let tlsupgrade = tls::TlsUpgrade::new().await;
let stream = tlsupgrade.upgrade_dialer_tls(stream).await?;
Ok(Box::new(stream))
}
}
}
pub fn endpoint(&self) -> &Url {
&self.endpoint
}
}
pub struct Listener {
endpoint: Url,
variant: ListenerVariant,
}
impl Listener {
pub async fn new(endpoint: Url, datastore: Option<String>) -> io::Result<Self> {
match endpoint.scheme().to_lowercase().as_str() {
"tcp" => {
enforce_hostport!(endpoint);
let variant = tcp::TcpListener::new(1024).await?;
let variant = ListenerVariant::Tcp(variant);
Ok(Self { endpoint, variant })
}
"tcp+tls" => {
enforce_hostport!(endpoint);
let variant = tcp::TcpListener::new(1024).await?;
let variant = ListenerVariant::TcpTls(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-tor")]
"tor" => {
enforce_hostport!(endpoint);
let variant = tor::TorListener::new(datastore).await?;
let variant = ListenerVariant::Tor(variant);
Ok(Self { endpoint, variant })
}
#[cfg(feature = "p2p-unix")]
"unix" => {
enforce_abspath!(endpoint);
let variant = unix::UnixListener::new().await?;
let variant = ListenerVariant::Unix(variant);
Ok(Self { endpoint, variant })
}
x => {
error!("[P2P] Requested unsupported transport: {}", x);
Err(io::Error::from_raw_os_error(libc::ENETUNREACH))
}
}
}
pub async fn listen(&self) -> io::Result<Box<dyn PtListener>> {
match &self.variant {
ListenerVariant::Tcp(listener) => {
let sockaddr = self.endpoint.socket_addrs(|| None)?;
let l = listener.do_listen(sockaddr[0]).await?;
Ok(Box::new(l))
}
ListenerVariant::TcpTls(listener) => {
let sockaddr = self.endpoint.socket_addrs(|| None)?;
let l = listener.do_listen(sockaddr[0]).await?;
let tlsupgrade = tls::TlsUpgrade::new().await;
let l = tlsupgrade.upgrade_listener_tcp_tls(l).await?;
Ok(Box::new(l))
}
#[cfg(feature = "p2p-tor")]
ListenerVariant::Tor(listener) => {
let port = self.endpoint.port().unwrap();
let l = listener.do_listen(port).await?;
Ok(Box::new(l))
}
#[cfg(feature = "p2p-unix")]
ListenerVariant::Unix(listener) => {
let path = match self.endpoint.to_file_path() {
Ok(v) => v,
Err(_) => return Err(io::Error::new(ErrorKind::Unsupported, "Invalid path")),
};
let l = listener.do_listen(&path).await?;
Ok(Box::new(l))
}
}
}
pub async fn endpoint(&self) -> Url {
match &self.variant {
ListenerVariant::Tcp(listener) | ListenerVariant::TcpTls(listener) => {
let mut endpoint = self.endpoint.clone();
let port = self.endpoint.port().unwrap();
if port == 0 {
if let Some(actual_port) = listener.port.get() {
endpoint.set_port(Some(*actual_port)).unwrap();
}
}
endpoint
}
#[cfg(feature = "p2p-tor")]
ListenerVariant::Tor(listener) => listener.endpoint.get().unwrap().clone(),
#[allow(unreachable_patterns)]
_ => self.endpoint.clone(),
}
}
}
pub trait PtStream: AsyncRead + AsyncWrite + Unpin + Send {}
impl PtStream for smol::net::TcpStream {}
impl PtStream for futures_rustls::TlsStream<smol::net::TcpStream> {}
#[cfg(feature = "p2p-tor")]
impl PtStream for arti_client::DataStream {}
#[cfg(feature = "p2p-tor")]
impl PtStream for futures_rustls::TlsStream<arti_client::DataStream> {}
#[cfg(feature = "p2p-unix")]
impl PtStream for smol::net::unix::UnixStream {}
#[async_trait]
pub trait PtListener: Send + Unpin {
async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)>;
}