fud/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    io::ErrorKind,
22    path::{Path, PathBuf},
23    sync::Arc,
24    time::Instant,
25};
26
27use async_trait::async_trait;
28use futures::{future::FutureExt, pin_mut, select};
29use log::{debug, error, info, warn};
30use num_bigint::BigUint;
31use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, Rng};
32use sled_overlay::sled;
33use smol::{
34    channel,
35    fs::{self, File, OpenOptions},
36    lock::RwLock,
37};
38use url::Url;
39
40use darkfi::{
41    dht::{
42        impl_dht_node_defaults, tasks as dht_tasks, Dht, DhtHandler, DhtNode, DhtRouterItem,
43        DhtRouterPtr, DhtSettings,
44    },
45    geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
46    net::{ChannelPtr, P2pPtr},
47    system::{ExecutorPtr, PublisherPtr, StoppableTask},
48    util::path::expand_path,
49    Error, Result,
50};
51use darkfi_sdk::crypto::{schnorr::SchnorrPublic, SecretKey};
52use darkfi_serial::{deserialize_async, serialize_async, SerialDecodable, SerialEncodable};
53
54/// P2P protocols
55pub mod proto;
56use proto::{
57    FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply, FudFindNodesReply,
58    FudFindNodesRequest, FudFindRequest, FudFindSeedersReply, FudFindSeedersRequest, FudNotFound,
59    FudPingReply, FudPingRequest,
60};
61
62/// FudEvent
63pub mod event;
64use event::{notify_event, FudEvent};
65
66/// Resource definition
67pub mod resource;
68use resource::{Resource, ResourceStatus, ResourceType};
69
70/// Scrap definition
71pub mod scrap;
72use scrap::Scrap;
73
74/// JSON-RPC related methods
75pub mod rpc;
76
77/// Background tasks
78pub mod tasks;
79use tasks::{start_task, FetchReply};
80
81/// Bitcoin
82pub mod bitcoin;
83
84/// PoW
85pub mod pow;
86use pow::{FudPow, VerifiableNodeData};
87
88/// Equi-X
89pub mod equix;
90
91/// Settings and args
92pub mod settings;
93use settings::Args;
94
95/// Utils
96pub mod util;
97use util::{get_all_files, FileSelection};
98
99const SLED_PATH_TREE: &[u8] = b"_fud_paths";
100const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
101const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
102
103#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
104pub struct FudNode {
105    data: VerifiableNodeData,
106    addresses: Vec<Url>,
107}
108impl_dht_node_defaults!(FudNode);
109
110impl DhtNode for FudNode {
111    fn id(&self) -> blake3::Hash {
112        self.data.id()
113    }
114    fn addresses(&self) -> Vec<Url> {
115        self.addresses.clone()
116    }
117}
118
119pub struct Fud {
120    /// Our own [`VerifiableNodeData`]
121    pub node_data: Arc<RwLock<VerifiableNodeData>>,
122
123    /// Our secret key (the public key is in `node_data`)
124    pub secret_key: Arc<RwLock<SecretKey>>,
125
126    /// Key -> Seeders
127    pub seeders_router: DhtRouterPtr<FudNode>,
128
129    /// Pointer to the P2P network instance
130    p2p: P2pPtr,
131
132    /// The Geode instance
133    geode: Geode,
134
135    /// Default download directory
136    downloads_path: PathBuf,
137
138    /// Chunk transfer timeout in seconds
139    chunk_timeout: u64,
140
141    /// The [`FudPow`] instance
142    pub pow: Arc<RwLock<FudPow>>,
143
144    /// The DHT instance
145    dht: Arc<Dht<FudNode>>,
146
147    /// Resources (current status of all downloads/seeds)
148    resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
149
150    /// Sled tree containing "resource hash -> path on the filesystem"
151    path_tree: sled::Tree,
152
153    /// Sled tree containing "resource hash -> file selection". If the file
154    /// selection is all files of the resource (or if the resource is not a
155    /// directory), the resource does not store its file selection in the tree.
156    file_selection_tree: sled::Tree,
157
158    /// Sled tree containing scraps which are chunks containing data the user
159    /// did not want to save to files. They also contain data the user wanted
160    /// otherwise we would not have downloaded the chunk at all.
161    /// "chunk/scrap hash -> chunk content"
162    scrap_tree: sled::Tree,
163
164    get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
165    get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
166
167    put_tx: channel::Sender<PathBuf>,
168    put_rx: channel::Receiver<PathBuf>,
169
170    /// Currently active downloading tasks (running the `fud.fetch_resource()` method)
171    fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
172
173    /// Currently active put tasks (running the `fud.insert_resource()` method)
174    put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
175
176    /// Currently active tasks (defined in `tasks`, started with the `start_task` macro)
177    tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
178
179    /// Used to send events to fud clients
180    event_publisher: PublisherPtr<FudEvent>,
181
182    /// Global multithreaded executor reference
183    pub executor: ExecutorPtr,
184}
185
186#[async_trait]
187impl DhtHandler<FudNode> for Fud {
188    fn dht(&self) -> Arc<Dht<FudNode>> {
189        self.dht.clone()
190    }
191
192    async fn node(&self) -> FudNode {
193        FudNode {
194            data: self.node_data.read().await.clone(),
195            addresses: self
196                .p2p
197                .clone()
198                .hosts()
199                .external_addrs()
200                .await
201                .iter()
202                .filter(|addr| !addr.to_string().contains("[::]"))
203                .cloned()
204                .collect(),
205        }
206    }
207
208    async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
209        debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
210        let msg_subsystem = channel.message_subsystem();
211        msg_subsystem.add_dispatch::<FudPingReply>().await;
212        let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
213
214        // Send `FudPingRequest`
215        let mut rng = OsRng;
216        let request = FudPingRequest { random: rng.gen() };
217        channel.send(&request).await?;
218
219        // Wait for `FudPingReply`
220        let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await?;
221        msg_subscriber.unsubscribe().await;
222
223        // Verify the signature
224        if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
225            channel.ban().await;
226            return Err(Error::InvalidSignature)
227        }
228
229        // Verify PoW
230        if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await {
231            channel.ban().await;
232            return Err(e)
233        }
234
235        Ok(reply.node.clone())
236    }
237
238    // TODO: Optimize this
239    async fn on_new_node(&self, node: &FudNode) -> Result<()> {
240        debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id()));
241
242        // If this is the first node we know about, then bootstrap and announce our files
243        if !self.dht().is_bootstrapped().await {
244            let _ = self.init().await;
245        }
246
247        // Send keys that are closer to this node than we are
248        let self_id = self.node_data.read().await.id();
249        let channel = self.get_channel(node, None).await?;
250        for (key, seeders) in self.seeders_router.read().await.iter() {
251            let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id()));
252            let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
253            if node_distance <= self_distance {
254                let _ = channel
255                    .send(&FudAnnounce {
256                        key: *key,
257                        seeders: seeders.clone().into_iter().collect(),
258                    })
259                    .await;
260            }
261        }
262        self.cleanup_channel(channel).await;
263
264        Ok(())
265    }
266
267    async fn fetch_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> {
268        debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
269
270        let channel = self.get_channel(node, None).await?;
271        let msg_subsystem = channel.message_subsystem();
272        msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
273        let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
274
275        let request = FudFindNodesRequest { key: *key };
276        channel.send(&request).await?;
277
278        let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await;
279
280        msg_subscriber_nodes.unsubscribe().await;
281        self.cleanup_channel(channel).await;
282
283        Ok(reply?.nodes.clone())
284    }
285}
286
287impl Fud {
288    pub async fn new(
289        settings: Args,
290        p2p: P2pPtr,
291        sled_db: &sled::Db,
292        event_publisher: PublisherPtr<FudEvent>,
293        executor: ExecutorPtr,
294    ) -> Result<Self> {
295        let basedir = expand_path(&settings.base_dir)?;
296        let downloads_path = match settings.downloads_path {
297            Some(downloads_path) => expand_path(&downloads_path)?,
298            None => basedir.join("downloads"),
299        };
300
301        // Run the PoW and generate a `VerifiableNodeData`
302        let mut pow = FudPow::new(settings.pow.into(), executor.clone());
303        pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes
304        let (node_data, secret_key) = pow.generate_node().await?;
305        info!(target: "fud", "Your node ID: {}", hash_to_string(&node_data.id()));
306
307        // Geode
308        info!("Instantiating Geode instance");
309        let geode = Geode::new(&basedir).await?;
310
311        // DHT
312        let dht_settings: DhtSettings = settings.dht.into();
313        let dht: Arc<Dht<FudNode>> =
314            Arc::new(Dht::<FudNode>::new(&dht_settings, p2p.clone(), executor.clone()).await);
315
316        let (get_tx, get_rx) = smol::channel::unbounded();
317        let (put_tx, put_rx) = smol::channel::unbounded();
318        let fud = Self {
319            node_data: Arc::new(RwLock::new(node_data)),
320            secret_key: Arc::new(RwLock::new(secret_key)),
321            seeders_router: Arc::new(RwLock::new(HashMap::new())),
322            p2p,
323            geode,
324            downloads_path,
325            chunk_timeout: settings.chunk_timeout,
326            pow: Arc::new(RwLock::new(pow)),
327            dht,
328            path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
329            file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
330            scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
331            resources: Arc::new(RwLock::new(HashMap::new())),
332            get_tx,
333            get_rx,
334            put_tx,
335            put_rx,
336            fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
337            put_tasks: Arc::new(RwLock::new(HashMap::new())),
338            tasks: Arc::new(RwLock::new(HashMap::new())),
339            event_publisher,
340            executor,
341        };
342
343        Ok(fud)
344    }
345
346    pub async fn start_tasks(self: &Arc<Self>) {
347        let mut tasks = self.tasks.write().await;
348        start_task!(self, "get", tasks::get_task, tasks);
349        start_task!(self, "put", tasks::put_task, tasks);
350        start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud, FudNode>, tasks);
351        start_task!(self, "announce", tasks::announce_seed_task, tasks);
352        start_task!(self, "node ID", tasks::node_id_task, tasks);
353    }
354
355    /// Bootstrap the DHT, verify our resources, add ourselves to
356    /// `seeders_router` for the resources we already have, announce our files.
357    async fn init(&self) -> Result<()> {
358        info!(target: "fud::init()", "Bootstrapping the DHT...");
359        self.bootstrap().await;
360
361        info!(target: "fud::init()", "Finding resources...");
362        let mut resources_write = self.resources.write().await;
363        for result in self.path_tree.iter() {
364            if result.is_err() {
365                continue;
366            }
367
368            // Parse hash
369            let (hash, path) = result.unwrap();
370            let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
371                Ok(v) => v,
372                Err(_) => continue,
373            };
374            let hash = blake3::Hash::from_bytes(hash_bytes);
375
376            // Parse path
377            let path_bytes = path.to_vec();
378            let path_str = match std::str::from_utf8(&path_bytes) {
379                Ok(v) => v,
380                Err(_) => continue,
381            };
382            let path: PathBuf = match expand_path(path_str) {
383                Ok(v) => v,
384                Err(_) => continue,
385            };
386
387            // Get the file selection from sled, fallback on FileSelection::All
388            let mut file_selection = FileSelection::All;
389            if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
390                if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
391                    file_selection = FileSelection::Set(
392                        path_list
393                            .into_iter()
394                            .filter_map(|bytes| {
395                                std::str::from_utf8(&bytes)
396                                    .ok()
397                                    .and_then(|path_str| expand_path(path_str).ok())
398                            })
399                            .collect(),
400                    );
401                }
402            }
403
404            // Add resource
405            resources_write.insert(
406                hash,
407                Resource::new(
408                    hash,
409                    ResourceType::Unknown,
410                    &path,
411                    ResourceStatus::Incomplete,
412                    file_selection,
413                ),
414            );
415        }
416        drop(resources_write);
417
418        info!(target: "fud::init()", "Verifying resources...");
419        let resources = self.verify_resources(None).await?;
420
421        let self_node = self.node().await;
422
423        // Stop here if we have no external address
424        if self_node.addresses.is_empty() {
425            return Ok(());
426        }
427
428        // Add our own node as a seeder for the resources we are seeding
429        let self_router_items: Vec<DhtRouterItem<FudNode>> = vec![self_node.into()];
430        for resource in &resources {
431            self.add_to_router(
432                self.seeders_router.clone(),
433                &resource.hash,
434                self_router_items.clone(),
435            )
436            .await;
437        }
438
439        info!(target: "fud::init()", "Announcing resources...");
440        let seeders = vec![self.node().await.into()];
441        for resource in resources {
442            let _ = self
443                .announce(
444                    &resource.hash,
445                    &FudAnnounce { key: resource.hash, seeders: seeders.clone() },
446                    self.seeders_router.clone(),
447                )
448                .await;
449        }
450
451        Ok(())
452    }
453
454    /// Get resource path from hash using the sled db
455    pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
456        if let Some(value) = self.path_tree.get(hash.as_bytes())? {
457            let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
458            return Ok(Some(path));
459        }
460
461        Ok(None)
462    }
463
464    /// Get resource hash from path using the sled db
465    pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
466        let path_string = path.to_string_lossy().to_string();
467        let path_bytes = path_string.as_bytes();
468        for path_item in self.path_tree.iter() {
469            let (key, value) = path_item?;
470            if value == path_bytes {
471                let bytes: &[u8] = &key;
472                if bytes.len() != 32 {
473                    return Err(Error::Custom(format!(
474                        "Expected a 32-byte BLAKE3, got {} bytes",
475                        bytes.len()
476                    )));
477                }
478
479                let array: [u8; 32] = bytes.try_into().unwrap();
480                return Ok(Some(array.into()))
481            }
482        }
483
484        Ok(None)
485    }
486
487    /// Verify if resources are complete and uncorrupted.
488    /// If a resource is incomplete or corrupted, its status is changed to Incomplete.
489    /// If a resource is complete, its status is changed to Seeding.
490    /// Takes an optional list of resource hashes.
491    /// If no hash is given (None), it verifies all resources.
492    /// Returns the list of verified and uncorrupted/complete seeding resources.
493    pub async fn verify_resources(
494        &self,
495        hashes: Option<Vec<blake3::Hash>>,
496    ) -> Result<Vec<Resource>> {
497        let mut resources_write = self.resources.write().await;
498
499        let update_resource = async |resource: &mut Resource,
500                                     status: ResourceStatus,
501                                     chunked: Option<&ChunkedStorage>,
502                                     total_bytes_downloaded: u64,
503                                     target_bytes_downloaded: u64| {
504            let files = match chunked {
505                Some(chunked) => resource.get_selected_files(chunked),
506                None => vec![],
507            };
508            let chunk_hashes = match chunked {
509                Some(chunked) => resource.get_selected_chunks(chunked),
510                None => HashSet::new(),
511            };
512
513            if let Some(chunked) = chunked {
514                resource.rtype = match chunked.is_dir() {
515                    false => ResourceType::File,
516                    true => ResourceType::Directory,
517                };
518            }
519
520            resource.status = status;
521            resource.total_chunks_count = match chunked {
522                Some(chunked) => chunked.len() as u64,
523                None => 0,
524            };
525            resource.target_chunks_count = chunk_hashes.len() as u64;
526            resource.total_chunks_downloaded = match chunked {
527                Some(chunked) => chunked.local_chunks() as u64,
528                None => 0,
529            };
530            resource.target_chunks_downloaded = match chunked {
531                Some(chunked) => chunked
532                    .iter()
533                    .filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
534                    .count() as u64,
535                None => 0,
536            };
537
538            resource.total_bytes_size = match chunked {
539                Some(chunked) => chunked.get_fileseq().len(),
540                None => 0,
541            };
542            resource.target_bytes_size = match chunked {
543                Some(chunked) => chunked
544                    .get_files()
545                    .iter()
546                    .filter(|(path, _)| files.contains(path))
547                    .map(|(_, size)| size)
548                    .sum(),
549                None => 0,
550            };
551
552            resource.total_bytes_downloaded = total_bytes_downloaded;
553            resource.target_bytes_downloaded = target_bytes_downloaded;
554
555            notify_event!(self, ResourceUpdated, resource);
556        };
557
558        let mut seeding_resources: Vec<Resource> = vec![];
559        for (_, mut resource) in resources_write.iter_mut() {
560            if let Some(ref hashes_list) = hashes {
561                if !hashes_list.contains(&resource.hash) {
562                    continue;
563                }
564            }
565
566            match resource.status {
567                ResourceStatus::Seeding => {}
568                ResourceStatus::Incomplete => {}
569                _ => continue,
570            };
571
572            // Make sure the resource is not corrupted or incomplete
573            let resource_path = match self.hash_to_path(&resource.hash) {
574                Ok(Some(v)) => v,
575                Ok(None) | Err(_) => {
576                    update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
577                    continue;
578                }
579            };
580            let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
581                Ok(v) => v,
582                Err(_) => {
583                    update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
584                    continue;
585                }
586            };
587            let verify_res = self.verify_chunks(resource, &mut chunked).await;
588            if let Err(e) = verify_res {
589                error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
590                update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
591                continue;
592            }
593            let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
594
595            if !chunked.is_complete() {
596                update_resource(
597                    &mut resource,
598                    ResourceStatus::Incomplete,
599                    Some(&chunked),
600                    total_bytes_downloaded,
601                    target_bytes_downloaded,
602                )
603                .await;
604                continue;
605            }
606
607            update_resource(
608                &mut resource,
609                ResourceStatus::Seeding,
610                Some(&chunked),
611                total_bytes_downloaded,
612                target_bytes_downloaded,
613            )
614            .await;
615            seeding_resources.push(resource.clone());
616        }
617
618        Ok(seeding_resources)
619    }
620
621    /// Query `nodes` to find the seeders for `key`
622    async fn fetch_seeders(
623        &self,
624        nodes: &Vec<FudNode>,
625        key: &blake3::Hash,
626    ) -> HashSet<DhtRouterItem<FudNode>> {
627        let self_node = self.node().await;
628        let mut seeders: HashSet<DhtRouterItem<FudNode>> = HashSet::new();
629
630        for node in nodes {
631            let channel = match self.get_channel(node, None).await {
632                Ok(channel) => channel,
633                Err(e) => {
634                    warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id()));
635                    continue;
636                }
637            };
638            let msg_subsystem = channel.message_subsystem();
639            msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
640
641            let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
642                Ok(msg_subscriber) => msg_subscriber,
643                Err(e) => {
644                    warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {e}");
645                    self.cleanup_channel(channel).await;
646                    continue;
647                }
648            };
649
650            let send_res = channel.send(&FudFindSeedersRequest { key: *key }).await;
651            if let Err(e) = send_res {
652                warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {e}");
653                msg_subscriber.unsubscribe().await;
654                self.cleanup_channel(channel).await;
655                continue;
656            }
657
658            let reply = match msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await
659            {
660                Ok(reply) => reply,
661                Err(e) => {
662                    warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {e}");
663                    msg_subscriber.unsubscribe().await;
664                    self.cleanup_channel(channel).await;
665                    continue;
666                }
667            };
668
669            msg_subscriber.unsubscribe().await;
670            self.cleanup_channel(channel).await;
671
672            seeders.extend(reply.seeders.clone());
673        }
674
675        seeders =
676            seeders.iter().filter(|seeder| seeder.node.id() != self_node.id()).cloned().collect();
677
678        info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(key));
679        seeders
680    }
681
682    /// Fetch `chunks` for `chunked` (file or directory) from `seeders`.
683    async fn fetch_chunks(
684        &self,
685        hash: &blake3::Hash,
686        chunked: &mut ChunkedStorage,
687        seeders: &HashSet<DhtRouterItem<FudNode>>,
688        chunks: &HashSet<blake3::Hash>,
689    ) -> Result<()> {
690        let mut remaining_chunks = chunks.clone();
691        let mut shuffled_seeders = {
692            let mut vec: Vec<_> = seeders.iter().cloned().collect();
693            vec.shuffle(&mut OsRng);
694            vec
695        };
696
697        while let Some(seeder) = shuffled_seeders.pop() {
698            let channel = match self.get_channel(&seeder.node, Some(*hash)).await {
699                Ok(channel) => channel,
700                Err(e) => {
701                    warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
702                    continue;
703                }
704            };
705            let mut chunks_to_query = remaining_chunks.clone();
706            info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
707            loop {
708                let start_time = Instant::now();
709                let msg_subsystem = channel.message_subsystem();
710                msg_subsystem.add_dispatch::<FudChunkReply>().await;
711                msg_subsystem.add_dispatch::<FudNotFound>().await;
712                let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
713                let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
714
715                // Select a chunk to request
716                let mut chunk = None;
717                if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
718                    chunk = Some(*random_chunk);
719                }
720
721                if chunk.is_none() {
722                    // No more chunks to request from this seeder
723                    break; // Switch to another seeder
724                }
725                let chunk_hash = chunk.unwrap();
726                chunks_to_query.remove(&chunk_hash);
727
728                let send_res =
729                    channel.send(&FudFindRequest { info: Some(*hash), key: chunk_hash }).await;
730                if let Err(e) = send_res {
731                    warn!(target: "fud::fetch_chunks()", "Error while sending FudFindRequest: {e}");
732                    break; // Switch to another seeder
733                }
734
735                let chunk_recv =
736                    msg_subscriber_chunk.receive_with_timeout(self.chunk_timeout).fuse();
737                let notfound_recv =
738                    msg_subscriber_notfound.receive_with_timeout(self.chunk_timeout).fuse();
739
740                pin_mut!(chunk_recv, notfound_recv);
741
742                // Wait for a FudChunkReply or FudNotFound
743                select! {
744                    chunk_reply = chunk_recv => {
745                        if let Err(e) = chunk_reply {
746                            warn!(target: "fud::fetch_chunks()", "Error waiting for chunk reply: {e}");
747                            break; // Switch to another seeder
748                        }
749                        let reply = chunk_reply.unwrap();
750
751                        match self.geode.write_chunk(chunked, &reply.chunk).await {
752                            Ok((inserted_hash, bytes_written)) => {
753                                if inserted_hash != chunk_hash {
754                                    warn!(target: "fud::fetch_chunks()", "Received chunk does not match requested chunk");
755                                    msg_subscriber_chunk.unsubscribe().await;
756                                    msg_subscriber_notfound.unsubscribe().await;
757                                    continue; // Skip to next chunk, will retry this chunk later
758                                }
759
760                                info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
761
762                                // If we did not write the whole chunk to the filesystem,
763                                // save the chunk in the scraps.
764                                if bytes_written < reply.chunk.len() {
765                                    info!(target: "fud::fetch_chunks()", "Saving chunk {} as a scrap", hash_to_string(&chunk_hash));
766                                    let chunk_written = self.geode.get_chunk(chunked, &chunk_hash).await?;
767                                    if let Err(e) = self.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&Scrap {
768                                        chunk: reply.chunk.clone(),
769                                        hash_written: blake3::hash(&chunk_written),
770                                    }).await) {
771                                        error!(target: "fud::fetch_chunks()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(&chunk_hash))
772                                    }
773                                }
774
775                                // Update resource `chunks_downloaded` and `bytes_downloaded`
776                                let mut resources_write = self.resources.write().await;
777                                let resource = match resources_write.get_mut(hash) {
778                                    Some(resource) => {
779                                        resource.status = ResourceStatus::Downloading;
780                                        resource.total_chunks_downloaded += 1;
781                                        resource.target_chunks_downloaded += 1;
782
783                                        resource.total_bytes_downloaded += reply.chunk.len() as u64;
784                                        resource.target_bytes_downloaded += resource.get_selected_bytes(chunked, &reply.chunk) as u64;
785                                        resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
786                                        if resource.speeds.len() > 12 {
787                                            resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last 6 speeds
788                                        }
789
790                                        // If we just fetched the last chunk of a file, compute
791                                        // `total_bytes_size` (and `target_bytes_size`) again,
792                                        // as `geode.write_chunk()` updated the FileSequence
793                                        // to the exact file size.
794                                        if let Some((last_chunk_hash, _)) = chunked.iter().last() {
795                                            if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == chunk_hash {
796                                                resource.total_bytes_size = chunked.get_fileseq().len();
797                                                resource.target_bytes_size = resource.total_bytes_size;
798                                            }
799                                        }
800                                        resource.clone()
801                                    }
802                                    None => return Ok(()) // Resource was removed, abort
803                                };
804                                drop(resources_write);
805
806                                notify_event!(self, ChunkDownloadCompleted, { hash: *hash, chunk_hash, resource });
807                                remaining_chunks.remove(&chunk_hash);
808                            }
809                            Err(e) => {
810                                error!(target: "fud::fetch_chunks()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
811                            }
812                        };
813                    }
814                    notfound_reply = notfound_recv => {
815                        if let Err(e) = notfound_reply {
816                            warn!(target: "fud::fetch_chunks()", "Error waiting for NOTFOUND reply: {e}");
817                            msg_subscriber_chunk.unsubscribe().await;
818                            msg_subscriber_notfound.unsubscribe().await;
819                            break; // Switch to another seeder
820                        }
821                        info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
822                        notify_event!(self, ChunkNotFound, { hash: *hash, chunk_hash });
823                    }
824                };
825
826                msg_subscriber_chunk.unsubscribe().await;
827                msg_subscriber_notfound.unsubscribe().await;
828            }
829
830            self.cleanup_channel(channel).await;
831
832            // Stop when there are no missing chunks
833            if remaining_chunks.is_empty() {
834                break;
835            }
836        }
837
838        Ok(())
839    }
840
841    /// Fetch a single resource metadata from `nodes`.
842    /// If the resource is a file smaller than a single chunk then seeder can send the
843    /// chunk directly, and we will create the file from it on path `path`.
844    /// 1. Request seeders from those nodes
845    /// 2. Request the metadata from the seeders
846    /// 3. Insert metadata to geode using the reply
847    pub async fn fetch_metadata(
848        &self,
849        hash: &blake3::Hash,
850        nodes: &Vec<FudNode>,
851        path: &Path,
852    ) -> Result<()> {
853        let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
854        let mut result: Option<FetchReply> = None;
855
856        for node in nodes {
857            // 1. Request list of seeders
858            let channel = match self.get_channel(node, Some(*hash)).await {
859                Ok(channel) => channel,
860                Err(e) => {
861                    warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id()));
862                    continue;
863                }
864            };
865            let msg_subsystem = channel.message_subsystem();
866            msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
867
868            let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
869                Ok(msg_subscriber) => msg_subscriber,
870                Err(e) => {
871                    warn!(target: "fud::fetch_metadata()", "Error subscribing to msg: {e}");
872                    continue;
873                }
874            };
875
876            let send_res = channel.send(&FudFindSeedersRequest { key: *hash }).await;
877            if let Err(e) = send_res {
878                warn!(target: "fud::fetch_metadata()", "Error while sending FudFindSeedersRequest: {e}");
879                msg_subscriber.unsubscribe().await;
880                self.cleanup_channel(channel).await;
881                continue;
882            }
883
884            let reply = match msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await
885            {
886                Ok(reply) => reply,
887                Err(e) => {
888                    warn!(target: "fud::fetch_metadata()", "Error waiting for reply: {e}");
889                    msg_subscriber.unsubscribe().await;
890                    self.cleanup_channel(channel).await;
891                    continue;
892                }
893            };
894
895            let mut seeders = reply.seeders.clone();
896            info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id()));
897
898            msg_subscriber.unsubscribe().await;
899            self.cleanup_channel(channel).await;
900
901            // 2. Request the file/chunk from the seeders
902            while let Some(seeder) = seeders.pop() {
903                // Only query a seeder once
904                if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
905                    continue;
906                }
907                queried_seeders.insert(seeder.node.id());
908
909                let channel = self.get_channel(&seeder.node, Some(*hash)).await;
910                if let Ok(channel) = channel {
911                    let msg_subsystem = channel.message_subsystem();
912                    msg_subsystem.add_dispatch::<FudChunkReply>().await;
913                    msg_subsystem.add_dispatch::<FudFileReply>().await;
914                    msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
915                    msg_subsystem.add_dispatch::<FudNotFound>().await;
916                    let msg_subscriber_chunk =
917                        channel.subscribe_msg::<FudChunkReply>().await.unwrap();
918                    let msg_subscriber_file =
919                        channel.subscribe_msg::<FudFileReply>().await.unwrap();
920                    let msg_subscriber_dir =
921                        channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
922                    let msg_subscriber_notfound =
923                        channel.subscribe_msg::<FudNotFound>().await.unwrap();
924
925                    let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await;
926                    if let Err(e) = send_res {
927                        warn!(target: "fud::fetch_metadata()", "Error while sending FudFindRequest: {e}");
928                        msg_subscriber_chunk.unsubscribe().await;
929                        msg_subscriber_file.unsubscribe().await;
930                        msg_subscriber_dir.unsubscribe().await;
931                        msg_subscriber_notfound.unsubscribe().await;
932                        self.cleanup_channel(channel).await;
933                        continue;
934                    }
935
936                    let chunk_recv =
937                        msg_subscriber_chunk.receive_with_timeout(self.chunk_timeout).fuse();
938                    let file_recv =
939                        msg_subscriber_file.receive_with_timeout(self.chunk_timeout).fuse();
940                    let dir_recv =
941                        msg_subscriber_dir.receive_with_timeout(self.chunk_timeout).fuse();
942                    let notfound_recv =
943                        msg_subscriber_notfound.receive_with_timeout(self.chunk_timeout).fuse();
944
945                    pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
946
947                    let cleanup = async || {
948                        msg_subscriber_chunk.unsubscribe().await;
949                        msg_subscriber_file.unsubscribe().await;
950                        msg_subscriber_dir.unsubscribe().await;
951                        msg_subscriber_notfound.unsubscribe().await;
952                        self.cleanup_channel(channel).await;
953                    };
954
955                    // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound
956                    select! {
957                        // Received a chunk while requesting metadata, this is allowed to
958                        // optimize fetching files smaller than a single chunk
959                        chunk_reply = chunk_recv => {
960                            cleanup().await;
961                            if let Err(e) = chunk_reply {
962                                warn!(target: "fud::fetch_metadata()", "Error waiting for chunk reply: {e}");
963                                continue;
964                            }
965                            let reply = chunk_reply.unwrap();
966                            let chunk_hash = blake3::hash(&reply.chunk);
967                            // Check that this is the only chunk in the file
968                            if !self.geode.verify_metadata(hash, &[chunk_hash], &[]) {
969                                warn!(target: "fud::fetch_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash");
970                                continue;
971                            }
972                            info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
973                            result = Some(FetchReply::Chunk((*reply).clone()));
974                            break;
975                        }
976                        file_reply = file_recv => {
977                            cleanup().await;
978                            if let Err(e) = file_reply {
979                                warn!(target: "fud::fetch_metadata()", "Error waiting for file reply: {e}");
980                                continue;
981                            }
982                            let reply = file_reply.unwrap();
983                            if !self.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
984                                warn!(target: "fud::fetch_metadata()", "Received invalid file metadata");
985                                continue;
986                            }
987                            info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
988                            result = Some(FetchReply::File((*reply).clone()));
989                            break;
990                        }
991                        dir_reply = dir_recv => {
992                            cleanup().await;
993                            if let Err(e) = dir_reply {
994                                warn!(target: "fud::fetch_metadata()", "Error waiting for directory reply: {e}");
995                                continue;
996                            }
997                            let reply = dir_reply.unwrap();
998
999                            // Convert all file paths from String to PathBuf
1000                            let files: Vec<_> = reply.files.clone().into_iter()
1001                                .map(|(path_str, size)| (PathBuf::from(path_str), size))
1002                                .collect();
1003
1004                            if !self.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
1005                                warn!(target: "fud::fetch_metadata()", "Received invalid directory metadata");
1006                                continue;
1007                            }
1008                            info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
1009                            result = Some(FetchReply::Directory((*reply).clone()));
1010                            break;
1011                        }
1012                        notfound_reply = notfound_recv => {
1013                            cleanup().await;
1014                            if let Err(e) = notfound_reply {
1015                                warn!(target: "fud::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
1016                                continue;
1017                            }
1018                            info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
1019                        }
1020                    };
1021                }
1022            }
1023
1024            if result.is_some() {
1025                break;
1026            }
1027        }
1028
1029        // We did not find the resource
1030        if result.is_none() {
1031            return Err(Error::GeodeFileRouteNotFound)
1032        }
1033
1034        // 3. Insert metadata to geode using the reply
1035        // At this point the reply content is already verified
1036        match result.unwrap() {
1037            FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
1038                // Convert all file paths from String to PathBuf
1039                let mut files: Vec<_> = files
1040                    .into_iter()
1041                    .map(|(path_str, size)| (PathBuf::from(path_str), size))
1042                    .collect();
1043
1044                self.geode.sort_files(&mut files);
1045                if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &files).await {
1046                    error!(target: "fud::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
1047                    return Err(e)
1048                }
1049            }
1050            FetchReply::File(FudFileReply { chunk_hashes }) => {
1051                if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
1052                    error!(target: "fud::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
1053                    return Err(e)
1054                }
1055            }
1056            // Looked for a file but got a chunk: the entire file fits in a single chunk
1057            FetchReply::Chunk(FudChunkReply { chunk }) => {
1058                info!(target: "fud::fetch_metadata()", "File fits in a single chunk");
1059                let chunk_hash = blake3::hash(&chunk);
1060                let _ = self.geode.insert_metadata(hash, &[chunk_hash], &[]).await;
1061                let mut chunked_file = ChunkedStorage::new(
1062                    &[chunk_hash],
1063                    &[(path.to_path_buf(), chunk.len() as u64)],
1064                    false,
1065                );
1066                if let Err(e) = self.geode.write_chunk(&mut chunked_file, &chunk).await {
1067                    error!(target: "fud::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
1068                    return Err(e)
1069                };
1070            }
1071        };
1072
1073        Ok(())
1074    }
1075
1076    /// Start downloading a file or directory from the network to `path`.
1077    /// This creates a new task in `fetch_tasks` calling `fetch_resource()`.
1078    /// `files` is the list of files (relative paths) you want to download
1079    /// (if the resource is a directory), None means you want all files.
1080    pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
1081        let fetch_tasks = self.fetch_tasks.read().await;
1082        if fetch_tasks.contains_key(hash) {
1083            return Err(Error::Custom(format!(
1084                "Resource {} is already being downloaded",
1085                hash_to_string(hash)
1086            )))
1087        }
1088        drop(fetch_tasks);
1089
1090        self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
1091
1092        Ok(())
1093    }
1094
1095    /// Try to get the chunked file or directory from geode, if we don't have it
1096    /// then it is fetched from the network using `fetch_metadata()`.
1097    pub async fn get_metadata(
1098        &self,
1099        hash: &blake3::Hash,
1100        path: &Path,
1101    ) -> Result<(ChunkedStorage, Vec<FudNode>)> {
1102        match self.geode.get(hash, path).await {
1103            // We already know the metadata
1104            Ok(v) => Ok((v, vec![])),
1105            // The metadata in geode is invalid or corrupted
1106            Err(Error::GeodeNeedsGc) => todo!(),
1107            // If we could not find the metadata in geode, get it from the network
1108            Err(Error::GeodeFileNotFound) => {
1109                // Find nodes close to the file hash
1110                info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
1111                let closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default();
1112
1113                // Fetch file or directory metadata
1114                match self.fetch_metadata(hash, &closest_nodes, path).await {
1115                    // The file metadata was found and inserted into geode
1116                    Ok(()) => Ok((self.geode.get(hash, path).await?, closest_nodes)),
1117                    // We could not find the metadata, or any other error occured
1118                    Err(e) => Err(e),
1119                }
1120            }
1121
1122            Err(e) => {
1123                error!(target: "fud::get_metadata()", "{e}");
1124                Err(e)
1125            }
1126        }
1127    }
1128
1129    /// Download a file or directory from the network to `path`.
1130    /// Called when `get()` creates a new fetch task.
1131    pub async fn fetch_resource(
1132        &self,
1133        hash: &blake3::Hash,
1134        path: &Path,
1135        files: &FileSelection,
1136    ) -> Result<()> {
1137        let self_node = self.node().await;
1138
1139        let hash_bytes = hash.as_bytes();
1140        let path_string = path.to_string_lossy().to_string();
1141        let path_bytes = path_string.as_bytes();
1142
1143        // Macro that acquires a write lock on `self.resources`, updates a
1144        // resource, and returns the resource (dropping the write lock)
1145        macro_rules! update_resource {
1146            ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
1147                let mut resources_write = self.resources.write().await;
1148                let resource = match resources_write.get_mut($hash) {
1149                    Some(resource) => {
1150                        $(resource.$field = $value;)* // Apply the field assignments
1151                        resource.clone()
1152                    }
1153                    None => return Ok(()), // Resource was removed, abort
1154                };
1155                resource
1156            }};
1157        }
1158
1159        // Make sure we don't already have another resource on that path
1160        if let Ok(Some(hash_found)) = self.path_to_hash(path) {
1161            if *hash != hash_found {
1162                return Err(Error::Custom(format!(
1163                    "There is already another resource on path {path_string}"
1164                )))
1165            }
1166        }
1167
1168        // Add path to the sled db
1169        self.path_tree.insert(hash_bytes, path_bytes)?;
1170
1171        // Add file selection to the sled db
1172        if let FileSelection::Set(selected_files) = files {
1173            let paths: Vec<Vec<u8>> = selected_files
1174                .iter()
1175                .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
1176                .collect();
1177            let serialized_paths = serialize_async(&paths).await;
1178            // Abort if the file selection cannot be inserted into sled
1179            if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
1180                return Err(Error::SledError(e))
1181            }
1182        }
1183
1184        // Add resource to `self.resources`
1185        let resource = Resource::new(
1186            *hash,
1187            ResourceType::Unknown,
1188            path,
1189            ResourceStatus::Discovering,
1190            files.clone(),
1191        );
1192        let mut resources_write = self.resources.write().await;
1193        resources_write.insert(*hash, resource.clone());
1194        drop(resources_write);
1195
1196        // Send a DownloadStarted event
1197        notify_event!(self, DownloadStarted, resource);
1198
1199        // Try to get the chunked file or directory from geode or the network
1200        let (mut chunked, mut closest_nodes) = match self.get_metadata(hash, path).await {
1201            Ok(chunked) => chunked,
1202            Err(e) => {
1203                // Set resource status to `Incomplete` and send a `MetadataNotFound` event
1204                let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
1205                notify_event!(self, MetadataNotFound, resource);
1206                return Err(e);
1207            }
1208        };
1209
1210        // Get a list of all file paths the user wants to fetch
1211        let resources_read = self.resources.read().await;
1212        let resource = match resources_read.get(hash) {
1213            Some(resource) => resource,
1214            None => return Ok(()), // Resource was removed, abort
1215        };
1216        let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked);
1217        drop(resources_read);
1218
1219        // Create all files (and all necessary directories)
1220        for file_path in files_vec.iter() {
1221            if !file_path.exists() {
1222                if let Some(dir) = file_path.parent() {
1223                    fs::create_dir_all(dir).await?;
1224                }
1225                File::create(&file_path).await?;
1226            }
1227        }
1228
1229        // Set resource status to `Verifying` and send a `MetadataDownloadCompleted` event
1230        let resource = update_resource!(hash, {
1231            status = ResourceStatus::Verifying,
1232            total_chunks_count = chunked.len() as u64,
1233            total_bytes_size = chunked.get_fileseq().len(),
1234            rtype = match chunked.is_dir() {
1235                false => ResourceType::File,
1236                true => ResourceType::Directory,
1237            },
1238        });
1239        notify_event!(self, MetadataDownloadCompleted, resource);
1240
1241        // Set of all chunks we need locally (including the ones we already have)
1242        let chunk_hashes = resource.get_selected_chunks(&chunked);
1243
1244        // Write all scraps to make sure the data on the filesystem is correct
1245        self.write_scraps(&mut chunked, &chunk_hashes).await?;
1246
1247        // Mark locally available chunks as such
1248        let verify_res = self.verify_chunks(&resource, &mut chunked).await;
1249        if let Err(e) = verify_res {
1250            error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
1251            return Err(e);
1252        }
1253        let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
1254
1255        // Update `total_bytes_size` if the resource is a file
1256        if let ResourceType::File = resource.rtype {
1257            update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
1258            notify_event!(self, ResourceUpdated, resource);
1259        }
1260
1261        // If `chunked` is a file that is bigger than the all its chunks,
1262        // truncate the file to the chunks.
1263        // This fixes two edge-cases: a file that exactly ends at the end of
1264        // a chunk, and a file with no chunk.
1265        if !chunked.is_dir() {
1266            let fs_metadata = fs::metadata(&path).await?;
1267            if fs_metadata.len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
1268                if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
1269                    let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
1270                }
1271            }
1272        }
1273
1274        // Set of all chunks we need locally and their current availability
1275        let chunks: HashSet<(blake3::Hash, bool)> =
1276            chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
1277
1278        // Set of the chunks we need to download
1279        let missing_chunks: HashSet<blake3::Hash> =
1280            chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
1281
1282        // Update the resource with the chunks/bytes counts
1283        update_resource!(hash, {
1284            target_chunks_count = chunks.len() as u64,
1285            total_chunks_downloaded = chunked.local_chunks() as u64,
1286            target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
1287
1288            target_bytes_size =
1289                chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
1290            total_bytes_downloaded = total_bytes_downloaded,
1291            target_bytes_downloaded = target_bytes_downloaded,
1292        });
1293
1294        // If we don't need to download any chunk
1295        if missing_chunks.is_empty() {
1296            // Set resource status to `Seeding` or `Incomplete`
1297            let resource = update_resource!(hash, {
1298                status = match chunked.is_complete() {
1299                    true => ResourceStatus::Seeding,
1300                    false => ResourceStatus::Incomplete,
1301                }
1302            });
1303
1304            // Announce the resource if we have all chunks
1305            if chunked.is_complete() {
1306                let self_announce =
1307                    FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] };
1308                let _ = self.announce(hash, &self_announce, self.seeders_router.clone()).await;
1309            }
1310
1311            // Send a DownloadCompleted event
1312            notify_event!(self, DownloadCompleted, resource);
1313
1314            return Ok(());
1315        }
1316
1317        // Set resource status to `Downloading` and send a MetadataDownloadCompleted event
1318        let resource = update_resource!(hash, {
1319            status = ResourceStatus::Downloading,
1320        });
1321        notify_event!(self, MetadataDownloadCompleted, resource);
1322
1323        // Find nodes close to the file hash if we didn't previously fetched them
1324        if closest_nodes.is_empty() {
1325            closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default();
1326        }
1327
1328        // Find seeders and remove ourselves from the result
1329        let seeders = self.fetch_seeders(&closest_nodes, hash).await;
1330
1331        // Fetch missing chunks from seeders
1332        self.fetch_chunks(hash, &mut chunked, &seeders, &missing_chunks).await?;
1333
1334        // Get chunked file from geode
1335        let mut chunked = match self.geode.get(hash, path).await {
1336            Ok(v) => v,
1337            Err(e) => {
1338                error!(target: "fud::fetch_resource()", "{e}");
1339                return Err(e);
1340            }
1341        };
1342
1343        // Set resource status to `Verifying` and send FudEvent::ResourceUpdated
1344        let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
1345        notify_event!(self, ResourceUpdated, resource);
1346
1347        // Verify all chunks
1348        self.verify_chunks(&resource, &mut chunked).await?;
1349
1350        let is_complete = chunked
1351            .iter()
1352            .filter(|(hash, _)| chunk_hashes.contains(hash))
1353            .all(|(_, available)| *available);
1354
1355        // We fetched all chunks, but the resource is not complete
1356        // (some chunks were missing from all seeders)
1357        if !is_complete {
1358            // Set resource status to `Incomplete`
1359            let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
1360
1361            // Send a MissingChunks event
1362            notify_event!(self, MissingChunks, resource);
1363
1364            return Ok(());
1365        }
1366
1367        // Set resource status to `Seeding` or `Incomplete`
1368        let resource = update_resource!(hash, {
1369            status = match chunked.is_complete() {
1370                true => ResourceStatus::Seeding,
1371                false => ResourceStatus::Incomplete,
1372            },
1373            target_chunks_downloaded = chunks.len() as u64,
1374            total_chunks_downloaded = chunked.local_chunks() as u64,
1375        });
1376
1377        // Announce the resource if we have all chunks
1378        if chunked.is_complete() {
1379            let self_announce = FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] };
1380            let _ = self.announce(hash, &self_announce, self.seeders_router.clone()).await;
1381        }
1382
1383        // Send a DownloadCompleted event
1384        notify_event!(self, DownloadCompleted, resource);
1385
1386        Ok(())
1387    }
1388
1389    async fn write_scraps(
1390        &self,
1391        chunked: &mut ChunkedStorage,
1392        chunk_hashes: &HashSet<blake3::Hash>,
1393    ) -> Result<()> {
1394        // Get all scraps
1395        let mut scraps = HashMap::new();
1396        // TODO: This can be improved to not loop over all chunks
1397        for chunk_hash in chunk_hashes {
1398            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
1399            if scrap.is_none() {
1400                continue;
1401            }
1402
1403            // Verify the scrap we found
1404            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
1405            if scrap.is_err() {
1406                continue;
1407            }
1408            let scrap: Scrap = scrap.unwrap();
1409
1410            // Add the scrap to the HashMap
1411            scraps.insert(chunk_hash, scrap);
1412        }
1413
1414        // Write all scraps
1415        if !scraps.is_empty() {
1416            info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
1417        }
1418        for (scrap_hash, mut scrap) in scraps {
1419            let len = scrap.chunk.len();
1420            let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
1421            if let Err(e) = write_res {
1422                error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
1423                continue;
1424            }
1425            let (_, chunk_bytes_written) = write_res.unwrap();
1426
1427            // If the whole scrap was written, we can remove it from sled
1428            if chunk_bytes_written == len {
1429                self.scrap_tree.remove(scrap_hash.as_bytes())?;
1430                continue;
1431            }
1432            // Otherwise update the scrap in sled
1433            let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
1434            if let Err(e) = chunk_res {
1435                error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
1436                continue;
1437            }
1438            scrap.hash_written = blake3::hash(&chunk_res.unwrap());
1439            if let Err(e) =
1440                self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
1441            {
1442                error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
1443            }
1444        }
1445
1446        Ok(())
1447    }
1448
1449    /// Iterate over chunks and find which chunks are available locally,
1450    /// either in the filesystem (using geode::verify_chunks()) or in scraps.
1451    /// `chunk_hashes` is the list of chunk hashes we want to take into account, `None` means to
1452    /// take all chunks into account.
1453    /// Return the scraps in a HashMap, and the size in bytes of locally available data
1454    /// (downloaded and downloaded+targeted).
1455    pub async fn verify_chunks(
1456        &self,
1457        resource: &Resource,
1458        chunked: &mut ChunkedStorage,
1459    ) -> Result<(u64, u64)> {
1460        let chunks = chunked.get_chunks().clone();
1461        let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
1462
1463        // Gather all available chunks
1464        for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
1465            // Read the chunk using the `FileSequence`
1466            let chunk =
1467                match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
1468                    Ok(c) => c,
1469                    Err(Error::Io(ErrorKind::NotFound)) => continue,
1470                    Err(e) => {
1471                        warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
1472                        break
1473                    }
1474                };
1475
1476            // Perform chunk consistency check
1477            if self.geode.verify_chunk(chunk_hash, &chunk) {
1478                chunked.get_chunk_mut(chunk_index).1 = true;
1479                bytes.insert(
1480                    *chunk_hash,
1481                    (chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
1482                );
1483            }
1484        }
1485
1486        // Look for the chunks that are not on the filesystem
1487        let chunks = chunked.get_chunks().clone();
1488        let missing_on_fs: Vec<_> =
1489            chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
1490
1491        // Look for scraps
1492        for (chunk_index, (chunk_hash, _)) in missing_on_fs {
1493            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
1494            if scrap.is_none() {
1495                continue;
1496            }
1497
1498            // Verify the scrap we found
1499            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
1500            if scrap.is_err() {
1501                continue;
1502            }
1503            let scrap: Scrap = scrap.unwrap();
1504            if blake3::hash(&scrap.chunk) != *chunk_hash {
1505                continue;
1506            }
1507
1508            // Check if the scrap is still written on the filesystem
1509            let scrap_chunk =
1510                self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
1511            if scrap_chunk.is_err() {
1512                continue;
1513            }
1514            let scrap_chunk = scrap_chunk.unwrap();
1515
1516            // The scrap is not available if the chunk on the disk changed
1517            if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
1518                continue;
1519            }
1520
1521            // Mark the chunk as available
1522            chunked.get_chunk_mut(chunk_index).1 = true;
1523
1524            // Update the sums of locally available data
1525            bytes.insert(
1526                *chunk_hash,
1527                (scrap.chunk.len(), resource.get_selected_bytes(chunked, &scrap.chunk)),
1528            );
1529        }
1530
1531        // If the resource is a file: make the `FileSequence`'s file the
1532        // exact file size if we know the last chunk's size. This is not
1533        // needed for directories.
1534        if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
1535            if !chunked.is_dir() && *last_chunk_available {
1536                if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
1537                    let exact_file_size =
1538                        chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
1539                    chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
1540                }
1541            }
1542        }
1543
1544        let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
1545        let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
1546
1547        Ok((total_bytes_downloaded, target_bytes_downloaded))
1548    }
1549
1550    /// Add a resource from the file system.
1551    pub async fn put(&self, path: &Path) -> Result<()> {
1552        let put_tasks = self.put_tasks.read().await;
1553        drop(put_tasks);
1554
1555        self.put_tx.send(path.to_path_buf()).await?;
1556
1557        Ok(())
1558    }
1559
1560    /// Insert a file or directory from the file system.
1561    /// Called when `put()` creates a new put task.
1562    pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
1563        let self_node = self.node().await;
1564
1565        if self_node.addresses.is_empty() {
1566            return Err(Error::Custom(
1567                "Cannot put resource, you don't have any external address".to_string(),
1568            ))
1569        }
1570
1571        let metadata = fs::metadata(path).await?;
1572
1573        // Get the list of files and the resource type (file or directory)
1574        let (files, resource_type) = if metadata.is_file() {
1575            (vec![(path.clone(), metadata.len())], ResourceType::File)
1576        } else if metadata.is_dir() {
1577            let mut files = get_all_files(path).await?;
1578            self.geode.sort_files(&mut files);
1579            (files, ResourceType::Directory)
1580        } else {
1581            return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
1582        };
1583
1584        // Read the file or directory and create the chunks
1585        let stream = FileSequence::new(&files, false);
1586        let total_size = stream.len();
1587        let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
1588
1589        // Get the relative file paths included in the metadata and hash of directories
1590        let relative_files = if let ResourceType::Directory = resource_type {
1591            // [(absolute file path, file size)] -> [(relative file path, file size)]
1592            let relative_files = files
1593                .into_iter()
1594                .map(|(file_path, size)| match file_path.strip_prefix(path) {
1595                    Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1596                    Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1597                })
1598                .collect::<Result<Vec<_>>>()?;
1599
1600            // Add the files metadata to the hasher to complete the resource hash
1601            self.geode.hash_files_metadata(&mut hasher, &relative_files);
1602
1603            relative_files
1604        } else {
1605            vec![]
1606        };
1607
1608        // Finalize the resource hash
1609        let hash = hasher.finalize();
1610
1611        // Create the metadata file in geode
1612        if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1613            error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1614            return Err(e)
1615        }
1616
1617        // Add path to the sled db
1618        if let Err(e) =
1619            self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1620        {
1621            error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1622            return Err(e.into())
1623        }
1624
1625        // Add resource
1626        let mut resources_write = self.resources.write().await;
1627        resources_write.insert(
1628            hash,
1629            Resource {
1630                hash,
1631                rtype: resource_type,
1632                path: path.to_path_buf(),
1633                status: ResourceStatus::Seeding,
1634                file_selection: FileSelection::All,
1635                total_chunks_count: chunk_hashes.len() as u64,
1636                target_chunks_count: chunk_hashes.len() as u64,
1637                total_chunks_downloaded: chunk_hashes.len() as u64,
1638                target_chunks_downloaded: chunk_hashes.len() as u64,
1639                total_bytes_size: total_size,
1640                target_bytes_size: total_size,
1641                total_bytes_downloaded: total_size,
1642                target_bytes_downloaded: total_size,
1643                speeds: vec![],
1644            },
1645        );
1646        drop(resources_write);
1647
1648        // Announce the new resource
1649        let fud_announce = FudAnnounce { key: hash, seeders: vec![self_node.into()] };
1650        let _ = self.announce(&hash, &fud_announce, self.seeders_router.clone()).await;
1651
1652        // Send InsertCompleted event
1653        notify_event!(self, InsertCompleted, {
1654            hash,
1655            path: path.to_path_buf()
1656        });
1657
1658        Ok(())
1659    }
1660
1661    /// Removes:
1662    /// - a resource
1663    /// - its metadata in geode
1664    /// - its path in the sled path tree
1665    /// - its file selection in the sled file selection tree
1666    /// - and any related scrap in the sled scrap tree,
1667    ///
1668    /// then sends a `ResourceRemoved` fud event.
1669    pub async fn remove(&self, hash: &blake3::Hash) {
1670        // Remove the resource
1671        let mut resources_write = self.resources.write().await;
1672        resources_write.remove(hash);
1673        drop(resources_write);
1674
1675        // Remove the scraps in sled
1676        if let Ok(Some(path)) = self.hash_to_path(hash) {
1677            let chunked = self.geode.get(hash, &path).await;
1678
1679            if let Ok(chunked) = chunked {
1680                for (chunk_hash, _) in chunked.iter() {
1681                    let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
1682                }
1683            }
1684        }
1685
1686        // Remove the metadata in geode
1687        let hash_str = hash_to_string(hash);
1688        let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1689        let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1690
1691        // Remove the path in sled
1692        let _ = self.path_tree.remove(hash.as_bytes());
1693
1694        // Remove the file selection in sled
1695        let _ = self.file_selection_tree.remove(hash.as_bytes());
1696
1697        // Send a `ResourceRemoved` event
1698        notify_event!(self, ResourceRemoved, { hash: *hash });
1699    }
1700
1701    /// Stop all tasks.
1702    pub async fn stop(&self) {
1703        info!("Stopping fetch tasks...");
1704        // Create a clone of fetch_tasks because `task.stop()` needs a write lock
1705        let fetch_tasks = self.fetch_tasks.read().await;
1706        let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1707            fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1708        drop(fetch_tasks);
1709
1710        // Stop all fetch tasks
1711        for task in cloned_fetch_tasks.values() {
1712            task.stop().await;
1713        }
1714
1715        info!("Stopping put tasks...");
1716        // Create a clone of put_tasks because `task.stop()` needs a write lock
1717        let put_tasks = self.put_tasks.read().await;
1718        let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1719            put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1720        drop(put_tasks);
1721
1722        // Stop all put tasks
1723        for task in cloned_put_tasks.values() {
1724            task.stop().await;
1725        }
1726
1727        // Stop all other tasks
1728        let mut tasks = self.tasks.write().await;
1729        for (name, task) in tasks.clone() {
1730            info!("Stopping {name} task...");
1731            task.stop().await;
1732        }
1733        *tasks = HashMap::new();
1734    }
1735}