1use std::sync::Arc;
20
21use tracing::{error, info, warn};
22
23use darkfi::{
24 dht::{event::DhtEvent, DhtHandler, DhtNode},
25 geode::hash_to_string,
26 system::{sleep, StoppableTask},
27 Error, Result,
28};
29
30use crate::{
31 event::{self, notify_event},
32 proto::FudAnnounce,
33 Fud, FudEvent, FudState,
34};
35
36pub async fn handle_dht_events(fud: Arc<Fud>) -> Result<()> {
38 let sub = fud.dht().subscribe().await;
39 loop {
40 let event = sub.receive().await;
41
42 match event {
43 DhtEvent::ValueLookupCompleted { key, values, .. } => {
44 let mut seeders: Vec<_> = values.into_iter().flatten().collect();
45 seeders.dedup_by_key(|seeder| seeder.node.id());
46 notify_event!(fud, SeedersFound, {
47 hash: key,
48 seeders
49 });
50 }
51 DhtEvent::BootstrapCompleted => {
52 let _ = fud.init().await;
53 notify_event!(fud, Ready);
54 }
55 _ => {}
56 }
57 }
58}
59
60pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
65 loop {
66 let (hash, path, files) = fud.get_rx.recv().await.unwrap();
67
68 let mut fetch_tasks = fud.fetch_tasks.write().await;
70 let task = StoppableTask::new();
71 fetch_tasks.insert(hash, task.clone());
72 drop(fetch_tasks);
73
74 let fud_1 = fud.clone();
76 let fud_2 = fud.clone();
77 task.start(
78 async move { fud_1.fetch_resource(&hash, &path, &files).await },
79 move |res| async move {
80 let mut fetch_tasks = fud_2.fetch_tasks.write().await;
83 fetch_tasks.remove(&hash);
84
85 let lookup_tasks = fud_2.lookup_tasks.read().await;
87 if let Some(lookup_task) = lookup_tasks.get(&hash) {
88 lookup_task.stop().await;
89 }
90
91 match res {
92 Ok(()) | Err(Error::DetachedTaskStopped) => { }
93 Err(e) => {
94 error!(target: "fud::get_task()", "Error while fetching resource: {e}");
95
96 notify_event!(fud_2, DownloadError, {
98 hash,
99 error: e.to_string(),
100 });
101 }
102 }
103 },
104 Error::DetachedTaskStopped,
105 fud.executor.clone(),
106 );
107 }
108}
109
110pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
112 loop {
113 let path = fud.put_rx.recv().await.unwrap();
114
115 let mut put_tasks = fud.put_tasks.write().await;
117 let task = StoppableTask::new();
118 put_tasks.insert(path.clone(), task.clone());
119 drop(put_tasks);
120
121 let fud_1 = fud.clone();
123 let fud_2 = fud.clone();
124 let path_ = path.clone();
125 task.start(
126 async move { fud_1.insert_resource(&path_).await },
127 move |res| async move {
128 let mut put_tasks = fud_2.put_tasks.write().await;
131 put_tasks.remove(&path);
132 match res {
133 Ok(()) | Err(Error::DetachedTaskStopped) => { }
134 Err(e) => {
135 error!(target: "fud::put_task()", "Error while inserting resource: {e}");
136
137 notify_event!(fud_2, InsertError, {
139 path,
140 error: e.to_string(),
141 });
142 }
143 }
144 },
145 Error::DetachedTaskStopped,
146 fud.executor.clone(),
147 );
148 }
149}
150
151pub async fn lookup_task(fud: Arc<Fud>) -> Result<()> {
153 loop {
154 let key = fud.lookup_rx.recv().await.unwrap();
155
156 let mut lookup_tasks = fud.lookup_tasks.write().await;
157 let task = StoppableTask::new();
158 lookup_tasks.insert(key, task.clone());
159 drop(lookup_tasks);
160
161 let fud_1 = fud.clone();
162 let fud_2 = fud.clone();
163 task.start(
164 async move {
165 fud_1.dht.lookup_value(&key).await;
166 Ok(())
167 },
168 move |res| async move {
169 let mut lookup_tasks = fud_2.lookup_tasks.write().await;
172 lookup_tasks.remove(&key);
173 match res {
174 Ok(()) | Err(Error::DetachedTaskStopped) => { }
175 Err(e) => {
176 error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}");
177 }
178 }
179 },
180 Error::DetachedTaskStopped,
181 fud.executor.clone(),
182 );
183 }
184}
185
186pub async fn verify_node_task(fud: Arc<Fud>) -> Result<()> {
190 loop {
191 let node = fud.verify_node_rx.recv().await.unwrap();
192 if let Ok((channel, _)) = fud.dht.create_channel_to_node(&node).await {
193 fud.dht.cleanup_channel(channel).await;
194 }
195 }
196}
197
198pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
201 let interval = 3600; loop {
204 sleep(interval).await;
205
206 info!(target: "fud::announce_seed_task()", "Verifying seeds...");
207 let seeding_resources = match fud.verify_resources(None).await {
208 Ok(resources) => resources,
209 Err(e) => {
210 error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
211 continue;
212 }
213 };
214
215 info!(target: "fud::announce_seed_task()", "Announcing files...");
216 for resource in seeding_resources {
217 if let Ok(seeder) = fud.new_seeder(&resource.hash).await {
218 let seeders = vec![seeder];
219 let _ = fud
220 .dht
221 .announce(
222 &resource.hash,
223 &seeders.clone(),
224 &FudAnnounce { key: resource.hash, seeders },
225 )
226 .await;
227 }
228 }
229
230 info!(target: "fud::announce_seed_task()", "Pruning seeders...");
231 fud.prune_seeders(interval.try_into().unwrap()).await;
232 }
233}
234
235pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
241 let interval = 600; loop {
244 sleep(interval).await;
245
246 let mut pow = fud.pow.write().await;
247 if !pow.settings.read().await.btc_enabled {
248 continue
249 }
250
251 let btc = &mut pow.bitcoin_hash_cache;
252
253 if btc.update().await.is_err() {
254 continue
255 }
256
257 let state = fud.state.read().await;
258 if state.is_none() {
259 continue
260 }
261 let block = state.clone().unwrap().node_data.btc_block_hash;
262 drop(state);
263 let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
264 Some(i) => i < 6,
265 None => true,
266 };
267
268 if !needs_dht_reset {
269 let dht = fud.dht();
271 let mut buckets = dht.buckets.write().await;
272 for bucket in buckets.iter_mut() {
273 for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
274 if !btc.block_hashes.contains(&node.data.btc_block_hash) {
276 bucket.nodes.remove(i);
277 info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
278 }
279 }
280 }
281 drop(buckets);
282
283 let mut seeders_table = fud.dht.hash_table.write().await;
285 for (key, seeders) in seeders_table.iter_mut() {
286 for (i, seeder) in seeders.clone().iter().enumerate().rev() {
287 if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
288 seeders.remove(i);
289 info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
290 }
291 }
292 }
293
294 continue
295 }
296
297 info!(target: "fud::node_id_task()", "Creating a new node id...");
298 let (node_data, secret_key) = match pow.generate_node().await {
299 Ok(res) => res,
300 Err(e) => {
301 warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
302 continue
303 }
304 };
305 drop(pow);
306 info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
307
308 let dht = fud.dht();
310 let mut channel_cache = dht.channel_cache.write().await;
311 for channel in dht.p2p.hosts().channels().clone() {
312 channel.stop().await;
313 channel_cache.remove(&channel.info.id);
314 }
315 drop(channel_cache);
316
317 dht.reset().await;
319
320 let mut state = fud.state.write().await;
322 *state = Some(FudState { node_data, secret_key });
323
324 }
326}
327
328macro_rules! start_task {
329 ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
330 info!(target: "fud", "Starting {} task", $task_name);
331 let task = StoppableTask::new();
332 let fud_ = $fud.clone();
333 task.clone().start(
334 async move { $task_fn(fud_).await },
335 |res| async {
336 match res {
337 Ok(()) | Err(Error::DetachedTaskStopped) => { }
338 Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
339 }
340 },
341 Error::DetachedTaskStopped,
342 $fud.executor.clone(),
343 );
344 $tasks.insert($task_name.to_string(), task);
345 }};
346}
347pub(crate) use start_task;