fud/
lib.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{HashMap, HashSet},
21    io::ErrorKind,
22    path::{Path, PathBuf},
23    sync::Arc,
24};
25
26use log::{error, info, warn};
27use sled_overlay::sled;
28use smol::{
29    channel,
30    fs::{self, OpenOptions},
31    lock::RwLock,
32};
33
34use darkfi::{
35    dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
36    geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
37    net::P2pPtr,
38    system::{ExecutorPtr, Publisher, PublisherPtr, StoppableTask},
39    util::{path::expand_path, time::Timestamp},
40    Error, Result,
41};
42use darkfi_sdk::crypto::SecretKey;
43use darkfi_serial::{deserialize_async, serialize_async};
44
45/// P2P protocols
46pub mod proto;
47use proto::FudAnnounce;
48
49/// FudEvent
50pub mod event;
51use event::{notify_event, FudEvent};
52
53/// Resource definition
54pub mod resource;
55use resource::{Resource, ResourceStatus, ResourceType};
56
57/// Scrap definition
58pub mod scrap;
59use scrap::Scrap;
60
61/// JSON-RPC related methods
62pub mod rpc;
63
64/// Background tasks
65pub mod tasks;
66use tasks::start_task;
67
68/// Bitcoin
69pub mod bitcoin;
70
71/// PoW
72pub mod pow;
73use pow::{FudPow, VerifiableNodeData};
74
75/// Equi-X
76pub mod equix;
77
78/// Settings and args
79pub mod settings;
80use settings::Args;
81
82/// Utils
83pub mod util;
84use util::{create_all_files, get_all_files, FileSelection};
85
86/// Download methods
87mod download;
88use download::{fetch_chunks, fetch_metadata};
89
90/// [`DhtHandler`] implementation and fud-specific DHT structs
91pub mod dht;
92use dht::FudSeeder;
93
94const SLED_PATH_TREE: &[u8] = b"_fud_paths";
95const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
96const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
97
98pub struct Fud {
99    /// Our own [`VerifiableNodeData`]
100    pub node_data: Arc<RwLock<VerifiableNodeData>>,
101    /// Our secret key (the public key is in `node_data`)
102    pub secret_key: Arc<RwLock<SecretKey>>,
103    /// The Geode instance
104    geode: Geode,
105    /// Default download directory
106    downloads_path: PathBuf,
107    /// Chunk transfer timeout in seconds
108    chunk_timeout: u64,
109    /// The [`FudPow`] instance
110    pub pow: Arc<RwLock<FudPow>>,
111    /// The DHT instance
112    dht: Arc<Dht<Fud>>,
113    /// Resources (current status of all downloads/seeds)
114    resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
115    /// Sled tree containing "resource hash -> path on the filesystem"
116    path_tree: sled::Tree,
117    /// Sled tree containing "resource hash -> file selection". If the file
118    /// selection is all files of the resource (or if the resource is not a
119    /// directory), the resource does not store its file selection in the tree.
120    file_selection_tree: sled::Tree,
121    /// Sled tree containing scraps which are chunks containing data the user
122    /// did not want to save to files. They also contain data the user wanted
123    /// otherwise we would not have downloaded the chunk at all.
124    /// We save scraps to be able to verify integrity even if part of the chunk
125    /// is not saved to the filesystem in the downloaded files.
126    /// "chunk/scrap hash -> chunk content"
127    scrap_tree: sled::Tree,
128    /// Get requests sender
129    get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
130    /// Get requests receiver
131    get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
132    /// Put requests sender
133    put_tx: channel::Sender<PathBuf>,
134    /// Put requests receiver
135    put_rx: channel::Receiver<PathBuf>,
136    /// Lookup requests sender
137    lookup_tx: channel::Sender<(blake3::Hash, PublisherPtr<Option<Vec<FudSeeder>>>)>,
138    /// Lookup requests receiver
139    lookup_rx: channel::Receiver<(blake3::Hash, PublisherPtr<Option<Vec<FudSeeder>>>)>,
140    /// Currently active downloading tasks (running the `fud.fetch_resource()` method)
141    fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
142    /// Currently active put tasks (running the `fud.insert_resource()` method)
143    put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
144    /// Currently active lookup tasks (running the `fud.lookup_value()` method)
145    lookup_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
146    /// Currently active tasks (defined in `tasks`, started with the `start_task` macro)
147    tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
148    /// Used to send events to fud clients
149    event_publisher: PublisherPtr<FudEvent>,
150    /// Pointer to the P2P network instance
151    p2p: P2pPtr,
152    /// Global multithreaded executor reference
153    pub executor: ExecutorPtr,
154}
155
156impl Fud {
157    pub async fn new(
158        settings: Args,
159        p2p: P2pPtr,
160        sled_db: &sled::Db,
161        event_publisher: PublisherPtr<FudEvent>,
162        executor: ExecutorPtr,
163    ) -> Result<Arc<Self>> {
164        let basedir = expand_path(&settings.base_dir)?;
165        let downloads_path = match settings.downloads_path {
166            Some(downloads_path) => expand_path(&downloads_path)?,
167            None => basedir.join("downloads"),
168        };
169
170        // Run the PoW and generate a `VerifiableNodeData`
171        let mut pow = FudPow::new(settings.pow.into(), executor.clone());
172        pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes
173        let (node_data, secret_key) = pow.generate_node().await?;
174        info!(target: "fud::new()", "Your node ID: {}", hash_to_string(&node_data.id()));
175
176        // Geode
177        info!(target: "fud::new()", "Instantiating Geode instance");
178        let geode = Geode::new(&basedir).await?;
179
180        // DHT
181        let dht_settings: DhtSettings = settings.dht.into();
182        let dht: Arc<Dht<Fud>> =
183            Arc::new(Dht::<Fud>::new(&dht_settings, p2p.clone(), executor.clone()).await);
184
185        let (get_tx, get_rx) = smol::channel::unbounded();
186        let (put_tx, put_rx) = smol::channel::unbounded();
187        let (lookup_tx, lookup_rx) = smol::channel::unbounded();
188        let fud = Arc::new(Self {
189            node_data: Arc::new(RwLock::new(node_data)),
190            secret_key: Arc::new(RwLock::new(secret_key)),
191            geode,
192            downloads_path,
193            chunk_timeout: settings.chunk_timeout,
194            pow: Arc::new(RwLock::new(pow)),
195            dht: dht.clone(),
196            path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
197            file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
198            scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
199            resources: Arc::new(RwLock::new(HashMap::new())),
200            get_tx,
201            get_rx,
202            put_tx,
203            put_rx,
204            lookup_tx,
205            lookup_rx,
206            fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
207            put_tasks: Arc::new(RwLock::new(HashMap::new())),
208            lookup_tasks: Arc::new(RwLock::new(HashMap::new())),
209            tasks: Arc::new(RwLock::new(HashMap::new())),
210            event_publisher,
211            p2p,
212            executor,
213        });
214        *dht.handler.write().await = Arc::downgrade(&fud);
215
216        Ok(fud)
217    }
218
219    pub async fn start_tasks(self: &Arc<Self>) {
220        let mut tasks = self.tasks.write().await;
221        start_task!(self, "get", tasks::get_task, tasks);
222        start_task!(self, "put", tasks::put_task, tasks);
223        start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud>, tasks);
224        start_task!(self, "lookup", tasks::lookup_task, tasks);
225        start_task!(self, "announce", tasks::announce_seed_task, tasks);
226        start_task!(self, "node ID", tasks::node_id_task, tasks);
227    }
228
229    /// Bootstrap the DHT, verify our resources, add ourselves to
230    /// the seeders (`dht.hash_table`) for the resources we already have,
231    /// announce our files.
232    async fn init(&self) -> Result<()> {
233        info!(target: "fud::init()", "Bootstrapping the DHT...");
234        self.dht.bootstrap().await;
235
236        info!(target: "fud::init()", "Finding resources...");
237        let mut resources_write = self.resources.write().await;
238        for result in self.path_tree.iter() {
239            if result.is_err() {
240                continue;
241            }
242
243            // Parse hash
244            let (hash, path) = result.unwrap();
245            let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
246                Ok(v) => v,
247                Err(_) => continue,
248            };
249            let hash = blake3::Hash::from_bytes(hash_bytes);
250
251            // Parse path
252            let path_bytes = path.to_vec();
253            let path_str = match std::str::from_utf8(&path_bytes) {
254                Ok(v) => v,
255                Err(_) => continue,
256            };
257            let path: PathBuf = match expand_path(path_str) {
258                Ok(v) => v,
259                Err(_) => continue,
260            };
261
262            // Get the file selection from sled, fallback on FileSelection::All
263            let mut file_selection = FileSelection::All;
264            if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
265                if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
266                    file_selection = FileSelection::Set(
267                        path_list
268                            .into_iter()
269                            .filter_map(|bytes| {
270                                std::str::from_utf8(&bytes)
271                                    .ok()
272                                    .and_then(|path_str| expand_path(path_str).ok())
273                            })
274                            .collect(),
275                    );
276                }
277            }
278
279            // Add resource
280            resources_write.insert(
281                hash,
282                Resource::new(
283                    hash,
284                    ResourceType::Unknown,
285                    &path,
286                    ResourceStatus::Incomplete,
287                    file_selection,
288                ),
289            );
290        }
291        drop(resources_write);
292
293        info!(target: "fud::init()", "Verifying resources...");
294        let resources = self.verify_resources(None).await?;
295
296        let self_node = self.node().await;
297
298        // Stop here if we have no external address
299        if self_node.addresses.is_empty() {
300            return Ok(());
301        }
302
303        // Add our own node as a seeder for the resources we are seeding
304        for resource in &resources {
305            let self_router_items = vec![self.new_seeder(&resource.hash).await];
306            self.add_value(&resource.hash, &self_router_items).await;
307        }
308
309        info!(target: "fud::init()", "Announcing resources...");
310        for resource in resources {
311            let seeders = vec![self.new_seeder(&resource.hash).await];
312            let _ = self
313                .dht
314                .announce(
315                    &resource.hash,
316                    &seeders.clone(),
317                    &FudAnnounce { key: resource.hash, seeders },
318                )
319                .await;
320        }
321
322        Ok(())
323    }
324
325    /// Get resource path from hash using the sled db
326    pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
327        if let Some(value) = self.path_tree.get(hash.as_bytes())? {
328            let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
329            return Ok(Some(path));
330        }
331
332        Ok(None)
333    }
334
335    /// Get resource hash from path using the sled db
336    pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
337        let path_string = path.to_string_lossy().to_string();
338        let path_bytes = path_string.as_bytes();
339        for path_item in self.path_tree.iter() {
340            let (key, value) = path_item?;
341            if value == path_bytes {
342                let bytes: &[u8] = &key;
343                if bytes.len() != 32 {
344                    return Err(Error::Custom(format!(
345                        "Expected a 32-byte BLAKE3, got {} bytes",
346                        bytes.len()
347                    )));
348                }
349
350                let array: [u8; 32] = bytes.try_into().unwrap();
351                return Ok(Some(array.into()))
352            }
353        }
354
355        Ok(None)
356    }
357
358    /// Create a new [`dht::FudSeeder`] for own node
359    pub async fn new_seeder(&self, key: &blake3::Hash) -> FudSeeder {
360        FudSeeder {
361            key: *key,
362            node: self.node().await,
363            timestamp: Timestamp::current_time().inner(),
364        }
365    }
366
367    /// Verify if resources are complete and uncorrupted.
368    /// If a resource is incomplete or corrupted, its status is changed to Incomplete.
369    /// If a resource is complete, its status is changed to Seeding.
370    /// Takes an optional list of resource hashes.
371    /// If no hash is given (None), it verifies all resources.
372    /// Returns the list of verified and uncorrupted/complete seeding resources.
373    pub async fn verify_resources(
374        &self,
375        hashes: Option<Vec<blake3::Hash>>,
376    ) -> Result<Vec<Resource>> {
377        let mut resources_write = self.resources.write().await;
378
379        let update_resource = async |resource: &mut Resource,
380                                     status: ResourceStatus,
381                                     chunked: Option<&ChunkedStorage>,
382                                     total_bytes_downloaded: u64,
383                                     target_bytes_downloaded: u64| {
384            let files = match chunked {
385                Some(chunked) => resource.get_selected_files(chunked),
386                None => vec![],
387            };
388            let chunk_hashes = match chunked {
389                Some(chunked) => resource.get_selected_chunks(chunked),
390                None => HashSet::new(),
391            };
392
393            if let Some(chunked) = chunked {
394                resource.rtype = match chunked.is_dir() {
395                    false => ResourceType::File,
396                    true => ResourceType::Directory,
397                };
398            }
399
400            resource.status = status;
401            resource.total_chunks_count = match chunked {
402                Some(chunked) => chunked.len() as u64,
403                None => 0,
404            };
405            resource.target_chunks_count = chunk_hashes.len() as u64;
406            resource.total_chunks_downloaded = match chunked {
407                Some(chunked) => chunked.local_chunks() as u64,
408                None => 0,
409            };
410            resource.target_chunks_downloaded = match chunked {
411                Some(chunked) => chunked
412                    .iter()
413                    .filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
414                    .count() as u64,
415                None => 0,
416            };
417
418            resource.total_bytes_size = match chunked {
419                Some(chunked) => chunked.get_fileseq().len(),
420                None => 0,
421            };
422            resource.target_bytes_size = match chunked {
423                Some(chunked) => chunked
424                    .get_files()
425                    .iter()
426                    .filter(|(path, _)| files.contains(path))
427                    .map(|(_, size)| size)
428                    .sum(),
429                None => 0,
430            };
431
432            resource.total_bytes_downloaded = total_bytes_downloaded;
433            resource.target_bytes_downloaded = target_bytes_downloaded;
434
435            notify_event!(self, ResourceUpdated, resource);
436        };
437
438        let mut seeding_resources: Vec<Resource> = vec![];
439        for (_, mut resource) in resources_write.iter_mut() {
440            if let Some(ref hashes_list) = hashes {
441                if !hashes_list.contains(&resource.hash) {
442                    continue;
443                }
444            }
445
446            match resource.status {
447                ResourceStatus::Seeding => {}
448                ResourceStatus::Incomplete => {}
449                _ => continue,
450            };
451
452            // Make sure the resource is not corrupted or incomplete
453            let resource_path = match self.hash_to_path(&resource.hash) {
454                Ok(Some(v)) => v,
455                Ok(None) | Err(_) => {
456                    update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
457                    continue;
458                }
459            };
460            let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
461                Ok(v) => v,
462                Err(_) => {
463                    update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
464                    continue;
465                }
466            };
467            let verify_res = self.verify_chunks(resource, &mut chunked).await;
468            if let Err(e) = verify_res {
469                error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
470                update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
471                continue;
472            }
473            let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
474
475            if !chunked.is_complete() {
476                update_resource(
477                    &mut resource,
478                    ResourceStatus::Incomplete,
479                    Some(&chunked),
480                    total_bytes_downloaded,
481                    target_bytes_downloaded,
482                )
483                .await;
484                continue;
485            }
486
487            update_resource(
488                &mut resource,
489                ResourceStatus::Seeding,
490                Some(&chunked),
491                total_bytes_downloaded,
492                target_bytes_downloaded,
493            )
494            .await;
495            seeding_resources.push(resource.clone());
496        }
497
498        Ok(seeding_resources)
499    }
500
501    /// Start downloading a file or directory from the network to `path`.
502    /// This creates a new task in `fetch_tasks` calling `fetch_resource()`.
503    /// `files` is the list of files (relative paths) you want to download
504    /// (if the resource is a directory), None means you want all files.
505    pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
506        let fetch_tasks = self.fetch_tasks.read().await;
507        if fetch_tasks.contains_key(hash) {
508            return Err(Error::Custom(format!(
509                "Resource {} is already being downloaded",
510                hash_to_string(hash)
511            )))
512        }
513        drop(fetch_tasks);
514
515        self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
516
517        Ok(())
518    }
519
520    /// Try to get the chunked file or directory from geode, if we don't have it
521    /// then it is fetched from the network using `fetch_metadata()`.
522    /// If we need to fetch from the network, the seeders we find are sent to
523    /// `seeders_pub`.
524    /// The seeder in the returned result is only defined if we fetched from
525    /// the network.
526    pub async fn get_metadata(
527        &self,
528        hash: &blake3::Hash,
529        path: &Path,
530        seeders_pub: PublisherPtr<Option<Vec<FudSeeder>>>,
531    ) -> Result<(ChunkedStorage, Option<FudSeeder>)> {
532        match self.geode.get(hash, path).await {
533            // We already know the metadata
534            Ok(v) => Ok((v, None)),
535            // The metadata in geode is invalid or corrupted
536            Err(Error::GeodeNeedsGc) => todo!(),
537            // If we could not find the metadata in geode, get it from the network
538            Err(Error::GeodeFileNotFound) => {
539                // Find nodes close to the file hash
540                info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
541                let metadata_sub = seeders_pub.clone().subscribe().await;
542                self.lookup_tx.send((*hash, seeders_pub.clone())).await?;
543
544                // Fetch resource metadata
545                let fetch_res = fetch_metadata(self, hash, &metadata_sub, path).await;
546                metadata_sub.unsubscribe().await;
547                let seeder = fetch_res?;
548                Ok((self.geode.get(hash, path).await?, Some(seeder)))
549            }
550            Err(e) => Err(e),
551        }
552    }
553
554    /// Download a file or directory from the network to `path`.
555    /// Called when `get()` creates a new fetch task.
556    pub async fn fetch_resource(
557        &self,
558        hash: &blake3::Hash,
559        path: &Path,
560        files: &FileSelection,
561    ) -> Result<()> {
562        let hash_bytes = hash.as_bytes();
563        let path_string = path.to_string_lossy().to_string();
564        let path_bytes = path_string.as_bytes();
565
566        // Macro that acquires a write lock on `self.resources`, updates a
567        // resource, and returns the resource (dropping the write lock)
568        macro_rules! update_resource {
569            ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
570                let mut resources_write = self.resources.write().await;
571                let resource = match resources_write.get_mut($hash) {
572                    Some(resource) => {
573                        $(resource.$field = $value;)* // Apply the field assignments
574                        resource.clone()
575                    }
576                    None => return Ok(()), // Resource was removed, abort
577                };
578                resource
579            }};
580        }
581
582        // Make sure we don't already have another resource on that path
583        if let Ok(Some(hash_found)) = self.path_to_hash(path) {
584            if *hash != hash_found {
585                return Err(Error::Custom(format!(
586                    "There is already another resource on path {path_string}"
587                )))
588            }
589        }
590
591        // Add path to the sled db
592        self.path_tree.insert(hash_bytes, path_bytes)?;
593
594        // Add file selection to the sled db
595        if let FileSelection::Set(selected_files) = files {
596            let paths: Vec<Vec<u8>> = selected_files
597                .iter()
598                .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
599                .collect();
600            let serialized_paths = serialize_async(&paths).await;
601            // Abort if the file selection cannot be inserted into sled
602            if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
603                return Err(Error::SledError(e))
604            }
605        }
606
607        // Add resource to `self.resources`
608        let resource = Resource::new(
609            *hash,
610            ResourceType::Unknown,
611            path,
612            ResourceStatus::Discovering,
613            files.clone(),
614        );
615        let mut resources_write = self.resources.write().await;
616        resources_write.insert(*hash, resource.clone());
617        drop(resources_write);
618
619        // Send a DownloadStarted event
620        notify_event!(self, DownloadStarted, resource);
621
622        let seeders_pub = Publisher::new();
623        let seeders_sub = seeders_pub.clone().subscribe().await;
624
625        // Try to get the chunked file or directory from geode
626        let metadata_result = self.get_metadata(hash, path, seeders_pub.clone()).await;
627
628        if let Err(e) = metadata_result {
629            // Set resource status to `Incomplete` and send a `MetadataNotFound` event
630            let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
631            notify_event!(self, MetadataNotFound, resource);
632            return Err(e)
633        }
634        let (mut chunked, metadata_seeder) = metadata_result.unwrap();
635
636        // Get a list of all file paths the user wants to fetch
637        let resources_read = self.resources.read().await;
638        let resource = match resources_read.get(hash) {
639            Some(resource) => resource,
640            None => return Ok(()), // Resource was removed, abort
641        };
642        let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked);
643        drop(resources_read);
644
645        // Create all files (and all necessary directories)
646        create_all_files(&files_vec).await?;
647
648        // Set resource status to `Verifying` and send a `MetadataDownloadCompleted` event
649        let resource = update_resource!(hash, {
650            status = ResourceStatus::Verifying,
651            total_chunks_count = chunked.len() as u64,
652            total_bytes_size = chunked.get_fileseq().len(),
653            rtype = match chunked.is_dir() {
654                false => ResourceType::File,
655                true => ResourceType::Directory,
656            },
657        });
658        notify_event!(self, MetadataDownloadCompleted, resource);
659
660        // Set of all chunks we need locally (including the ones we already have)
661        let chunk_hashes = resource.get_selected_chunks(&chunked);
662
663        // Write all scraps to make sure the data on the filesystem is correct
664        self.write_scraps(&mut chunked, &chunk_hashes).await?;
665
666        // Mark locally available chunks as such
667        let verify_res = self.verify_chunks(&resource, &mut chunked).await;
668        if let Err(e) = verify_res {
669            error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
670            return Err(e);
671        }
672        let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
673
674        // Update `total_bytes_size` if the resource is a file
675        if let ResourceType::File = resource.rtype {
676            update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
677            notify_event!(self, ResourceUpdated, resource);
678        }
679
680        // If `chunked` is a file that is bigger than the all its chunks,
681        // truncate the file to the chunks.
682        // This fixes two edge-cases: a file that exactly ends at the end of
683        // a chunk, and a file with no chunk.
684        if !chunked.is_dir() {
685            let fs_metadata = fs::metadata(&path).await?;
686            if fs_metadata.len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
687                if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
688                    let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
689                }
690            }
691        }
692
693        // Set of all chunks we need locally and their current availability
694        let chunks: HashSet<(blake3::Hash, bool)> =
695            chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
696
697        // Set of the chunks we need to download
698        let mut missing_chunks: HashSet<blake3::Hash> =
699            chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
700
701        // Update the resource with the chunks/bytes counts
702        update_resource!(hash, {
703            target_chunks_count = chunks.len() as u64,
704            total_chunks_downloaded = chunked.local_chunks() as u64,
705            target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
706
707            target_bytes_size =
708                chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
709            total_bytes_downloaded = total_bytes_downloaded,
710            target_bytes_downloaded = target_bytes_downloaded,
711        });
712
713        let download_completed = async |chunked: &ChunkedStorage| -> Result<()> {
714            // Set resource status to `Seeding` or `Incomplete`
715            let resource = update_resource!(hash, {
716                status = match chunked.is_complete() {
717                    true => ResourceStatus::Seeding,
718                    false => ResourceStatus::Incomplete,
719                },
720                target_chunks_downloaded = chunks.len() as u64,
721                total_chunks_downloaded = chunked.local_chunks() as u64,
722            });
723
724            // Announce the resource if we have all chunks
725            if chunked.is_complete() {
726                let seeders = vec![self.new_seeder(hash).await];
727                let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() };
728                let _ = self.dht.announce(hash, &seeders, &self_announce).await;
729            }
730
731            // Send a DownloadCompleted event
732            notify_event!(self, DownloadCompleted, resource);
733
734            Ok(())
735        };
736
737        // If we don't need to download any chunk
738        if missing_chunks.is_empty() {
739            return download_completed(&chunked).await;
740        }
741
742        // Set resource status to `Downloading` and send a MetadataDownloadCompleted event
743        let resource = update_resource!(hash, {
744            status = ResourceStatus::Downloading,
745        });
746        notify_event!(self, MetadataDownloadCompleted, resource);
747
748        // Start looking up seeders if we did not need to do it for the metadata
749        if metadata_seeder.is_none() {
750            self.lookup_tx.send((*hash, seeders_pub)).await?;
751        }
752
753        // Fetch missing chunks from seeders
754        let _ = fetch_chunks(
755            self,
756            hash,
757            &mut chunked,
758            &seeders_sub,
759            metadata_seeder,
760            &mut missing_chunks,
761        )
762        .await;
763
764        // Get chunked file from geode
765        let mut chunked = self.geode.get(hash, path).await?;
766
767        // Set resource status to `Verifying` and send FudEvent::ResourceUpdated
768        let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
769        notify_event!(self, ResourceUpdated, resource);
770
771        // Verify all chunks
772        self.verify_chunks(&resource, &mut chunked).await?;
773
774        let is_complete = chunked
775            .iter()
776            .filter(|(hash, _)| chunk_hashes.contains(hash))
777            .all(|(_, available)| *available);
778
779        // We fetched all chunks, but the resource is not complete
780        // (some chunks were missing from all seeders)
781        if !is_complete {
782            // Set resource status to `Incomplete`
783            let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
784
785            // Send a MissingChunks event
786            notify_event!(self, MissingChunks, resource);
787
788            return Ok(());
789        }
790
791        download_completed(&chunked).await
792    }
793
794    async fn write_scraps(
795        &self,
796        chunked: &mut ChunkedStorage,
797        chunk_hashes: &HashSet<blake3::Hash>,
798    ) -> Result<()> {
799        // Get all scraps
800        let mut scraps = HashMap::new();
801        // TODO: This can be improved to not loop over all chunks
802        for chunk_hash in chunk_hashes {
803            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
804            if scrap.is_none() {
805                continue;
806            }
807
808            // Verify the scrap we found
809            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
810            if scrap.is_err() {
811                continue;
812            }
813            let scrap: Scrap = scrap.unwrap();
814
815            // Add the scrap to the HashMap
816            scraps.insert(chunk_hash, scrap);
817        }
818
819        // Write all scraps
820        if !scraps.is_empty() {
821            info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
822        }
823        for (scrap_hash, mut scrap) in scraps {
824            let len = scrap.chunk.len();
825            let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
826            if let Err(e) = write_res {
827                error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
828                continue;
829            }
830            let (_, chunk_bytes_written) = write_res.unwrap();
831
832            // If the whole scrap was written, we can remove it from sled
833            if chunk_bytes_written == len {
834                self.scrap_tree.remove(scrap_hash.as_bytes())?;
835                continue;
836            }
837            // Otherwise update the scrap in sled
838            let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
839            if let Err(e) = chunk_res {
840                error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
841                continue;
842            }
843            scrap.hash_written = blake3::hash(&chunk_res.unwrap());
844            if let Err(e) =
845                self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
846            {
847                error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
848            }
849        }
850
851        Ok(())
852    }
853
854    /// Iterate over chunks and find which chunks are available locally,
855    /// either in the filesystem (using geode::verify_chunks()) or in scraps.
856    /// `chunk_hashes` is the list of chunk hashes we want to take into account, `None` means to
857    /// take all chunks into account.
858    /// Return the scraps in a HashMap, and the size in bytes of locally available data
859    /// (downloaded and downloaded+targeted).
860    pub async fn verify_chunks(
861        &self,
862        resource: &Resource,
863        chunked: &mut ChunkedStorage,
864    ) -> Result<(u64, u64)> {
865        let chunks = chunked.get_chunks().clone();
866        let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
867
868        // Gather all available chunks
869        for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
870            // Read the chunk using the `FileSequence`
871            let chunk =
872                match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
873                    Ok(c) => c,
874                    Err(Error::Io(ErrorKind::NotFound)) => continue,
875                    Err(e) => {
876                        warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
877                        break
878                    }
879                };
880
881            // Perform chunk consistency check
882            if self.geode.verify_chunk(chunk_hash, &chunk) {
883                chunked.get_chunk_mut(chunk_index).1 = true;
884                bytes.insert(
885                    *chunk_hash,
886                    (chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
887                );
888            }
889        }
890
891        // Look for the chunks that are not on the filesystem
892        let chunks = chunked.get_chunks().clone();
893        let missing_on_fs: Vec<_> =
894            chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
895
896        // Look for scraps
897        for (chunk_index, (chunk_hash, _)) in missing_on_fs {
898            let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
899            if scrap.is_none() {
900                continue;
901            }
902
903            // Verify the scrap we found
904            let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
905            if scrap.is_err() {
906                continue;
907            }
908            let scrap: Scrap = scrap.unwrap();
909            if blake3::hash(&scrap.chunk) != *chunk_hash {
910                continue;
911            }
912
913            // Check if the scrap is still written on the filesystem
914            let scrap_chunk =
915                self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
916            if scrap_chunk.is_err() {
917                continue;
918            }
919            let scrap_chunk = scrap_chunk.unwrap();
920
921            // The scrap is not available if the chunk on the disk changed
922            if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
923                continue;
924            }
925
926            // Mark the chunk as available
927            chunked.get_chunk_mut(chunk_index).1 = true;
928
929            // Update the sums of locally available data
930            bytes.insert(
931                *chunk_hash,
932                (scrap.chunk.len(), resource.get_selected_bytes(chunked, &scrap.chunk)),
933            );
934        }
935
936        // If the resource is a file: make the `FileSequence`'s file the
937        // exact file size if we know the last chunk's size. This is not
938        // needed for directories.
939        if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
940            if !chunked.is_dir() && *last_chunk_available {
941                if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
942                    let exact_file_size =
943                        chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
944                    chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
945                }
946            }
947        }
948
949        let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
950        let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
951
952        Ok((total_bytes_downloaded, target_bytes_downloaded))
953    }
954
955    /// Add a resource from the file system.
956    pub async fn put(&self, path: &Path) -> Result<()> {
957        let put_tasks = self.put_tasks.read().await;
958        drop(put_tasks);
959
960        self.put_tx.send(path.to_path_buf()).await?;
961
962        Ok(())
963    }
964
965    /// Insert a file or directory from the file system.
966    /// Called when `put()` creates a new put task.
967    pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
968        let self_node = self.node().await;
969
970        if self_node.addresses.is_empty() {
971            return Err(Error::Custom(
972                "Cannot put resource, you don't have any external address".to_string(),
973            ))
974        }
975
976        let metadata = fs::metadata(path).await?;
977
978        // Get the list of files and the resource type (file or directory)
979        let (files, resource_type) = if metadata.is_file() {
980            (vec![(path.clone(), metadata.len())], ResourceType::File)
981        } else if metadata.is_dir() {
982            let mut files = get_all_files(path).await?;
983            self.geode.sort_files(&mut files);
984            (files, ResourceType::Directory)
985        } else {
986            return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
987        };
988
989        // Read the file or directory and create the chunks
990        let stream = FileSequence::new(&files, false);
991        let total_size = stream.len();
992        let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
993
994        // Get the relative file paths included in the metadata and hash of directories
995        let relative_files = if let ResourceType::Directory = resource_type {
996            // [(absolute file path, file size)] -> [(relative file path, file size)]
997            let relative_files = files
998                .into_iter()
999                .map(|(file_path, size)| match file_path.strip_prefix(path) {
1000                    Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1001                    Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1002                })
1003                .collect::<Result<Vec<_>>>()?;
1004
1005            // Add the files metadata to the hasher to complete the resource hash
1006            self.geode.hash_files_metadata(&mut hasher, &relative_files);
1007
1008            relative_files
1009        } else {
1010            vec![]
1011        };
1012
1013        // Finalize the resource hash
1014        let hash = hasher.finalize();
1015
1016        // Create the metadata file in geode
1017        if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1018            error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1019            return Err(e)
1020        }
1021
1022        // Add path to the sled db
1023        if let Err(e) =
1024            self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1025        {
1026            error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1027            return Err(e.into())
1028        }
1029
1030        // Add resource
1031        let mut resources_write = self.resources.write().await;
1032        resources_write.insert(
1033            hash,
1034            Resource {
1035                hash,
1036                rtype: resource_type,
1037                path: path.to_path_buf(),
1038                status: ResourceStatus::Seeding,
1039                file_selection: FileSelection::All,
1040                total_chunks_count: chunk_hashes.len() as u64,
1041                target_chunks_count: chunk_hashes.len() as u64,
1042                total_chunks_downloaded: chunk_hashes.len() as u64,
1043                target_chunks_downloaded: chunk_hashes.len() as u64,
1044                total_bytes_size: total_size,
1045                target_bytes_size: total_size,
1046                total_bytes_downloaded: total_size,
1047                target_bytes_downloaded: total_size,
1048                speeds: vec![],
1049            },
1050        );
1051        drop(resources_write);
1052
1053        // Announce the new resource
1054        let seeders = vec![self.new_seeder(&hash).await];
1055        let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() };
1056        let _ = self.dht.announce(&hash, &seeders, &fud_announce).await;
1057
1058        // Send InsertCompleted event
1059        notify_event!(self, InsertCompleted, {
1060            hash,
1061            path: path.to_path_buf()
1062        });
1063
1064        Ok(())
1065    }
1066
1067    /// Removes:
1068    /// - a resource
1069    /// - its metadata in geode
1070    /// - its path in the sled path tree
1071    /// - its file selection in the sled file selection tree
1072    /// - and any related scrap in the sled scrap tree,
1073    ///
1074    /// then sends a `ResourceRemoved` fud event.
1075    pub async fn remove(&self, hash: &blake3::Hash) {
1076        // Remove the resource
1077        let mut resources_write = self.resources.write().await;
1078        resources_write.remove(hash);
1079        drop(resources_write);
1080
1081        // Remove the scraps in sled
1082        if let Ok(Some(path)) = self.hash_to_path(hash) {
1083            let chunked = self.geode.get(hash, &path).await;
1084
1085            if let Ok(chunked) = chunked {
1086                for (chunk_hash, _) in chunked.iter() {
1087                    let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
1088                }
1089            }
1090        }
1091
1092        // Remove the metadata in geode
1093        let hash_str = hash_to_string(hash);
1094        let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1095        let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1096
1097        // Remove the path in sled
1098        let _ = self.path_tree.remove(hash.as_bytes());
1099
1100        // Remove the file selection in sled
1101        let _ = self.file_selection_tree.remove(hash.as_bytes());
1102
1103        // Send a `ResourceRemoved` event
1104        notify_event!(self, ResourceRemoved, { hash: *hash });
1105    }
1106
1107    /// Remove seeders that are older than `expiry_secs`
1108    pub async fn prune_seeders(&self, expiry_secs: u32) {
1109        let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
1110        let mut seeders_write = self.dht.hash_table.write().await;
1111
1112        let keys: Vec<_> = seeders_write.keys().cloned().collect();
1113
1114        for key in keys {
1115            let items = seeders_write.get_mut(&key).unwrap();
1116            items.retain(|item| item.timestamp > expiry_timestamp);
1117            if items.is_empty() {
1118                seeders_write.remove(&key);
1119            }
1120        }
1121    }
1122
1123    /// Stop all tasks.
1124    pub async fn stop(&self) {
1125        info!("Stopping fetch tasks...");
1126        // Create a clone of fetch_tasks because `task.stop()` needs a write lock
1127        let fetch_tasks = self.fetch_tasks.read().await;
1128        let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1129            fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1130        drop(fetch_tasks);
1131
1132        for task in cloned_fetch_tasks.values() {
1133            task.stop().await;
1134        }
1135
1136        info!("Stopping put tasks...");
1137        let put_tasks = self.put_tasks.read().await;
1138        let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1139            put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1140        drop(put_tasks);
1141
1142        for task in cloned_put_tasks.values() {
1143            task.stop().await;
1144        }
1145
1146        info!("Stopping lookup tasks...");
1147        let lookup_tasks = self.lookup_tasks.read().await;
1148        let cloned_lookup_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1149            lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1150        drop(lookup_tasks);
1151
1152        for task in cloned_lookup_tasks.values() {
1153            task.stop().await;
1154        }
1155
1156        // Stop all other tasks
1157        let mut tasks = self.tasks.write().await;
1158        for (name, task) in tasks.clone() {
1159            info!("Stopping {name} task...");
1160            task.stop().await;
1161        }
1162        *tasks = HashMap::new();
1163    }
1164}