darkfi/net/session/
manual_session.rs1use std::sync::{Arc, Weak};
35
36use async_trait::async_trait;
37use futures::stream::{FuturesUnordered, StreamExt};
38use log::{debug, error, info, warn};
39use smol::lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
40use url::Url;
41
42use super::{
43 super::{
44 connector::Connector,
45 p2p::{P2p, P2pPtr},
46 },
47 Session, SessionBitFlag, SESSION_MANUAL,
48};
49use crate::{
50 net::{hosts::HostState, settings::Settings},
51 system::{sleep, StoppableTask, StoppableTaskPtr},
52 Error, Result,
53};
54
55pub type ManualSessionPtr = Arc<ManualSession>;
56
57pub struct ManualSession {
59 pub(in crate::net) p2p: Weak<P2p>,
60 slots: AsyncMutex<Vec<Arc<Slot>>>,
61}
62
63impl ManualSession {
64 pub fn new(p2p: Weak<P2p>) -> ManualSessionPtr {
66 Arc::new(Self { p2p, slots: AsyncMutex::new(Vec::new()) })
67 }
68
69 pub(crate) async fn start(self: Arc<Self>) {
70 let mut slots = self.slots.lock().await;
72
73 let mut futures = FuturesUnordered::new();
74
75 let self_ = Arc::downgrade(&self);
76
77 for peer in &self.p2p().settings().read().await.peers {
80 let slot = Slot::new(self_.clone(), peer.clone(), self.p2p().settings());
81 futures.push(slot.clone().start());
82 slots.push(slot);
83 }
84
85 while (futures.next().await).is_some() {}
86 }
87
88 pub async fn stop(&self) {
90 let slots = &*self.slots.lock().await;
91 let mut futures = FuturesUnordered::new();
92
93 for slot in slots {
94 futures.push(slot.stop());
95 }
96
97 while (futures.next().await).is_some() {}
98 }
99}
100
101#[async_trait]
102impl Session for ManualSession {
103 fn p2p(&self) -> P2pPtr {
104 self.p2p.upgrade().unwrap()
105 }
106
107 fn type_id(&self) -> SessionBitFlag {
108 SESSION_MANUAL
109 }
110}
111
112struct Slot {
113 addr: Url,
114 process: StoppableTaskPtr,
115 session: Weak<ManualSession>,
116 connector: Connector,
117}
118
119impl Slot {
120 fn new(
121 session: Weak<ManualSession>,
122 addr: Url,
123 settings: Arc<AsyncRwLock<Settings>>,
124 ) -> Arc<Self> {
125 Arc::new(Self {
126 addr,
127 process: StoppableTask::new(),
128 session: session.clone(),
129 connector: Connector::new(settings, session),
130 })
131 }
132
133 async fn start(self: Arc<Self>) {
134 let ex = self.p2p().executor();
135
136 self.process.clone().start(
137 self.run(),
138 |res| async {
139 match res {
140 Ok(()) | Err(Error::NetworkServiceStopped) => {}
141 Err(e) => error!("net::manual_session {}", e),
142 }
143 },
144 Error::NetworkServiceStopped,
145 ex,
146 );
147 }
148
149 async fn run(self: Arc<Self>) -> Result<()> {
151 let ex = self.p2p().executor();
152
153 let mut attempts = 0;
154 loop {
155 attempts += 1;
156
157 info!(
158 target: "net::manual_session",
159 "[P2P] Connecting to manual outbound [{}] (attempt #{})",
160 self.addr, attempts
161 );
162
163 let settings = self.p2p().settings().read_arc().await;
164 let seeds = settings.seeds.clone();
165 let outbound_connect_timeout = settings.outbound_connect_timeout;
166 drop(settings);
167
168 if seeds.contains(&self.addr) {
171 error!(
172 target: "net::manual_session",
173 "[P2P] Suspending manual connection to seed [{}]", self.addr.clone(),
174 );
175 return Ok(())
176 }
177
178 if let Err(e) = self.p2p().hosts().try_register(self.addr.clone(), HostState::Connect) {
179 debug!(target: "net::manual_session",
180 "Cannot connect to manual={}, err={}", &self.addr, e);
181
182 sleep(outbound_connect_timeout).await;
183
184 continue
185 }
186
187 match self.connector.connect(&self.addr).await {
188 Ok((url, channel)) => {
189 info!(
190 target: "net::manual_session",
191 "[P2P] Manual outbound connected [{}]", url,
192 );
193
194 let stop_sub = channel.subscribe_stop().await?;
195
196 match self.session().register_channel(channel.clone(), ex.clone()).await {
200 Ok(()) => {
201 stop_sub.receive().await;
203
204 info!(
205 target: "net::manual_session",
206 "[P2P] Manual outbound disconnected [{}]", url,
207 );
208 }
209 Err(e) => {
210 self.handle_failure(e, &url);
211 }
212 }
213 }
214 Err(e) => {
215 self.handle_failure(e, &self.addr);
216 }
217 }
218
219 info!(
220 target: "net::manual_session",
221 "[P2P] Waiting {} seconds until next manual outbound connection attempt [{}]",
222 outbound_connect_timeout, self.addr,
223 );
224
225 sleep(outbound_connect_timeout).await;
226 }
227 }
228
229 fn handle_failure(&self, error: Error, addr: &Url) {
230 warn!(
231 target: "net::manual_session",
232 "[P2P] Unable to connect to manual outbound [{}]: {}",
233 self.addr, error,
234 );
235
236 self.p2p().hosts().unregister(addr);
238 }
239
240 fn session(&self) -> ManualSessionPtr {
241 self.session.upgrade().unwrap()
242 }
243
244 fn p2p(&self) -> P2pPtr {
245 self.session().p2p()
246 }
247
248 async fn stop(&self) {
249 self.connector.stop();
250 self.process.stop().await;
251 }
252}