1use std::{
20 collections::{HashMap, HashSet},
21 io::ErrorKind,
22 path::{Path, PathBuf},
23 sync::Arc,
24};
25
26use log::{error, info, warn};
27use sled_overlay::sled;
28use smol::{
29 channel,
30 fs::{self, OpenOptions},
31 lock::RwLock,
32};
33
34use darkfi::{
35 dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
36 geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
37 net::P2pPtr,
38 system::{ExecutorPtr, Publisher, PublisherPtr, StoppableTask},
39 util::{path::expand_path, time::Timestamp},
40 Error, Result,
41};
42use darkfi_sdk::crypto::SecretKey;
43use darkfi_serial::{deserialize_async, serialize_async};
44
45pub mod proto;
47use proto::FudAnnounce;
48
49pub mod event;
51use event::{notify_event, FudEvent};
52
53pub mod resource;
55use resource::{Resource, ResourceStatus, ResourceType};
56
57pub mod scrap;
59use scrap::Scrap;
60
61pub mod rpc;
63
64pub mod tasks;
66use tasks::start_task;
67
68pub mod bitcoin;
70
71pub mod pow;
73use pow::{FudPow, VerifiableNodeData};
74
75pub mod equix;
77
78pub mod settings;
80use settings::Args;
81
82pub mod util;
84use util::{create_all_files, get_all_files, FileSelection};
85
86mod download;
88use download::{fetch_chunks, fetch_metadata};
89
90pub mod dht;
92use dht::FudSeeder;
93
94const SLED_PATH_TREE: &[u8] = b"_fud_paths";
95const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
96const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
97
98pub struct Fud {
99 pub node_data: Arc<RwLock<VerifiableNodeData>>,
101 pub secret_key: Arc<RwLock<SecretKey>>,
103 geode: Geode,
105 downloads_path: PathBuf,
107 chunk_timeout: u64,
109 pub pow: Arc<RwLock<FudPow>>,
111 dht: Arc<Dht<Fud>>,
113 resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
115 path_tree: sled::Tree,
117 file_selection_tree: sled::Tree,
121 scrap_tree: sled::Tree,
128 get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
130 get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
132 put_tx: channel::Sender<PathBuf>,
134 put_rx: channel::Receiver<PathBuf>,
136 lookup_tx: channel::Sender<(blake3::Hash, PublisherPtr<Option<Vec<FudSeeder>>>)>,
138 lookup_rx: channel::Receiver<(blake3::Hash, PublisherPtr<Option<Vec<FudSeeder>>>)>,
140 fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
142 put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
144 lookup_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
146 tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
148 event_publisher: PublisherPtr<FudEvent>,
150 p2p: P2pPtr,
152 pub executor: ExecutorPtr,
154}
155
156impl Fud {
157 pub async fn new(
158 settings: Args,
159 p2p: P2pPtr,
160 sled_db: &sled::Db,
161 event_publisher: PublisherPtr<FudEvent>,
162 executor: ExecutorPtr,
163 ) -> Result<Arc<Self>> {
164 let basedir = expand_path(&settings.base_dir)?;
165 let downloads_path = match settings.downloads_path {
166 Some(downloads_path) => expand_path(&downloads_path)?,
167 None => basedir.join("downloads"),
168 };
169
170 let mut pow = FudPow::new(settings.pow.into(), executor.clone());
172 pow.bitcoin_hash_cache.update().await?; let (node_data, secret_key) = pow.generate_node().await?;
174 info!(target: "fud::new()", "Your node ID: {}", hash_to_string(&node_data.id()));
175
176 info!(target: "fud::new()", "Instantiating Geode instance");
178 let geode = Geode::new(&basedir).await?;
179
180 let dht_settings: DhtSettings = settings.dht.into();
182 let dht: Arc<Dht<Fud>> =
183 Arc::new(Dht::<Fud>::new(&dht_settings, p2p.clone(), executor.clone()).await);
184
185 let (get_tx, get_rx) = smol::channel::unbounded();
186 let (put_tx, put_rx) = smol::channel::unbounded();
187 let (lookup_tx, lookup_rx) = smol::channel::unbounded();
188 let fud = Arc::new(Self {
189 node_data: Arc::new(RwLock::new(node_data)),
190 secret_key: Arc::new(RwLock::new(secret_key)),
191 geode,
192 downloads_path,
193 chunk_timeout: settings.chunk_timeout,
194 pow: Arc::new(RwLock::new(pow)),
195 dht: dht.clone(),
196 path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
197 file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
198 scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
199 resources: Arc::new(RwLock::new(HashMap::new())),
200 get_tx,
201 get_rx,
202 put_tx,
203 put_rx,
204 lookup_tx,
205 lookup_rx,
206 fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
207 put_tasks: Arc::new(RwLock::new(HashMap::new())),
208 lookup_tasks: Arc::new(RwLock::new(HashMap::new())),
209 tasks: Arc::new(RwLock::new(HashMap::new())),
210 event_publisher,
211 p2p,
212 executor,
213 });
214 *dht.handler.write().await = Arc::downgrade(&fud);
215
216 Ok(fud)
217 }
218
219 pub async fn start_tasks(self: &Arc<Self>) {
220 let mut tasks = self.tasks.write().await;
221 start_task!(self, "get", tasks::get_task, tasks);
222 start_task!(self, "put", tasks::put_task, tasks);
223 start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud>, tasks);
224 start_task!(self, "lookup", tasks::lookup_task, tasks);
225 start_task!(self, "announce", tasks::announce_seed_task, tasks);
226 start_task!(self, "node ID", tasks::node_id_task, tasks);
227 }
228
229 async fn init(&self) -> Result<()> {
233 info!(target: "fud::init()", "Bootstrapping the DHT...");
234 self.dht.bootstrap().await;
235
236 info!(target: "fud::init()", "Finding resources...");
237 let mut resources_write = self.resources.write().await;
238 for result in self.path_tree.iter() {
239 if result.is_err() {
240 continue;
241 }
242
243 let (hash, path) = result.unwrap();
245 let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
246 Ok(v) => v,
247 Err(_) => continue,
248 };
249 let hash = blake3::Hash::from_bytes(hash_bytes);
250
251 let path_bytes = path.to_vec();
253 let path_str = match std::str::from_utf8(&path_bytes) {
254 Ok(v) => v,
255 Err(_) => continue,
256 };
257 let path: PathBuf = match expand_path(path_str) {
258 Ok(v) => v,
259 Err(_) => continue,
260 };
261
262 let mut file_selection = FileSelection::All;
264 if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
265 if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
266 file_selection = FileSelection::Set(
267 path_list
268 .into_iter()
269 .filter_map(|bytes| {
270 std::str::from_utf8(&bytes)
271 .ok()
272 .and_then(|path_str| expand_path(path_str).ok())
273 })
274 .collect(),
275 );
276 }
277 }
278
279 resources_write.insert(
281 hash,
282 Resource::new(
283 hash,
284 ResourceType::Unknown,
285 &path,
286 ResourceStatus::Incomplete,
287 file_selection,
288 ),
289 );
290 }
291 drop(resources_write);
292
293 info!(target: "fud::init()", "Verifying resources...");
294 let resources = self.verify_resources(None).await?;
295
296 let self_node = self.node().await;
297
298 if self_node.addresses.is_empty() {
300 return Ok(());
301 }
302
303 for resource in &resources {
305 let self_router_items = vec![self.new_seeder(&resource.hash).await];
306 self.add_value(&resource.hash, &self_router_items).await;
307 }
308
309 info!(target: "fud::init()", "Announcing resources...");
310 for resource in resources {
311 let seeders = vec![self.new_seeder(&resource.hash).await];
312 let _ = self
313 .dht
314 .announce(
315 &resource.hash,
316 &seeders.clone(),
317 &FudAnnounce { key: resource.hash, seeders },
318 )
319 .await;
320 }
321
322 Ok(())
323 }
324
325 pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
327 if let Some(value) = self.path_tree.get(hash.as_bytes())? {
328 let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
329 return Ok(Some(path));
330 }
331
332 Ok(None)
333 }
334
335 pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
337 let path_string = path.to_string_lossy().to_string();
338 let path_bytes = path_string.as_bytes();
339 for path_item in self.path_tree.iter() {
340 let (key, value) = path_item?;
341 if value == path_bytes {
342 let bytes: &[u8] = &key;
343 if bytes.len() != 32 {
344 return Err(Error::Custom(format!(
345 "Expected a 32-byte BLAKE3, got {} bytes",
346 bytes.len()
347 )));
348 }
349
350 let array: [u8; 32] = bytes.try_into().unwrap();
351 return Ok(Some(array.into()))
352 }
353 }
354
355 Ok(None)
356 }
357
358 pub async fn new_seeder(&self, key: &blake3::Hash) -> FudSeeder {
360 FudSeeder {
361 key: *key,
362 node: self.node().await,
363 timestamp: Timestamp::current_time().inner(),
364 }
365 }
366
367 pub async fn verify_resources(
374 &self,
375 hashes: Option<Vec<blake3::Hash>>,
376 ) -> Result<Vec<Resource>> {
377 let mut resources_write = self.resources.write().await;
378
379 let update_resource = async |resource: &mut Resource,
380 status: ResourceStatus,
381 chunked: Option<&ChunkedStorage>,
382 total_bytes_downloaded: u64,
383 target_bytes_downloaded: u64| {
384 let files = match chunked {
385 Some(chunked) => resource.get_selected_files(chunked),
386 None => vec![],
387 };
388 let chunk_hashes = match chunked {
389 Some(chunked) => resource.get_selected_chunks(chunked),
390 None => HashSet::new(),
391 };
392
393 if let Some(chunked) = chunked {
394 resource.rtype = match chunked.is_dir() {
395 false => ResourceType::File,
396 true => ResourceType::Directory,
397 };
398 }
399
400 resource.status = status;
401 resource.total_chunks_count = match chunked {
402 Some(chunked) => chunked.len() as u64,
403 None => 0,
404 };
405 resource.target_chunks_count = chunk_hashes.len() as u64;
406 resource.total_chunks_downloaded = match chunked {
407 Some(chunked) => chunked.local_chunks() as u64,
408 None => 0,
409 };
410 resource.target_chunks_downloaded = match chunked {
411 Some(chunked) => chunked
412 .iter()
413 .filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
414 .count() as u64,
415 None => 0,
416 };
417
418 resource.total_bytes_size = match chunked {
419 Some(chunked) => chunked.get_fileseq().len(),
420 None => 0,
421 };
422 resource.target_bytes_size = match chunked {
423 Some(chunked) => chunked
424 .get_files()
425 .iter()
426 .filter(|(path, _)| files.contains(path))
427 .map(|(_, size)| size)
428 .sum(),
429 None => 0,
430 };
431
432 resource.total_bytes_downloaded = total_bytes_downloaded;
433 resource.target_bytes_downloaded = target_bytes_downloaded;
434
435 notify_event!(self, ResourceUpdated, resource);
436 };
437
438 let mut seeding_resources: Vec<Resource> = vec![];
439 for (_, mut resource) in resources_write.iter_mut() {
440 if let Some(ref hashes_list) = hashes {
441 if !hashes_list.contains(&resource.hash) {
442 continue;
443 }
444 }
445
446 match resource.status {
447 ResourceStatus::Seeding => {}
448 ResourceStatus::Incomplete => {}
449 _ => continue,
450 };
451
452 let resource_path = match self.hash_to_path(&resource.hash) {
454 Ok(Some(v)) => v,
455 Ok(None) | Err(_) => {
456 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
457 continue;
458 }
459 };
460 let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
461 Ok(v) => v,
462 Err(_) => {
463 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
464 continue;
465 }
466 };
467 let verify_res = self.verify_chunks(resource, &mut chunked).await;
468 if let Err(e) = verify_res {
469 error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
470 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
471 continue;
472 }
473 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
474
475 if !chunked.is_complete() {
476 update_resource(
477 &mut resource,
478 ResourceStatus::Incomplete,
479 Some(&chunked),
480 total_bytes_downloaded,
481 target_bytes_downloaded,
482 )
483 .await;
484 continue;
485 }
486
487 update_resource(
488 &mut resource,
489 ResourceStatus::Seeding,
490 Some(&chunked),
491 total_bytes_downloaded,
492 target_bytes_downloaded,
493 )
494 .await;
495 seeding_resources.push(resource.clone());
496 }
497
498 Ok(seeding_resources)
499 }
500
501 pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
506 let fetch_tasks = self.fetch_tasks.read().await;
507 if fetch_tasks.contains_key(hash) {
508 return Err(Error::Custom(format!(
509 "Resource {} is already being downloaded",
510 hash_to_string(hash)
511 )))
512 }
513 drop(fetch_tasks);
514
515 self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
516
517 Ok(())
518 }
519
520 pub async fn get_metadata(
527 &self,
528 hash: &blake3::Hash,
529 path: &Path,
530 seeders_pub: PublisherPtr<Option<Vec<FudSeeder>>>,
531 ) -> Result<(ChunkedStorage, Option<FudSeeder>)> {
532 match self.geode.get(hash, path).await {
533 Ok(v) => Ok((v, None)),
535 Err(Error::GeodeNeedsGc) => todo!(),
537 Err(Error::GeodeFileNotFound) => {
539 info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
541 let metadata_sub = seeders_pub.clone().subscribe().await;
542 self.lookup_tx.send((*hash, seeders_pub.clone())).await?;
543
544 let fetch_res = fetch_metadata(self, hash, &metadata_sub, path).await;
546 metadata_sub.unsubscribe().await;
547 let seeder = fetch_res?;
548 Ok((self.geode.get(hash, path).await?, Some(seeder)))
549 }
550 Err(e) => Err(e),
551 }
552 }
553
554 pub async fn fetch_resource(
557 &self,
558 hash: &blake3::Hash,
559 path: &Path,
560 files: &FileSelection,
561 ) -> Result<()> {
562 let hash_bytes = hash.as_bytes();
563 let path_string = path.to_string_lossy().to_string();
564 let path_bytes = path_string.as_bytes();
565
566 macro_rules! update_resource {
569 ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
570 let mut resources_write = self.resources.write().await;
571 let resource = match resources_write.get_mut($hash) {
572 Some(resource) => {
573 $(resource.$field = $value;)* resource.clone()
575 }
576 None => return Ok(()), };
578 resource
579 }};
580 }
581
582 if let Ok(Some(hash_found)) = self.path_to_hash(path) {
584 if *hash != hash_found {
585 return Err(Error::Custom(format!(
586 "There is already another resource on path {path_string}"
587 )))
588 }
589 }
590
591 self.path_tree.insert(hash_bytes, path_bytes)?;
593
594 if let FileSelection::Set(selected_files) = files {
596 let paths: Vec<Vec<u8>> = selected_files
597 .iter()
598 .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
599 .collect();
600 let serialized_paths = serialize_async(&paths).await;
601 if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
603 return Err(Error::SledError(e))
604 }
605 }
606
607 let resource = Resource::new(
609 *hash,
610 ResourceType::Unknown,
611 path,
612 ResourceStatus::Discovering,
613 files.clone(),
614 );
615 let mut resources_write = self.resources.write().await;
616 resources_write.insert(*hash, resource.clone());
617 drop(resources_write);
618
619 notify_event!(self, DownloadStarted, resource);
621
622 let seeders_pub = Publisher::new();
623 let seeders_sub = seeders_pub.clone().subscribe().await;
624
625 let metadata_result = self.get_metadata(hash, path, seeders_pub.clone()).await;
627
628 if let Err(e) = metadata_result {
629 let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
631 notify_event!(self, MetadataNotFound, resource);
632 return Err(e)
633 }
634 let (mut chunked, metadata_seeder) = metadata_result.unwrap();
635
636 let resources_read = self.resources.read().await;
638 let resource = match resources_read.get(hash) {
639 Some(resource) => resource,
640 None => return Ok(()), };
642 let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked);
643 drop(resources_read);
644
645 create_all_files(&files_vec).await?;
647
648 let resource = update_resource!(hash, {
650 status = ResourceStatus::Verifying,
651 total_chunks_count = chunked.len() as u64,
652 total_bytes_size = chunked.get_fileseq().len(),
653 rtype = match chunked.is_dir() {
654 false => ResourceType::File,
655 true => ResourceType::Directory,
656 },
657 });
658 notify_event!(self, MetadataDownloadCompleted, resource);
659
660 let chunk_hashes = resource.get_selected_chunks(&chunked);
662
663 self.write_scraps(&mut chunked, &chunk_hashes).await?;
665
666 let verify_res = self.verify_chunks(&resource, &mut chunked).await;
668 if let Err(e) = verify_res {
669 error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
670 return Err(e);
671 }
672 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
673
674 if let ResourceType::File = resource.rtype {
676 update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
677 notify_event!(self, ResourceUpdated, resource);
678 }
679
680 if !chunked.is_dir() {
685 let fs_metadata = fs::metadata(&path).await?;
686 if fs_metadata.len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
687 if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
688 let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
689 }
690 }
691 }
692
693 let chunks: HashSet<(blake3::Hash, bool)> =
695 chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
696
697 let mut missing_chunks: HashSet<blake3::Hash> =
699 chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
700
701 update_resource!(hash, {
703 target_chunks_count = chunks.len() as u64,
704 total_chunks_downloaded = chunked.local_chunks() as u64,
705 target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
706
707 target_bytes_size =
708 chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
709 total_bytes_downloaded = total_bytes_downloaded,
710 target_bytes_downloaded = target_bytes_downloaded,
711 });
712
713 let download_completed = async |chunked: &ChunkedStorage| -> Result<()> {
714 let resource = update_resource!(hash, {
716 status = match chunked.is_complete() {
717 true => ResourceStatus::Seeding,
718 false => ResourceStatus::Incomplete,
719 },
720 target_chunks_downloaded = chunks.len() as u64,
721 total_chunks_downloaded = chunked.local_chunks() as u64,
722 });
723
724 if chunked.is_complete() {
726 let seeders = vec![self.new_seeder(hash).await];
727 let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() };
728 let _ = self.dht.announce(hash, &seeders, &self_announce).await;
729 }
730
731 notify_event!(self, DownloadCompleted, resource);
733
734 Ok(())
735 };
736
737 if missing_chunks.is_empty() {
739 return download_completed(&chunked).await;
740 }
741
742 let resource = update_resource!(hash, {
744 status = ResourceStatus::Downloading,
745 });
746 notify_event!(self, MetadataDownloadCompleted, resource);
747
748 if metadata_seeder.is_none() {
750 self.lookup_tx.send((*hash, seeders_pub)).await?;
751 }
752
753 let _ = fetch_chunks(
755 self,
756 hash,
757 &mut chunked,
758 &seeders_sub,
759 metadata_seeder,
760 &mut missing_chunks,
761 )
762 .await;
763
764 let mut chunked = self.geode.get(hash, path).await?;
766
767 let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
769 notify_event!(self, ResourceUpdated, resource);
770
771 self.verify_chunks(&resource, &mut chunked).await?;
773
774 let is_complete = chunked
775 .iter()
776 .filter(|(hash, _)| chunk_hashes.contains(hash))
777 .all(|(_, available)| *available);
778
779 if !is_complete {
782 let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
784
785 notify_event!(self, MissingChunks, resource);
787
788 return Ok(());
789 }
790
791 download_completed(&chunked).await
792 }
793
794 async fn write_scraps(
795 &self,
796 chunked: &mut ChunkedStorage,
797 chunk_hashes: &HashSet<blake3::Hash>,
798 ) -> Result<()> {
799 let mut scraps = HashMap::new();
801 for chunk_hash in chunk_hashes {
803 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
804 if scrap.is_none() {
805 continue;
806 }
807
808 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
810 if scrap.is_err() {
811 continue;
812 }
813 let scrap: Scrap = scrap.unwrap();
814
815 scraps.insert(chunk_hash, scrap);
817 }
818
819 if !scraps.is_empty() {
821 info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
822 }
823 for (scrap_hash, mut scrap) in scraps {
824 let len = scrap.chunk.len();
825 let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
826 if let Err(e) = write_res {
827 error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
828 continue;
829 }
830 let (_, chunk_bytes_written) = write_res.unwrap();
831
832 if chunk_bytes_written == len {
834 self.scrap_tree.remove(scrap_hash.as_bytes())?;
835 continue;
836 }
837 let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
839 if let Err(e) = chunk_res {
840 error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
841 continue;
842 }
843 scrap.hash_written = blake3::hash(&chunk_res.unwrap());
844 if let Err(e) =
845 self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
846 {
847 error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
848 }
849 }
850
851 Ok(())
852 }
853
854 pub async fn verify_chunks(
861 &self,
862 resource: &Resource,
863 chunked: &mut ChunkedStorage,
864 ) -> Result<(u64, u64)> {
865 let chunks = chunked.get_chunks().clone();
866 let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
867
868 for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
870 let chunk =
872 match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
873 Ok(c) => c,
874 Err(Error::Io(ErrorKind::NotFound)) => continue,
875 Err(e) => {
876 warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
877 break
878 }
879 };
880
881 if self.geode.verify_chunk(chunk_hash, &chunk) {
883 chunked.get_chunk_mut(chunk_index).1 = true;
884 bytes.insert(
885 *chunk_hash,
886 (chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
887 );
888 }
889 }
890
891 let chunks = chunked.get_chunks().clone();
893 let missing_on_fs: Vec<_> =
894 chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
895
896 for (chunk_index, (chunk_hash, _)) in missing_on_fs {
898 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
899 if scrap.is_none() {
900 continue;
901 }
902
903 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
905 if scrap.is_err() {
906 continue;
907 }
908 let scrap: Scrap = scrap.unwrap();
909 if blake3::hash(&scrap.chunk) != *chunk_hash {
910 continue;
911 }
912
913 let scrap_chunk =
915 self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
916 if scrap_chunk.is_err() {
917 continue;
918 }
919 let scrap_chunk = scrap_chunk.unwrap();
920
921 if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
923 continue;
924 }
925
926 chunked.get_chunk_mut(chunk_index).1 = true;
928
929 bytes.insert(
931 *chunk_hash,
932 (scrap.chunk.len(), resource.get_selected_bytes(chunked, &scrap.chunk)),
933 );
934 }
935
936 if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
940 if !chunked.is_dir() && *last_chunk_available {
941 if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
942 let exact_file_size =
943 chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
944 chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
945 }
946 }
947 }
948
949 let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
950 let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
951
952 Ok((total_bytes_downloaded, target_bytes_downloaded))
953 }
954
955 pub async fn put(&self, path: &Path) -> Result<()> {
957 let put_tasks = self.put_tasks.read().await;
958 drop(put_tasks);
959
960 self.put_tx.send(path.to_path_buf()).await?;
961
962 Ok(())
963 }
964
965 pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
968 let self_node = self.node().await;
969
970 if self_node.addresses.is_empty() {
971 return Err(Error::Custom(
972 "Cannot put resource, you don't have any external address".to_string(),
973 ))
974 }
975
976 let metadata = fs::metadata(path).await?;
977
978 let (files, resource_type) = if metadata.is_file() {
980 (vec![(path.clone(), metadata.len())], ResourceType::File)
981 } else if metadata.is_dir() {
982 let mut files = get_all_files(path).await?;
983 self.geode.sort_files(&mut files);
984 (files, ResourceType::Directory)
985 } else {
986 return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
987 };
988
989 let stream = FileSequence::new(&files, false);
991 let total_size = stream.len();
992 let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
993
994 let relative_files = if let ResourceType::Directory = resource_type {
996 let relative_files = files
998 .into_iter()
999 .map(|(file_path, size)| match file_path.strip_prefix(path) {
1000 Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1001 Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1002 })
1003 .collect::<Result<Vec<_>>>()?;
1004
1005 self.geode.hash_files_metadata(&mut hasher, &relative_files);
1007
1008 relative_files
1009 } else {
1010 vec![]
1011 };
1012
1013 let hash = hasher.finalize();
1015
1016 if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1018 error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1019 return Err(e)
1020 }
1021
1022 if let Err(e) =
1024 self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1025 {
1026 error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1027 return Err(e.into())
1028 }
1029
1030 let mut resources_write = self.resources.write().await;
1032 resources_write.insert(
1033 hash,
1034 Resource {
1035 hash,
1036 rtype: resource_type,
1037 path: path.to_path_buf(),
1038 status: ResourceStatus::Seeding,
1039 file_selection: FileSelection::All,
1040 total_chunks_count: chunk_hashes.len() as u64,
1041 target_chunks_count: chunk_hashes.len() as u64,
1042 total_chunks_downloaded: chunk_hashes.len() as u64,
1043 target_chunks_downloaded: chunk_hashes.len() as u64,
1044 total_bytes_size: total_size,
1045 target_bytes_size: total_size,
1046 total_bytes_downloaded: total_size,
1047 target_bytes_downloaded: total_size,
1048 speeds: vec![],
1049 },
1050 );
1051 drop(resources_write);
1052
1053 let seeders = vec![self.new_seeder(&hash).await];
1055 let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() };
1056 let _ = self.dht.announce(&hash, &seeders, &fud_announce).await;
1057
1058 notify_event!(self, InsertCompleted, {
1060 hash,
1061 path: path.to_path_buf()
1062 });
1063
1064 Ok(())
1065 }
1066
1067 pub async fn remove(&self, hash: &blake3::Hash) {
1076 let mut resources_write = self.resources.write().await;
1078 resources_write.remove(hash);
1079 drop(resources_write);
1080
1081 if let Ok(Some(path)) = self.hash_to_path(hash) {
1083 let chunked = self.geode.get(hash, &path).await;
1084
1085 if let Ok(chunked) = chunked {
1086 for (chunk_hash, _) in chunked.iter() {
1087 let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
1088 }
1089 }
1090 }
1091
1092 let hash_str = hash_to_string(hash);
1094 let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1095 let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1096
1097 let _ = self.path_tree.remove(hash.as_bytes());
1099
1100 let _ = self.file_selection_tree.remove(hash.as_bytes());
1102
1103 notify_event!(self, ResourceRemoved, { hash: *hash });
1105 }
1106
1107 pub async fn prune_seeders(&self, expiry_secs: u32) {
1109 let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
1110 let mut seeders_write = self.dht.hash_table.write().await;
1111
1112 let keys: Vec<_> = seeders_write.keys().cloned().collect();
1113
1114 for key in keys {
1115 let items = seeders_write.get_mut(&key).unwrap();
1116 items.retain(|item| item.timestamp > expiry_timestamp);
1117 if items.is_empty() {
1118 seeders_write.remove(&key);
1119 }
1120 }
1121 }
1122
1123 pub async fn stop(&self) {
1125 info!("Stopping fetch tasks...");
1126 let fetch_tasks = self.fetch_tasks.read().await;
1128 let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1129 fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1130 drop(fetch_tasks);
1131
1132 for task in cloned_fetch_tasks.values() {
1133 task.stop().await;
1134 }
1135
1136 info!("Stopping put tasks...");
1137 let put_tasks = self.put_tasks.read().await;
1138 let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1139 put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1140 drop(put_tasks);
1141
1142 for task in cloned_put_tasks.values() {
1143 task.stop().await;
1144 }
1145
1146 info!("Stopping lookup tasks...");
1147 let lookup_tasks = self.lookup_tasks.read().await;
1148 let cloned_lookup_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1149 lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1150 drop(lookup_tasks);
1151
1152 for task in cloned_lookup_tasks.values() {
1153 task.stop().await;
1154 }
1155
1156 let mut tasks = self.tasks.write().await;
1158 for (name, task) in tasks.clone() {
1159 info!("Stopping {name} task...");
1160 task.stop().await;
1161 }
1162 *tasks = HashMap::new();
1163 }
1164}