fud/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    io::ErrorKind,
22    path::{Path, PathBuf},
23    sync::Arc,
24};
25
26use sled_overlay::sled;
27use smol::{
28    channel,
29    fs::{self, OpenOptions},
30    lock::RwLock,
31};
32use tracing::{error, info, warn};
33
34use darkfi::{
35    dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
36    geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
37    net::P2pPtr,
38    system::{ExecutorPtr, PublisherPtr, StoppableTask},
39    util::{path::expand_path, time::Timestamp},
40    Error, Result,
41};
42use darkfi_sdk::crypto::{schnorr::SchnorrSecret, SecretKey};
43use darkfi_serial::{deserialize_async, serialize_async};
44
45/// P2P protocols
46pub mod proto;
47use proto::FudAnnounce;
48
49/// FudEvent
50pub mod event;
51use event::{notify_event, FudEvent};
52
53/// Resource definition
54pub mod resource;
55use resource::{Resource, ResourceStatus, ResourceType};
56
57/// Scrap definition
58pub mod scrap;
59use scrap::Scrap;
60
61/// JSON-RPC related methods
62pub mod rpc;
63
64/// Background tasks
65pub mod tasks;
66use tasks::start_task;
67
68/// Bitcoin
69pub mod bitcoin;
70
71/// PoW
72pub mod pow;
73use pow::{FudPow, VerifiableNodeData};
74
75/// Equi-X
76pub mod equix;
77
78/// Settings and args
79pub mod settings;
80use settings::Args;
81
82/// Utils
83pub mod util;
84use util::{create_all_files, get_all_files, FileSelection};
85
86/// Download methods
87mod download;
88use download::{fetch_chunks, fetch_metadata};
89
90/// [`DhtHandler`] implementation and fud-specific DHT structs
91pub mod dht;
92use dht::FudSeeder;
93
94use crate::{dht::FudNode, pow::PowSettings};
95
96const SLED_PATH_TREE: &[u8] = b"_fud_paths";
97const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
98const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
99
100#[derive(Clone, Debug)]
101pub struct FudState {
102    /// Our own [`VerifiableNodeData`]
103    node_data: VerifiableNodeData,
104    /// Our secret key (the public key is in `node_data`)
105    secret_key: SecretKey,
106}
107
108pub struct Fud {
109    state: Arc<RwLock<Option<FudState>>>,
110    /// The Geode instance
111    geode: Geode,
112    /// Default download directory
113    downloads_path: PathBuf,
114    /// Chunk transfer timeout in seconds
115    chunk_timeout: u64,
116    /// The [`FudPow`] instance
117    pub pow: Arc<RwLock<FudPow>>,
118    /// The DHT instance
119    dht: Arc<Dht<Fud>>,
120    /// Resources (current status of all downloads/seeds)
121    resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
122    /// Sled tree containing "resource hash -> path on the filesystem"
123    path_tree: sled::Tree,
124    /// Sled tree containing "resource hash -> file selection". If the file
125    /// selection is all files of the resource (or if the resource is not a
126    /// directory), the resource does not store its file selection in the tree.
127    file_selection_tree: sled::Tree,
128    /// Sled tree containing scraps which are chunks containing data the user
129    /// did not want to save to files. They also contain data the user wanted
130    /// otherwise we would not have downloaded the chunk at all.
131    /// We save scraps to be able to verify integrity even if part of the chunk
132    /// is not saved to the filesystem in the downloaded files.
133    /// "chunk/scrap hash -> chunk content"
134    scrap_tree: sled::Tree,
135    /// Get requests sender
136    get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
137    /// Get requests receiver
138    get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
139    /// Put requests sender
140    put_tx: channel::Sender<PathBuf>,
141    /// Put requests receiver
142    put_rx: channel::Receiver<PathBuf>,
143    /// Lookup requests sender
144    lookup_tx: channel::Sender<blake3::Hash>,
145    /// Lookup requests receiver
146    lookup_rx: channel::Receiver<blake3::Hash>,
147    /// Verify node requests sender
148    verify_node_tx: channel::Sender<FudNode>,
149    /// Verify node requests receiver
150    verify_node_rx: channel::Receiver<FudNode>,
151    /// Currently active downloading tasks (running the `fud.fetch_resource()` method)
152    fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
153    /// Currently active put tasks (running the `fud.insert_resource()` method)
154    put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
155    /// Currently active lookup tasks (running the `fud.lookup_value()` method)
156    lookup_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
157    /// Currently active tasks (defined in `tasks`, started with the `start_task` macro)
158    tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
159    /// Used to send events to fud clients
160    event_publisher: PublisherPtr<FudEvent>,
161    /// Pointer to the P2P network instance
162    p2p: P2pPtr,
163    /// Global multithreaded executor reference
164    pub executor: ExecutorPtr,
165}
166
167impl Fud {
168    pub async fn new(
169        settings: Args,
170        p2p: P2pPtr,
171        sled_db: &sled::Db,
172        event_publisher: PublisherPtr<FudEvent>,
173        executor: ExecutorPtr,
174    ) -> Result<Arc<Self>> {
175        let dht_settings: DhtSettings = settings.dht.into();
176        let net_settings_lock = p2p.settings();
177        let mut net_settings = net_settings_lock.write().await;
178        // We do not need any outbound slot
179        net_settings.outbound_connections = 0;
180        // Default GetAddrsMessage's `max` is dht's `k`
181        net_settings.getaddrs_max =
182            Some(net_settings.getaddrs_max.unwrap_or(dht_settings.k.min(u32::MAX as usize) as u32));
183        drop(net_settings);
184
185        let basedir = expand_path(&settings.base_dir)?;
186        let downloads_path = match settings.downloads_path {
187            Some(downloads_path) => expand_path(&downloads_path)?,
188            None => basedir.join("downloads"),
189        };
190
191        let pow_settings: PowSettings = settings.pow.into();
192        let pow = FudPow::new(pow_settings.clone(), executor.clone());
193
194        // Geode
195        info!(target: "fud::new()", "Instantiating Geode instance");
196        let geode = Geode::new(&basedir).await?;
197
198        // DHT
199        let dht: Arc<Dht<Fud>> =
200            Arc::new(Dht::<Fud>::new(&dht_settings, p2p.clone(), executor.clone()).await);
201
202        let (get_tx, get_rx) = smol::channel::unbounded();
203        let (put_tx, put_rx) = smol::channel::unbounded();
204        let (lookup_tx, lookup_rx) = smol::channel::unbounded();
205        let (verify_node_tx, verify_node_rx) = smol::channel::unbounded();
206        let fud = Arc::new(Self {
207            state: Arc::new(RwLock::new(None)),
208            geode,
209            downloads_path,
210            chunk_timeout: settings.chunk_timeout,
211            pow: Arc::new(RwLock::new(pow)),
212            dht: dht.clone(),
213            path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
214            file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
215            scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
216            resources: Arc::new(RwLock::new(HashMap::new())),
217            get_tx,
218            get_rx,
219            put_tx,
220            put_rx,
221            lookup_tx,
222            lookup_rx,
223            verify_node_tx,
224            verify_node_rx,
225            fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
226            put_tasks: Arc::new(RwLock::new(HashMap::new())),
227            lookup_tasks: Arc::new(RwLock::new(HashMap::new())),
228            tasks: Arc::new(RwLock::new(HashMap::new())),
229            event_publisher,
230            p2p,
231            executor,
232        });
233        *dht.handler.write().await = Arc::downgrade(&fud);
234
235        Ok(fud)
236    }
237
238    /// Run the PoW and generate a `VerifiableNodeData`, then start tasks
239    pub async fn start(self: &Arc<Self>) -> Result<()> {
240        let mut pow = self.pow.write().await;
241        if pow.settings.read().await.btc_enabled {
242            pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes
243        }
244        let (node_data, secret_key) = pow.generate_node().await?;
245        info!(target: "fud::init()", "Your node ID: {}", hash_to_string(&node_data.id()));
246        let mut state = self.state.write().await;
247        *state = Some(FudState { node_data, secret_key });
248        drop(state);
249        drop(pow);
250
251        self.start_tasks().await;
252
253        Ok(())
254    }
255
256    async fn start_tasks(self: &Arc<Self>) {
257        let mut tasks = self.tasks.write().await;
258        start_task!(self, "get", tasks::get_task, tasks);
259        start_task!(self, "put", tasks::put_task, tasks);
260        start_task!(self, "events", tasks::handle_dht_events, tasks);
261        start_task!(self, "DHT events", dht_tasks::events_task::<Fud>, tasks);
262        start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud>, tasks);
263        start_task!(self, "DHT cleanup channels", dht_tasks::cleanup_channels_task::<Fud>, tasks);
264        start_task!(self, "DHT add node", dht_tasks::add_node_task::<Fud>, tasks);
265        start_task!(self, "DHT refinery", dht_tasks::dht_refinery_task::<Fud>, tasks);
266        start_task!(
267            self,
268            "DHT disconnect inbounds",
269            dht_tasks::disconnect_inbounds_task::<Fud>,
270            tasks
271        );
272        start_task!(self, "lookup", tasks::lookup_task, tasks);
273        start_task!(self, "verify node", tasks::verify_node_task, tasks);
274        start_task!(self, "announce", tasks::announce_seed_task, tasks);
275        start_task!(self, "node ID", tasks::node_id_task, tasks);
276    }
277
278    /// Verify our resources, add ourselves to the seeders (`dht.hash_table`)
279    /// for the resources we already have, announce our resources.
280    async fn init(&self) -> Result<()> {
281        info!(target: "fud::init()", "Finding resources...");
282        let mut resources_write = self.resources.write().await;
283        for result in self.path_tree.iter() {
284            if result.is_err() {
285                continue;
286            }
287
288            // Parse hash
289            let (hash, path) = result.unwrap();
290            let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
291                Ok(v) => v,
292                Err(_) => continue,
293            };
294            let hash = blake3::Hash::from_bytes(hash_bytes);
295
296            // Parse path
297            let path_bytes = path.to_vec();
298            let path_str = match std::str::from_utf8(&path_bytes) {
299                Ok(v) => v,
300                Err(_) => continue,
301            };
302            let path: PathBuf = match expand_path(path_str) {
303                Ok(v) => v,
304                Err(_) => continue,
305            };
306
307            // Get the file selection from sled, fallback on FileSelection::All
308            let mut file_selection = FileSelection::All;
309            if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
310                if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
311                    file_selection = FileSelection::Set(
312                        path_list
313                            .into_iter()
314                            .filter_map(|bytes| {
315                                std::str::from_utf8(&bytes)
316                                    .ok()
317                                    .and_then(|path_str| expand_path(path_str).ok())
318                            })
319                            .collect(),
320                    );
321                }
322            }
323
324            // Add resource
325            resources_write.insert(
326                hash,
327                Resource::new(
328                    hash,
329                    ResourceType::Unknown,
330                    &path,
331                    ResourceStatus::Incomplete,
332                    file_selection,
333                ),
334            );
335        }
336        drop(resources_write);
337
338        info!(target: "fud::init()", "Verifying resources...");
339        let resources = self.verify_resources(None).await?;
340
341        let self_node = self.node().await?;
342
343        // Stop here if we have no external address
344        if self_node.addresses.is_empty() {
345            return Ok(());
346        }
347
348        // Add our own node as a seeder for the resources we are seeding
349        for resource in &resources {
350            if let Ok(seeder) = self.new_seeder(&resource.hash).await {
351                let self_router_items = vec![seeder];
352                self.add_value(&resource.hash, &self_router_items).await;
353            }
354        }
355
356        info!(target: "fud::init()", "Announcing resources...");
357        for resource in resources {
358            if let Ok(seeder) = self.new_seeder(&resource.hash).await {
359                let seeders = vec![seeder];
360                let _ = self
361                    .dht
362                    .announce(
363                        &resource.hash,
364                        &seeders.clone(),
365                        &FudAnnounce { key: resource.hash, seeders },
366                    )
367                    .await;
368            }
369        }
370
371        Ok(())
372    }
373
374    /// Get a copy of the current resources
375    pub async fn resources(&self) -> HashMap<blake3::Hash, Resource> {
376        let resources = self.resources.read().await;
377        resources.clone()
378    }
379
380    /// Get resource path from hash using the sled db
381    pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
382        if let Some(value) = self.path_tree.get(hash.as_bytes())? {
383            let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
384            return Ok(Some(path));
385        }
386
387        Ok(None)
388    }
389
390    /// Get resource hash from path using the sled db
391    pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
392        let path_string = path.to_string_lossy().to_string();
393        let path_bytes = path_string.as_bytes();
394        for path_item in self.path_tree.iter() {
395            let (key, value) = path_item?;
396            if value == path_bytes {
397                let bytes: &[u8] = &key;
398                if bytes.len() != 32 {
399                    return Err(Error::Custom(format!(
400                        "Expected a 32-byte BLAKE3, got {} bytes",
401                        bytes.len()
402                    )));
403                }
404
405                let array: [u8; 32] = bytes.try_into().unwrap();
406                return Ok(Some(array.into()))
407            }
408        }
409
410        Ok(None)
411    }
412
413    /// Create a new [`dht::FudSeeder`] for own node
414    pub async fn new_seeder(&self, key: &blake3::Hash) -> Result<FudSeeder> {
415        let state = self.state.read().await;
416        if state.is_none() {
417            return Err(Error::Custom("Fud is not ready yet".to_string()))
418        }
419        let state_ = state.clone().unwrap();
420        drop(state);
421        let node = self.node().await?;
422
423        Ok(FudSeeder {
424            key: *key,
425            node: node.clone(),
426            sig: state_
427                .secret_key
428                .sign(&[key.as_bytes().to_vec(), serialize_async(&node).await].concat()),
429            timestamp: Timestamp::current_time().inner(),
430        })
431    }
432
433    /// Verify if resources are complete and uncorrupted.
434    /// If a resource is incomplete or corrupted, its status is changed to Incomplete.
435    /// If a resource is complete, its status is changed to Seeding.
436    /// Takes an optional list of resource hashes.
437    /// If no hash is given (None), it verifies all resources.
438    /// Returns the list of verified and uncorrupted/complete seeding resources.
439    pub async fn verify_resources(
440        &self,
441        hashes: Option<Vec<blake3::Hash>>,
442    ) -> Result<Vec<Resource>> {
443        let mut resources_write = self.resources.write().await;
444
445        let update_resource = async |resource: &mut Resource,
446                                     status: ResourceStatus,
447                                     chunked: Option<&ChunkedStorage>,
448                                     total_bytes_downloaded: u64,
449                                     target_bytes_downloaded: u64| {
450            let files = match chunked {
451                Some(chunked) => resource.get_selected_files(chunked),
452                None => vec![],
453            };
454            let chunk_hashes = match chunked {
455                Some(chunked) => resource.get_selected_chunks(chunked),
456                None => HashSet::new(),
457            };
458
459            if let Some(chunked) = chunked {
460                resource.rtype = match chunked.is_dir() {
461                    false => ResourceType::File,
462                    true => ResourceType::Directory,
463                };
464            }
465
466            resource.status = status;
467            resource.total_chunks_count = match chunked {
468                Some(chunked) => chunked.len() as u64,
469                None => 0,
470            };
471            resource.target_chunks_count = chunk_hashes.len() as u64;
472            resource.total_chunks_downloaded = match chunked {
473                Some(chunked) => chunked.local_chunks() as u64,
474                None => 0,
475            };
476            resource.target_chunks_downloaded = match chunked {
477                Some(chunked) => chunked
478                    .iter()
479                    .filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
480                    .count() as u64,
481                None => 0,
482            };
483
484            resource.total_bytes_size = match chunked {
485                Some(chunked) => chunked.get_fileseq().len(),
486                None => 0,
487            };
488            resource.target_bytes_size = match chunked {
489                Some(chunked) => chunked
490                    .get_files()
491                    .iter()
492                    .filter(|(path, _)| files.contains(path))
493                    .map(|(_, size)| size)
494                    .sum(),
495                None => 0,
496            };
497
498            resource.total_bytes_downloaded = total_bytes_downloaded;
499            resource.target_bytes_downloaded = target_bytes_downloaded;
500
501            notify_event!(self, ResourceUpdated, resource);
502        };
503
504        let mut seeding_resources: Vec<Resource> = vec![];
505        for (_, mut resource) in resources_write.iter_mut() {
506            if let Some(ref hashes_list) = hashes {
507                if !hashes_list.contains(&resource.hash) {
508                    continue;
509                }
510            }
511
512            match resource.status {
513                ResourceStatus::Seeding => {}
514                ResourceStatus::Incomplete => {}
515                _ => continue,
516            };
517
518            // Make sure the resource is not corrupted or incomplete
519            let resource_path = match self.hash_to_path(&resource.hash) {
520                Ok(Some(v)) => v,
521                Ok(None) | Err(_) => {
522                    update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
523                    continue;
524                }
525            };
526            let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
527                Ok(v) => v,
528                Err(_) => {
529                    update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
530                    continue;
531                }
532            };
533            let verify_res = self.verify_chunks(resource, &mut chunked).await;
534            if let Err(e) = verify_res {
535                error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
536                update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
537                continue;
538            }
539            let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
540
541            if !chunked.is_complete() {
542                update_resource(
543                    &mut resource,
544                    ResourceStatus::Incomplete,
545                    Some(&chunked),
546                    total_bytes_downloaded,
547                    target_bytes_downloaded,
548                )
549                .await;
550                continue;
551            }
552
553            update_resource(
554                &mut resource,
555                ResourceStatus::Seeding,
556                Some(&chunked),
557                total_bytes_downloaded,
558                target_bytes_downloaded,
559            )
560            .await;
561            seeding_resources.push(resource.clone());
562        }
563
564        Ok(seeding_resources)
565    }
566
567    /// Start downloading a file or directory from the network to `path`.
568    /// This creates a new task in `fetch_tasks` calling `fetch_resource()`.
569    /// `files` is the list of files (relative paths) you want to download
570    /// (if the resource is a directory), None means you want all files.
571    pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
572        let fetch_tasks = self.fetch_tasks.read().await;
573        if fetch_tasks.contains_key(hash) {
574            return Err(Error::Custom(format!(
575                "Resource {} is already being downloaded",
576                hash_to_string(hash)
577            )))
578        }
579        drop(fetch_tasks);
580
581        self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
582
583        Ok(())
584    }
585
586    /// Try to get the chunked file or directory from geode, if we don't have it
587    /// then it is fetched from the network using `fetch_metadata()`.
588    /// If we need to fetch from the network, the seeders we find are sent to
589    /// `seeders_pub`.
590    /// The seeder in the returned result is only defined if we fetched from
591    /// the network.
592    pub async fn get_metadata(
593        &self,
594        hash: &blake3::Hash,
595        path: &Path,
596    ) -> Result<(ChunkedStorage, Option<FudSeeder>)> {
597        match self.geode.get(hash, path).await {
598            // We already know the metadata
599            Ok(v) => Ok((v, None)),
600            // The metadata in geode is invalid or corrupted
601            Err(Error::GeodeNeedsGc) => todo!(),
602            // If we could not find the metadata in geode, get it from the network
603            Err(Error::GeodeFileNotFound) => {
604                // Find nodes close to the file hash
605                info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
606                let dht_sub = self.dht.subscribe().await;
607                if let Err(e) = self.lookup_tx.send(*hash).await {
608                    dht_sub.unsubscribe().await;
609                    return Err(e.into())
610                }
611
612                // Fetch resource metadata
613                let fetch_res = fetch_metadata(self, hash, path, &dht_sub).await;
614                dht_sub.unsubscribe().await;
615                let seeder = fetch_res?;
616                Ok((self.geode.get(hash, path).await?, Some(seeder)))
617            }
618            Err(e) => Err(e),
619        }
620    }
621
622    /// Download a file or directory from the network to `path`.
623    /// Called when `get()` creates a new fetch task.
624    pub async fn fetch_resource(
625        &self,
626        hash: &blake3::Hash,
627        path: &Path,
628        files: &FileSelection,
629    ) -> Result<()> {
630        let hash_bytes = hash.as_bytes();
631        let path_string = path.to_string_lossy().to_string();
632        let path_bytes = path_string.as_bytes();
633
634        // Macro that acquires a write lock on `self.resources`, updates a
635        // resource, and returns the resource (dropping the write lock)
636        macro_rules! update_resource {
637            ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
638                let mut resources_write = self.resources.write().await;
639                let resource = match resources_write.get_mut($hash) {
640                    Some(resource) => {
641                        $(resource.$field = $value;)* // Apply the field assignments
642                        resource.clone()
643                    }
644                    None => return Ok(()), // Resource was removed, abort
645                };
646                resource
647            }};
648        }
649
650        // Make sure we don't already have another resource on that path
651        if let Ok(Some(hash_found)) = self.path_to_hash(path) {
652            if *hash != hash_found {
653                return Err(Error::Custom(format!(
654                    "There is already another resource on path {path_string}"
655                )))
656            }
657        }
658
659        // Add path to the sled db
660        self.path_tree.insert(hash_bytes, path_bytes)?;
661
662        // Add file selection to the sled db
663        if let FileSelection::Set(selected_files) = files {
664            let paths: Vec<Vec<u8>> = selected_files
665                .iter()
666                .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
667                .collect();
668            let serialized_paths = serialize_async(&paths).await;
669            // Abort if the file selection cannot be inserted into sled
670            if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
671                return Err(Error::SledError(e))
672            }
673        }
674
675        // Add resource to `self.resources`
676        let resource = Resource::new(
677            *hash,
678            ResourceType::Unknown,
679            path,
680            ResourceStatus::Discovering,
681            files.clone(),
682        );
683        let mut resources_write = self.resources.write().await;
684        resources_write.insert(*hash, resource.clone());
685        drop(resources_write);
686
687        // Subscribe to DHT events early for `fetch_chunks()`
688        let dht_sub = self.dht.subscribe().await;
689
690        // Send a DownloadStarted event
691        notify_event!(self, DownloadStarted, resource);
692
693        // Try to get the chunked file or directory from geode
694        let metadata_result = self.get_metadata(hash, path).await;
695
696        if let Err(e) = metadata_result {
697            // Set resource status to `Incomplete` and send a `MetadataNotFound` event
698            let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
699            notify_event!(self, MetadataNotFound, resource);
700            dht_sub.unsubscribe().await;
701            return Err(e)
702        }
703        let (mut chunked, metadata_seeder) = metadata_result.unwrap();
704
705        // Get a list of all file paths the user wants to fetch
706        let resources_read = self.resources.read().await;
707        let resource = match resources_read.get(hash) {
708            Some(resource) => resource,
709            None => {
710                // Resource was removed, abort
711                dht_sub.unsubscribe().await;
712                return Ok(())
713            }
714        };
715        let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked);
716        drop(resources_read);
717
718        // Create all files (and all necessary directories)
719        if let Err(e) = create_all_files(&files_vec).await {
720            dht_sub.unsubscribe().await;
721            return Err(e)
722        }
723
724        // Set resource status to `Verifying` and send a `MetadataDownloadCompleted` event
725        let resource = update_resource!(hash, {
726            status = ResourceStatus::Verifying,
727            total_chunks_count = chunked.len() as u64,
728            total_bytes_size = chunked.get_fileseq().len(),
729            rtype = match chunked.is_dir() {
730                false => ResourceType::File,
731                true => ResourceType::Directory,
732            },
733        });
734        notify_event!(self, MetadataDownloadCompleted, resource);
735
736        // Set of all chunks we need locally (including the ones we already have)
737        let chunk_hashes = resource.get_selected_chunks(&chunked);
738
739        // Write all scraps to make sure the data on the filesystem is correct
740        if let Err(e) = self.write_scraps(&mut chunked, &chunk_hashes).await {
741            dht_sub.unsubscribe().await;
742            return Err(e)
743        }
744
745        // Mark locally available chunks as such
746        let verify_res = self.verify_chunks(&resource, &mut chunked).await;
747        if let Err(e) = verify_res {
748            dht_sub.unsubscribe().await;
749            error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
750            return Err(e);
751        }
752        let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
753
754        // Update `total_bytes_size` if the resource is a file
755        if let ResourceType::File = resource.rtype {
756            update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
757            notify_event!(self, ResourceUpdated, resource);
758        }
759
760        // If `chunked` is a file that is bigger than the all its chunks,
761        // truncate the file to the chunks.
762        // This fixes two edge-cases: a file that exactly ends at the end of
763        // a chunk, and a file with no chunk.
764        if !chunked.is_dir() {
765            let fs_metadata = fs::metadata(&path).await;
766            if let Err(e) = fs_metadata {
767                dht_sub.unsubscribe().await;
768                return Err(e.into());
769            }
770            if fs_metadata.unwrap().len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
771                if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
772                    let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
773                }
774            }
775        }
776
777        // Set of all chunks we need locally and their current availability
778        let chunks: HashSet<(blake3::Hash, bool)> =
779            chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
780
781        // Set of the chunks we need to download
782        let mut missing_chunks: HashSet<blake3::Hash> =
783            chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
784
785        // Update the resource with the chunks/bytes counts
786        update_resource!(hash, {
787            target_chunks_count = chunks.len() as u64,
788            total_chunks_downloaded = chunked.local_chunks() as u64,
789            target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
790
791            target_bytes_size =
792                chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
793            total_bytes_downloaded = total_bytes_downloaded,
794            target_bytes_downloaded = target_bytes_downloaded,
795        });
796
797        let download_completed = async |chunked: &ChunkedStorage| -> Result<()> {
798            // Set resource status to `Seeding` or `Incomplete`
799            let resource = update_resource!(hash, {
800                status = match chunked.is_complete() {
801                    true => ResourceStatus::Seeding,
802                    false => ResourceStatus::Incomplete,
803                },
804                target_chunks_downloaded = chunks.len() as u64,
805                total_chunks_downloaded = chunked.local_chunks() as u64,
806            });
807
808            // Announce the resource if we have all chunks
809            if chunked.is_complete() {
810                if let Ok(seeder) = self.new_seeder(hash).await {
811                    let seeders = vec![seeder];
812                    let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() };
813                    let _ = self.dht.announce(hash, &seeders, &self_announce).await;
814                }
815            }
816
817            // Send a DownloadCompleted event
818            notify_event!(self, DownloadCompleted, resource);
819
820            Ok(())
821        };
822
823        // If we don't need to download any chunk
824        if missing_chunks.is_empty() {
825            dht_sub.unsubscribe().await;
826            return download_completed(&chunked).await;
827        }
828
829        // Set resource status to `Downloading` and send a MetadataDownloadCompleted event
830        let resource = update_resource!(hash, {
831            status = ResourceStatus::Downloading,
832        });
833        notify_event!(self, MetadataDownloadCompleted, resource);
834
835        // Start looking up seeders if we did not need to do it for the metadata
836        if metadata_seeder.is_none() {
837            if let Err(e) = self.lookup_tx.send(*hash).await {
838                dht_sub.unsubscribe().await;
839                return Err(e.into())
840            }
841        }
842
843        // Fetch missing chunks from seeders
844        let _ =
845            fetch_chunks(self, hash, &mut chunked, &dht_sub, metadata_seeder, &mut missing_chunks)
846                .await;
847
848        // We don't need the DHT events sub anymore
849        dht_sub.unsubscribe().await;
850
851        // Get chunked file from geode
852        let mut chunked = self.geode.get(hash, path).await?;
853
854        // Set resource status to `Verifying` and send FudEvent::ResourceUpdated
855        let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
856        notify_event!(self, ResourceUpdated, resource);
857
858        // Verify all chunks
859        self.verify_chunks(&resource, &mut chunked).await?;
860
861        let is_complete = chunked
862            .iter()
863            .filter(|(hash, _)| chunk_hashes.contains(hash))
864            .all(|(_, available)| *available);
865
866        // We fetched all chunks, but the resource is not complete
867        // (some chunks were missing from all seeders)
868        if !is_complete {
869            // Set resource status to `Incomplete`
870            let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
871
872            // Send a MissingChunks event
873            notify_event!(self, MissingChunks, resource);
874
875            return Ok(());
876        }
877
878        download_completed(&chunked).await
879    }
880
881    async fn write_scraps(
882        &self,
883        chunked: &mut ChunkedStorage,
884        chunk_hashes: &HashSet<blake3::Hash>,
885    ) -> Result<()> {
886        // Get all scraps
887        let mut scraps = HashMap::new();
888        // TODO: This can be improved to not loop over all chunks
889        for chunk_hash in chunk_hashes {
890            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
891            if scrap.is_none() {
892                continue;
893            }
894
895            // Verify the scrap we found
896            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
897            if scrap.is_err() {
898                continue;
899            }
900            let scrap: Scrap = scrap.unwrap();
901
902            // Add the scrap to the HashMap
903            scraps.insert(chunk_hash, scrap);
904        }
905
906        // Write all scraps
907        if !scraps.is_empty() {
908            info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
909        }
910        for (scrap_hash, mut scrap) in scraps {
911            let len = scrap.chunk.len();
912            let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
913            if let Err(e) = write_res {
914                error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
915                continue;
916            }
917            let (_, chunk_bytes_written) = write_res.unwrap();
918
919            // If the whole scrap was written, we can remove it from sled
920            if chunk_bytes_written == len {
921                self.scrap_tree.remove(scrap_hash.as_bytes())?;
922                continue;
923            }
924            // Otherwise update the scrap in sled
925            let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
926            if let Err(e) = chunk_res {
927                error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
928                continue;
929            }
930            scrap.hash_written = blake3::hash(&chunk_res.unwrap());
931            if let Err(e) =
932                self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
933            {
934                error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
935            }
936        }
937
938        Ok(())
939    }
940
941    /// Iterate over chunks and find which chunks are available locally,
942    /// either in the filesystem (using geode::verify_chunks()) or in scraps.
943    /// `chunk_hashes` is the list of chunk hashes we want to take into account, `None` means to
944    /// take all chunks into account.
945    /// Return the scraps in a HashMap, and the size in bytes of locally available data
946    /// (downloaded and downloaded+targeted).
947    pub async fn verify_chunks(
948        &self,
949        resource: &Resource,
950        chunked: &mut ChunkedStorage,
951    ) -> Result<(u64, u64)> {
952        let chunks = chunked.get_chunks().clone();
953        let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
954
955        // Gather all available chunks
956        for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
957            // Read the chunk using the `FileSequence`
958            let chunk =
959                match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
960                    Ok(c) => c,
961                    Err(Error::Io(ErrorKind::NotFound)) => continue,
962                    Err(e) => {
963                        warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
964                        break
965                    }
966                };
967
968            // Perform chunk consistency check
969            if self.geode.verify_chunk(chunk_hash, &chunk) {
970                chunked.get_chunk_mut(chunk_index).1 = true;
971                bytes.insert(
972                    *chunk_hash,
973                    (chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
974                );
975            }
976        }
977
978        // Look for the chunks that are not on the filesystem
979        let chunks = chunked.get_chunks().clone();
980        let missing_on_fs: Vec<_> =
981            chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
982
983        // Look for scraps
984        for (chunk_index, (chunk_hash, _)) in missing_on_fs {
985            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
986            if scrap.is_none() {
987                continue;
988            }
989
990            // Verify the scrap we found
991            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
992            if scrap.is_err() {
993                continue;
994            }
995            let scrap: Scrap = scrap.unwrap();
996            if blake3::hash(&scrap.chunk) != *chunk_hash {
997                continue;
998            }
999
1000            // Check if the scrap is still written on the filesystem
1001            let scrap_chunk =
1002                self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
1003            if scrap_chunk.is_err() {
1004                continue;
1005            }
1006            let scrap_chunk = scrap_chunk.unwrap();
1007
1008            // The scrap is not available if the chunk on the disk changed
1009            if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
1010                continue;
1011            }
1012
1013            // Mark the chunk as available
1014            chunked.get_chunk_mut(chunk_index).1 = true;
1015
1016            // Update the sums of locally available data
1017            bytes.insert(
1018                *chunk_hash,
1019                (scrap.chunk.len(), resource.get_selected_bytes(chunked, &scrap.chunk)),
1020            );
1021        }
1022
1023        // If the resource is a file: make the `FileSequence`'s file the
1024        // exact file size if we know the last chunk's size. This is not
1025        // needed for directories.
1026        if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
1027            if !chunked.is_dir() && *last_chunk_available {
1028                if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
1029                    let exact_file_size =
1030                        chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
1031                    chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
1032                }
1033            }
1034        }
1035
1036        let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
1037        let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
1038
1039        Ok((total_bytes_downloaded, target_bytes_downloaded))
1040    }
1041
1042    /// Add a resource from the file system.
1043    pub async fn put(&self, path: &Path) -> Result<()> {
1044        let put_tasks = self.put_tasks.read().await;
1045        drop(put_tasks);
1046
1047        self.put_tx.send(path.to_path_buf()).await?;
1048
1049        Ok(())
1050    }
1051
1052    /// Insert a file or directory from the file system.
1053    /// Called when `put()` creates a new put task.
1054    pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
1055        let self_node = self.node().await?;
1056
1057        if self_node.addresses.is_empty() {
1058            return Err(Error::Custom(
1059                "Cannot put resource, you don't have any external address".to_string(),
1060            ))
1061        }
1062
1063        let metadata = fs::metadata(path).await?;
1064
1065        // Get the list of files and the resource type (file or directory)
1066        let (files, resource_type) = if metadata.is_file() {
1067            (vec![(path.clone(), metadata.len())], ResourceType::File)
1068        } else if metadata.is_dir() {
1069            let mut files = get_all_files(path).await?;
1070            self.geode.sort_files(&mut files);
1071            (files, ResourceType::Directory)
1072        } else {
1073            return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
1074        };
1075
1076        // Read the file or directory and create the chunks
1077        let stream = FileSequence::new(&files, false);
1078        let total_size = stream.len();
1079        let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
1080
1081        // Get the relative file paths included in the metadata and hash of directories
1082        let relative_files = if let ResourceType::Directory = resource_type {
1083            // [(absolute file path, file size)] -> [(relative file path, file size)]
1084            let relative_files = files
1085                .into_iter()
1086                .map(|(file_path, size)| match file_path.strip_prefix(path) {
1087                    Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1088                    Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1089                })
1090                .collect::<Result<Vec<_>>>()?;
1091
1092            // Add the files metadata to the hasher to complete the resource hash
1093            self.geode.hash_files_metadata(&mut hasher, &relative_files);
1094
1095            relative_files
1096        } else {
1097            vec![]
1098        };
1099
1100        // Finalize the resource hash
1101        let hash = hasher.finalize();
1102
1103        // Create the metadata file in geode
1104        if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1105            error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1106            return Err(e)
1107        }
1108
1109        // Add path to the sled db
1110        if let Err(e) =
1111            self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1112        {
1113            error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1114            return Err(e.into())
1115        }
1116
1117        // Add resource
1118        let mut resources_write = self.resources.write().await;
1119        resources_write.insert(
1120            hash,
1121            Resource {
1122                hash,
1123                rtype: resource_type,
1124                path: path.to_path_buf(),
1125                status: ResourceStatus::Seeding,
1126                file_selection: FileSelection::All,
1127                total_chunks_count: chunk_hashes.len() as u64,
1128                target_chunks_count: chunk_hashes.len() as u64,
1129                total_chunks_downloaded: chunk_hashes.len() as u64,
1130                target_chunks_downloaded: chunk_hashes.len() as u64,
1131                total_bytes_size: total_size,
1132                target_bytes_size: total_size,
1133                total_bytes_downloaded: total_size,
1134                target_bytes_downloaded: total_size,
1135                speeds: vec![],
1136            },
1137        );
1138        drop(resources_write);
1139
1140        // Announce the new resource
1141        if let Ok(seeder) = self.new_seeder(&hash).await {
1142            let seeders = vec![seeder];
1143            let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() };
1144            let _ = self.dht.announce(&hash, &seeders, &fud_announce).await;
1145        }
1146
1147        // Send InsertCompleted event
1148        notify_event!(self, InsertCompleted, {
1149            hash,
1150            path: path.to_path_buf()
1151        });
1152
1153        Ok(())
1154    }
1155
1156    /// Removes:
1157    /// - a resource
1158    /// - its metadata in geode
1159    /// - its path in the sled path tree
1160    /// - its file selection in the sled file selection tree
1161    /// - and any related scrap in the sled scrap tree,
1162    ///
1163    /// then sends a `ResourceRemoved` fud event.
1164    pub async fn remove(&self, hash: &blake3::Hash) {
1165        // Remove the resource
1166        let mut resources_write = self.resources.write().await;
1167        resources_write.remove(hash);
1168        drop(resources_write);
1169
1170        // Remove the scraps in sled
1171        if let Ok(Some(path)) = self.hash_to_path(hash) {
1172            let chunked = self.geode.get(hash, &path).await;
1173
1174            if let Ok(chunked) = chunked {
1175                for (chunk_hash, _) in chunked.iter() {
1176                    let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
1177                }
1178            }
1179        }
1180
1181        // Remove the metadata in geode
1182        let hash_str = hash_to_string(hash);
1183        let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1184        let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1185
1186        // Remove the path in sled
1187        let _ = self.path_tree.remove(hash.as_bytes());
1188
1189        // Remove the file selection in sled
1190        let _ = self.file_selection_tree.remove(hash.as_bytes());
1191
1192        // Send a `ResourceRemoved` event
1193        notify_event!(self, ResourceRemoved, { hash: *hash });
1194    }
1195
1196    /// Remove seeders that are older than `expiry_secs`
1197    pub async fn prune_seeders(&self, expiry_secs: u32) {
1198        let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
1199        let mut seeders_write = self.dht.hash_table.write().await;
1200
1201        let keys: Vec<_> = seeders_write.keys().cloned().collect();
1202
1203        for key in keys {
1204            let items = seeders_write.get_mut(&key).unwrap();
1205            items.retain(|item| item.timestamp > expiry_timestamp);
1206            if items.is_empty() {
1207                seeders_write.remove(&key);
1208            }
1209        }
1210    }
1211
1212    /// Stop all tasks.
1213    pub async fn stop(&self) {
1214        info!("Stopping fetch tasks...");
1215        // Create a clone of fetch_tasks because `task.stop()` needs a write lock
1216        let fetch_tasks = self.fetch_tasks.read().await;
1217        let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1218            fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1219        drop(fetch_tasks);
1220
1221        for task in cloned_fetch_tasks.values() {
1222            task.stop().await;
1223        }
1224
1225        info!("Stopping put tasks...");
1226        let put_tasks = self.put_tasks.read().await;
1227        let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1228            put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1229        drop(put_tasks);
1230
1231        for task in cloned_put_tasks.values() {
1232            task.stop().await;
1233        }
1234
1235        info!("Stopping lookup tasks...");
1236        let lookup_tasks = self.lookup_tasks.read().await;
1237        let cloned_lookup_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1238            lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1239        drop(lookup_tasks);
1240
1241        for task in cloned_lookup_tasks.values() {
1242            task.stop().await;
1243        }
1244
1245        // Stop all other tasks
1246        let mut tasks = self.tasks.write().await;
1247        for (name, task) in tasks.clone() {
1248            info!("Stopping {name} task...");
1249            task.stop().await;
1250        }
1251        *tasks = HashMap::new();
1252    }
1253}