fud/
tasks.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use tracing::{error, info, warn};
22
23use darkfi::{
24    dht::{event::DhtEvent, DhtHandler, DhtNode},
25    geode::hash_to_string,
26    system::{sleep, StoppableTask},
27    Error, Result,
28};
29
30use crate::{
31    event::{self, notify_event},
32    proto::FudAnnounce,
33    Fud, FudEvent, FudState,
34};
35
36/// Handle DHT events in fud.
37pub async fn handle_dht_events(fud: Arc<Fud>) -> Result<()> {
38    let sub = fud.dht().subscribe().await;
39    loop {
40        let event = sub.receive().await;
41
42        match event {
43            DhtEvent::ValueLookupCompleted { key, values, .. } => {
44                let mut seeders: Vec<_> = values.into_iter().flatten().collect();
45                seeders.dedup_by_key(|seeder| seeder.node.id());
46                notify_event!(fud, SeedersFound, {
47                    hash: key,
48                    seeders
49                });
50            }
51            DhtEvent::BootstrapCompleted => {
52                let _ = fud.init().await;
53                notify_event!(fud, Ready);
54            }
55            _ => {}
56        }
57    }
58}
59
60/// Triggered when calling the `fud.get()` method.
61/// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts
62/// it into the `fud.fetch_tasks` hashmap. When the task is stopped it's
63/// removed from the hashmap.
64pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
65    loop {
66        let (hash, path, files) = fud.get_rx.recv().await.unwrap();
67
68        // Create the new task
69        let mut fetch_tasks = fud.fetch_tasks.write().await;
70        let task = StoppableTask::new();
71        fetch_tasks.insert(hash, task.clone());
72        drop(fetch_tasks);
73
74        // Start the new task
75        let fud_1 = fud.clone();
76        let fud_2 = fud.clone();
77        task.start(
78            async move { fud_1.fetch_resource(&hash, &path, &files).await },
79            move |res| async move {
80                // Remove the task from the `fud.fetch_tasks` hashmap once it is
81                // stopped (error, manually, or just done).
82                let mut fetch_tasks = fud_2.fetch_tasks.write().await;
83                fetch_tasks.remove(&hash);
84
85                // If there is still a lookup task for this hash, stop it
86                let lookup_tasks = fud_2.lookup_tasks.read().await;
87                if let Some(lookup_task) = lookup_tasks.get(&hash) {
88                    lookup_task.stop().await;
89                }
90
91                match res {
92                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
93                    Err(e) => {
94                        error!(target: "fud::get_task()", "Error while fetching resource: {e}");
95
96                        // Send a DownloadError for any error that stopped the fetch task
97                        notify_event!(fud_2, DownloadError, {
98                            hash,
99                            error: e.to_string(),
100                        });
101                    }
102                }
103            },
104            Error::DetachedTaskStopped,
105            fud.executor.clone(),
106        );
107    }
108}
109
110/// Triggered when calling the `fud.put()` method.
111pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
112    loop {
113        let path = fud.put_rx.recv().await.unwrap();
114
115        // Create the new task
116        let mut put_tasks = fud.put_tasks.write().await;
117        let task = StoppableTask::new();
118        put_tasks.insert(path.clone(), task.clone());
119        drop(put_tasks);
120
121        // Start the new task
122        let fud_1 = fud.clone();
123        let fud_2 = fud.clone();
124        let path_ = path.clone();
125        task.start(
126            async move { fud_1.insert_resource(&path_).await },
127            move |res| async move {
128                // Remove the task from the `fud.put_tasks` hashmap once it is
129                // stopped (error, manually, or just done).
130                let mut put_tasks = fud_2.put_tasks.write().await;
131                put_tasks.remove(&path);
132                match res {
133                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
134                    Err(e) => {
135                        error!(target: "fud::put_task()", "Error while inserting resource: {e}");
136
137                        // Send a InsertError for any error that stopped the fetch task
138                        notify_event!(fud_2, InsertError, {
139                            path,
140                            error: e.to_string(),
141                        });
142                    }
143                }
144            },
145            Error::DetachedTaskStopped,
146            fud.executor.clone(),
147        );
148    }
149}
150
151/// Triggered when you need to lookup seeders for a resource.
152pub async fn lookup_task(fud: Arc<Fud>) -> Result<()> {
153    loop {
154        let key = fud.lookup_rx.recv().await.unwrap();
155
156        let mut lookup_tasks = fud.lookup_tasks.write().await;
157        let task = StoppableTask::new();
158        lookup_tasks.insert(key, task.clone());
159        drop(lookup_tasks);
160
161        let fud_1 = fud.clone();
162        let fud_2 = fud.clone();
163        task.start(
164            async move {
165                fud_1.dht.lookup_value(&key).await;
166                Ok(())
167            },
168            move |res| async move {
169                // Remove the task from the `fud.lookup_tasks` hashmap once it is
170                // stopped (error, manually, or just done).
171                let mut lookup_tasks = fud_2.lookup_tasks.write().await;
172                lookup_tasks.remove(&key);
173                match res {
174                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
175                    Err(e) => {
176                        error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}");
177                    }
178                }
179            },
180            Error::DetachedTaskStopped,
181            fud.executor.clone(),
182        );
183    }
184}
185
186/// After pinging an inbound connection, this task is triggered to make sure
187/// that you are able to reach at least one of the node's external address.
188/// [`Fud::ping()`] will take care of adding the node to our buckets.
189pub async fn verify_node_task(fud: Arc<Fud>) -> Result<()> {
190    loop {
191        let node = fud.verify_node_rx.recv().await.unwrap();
192        if let Ok((channel, _)) = fud.dht.create_channel_to_node(&node).await {
193            fud.dht.cleanup_channel(channel).await;
194        }
195    }
196}
197
198/// Background task that announces our files once every hour.
199/// Also removes seeders that did not announce for too long.
200pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
201    let interval = 3600; // TODO: Make a setting
202
203    loop {
204        sleep(interval).await;
205
206        info!(target: "fud::announce_seed_task()", "Verifying seeds...");
207        let seeding_resources = match fud.verify_resources(None).await {
208            Ok(resources) => resources,
209            Err(e) => {
210                error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
211                continue;
212            }
213        };
214
215        info!(target: "fud::announce_seed_task()", "Announcing files...");
216        for resource in seeding_resources {
217            if let Ok(seeder) = fud.new_seeder(&resource.hash).await {
218                let seeders = vec![seeder];
219                let _ = fud
220                    .dht
221                    .announce(
222                        &resource.hash,
223                        &seeders.clone(),
224                        &FudAnnounce { key: resource.hash, seeders },
225                    )
226                    .await;
227            }
228        }
229
230        info!(target: "fud::announce_seed_task()", "Pruning seeders...");
231        fud.prune_seeders(interval.try_into().unwrap()).await;
232    }
233}
234
235/// Background task that:
236/// 1. Updates the [`crate::bitcoin::BitcoinHashCache`]
237/// 2. Removes old nodes from the DHT
238/// 3. Removes old nodes from the seeders router
239/// 4. If the Bitcoin block hash we currently use in our `fud.node_data` is too old, we update it and reset our DHT
240pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
241    let interval = 600; // TODO: Make a setting
242
243    loop {
244        sleep(interval).await;
245
246        let mut pow = fud.pow.write().await;
247        if !pow.settings.read().await.btc_enabled {
248            continue
249        }
250
251        let btc = &mut pow.bitcoin_hash_cache;
252
253        if btc.update().await.is_err() {
254            continue
255        }
256
257        let state = fud.state.read().await;
258        if state.is_none() {
259            continue
260        }
261        let block = state.clone().unwrap().node_data.btc_block_hash;
262        drop(state);
263        let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
264            Some(i) => i < 6,
265            None => true,
266        };
267
268        if !needs_dht_reset {
269            // Removes nodes in the DHT with unknown BTC block hashes.
270            let dht = fud.dht();
271            let mut buckets = dht.buckets.write().await;
272            for bucket in buckets.iter_mut() {
273                for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
274                    // If this node's BTC block hash is unknown, remove it from the bucket
275                    if !btc.block_hashes.contains(&node.data.btc_block_hash) {
276                        bucket.nodes.remove(i);
277                        info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
278                    }
279                }
280            }
281            drop(buckets);
282
283            // Removes nodes in the seeders router with unknown BTC block hashes
284            let mut seeders_table = fud.dht.hash_table.write().await;
285            for (key, seeders) in seeders_table.iter_mut() {
286                for (i, seeder) in seeders.clone().iter().enumerate().rev() {
287                    if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
288                        seeders.remove(i);
289                        info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
290                    }
291                }
292            }
293
294            continue
295        }
296
297        info!(target: "fud::node_id_task()", "Creating a new node id...");
298        let (node_data, secret_key) = match pow.generate_node().await {
299            Ok(res) => res,
300            Err(e) => {
301                warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
302                continue
303            }
304        };
305        drop(pow);
306        info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
307
308        // Close all channels
309        let dht = fud.dht();
310        let mut channel_cache = dht.channel_cache.write().await;
311        for channel in dht.p2p.hosts().channels().clone() {
312            channel.stop().await;
313            channel_cache.remove(&channel.info.id);
314        }
315        drop(channel_cache);
316
317        // Reset the DHT: removes known nodes and seeders
318        dht.reset().await;
319
320        // Update our node data and our secret key
321        let mut state = fud.state.write().await;
322        *state = Some(FudState { node_data, secret_key });
323
324        // DHT will be bootstrapped on the next channel connection
325    }
326}
327
328macro_rules! start_task {
329    ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
330        info!(target: "fud", "Starting {} task", $task_name);
331        let task = StoppableTask::new();
332        let fud_ = $fud.clone();
333        task.clone().start(
334            async move { $task_fn(fud_).await },
335            |res| async {
336                match res {
337                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
338                    Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
339                }
340            },
341            Error::DetachedTaskStopped,
342            $fud.executor.clone(),
343        );
344        $tasks.insert($task_name.to_string(), task);
345    }};
346}
347pub(crate) use start_task;