use std::{
fmt::{self, Debug, Formatter},
fs::remove_dir_all,
io::{self, ErrorKind},
pin::Pin,
sync::Arc,
time::Duration,
};
use arti_client::{
config::{onion_service::OnionServiceConfigBuilder, BoolOrAuto, TorClientConfigBuilder},
DataStream, StreamPrefs, TorClient,
};
use async_trait::async_trait;
use futures::{
future::{select, Either},
pin_mut,
stream::StreamExt,
Stream,
};
use log::{debug, error, info, warn};
use smol::{
lock::{Mutex as AsyncMutex, OnceCell},
Timer,
};
use tor_cell::relaycell::msg::Connected;
use tor_error::ErrorReport;
use tor_hsservice::{HsNickname, RendRequest, RunningOnionService};
use tor_proto::stream::IncomingStreamRequest;
use tor_rtcompat::PreferredRuntime;
use url::Url;
use super::{PtListener, PtStream};
use crate::util::path::expand_path;
static TOR_CLIENT: OnceCell<TorClient<PreferredRuntime>> = OnceCell::new();
#[derive(Clone)]
pub struct TorDialer {
client: TorClient<PreferredRuntime>,
}
impl Debug for TorDialer {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
writeln!(f, "TorDialer {{ TorClient }}")
}
}
impl TorDialer {
pub(crate) async fn new(datastore: Option<String>) -> io::Result<Self> {
let client = match TOR_CLIENT
.get_or_try_init(|| async {
debug!(target: "net::tor::TorDialer", "Bootstrapping...");
if let Some(datadir) = &datastore {
let datadir = expand_path(datadir).unwrap();
let arti_data = datadir.join("arti-data");
let arti_cache = datadir.join("arti-cache");
if arti_data.exists() {
remove_dir_all(&arti_data).unwrap();
}
if arti_cache.exists() {
remove_dir_all(&arti_cache).unwrap();
}
let config = TorClientConfigBuilder::from_directories(arti_data, arti_cache)
.build()
.unwrap();
TorClient::create_bootstrapped(config).await
} else {
TorClient::builder().create_bootstrapped().await
}
})
.await
{
Ok(client) => client.isolated_client(),
Err(e) => {
warn!(target: "net::tor::TorDialer", "{}", e.report());
return Err(io::Error::other("Internal Tor error, see logged warning"))
}
};
Ok(Self { client })
}
pub(crate) async fn do_dial(
&self,
host: &str,
port: u16,
conn_timeout: Option<Duration>,
) -> io::Result<DataStream> {
debug!(target: "net::tor::do_dial", "Dialing {}:{} with Tor...", host, port);
let mut stream_prefs = StreamPrefs::new();
stream_prefs.connect_to_onion_services(BoolOrAuto::Explicit(true));
let connect = self.client.connect_with_prefs((host, port), &stream_prefs);
match conn_timeout {
Some(t) => {
let timeout = Timer::after(t);
pin_mut!(timeout);
pin_mut!(connect);
match select(connect, timeout).await {
Either::Left((Ok(stream), _)) => Ok(stream),
Either::Left((Err(e), _)) => {
warn!(target: "net::tor::do_dial", "{}", e.report());
Err(io::Error::other("Internal Tor error, see logged warning"))
}
Either::Right((_, _)) => Err(io::ErrorKind::TimedOut.into()),
}
}
None => {
match connect.await {
Ok(stream) => Ok(stream),
Err(e) => {
warn!(target: "net::tor::do_dial", "{}", e.report());
Err(io::Error::other("Internal Tor error, see logged warning"))
}
}
}
}
}
}
#[derive(Clone, Debug)]
pub struct TorListener {
datastore: Option<String>,
pub endpoint: Arc<OnceCell<Url>>,
}
impl TorListener {
pub async fn new(datastore: Option<String>) -> io::Result<Self> {
Ok(Self { datastore, endpoint: Arc::new(OnceCell::new()) })
}
pub(crate) async fn do_listen(&self, port: u16) -> io::Result<TorListenerIntern> {
let client = match TOR_CLIENT
.get_or_try_init(|| async {
debug!(target: "net::tor::do_listen", "Bootstrapping...");
if let Some(datadir) = &self.datastore {
let datadir = expand_path(datadir).unwrap();
let config = TorClientConfigBuilder::from_directories(datadir.clone(), datadir)
.build()
.unwrap();
TorClient::create_bootstrapped(config).await
} else {
TorClient::builder().create_bootstrapped().await
}
})
.await
{
Ok(client) => client,
Err(e) => {
warn!(target: "net::tor::do_listen", "{}", e.report());
return Err(io::Error::other("Internal Tor error, see logged warning"))
}
};
let hs_nick = HsNickname::new("darkfi_tor".to_string()).unwrap();
let hs_config = match OnionServiceConfigBuilder::default().nickname(hs_nick).build() {
Ok(v) => v,
Err(e) => {
error!(
target: "net::tor::do_listen",
"[P2P] Failed to create OnionServiceConfig: {}", e,
);
return Err(io::Error::other("Internal Tor error"))
}
};
let (onion_service, rendreq_stream) = match client.launch_onion_service(hs_config) {
Ok(v) => v,
Err(e) => {
error!(
target: "net::tor::do_listen",
"[P2P] Failed to launch Onion Service: {}", e,
);
return Err(io::Error::other("Internal Tor error"))
}
};
info!(
target: "net::tor::do_listen",
"[P2P] Established Tor listener on tor://{}:{}",
onion_service.onion_name().unwrap(), port,
);
let endpoint =
Url::parse(&format!("tor://{}:{}", onion_service.onion_name().unwrap(), port)).unwrap();
self.endpoint.set(endpoint).await.expect("fatal endpoint already set for TorListener");
Ok(TorListenerIntern {
port,
_onion_service: onion_service,
rendreq_stream: AsyncMutex::new(Box::pin(rendreq_stream)),
})
}
}
pub struct TorListenerIntern {
port: u16,
_onion_service: Arc<RunningOnionService>,
rendreq_stream: AsyncMutex<Pin<Box<dyn Stream<Item = RendRequest> + Send>>>,
}
unsafe impl Sync for TorListenerIntern {}
#[async_trait]
impl PtListener for TorListenerIntern {
async fn next(&self) -> io::Result<(Box<dyn PtStream>, Url)> {
let mut rendreq_stream = self.rendreq_stream.lock().await;
let Some(rendrequest) = rendreq_stream.next().await else {
return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"))
};
drop(rendreq_stream);
let mut streamreq_stream = match rendrequest.accept().await {
Ok(v) => v,
Err(e) => {
error!(
target: "net::tor::PtListener::next",
"[P2P] Failed accepting Tor RendRequest: {}", e,
);
return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"))
}
};
let Some(streamrequest) = streamreq_stream.next().await else {
return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"))
};
match streamrequest.request() {
IncomingStreamRequest::Begin(begin) => {
if begin.port() != self.port {
return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted"))
}
}
&_ => return Err(io::Error::new(ErrorKind::ConnectionAborted, "Connection Aborted")),
}
let stream = match streamrequest.accept(Connected::new_empty()).await {
Ok(v) => v,
Err(e) => {
error!(
target: "net::tor::PtListener::next",
"[P2P] Failed accepting Tor StreamRequest: {}", e,
);
return Err(io::Error::other("Internal Tor error"))
}
};
Ok((Box::new(stream), Url::parse(&format!("tor://127.0.0.1:{}", self.port)).unwrap()))
}
}