fud/
tasks.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{collections::HashMap, sync::Arc};
20
21use log::{error, info, warn};
22
23use darkfi::{
24    dht::{DhtHandler, DhtNode},
25    geode::hash_to_string,
26    system::{sleep, StoppableTask},
27    Error, Result,
28};
29
30use crate::{
31    event,
32    event::notify_event,
33    proto::{FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply},
34    Fud, FudEvent,
35};
36
37pub enum FetchReply {
38    Directory(FudDirectoryReply),
39    File(FudFileReply),
40    Chunk(FudChunkReply),
41}
42
43/// Triggered when calling the `fud.get()` method.
44/// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts
45/// it into the `fud.fetch_tasks` hashmap. When the task is stopped it's
46/// removed from the hashmap.
47pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
48    loop {
49        let (hash, path, files) = fud.get_rx.recv().await.unwrap();
50
51        // Create the new task
52        let mut fetch_tasks = fud.fetch_tasks.write().await;
53        let task = StoppableTask::new();
54        fetch_tasks.insert(hash, task.clone());
55        drop(fetch_tasks);
56
57        // Start the new task
58        let fud_1 = fud.clone();
59        let fud_2 = fud.clone();
60        task.start(
61            async move { fud_1.fetch_resource(&hash, &path, &files).await },
62            move |res| async move {
63                // Remove the task from the `fud.fetch_tasks` hashmap once it is
64                // stopped (error, manually, or just done).
65                let mut fetch_tasks = fud_2.fetch_tasks.write().await;
66                fetch_tasks.remove(&hash);
67                match res {
68                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
69                    Err(e) => {
70                        error!(target: "fud::get_task()", "Error while fetching resource: {e}");
71
72                        // Send a DownloadError for any error that stopped the fetch task
73                        notify_event!(fud_2, DownloadError, {
74                            hash,
75                            error: e.to_string(),
76                        });
77                    }
78                }
79            },
80            Error::DetachedTaskStopped,
81            fud.executor.clone(),
82        );
83    }
84}
85
86/// Triggered when calling the `fud.put()` method.
87pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
88    loop {
89        let path = fud.put_rx.recv().await.unwrap();
90
91        // Create the new task
92        let mut put_tasks = fud.put_tasks.write().await;
93        let task = StoppableTask::new();
94        put_tasks.insert(path.clone(), task.clone());
95        drop(put_tasks);
96
97        // Start the new task
98        let fud_1 = fud.clone();
99        let fud_2 = fud.clone();
100        let path_ = path.clone();
101        task.start(
102            async move { fud_1.insert_resource(&path_).await },
103            move |res| async move {
104                // Remove the task from the `fud.put_tasks` hashmap once it is
105                // stopped (error, manually, or just done).
106                let mut put_tasks = fud_2.put_tasks.write().await;
107                put_tasks.remove(&path);
108                match res {
109                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
110                    Err(e) => {
111                        error!(target: "fud::put_task()", "Error while inserting resource: {e}");
112
113                        // Send a InsertError for any error that stopped the fetch task
114                        notify_event!(fud_2, InsertError, {
115                            path,
116                            error: e.to_string(),
117                        });
118                    }
119                }
120            },
121            Error::DetachedTaskStopped,
122            fud.executor.clone(),
123        );
124    }
125}
126
127/// Background task that announces our files once every hour.
128/// Also removes seeders that did not announce for too long.
129pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
130    let interval = 3600; // TODO: Make a setting
131
132    loop {
133        sleep(interval).await;
134
135        let seeders = vec![fud.node().await.into()];
136
137        info!(target: "fud::announce_seed_task()", "Verifying seeds...");
138        let seeding_resources = match fud.verify_resources(None).await {
139            Ok(resources) => resources,
140            Err(e) => {
141                error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
142                continue;
143            }
144        };
145
146        info!(target: "fud::announce_seed_task()", "Announcing files...");
147        for resource in seeding_resources {
148            let _ = fud
149                .announce(
150                    &resource.hash,
151                    &FudAnnounce { key: resource.hash, seeders: seeders.clone() },
152                    fud.seeders_router.clone(),
153                )
154                .await;
155        }
156
157        info!(target: "fud::announce_seed_task()", "Pruning seeders...");
158        fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await;
159    }
160}
161
162/// Background task that:
163/// 1. Updates the [`crate::bitcoin::BitcoinHashCache`]
164/// 2. Removes old nodes from the DHT
165/// 3. Removes old nodes from the seeders router
166/// 4. If the Bitcoin block hash we currently use in our `fud.node_data` is too old, we update it and reset our DHT
167pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
168    let interval = 600; // TODO: Make a setting
169
170    loop {
171        sleep(interval).await;
172
173        let mut pow = fud.pow.write().await;
174        let btc = &mut pow.bitcoin_hash_cache;
175
176        if btc.update().await.is_err() {
177            continue
178        }
179
180        let block = fud.node_data.read().await.btc_block_hash;
181        let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
182            Some(i) => i < 6,
183            None => true,
184        };
185
186        if !needs_dht_reset {
187            // Removes nodes in the DHT with unknown BTC block hashes.
188            let dht = fud.dht();
189            let mut buckets = dht.buckets.write().await;
190            for bucket in buckets.iter_mut() {
191                for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
192                    // If this node's BTC block hash is unknown, remove it from the bucket
193                    if !btc.block_hashes.contains(&node.data.btc_block_hash) {
194                        bucket.nodes.remove(i);
195                        info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
196                    }
197                }
198            }
199            drop(buckets);
200
201            // Removes nodes in the seeders router with unknown BTC block hashes
202            let mut seeders_router = fud.seeders_router.write().await;
203            for (key, seeders) in seeders_router.iter_mut() {
204                for seeder in seeders.clone().iter() {
205                    if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
206                        seeders.remove(seeder);
207                        info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
208                    }
209                }
210            }
211
212            continue
213        }
214
215        info!(target: "fud::node_id_task()", "Creating a new node id...");
216        let (node_data, secret_key) = match pow.generate_node().await {
217            Ok(res) => res,
218            Err(e) => {
219                warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
220                continue
221            }
222        };
223        drop(pow);
224        info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
225
226        // Close all channels
227        let dht = fud.dht();
228        let mut channel_cache = dht.channel_cache.write().await;
229        for channel in dht.p2p.hosts().channels().clone() {
230            channel.stop().await;
231            channel_cache.remove(&channel.info.id);
232        }
233        drop(channel_cache);
234
235        // Reset the DHT
236        dht.reset().await;
237
238        // Reset the seeders router
239        *fud.seeders_router.write().await = HashMap::new();
240
241        // Update our node data and our secret key
242        *fud.node_data.write().await = node_data;
243        *fud.secret_key.write().await = secret_key;
244
245        // DHT will be bootstrapped on the next channel connection
246    }
247}
248
249macro_rules! start_task {
250    ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
251        info!(target: "fud", "Starting {} task", $task_name);
252        let task = StoppableTask::new();
253        let fud_ = $fud.clone();
254        task.clone().start(
255            async move { $task_fn(fud_).await },
256            |res| async {
257                match res {
258                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
259                    Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
260                }
261            },
262            Error::DetachedTaskStopped,
263            $fud.executor.clone(),
264        );
265        $tasks.insert($task_name.to_string(), task);
266    }};
267}
268pub(crate) use start_task;