fud/
tasks.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use log::{error, info, warn};
22
23use darkfi::{
24    dht::{DhtHandler, DhtNode},
25    geode::hash_to_string,
26    system::{sleep, StoppableTask},
27    Error, Result,
28};
29
30use crate::{event, event::notify_event, proto::FudAnnounce, Fud, FudEvent};
31
32/// Triggered when calling the `fud.get()` method.
33/// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts
34/// it into the `fud.fetch_tasks` hashmap. When the task is stopped it's
35/// removed from the hashmap.
36pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
37    loop {
38        let (hash, path, files) = fud.get_rx.recv().await.unwrap();
39
40        // Create the new task
41        let mut fetch_tasks = fud.fetch_tasks.write().await;
42        let task = StoppableTask::new();
43        fetch_tasks.insert(hash, task.clone());
44        drop(fetch_tasks);
45
46        // Start the new task
47        let fud_1 = fud.clone();
48        let fud_2 = fud.clone();
49        task.start(
50            async move { fud_1.fetch_resource(&hash, &path, &files).await },
51            move |res| async move {
52                // Remove the task from the `fud.fetch_tasks` hashmap once it is
53                // stopped (error, manually, or just done).
54                let mut fetch_tasks = fud_2.fetch_tasks.write().await;
55                fetch_tasks.remove(&hash);
56
57                // If there is still a lookup task for this hash, stop it
58                let lookup_tasks = fud_2.lookup_tasks.read().await;
59                if let Some(lookup_task) = lookup_tasks.get(&hash) {
60                    lookup_task.stop().await;
61                }
62
63                match res {
64                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
65                    Err(e) => {
66                        error!(target: "fud::get_task()", "Error while fetching resource: {e}");
67
68                        // Send a DownloadError for any error that stopped the fetch task
69                        notify_event!(fud_2, DownloadError, {
70                            hash,
71                            error: e.to_string(),
72                        });
73                    }
74                }
75            },
76            Error::DetachedTaskStopped,
77            fud.executor.clone(),
78        );
79    }
80}
81
82/// Triggered when calling the `fud.put()` method.
83pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
84    loop {
85        let path = fud.put_rx.recv().await.unwrap();
86
87        // Create the new task
88        let mut put_tasks = fud.put_tasks.write().await;
89        let task = StoppableTask::new();
90        put_tasks.insert(path.clone(), task.clone());
91        drop(put_tasks);
92
93        // Start the new task
94        let fud_1 = fud.clone();
95        let fud_2 = fud.clone();
96        let path_ = path.clone();
97        task.start(
98            async move { fud_1.insert_resource(&path_).await },
99            move |res| async move {
100                // Remove the task from the `fud.put_tasks` hashmap once it is
101                // stopped (error, manually, or just done).
102                let mut put_tasks = fud_2.put_tasks.write().await;
103                put_tasks.remove(&path);
104                match res {
105                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
106                    Err(e) => {
107                        error!(target: "fud::put_task()", "Error while inserting resource: {e}");
108
109                        // Send a InsertError for any error that stopped the fetch task
110                        notify_event!(fud_2, InsertError, {
111                            path,
112                            error: e.to_string(),
113                        });
114                    }
115                }
116            },
117            Error::DetachedTaskStopped,
118            fud.executor.clone(),
119        );
120    }
121}
122
123/// Triggered when you need to lookup seeders for a resource.
124pub async fn lookup_task(fud: Arc<Fud>) -> Result<()> {
125    loop {
126        let (key, seeders_pub) = fud.lookup_rx.recv().await.unwrap();
127
128        let mut lookup_tasks = fud.lookup_tasks.write().await;
129        let task = StoppableTask::new();
130        lookup_tasks.insert(key, task.clone());
131        drop(lookup_tasks);
132
133        let fud_1 = fud.clone();
134        let fud_2 = fud.clone();
135        task.start(
136            async move {
137                fud_1.dht.lookup_value(&key, seeders_pub).await?;
138                Ok(())
139            },
140            move |res| async move {
141                // Remove the task from the `fud.lookup_tasks` hashmap once it is
142                // stopped (error, manually, or just done).
143                let mut lookup_tasks = fud_2.lookup_tasks.write().await;
144                lookup_tasks.remove(&key);
145                match res {
146                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
147                    Err(e) => {
148                        error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}");
149                    }
150                }
151            },
152            Error::DetachedTaskStopped,
153            fud.executor.clone(),
154        );
155    }
156}
157
158/// Background task that announces our files once every hour.
159/// Also removes seeders that did not announce for too long.
160pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
161    let interval = 3600; // TODO: Make a setting
162
163    loop {
164        sleep(interval).await;
165
166        info!(target: "fud::announce_seed_task()", "Verifying seeds...");
167        let seeding_resources = match fud.verify_resources(None).await {
168            Ok(resources) => resources,
169            Err(e) => {
170                error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
171                continue;
172            }
173        };
174
175        info!(target: "fud::announce_seed_task()", "Announcing files...");
176        for resource in seeding_resources {
177            let seeders = vec![fud.new_seeder(&resource.hash).await];
178            let _ = fud
179                .dht
180                .announce(
181                    &resource.hash,
182                    &seeders.clone(),
183                    &FudAnnounce { key: resource.hash, seeders },
184                )
185                .await;
186        }
187
188        info!(target: "fud::announce_seed_task()", "Pruning seeders...");
189        fud.prune_seeders(interval.try_into().unwrap()).await;
190    }
191}
192
193/// Background task that:
194/// 1. Updates the [`crate::bitcoin::BitcoinHashCache`]
195/// 2. Removes old nodes from the DHT
196/// 3. Removes old nodes from the seeders router
197/// 4. If the Bitcoin block hash we currently use in our `fud.node_data` is too old, we update it and reset our DHT
198pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
199    let interval = 600; // TODO: Make a setting
200
201    loop {
202        sleep(interval).await;
203
204        let mut pow = fud.pow.write().await;
205        let btc = &mut pow.bitcoin_hash_cache;
206
207        if btc.update().await.is_err() {
208            continue
209        }
210
211        let block = fud.node_data.read().await.btc_block_hash;
212        let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
213            Some(i) => i < 6,
214            None => true,
215        };
216
217        if !needs_dht_reset {
218            // Removes nodes in the DHT with unknown BTC block hashes.
219            let dht = fud.dht();
220            let mut buckets = dht.buckets.write().await;
221            for bucket in buckets.iter_mut() {
222                for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
223                    // If this node's BTC block hash is unknown, remove it from the bucket
224                    if !btc.block_hashes.contains(&node.data.btc_block_hash) {
225                        bucket.nodes.remove(i);
226                        info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
227                    }
228                }
229            }
230            drop(buckets);
231
232            // Removes nodes in the seeders router with unknown BTC block hashes
233            let mut seeders_table = fud.dht.hash_table.write().await;
234            for (key, seeders) in seeders_table.iter_mut() {
235                for (i, seeder) in seeders.clone().iter().enumerate().rev() {
236                    if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
237                        seeders.remove(i);
238                        info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
239                    }
240                }
241            }
242
243            continue
244        }
245
246        info!(target: "fud::node_id_task()", "Creating a new node id...");
247        let (node_data, secret_key) = match pow.generate_node().await {
248            Ok(res) => res,
249            Err(e) => {
250                warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
251                continue
252            }
253        };
254        drop(pow);
255        info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
256
257        // Close all channels
258        let dht = fud.dht();
259        let mut channel_cache = dht.channel_cache.write().await;
260        for channel in dht.p2p.hosts().channels().clone() {
261            channel.stop().await;
262            channel_cache.remove(&channel.info.id);
263        }
264        drop(channel_cache);
265
266        // Reset the DHT: removes known nodes and seeders
267        dht.reset().await;
268
269        // Update our node data and our secret key
270        *fud.node_data.write().await = node_data;
271        *fud.secret_key.write().await = secret_key;
272
273        // DHT will be bootstrapped on the next channel connection
274    }
275}
276
277macro_rules! start_task {
278    ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
279        info!(target: "fud", "Starting {} task", $task_name);
280        let task = StoppableTask::new();
281        let fud_ = $fud.clone();
282        task.clone().start(
283            async move { $task_fn(fud_).await },
284            |res| async {
285                match res {
286                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
287                    Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
288                }
289            },
290            Error::DetachedTaskStopped,
291            $fud.executor.clone(),
292        );
293        $tasks.insert($task_name.to_string(), task);
294    }};
295}
296pub(crate) use start_task;