1use std::{
20 collections::{HashMap, HashSet},
21 io::ErrorKind,
22 path::{Path, PathBuf},
23 sync::Arc,
24};
25
26use sled_overlay::sled;
27use smol::{
28 channel,
29 fs::{self, OpenOptions},
30 lock::RwLock,
31};
32use tracing::{error, info, warn};
33
34use darkfi::{
35 dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings},
36 geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
37 net::P2pPtr,
38 system::{ExecutorPtr, PublisherPtr, StoppableTask},
39 util::{path::expand_path, time::Timestamp},
40 Error, Result,
41};
42use darkfi_sdk::crypto::{schnorr::SchnorrSecret, SecretKey};
43use darkfi_serial::{deserialize_async, serialize_async};
44
45pub mod proto;
47use proto::FudAnnounce;
48
49pub mod event;
51use event::{notify_event, FudEvent};
52
53pub mod resource;
55use resource::{Resource, ResourceStatus, ResourceType};
56
57pub mod scrap;
59use scrap::Scrap;
60
61pub mod rpc;
63
64pub mod tasks;
66use tasks::start_task;
67
68pub mod bitcoin;
70
71pub mod pow;
73use pow::{FudPow, VerifiableNodeData};
74
75pub mod equix;
77
78pub mod settings;
80use settings::Args;
81
82pub mod util;
84use util::{create_all_files, get_all_files, FileSelection};
85
86mod download;
88use download::{fetch_chunks, fetch_metadata};
89
90pub mod dht;
92use dht::FudSeeder;
93
94use crate::{dht::FudNode, pow::PowSettings};
95
96const SLED_PATH_TREE: &[u8] = b"_fud_paths";
97const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
98const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
99
100#[derive(Clone, Debug)]
101pub struct FudState {
102 node_data: VerifiableNodeData,
104 secret_key: SecretKey,
106}
107
108pub struct Fud {
109 state: Arc<RwLock<Option<FudState>>>,
110 geode: Geode,
112 downloads_path: PathBuf,
114 chunk_timeout: u64,
116 pub pow: Arc<RwLock<FudPow>>,
118 dht: Arc<Dht<Fud>>,
120 resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
122 path_tree: sled::Tree,
124 file_selection_tree: sled::Tree,
128 scrap_tree: sled::Tree,
135 get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
137 get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
139 put_tx: channel::Sender<PathBuf>,
141 put_rx: channel::Receiver<PathBuf>,
143 lookup_tx: channel::Sender<blake3::Hash>,
145 lookup_rx: channel::Receiver<blake3::Hash>,
147 verify_node_tx: channel::Sender<FudNode>,
149 verify_node_rx: channel::Receiver<FudNode>,
151 fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
153 put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
155 lookup_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
157 tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
159 event_publisher: PublisherPtr<FudEvent>,
161 p2p: P2pPtr,
163 pub executor: ExecutorPtr,
165}
166
167impl Fud {
168 pub async fn new(
169 settings: Args,
170 p2p: P2pPtr,
171 sled_db: &sled::Db,
172 event_publisher: PublisherPtr<FudEvent>,
173 executor: ExecutorPtr,
174 ) -> Result<Arc<Self>> {
175 let dht_settings: DhtSettings = settings.dht.into();
176 let net_settings_lock = p2p.settings();
177 let mut net_settings = net_settings_lock.write().await;
178 net_settings.outbound_connections = 0;
180 net_settings.getaddrs_max =
182 Some(net_settings.getaddrs_max.unwrap_or(dht_settings.k.min(u32::MAX as usize) as u32));
183 drop(net_settings);
184
185 let basedir = expand_path(&settings.base_dir)?;
186 let downloads_path = match settings.downloads_path {
187 Some(downloads_path) => expand_path(&downloads_path)?,
188 None => basedir.join("downloads"),
189 };
190
191 let pow_settings: PowSettings = settings.pow.into();
192 let pow = FudPow::new(pow_settings.clone(), executor.clone());
193
194 info!(target: "fud::new()", "Instantiating Geode instance");
196 let geode = Geode::new(&basedir).await?;
197
198 let dht: Arc<Dht<Fud>> =
200 Arc::new(Dht::<Fud>::new(&dht_settings, p2p.clone(), executor.clone()).await);
201
202 let (get_tx, get_rx) = smol::channel::unbounded();
203 let (put_tx, put_rx) = smol::channel::unbounded();
204 let (lookup_tx, lookup_rx) = smol::channel::unbounded();
205 let (verify_node_tx, verify_node_rx) = smol::channel::unbounded();
206 let fud = Arc::new(Self {
207 state: Arc::new(RwLock::new(None)),
208 geode,
209 downloads_path,
210 chunk_timeout: settings.chunk_timeout,
211 pow: Arc::new(RwLock::new(pow)),
212 dht: dht.clone(),
213 path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
214 file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
215 scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
216 resources: Arc::new(RwLock::new(HashMap::new())),
217 get_tx,
218 get_rx,
219 put_tx,
220 put_rx,
221 lookup_tx,
222 lookup_rx,
223 verify_node_tx,
224 verify_node_rx,
225 fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
226 put_tasks: Arc::new(RwLock::new(HashMap::new())),
227 lookup_tasks: Arc::new(RwLock::new(HashMap::new())),
228 tasks: Arc::new(RwLock::new(HashMap::new())),
229 event_publisher,
230 p2p,
231 executor,
232 });
233 *dht.handler.write().await = Arc::downgrade(&fud);
234
235 Ok(fud)
236 }
237
238 pub async fn start(self: &Arc<Self>) -> Result<()> {
240 let mut pow = self.pow.write().await;
241 if pow.settings.read().await.btc_enabled {
242 pow.bitcoin_hash_cache.update().await?; }
244 let (node_data, secret_key) = pow.generate_node().await?;
245 info!(target: "fud::init()", "Your node ID: {}", hash_to_string(&node_data.id()));
246 let mut state = self.state.write().await;
247 *state = Some(FudState { node_data, secret_key });
248 drop(state);
249 drop(pow);
250
251 self.start_tasks().await;
252
253 Ok(())
254 }
255
256 async fn start_tasks(self: &Arc<Self>) {
257 let mut tasks = self.tasks.write().await;
258 start_task!(self, "get", tasks::get_task, tasks);
259 start_task!(self, "put", tasks::put_task, tasks);
260 start_task!(self, "events", tasks::handle_dht_events, tasks);
261 start_task!(self, "DHT events", dht_tasks::events_task::<Fud>, tasks);
262 start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud>, tasks);
263 start_task!(self, "DHT cleanup channels", dht_tasks::cleanup_channels_task::<Fud>, tasks);
264 start_task!(self, "DHT add node", dht_tasks::add_node_task::<Fud>, tasks);
265 start_task!(self, "DHT refinery", dht_tasks::dht_refinery_task::<Fud>, tasks);
266 start_task!(
267 self,
268 "DHT disconnect inbounds",
269 dht_tasks::disconnect_inbounds_task::<Fud>,
270 tasks
271 );
272 start_task!(self, "lookup", tasks::lookup_task, tasks);
273 start_task!(self, "verify node", tasks::verify_node_task, tasks);
274 start_task!(self, "announce", tasks::announce_seed_task, tasks);
275 start_task!(self, "node ID", tasks::node_id_task, tasks);
276 }
277
278 async fn init(&self) -> Result<()> {
281 info!(target: "fud::init()", "Finding resources...");
282 let mut resources_write = self.resources.write().await;
283 for result in self.path_tree.iter() {
284 if result.is_err() {
285 continue;
286 }
287
288 let (hash, path) = result.unwrap();
290 let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
291 Ok(v) => v,
292 Err(_) => continue,
293 };
294 let hash = blake3::Hash::from_bytes(hash_bytes);
295
296 let path_bytes = path.to_vec();
298 let path_str = match std::str::from_utf8(&path_bytes) {
299 Ok(v) => v,
300 Err(_) => continue,
301 };
302 let path: PathBuf = match expand_path(path_str) {
303 Ok(v) => v,
304 Err(_) => continue,
305 };
306
307 let mut file_selection = FileSelection::All;
309 if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
310 if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
311 file_selection = FileSelection::Set(
312 path_list
313 .into_iter()
314 .filter_map(|bytes| {
315 std::str::from_utf8(&bytes)
316 .ok()
317 .and_then(|path_str| expand_path(path_str).ok())
318 })
319 .collect(),
320 );
321 }
322 }
323
324 resources_write.insert(
326 hash,
327 Resource::new(
328 hash,
329 ResourceType::Unknown,
330 &path,
331 ResourceStatus::Incomplete,
332 file_selection,
333 ),
334 );
335 }
336 drop(resources_write);
337
338 info!(target: "fud::init()", "Verifying resources...");
339 let resources = self.verify_resources(None).await?;
340
341 let self_node = self.node().await?;
342
343 if self_node.addresses.is_empty() {
345 return Ok(());
346 }
347
348 for resource in &resources {
350 if let Ok(seeder) = self.new_seeder(&resource.hash).await {
351 let self_router_items = vec![seeder];
352 self.add_value(&resource.hash, &self_router_items).await;
353 }
354 }
355
356 info!(target: "fud::init()", "Announcing resources...");
357 for resource in resources {
358 if let Ok(seeder) = self.new_seeder(&resource.hash).await {
359 let seeders = vec![seeder];
360 let _ = self
361 .dht
362 .announce(
363 &resource.hash,
364 &seeders.clone(),
365 &FudAnnounce { key: resource.hash, seeders },
366 )
367 .await;
368 }
369 }
370
371 Ok(())
372 }
373
374 pub async fn resources(&self) -> HashMap<blake3::Hash, Resource> {
376 let resources = self.resources.read().await;
377 resources.clone()
378 }
379
380 pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
382 if let Some(value) = self.path_tree.get(hash.as_bytes())? {
383 let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
384 return Ok(Some(path));
385 }
386
387 Ok(None)
388 }
389
390 pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
392 let path_string = path.to_string_lossy().to_string();
393 let path_bytes = path_string.as_bytes();
394 for path_item in self.path_tree.iter() {
395 let (key, value) = path_item?;
396 if value == path_bytes {
397 let bytes: &[u8] = &key;
398 if bytes.len() != 32 {
399 return Err(Error::Custom(format!(
400 "Expected a 32-byte BLAKE3, got {} bytes",
401 bytes.len()
402 )));
403 }
404
405 let array: [u8; 32] = bytes.try_into().unwrap();
406 return Ok(Some(array.into()))
407 }
408 }
409
410 Ok(None)
411 }
412
413 pub async fn new_seeder(&self, key: &blake3::Hash) -> Result<FudSeeder> {
415 let state = self.state.read().await;
416 if state.is_none() {
417 return Err(Error::Custom("Fud is not ready yet".to_string()))
418 }
419 let state_ = state.clone().unwrap();
420 drop(state);
421 let node = self.node().await?;
422
423 Ok(FudSeeder {
424 key: *key,
425 node: node.clone(),
426 sig: state_
427 .secret_key
428 .sign(&[key.as_bytes().to_vec(), serialize_async(&node).await].concat()),
429 timestamp: Timestamp::current_time().inner(),
430 })
431 }
432
433 pub async fn verify_resources(
440 &self,
441 hashes: Option<Vec<blake3::Hash>>,
442 ) -> Result<Vec<Resource>> {
443 let mut resources_write = self.resources.write().await;
444
445 let update_resource = async |resource: &mut Resource,
446 status: ResourceStatus,
447 chunked: Option<&ChunkedStorage>,
448 total_bytes_downloaded: u64,
449 target_bytes_downloaded: u64| {
450 let files = match chunked {
451 Some(chunked) => resource.get_selected_files(chunked),
452 None => vec![],
453 };
454 let chunk_hashes = match chunked {
455 Some(chunked) => resource.get_selected_chunks(chunked),
456 None => HashSet::new(),
457 };
458
459 if let Some(chunked) = chunked {
460 resource.rtype = match chunked.is_dir() {
461 false => ResourceType::File,
462 true => ResourceType::Directory,
463 };
464 }
465
466 resource.status = status;
467 resource.total_chunks_count = match chunked {
468 Some(chunked) => chunked.len() as u64,
469 None => 0,
470 };
471 resource.target_chunks_count = chunk_hashes.len() as u64;
472 resource.total_chunks_downloaded = match chunked {
473 Some(chunked) => chunked.local_chunks() as u64,
474 None => 0,
475 };
476 resource.target_chunks_downloaded = match chunked {
477 Some(chunked) => chunked
478 .iter()
479 .filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
480 .count() as u64,
481 None => 0,
482 };
483
484 resource.total_bytes_size = match chunked {
485 Some(chunked) => chunked.get_fileseq().len(),
486 None => 0,
487 };
488 resource.target_bytes_size = match chunked {
489 Some(chunked) => chunked
490 .get_files()
491 .iter()
492 .filter(|(path, _)| files.contains(path))
493 .map(|(_, size)| size)
494 .sum(),
495 None => 0,
496 };
497
498 resource.total_bytes_downloaded = total_bytes_downloaded;
499 resource.target_bytes_downloaded = target_bytes_downloaded;
500
501 notify_event!(self, ResourceUpdated, resource);
502 };
503
504 let mut seeding_resources: Vec<Resource> = vec![];
505 for (_, mut resource) in resources_write.iter_mut() {
506 if let Some(ref hashes_list) = hashes {
507 if !hashes_list.contains(&resource.hash) {
508 continue;
509 }
510 }
511
512 match resource.status {
513 ResourceStatus::Seeding => {}
514 ResourceStatus::Incomplete => {}
515 _ => continue,
516 };
517
518 let resource_path = match self.hash_to_path(&resource.hash) {
520 Ok(Some(v)) => v,
521 Ok(None) | Err(_) => {
522 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
523 continue;
524 }
525 };
526 let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
527 Ok(v) => v,
528 Err(_) => {
529 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
530 continue;
531 }
532 };
533 let verify_res = self.verify_chunks(resource, &mut chunked).await;
534 if let Err(e) = verify_res {
535 error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
536 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
537 continue;
538 }
539 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
540
541 if !chunked.is_complete() {
542 update_resource(
543 &mut resource,
544 ResourceStatus::Incomplete,
545 Some(&chunked),
546 total_bytes_downloaded,
547 target_bytes_downloaded,
548 )
549 .await;
550 continue;
551 }
552
553 update_resource(
554 &mut resource,
555 ResourceStatus::Seeding,
556 Some(&chunked),
557 total_bytes_downloaded,
558 target_bytes_downloaded,
559 )
560 .await;
561 seeding_resources.push(resource.clone());
562 }
563
564 Ok(seeding_resources)
565 }
566
567 pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
572 let fetch_tasks = self.fetch_tasks.read().await;
573 if fetch_tasks.contains_key(hash) {
574 return Err(Error::Custom(format!(
575 "Resource {} is already being downloaded",
576 hash_to_string(hash)
577 )))
578 }
579 drop(fetch_tasks);
580
581 self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
582
583 Ok(())
584 }
585
586 pub async fn get_metadata(
593 &self,
594 hash: &blake3::Hash,
595 path: &Path,
596 ) -> Result<(ChunkedStorage, Option<FudSeeder>)> {
597 match self.geode.get(hash, path).await {
598 Ok(v) => Ok((v, None)),
600 Err(Error::GeodeNeedsGc) => todo!(),
602 Err(Error::GeodeFileNotFound) => {
604 info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
606 let dht_sub = self.dht.subscribe().await;
607 if let Err(e) = self.lookup_tx.send(*hash).await {
608 dht_sub.unsubscribe().await;
609 return Err(e.into())
610 }
611
612 let fetch_res = fetch_metadata(self, hash, path, &dht_sub).await;
614 dht_sub.unsubscribe().await;
615 let seeder = fetch_res?;
616 Ok((self.geode.get(hash, path).await?, Some(seeder)))
617 }
618 Err(e) => Err(e),
619 }
620 }
621
622 pub async fn fetch_resource(
625 &self,
626 hash: &blake3::Hash,
627 path: &Path,
628 files: &FileSelection,
629 ) -> Result<()> {
630 let hash_bytes = hash.as_bytes();
631 let path_string = path.to_string_lossy().to_string();
632 let path_bytes = path_string.as_bytes();
633
634 macro_rules! update_resource {
637 ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
638 let mut resources_write = self.resources.write().await;
639 let resource = match resources_write.get_mut($hash) {
640 Some(resource) => {
641 $(resource.$field = $value;)* resource.clone()
643 }
644 None => return Ok(()), };
646 resource
647 }};
648 }
649
650 if let Ok(Some(hash_found)) = self.path_to_hash(path) {
652 if *hash != hash_found {
653 return Err(Error::Custom(format!(
654 "There is already another resource on path {path_string}"
655 )))
656 }
657 }
658
659 self.path_tree.insert(hash_bytes, path_bytes)?;
661
662 if let FileSelection::Set(selected_files) = files {
664 let paths: Vec<Vec<u8>> = selected_files
665 .iter()
666 .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
667 .collect();
668 let serialized_paths = serialize_async(&paths).await;
669 if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
671 return Err(Error::SledError(e))
672 }
673 }
674
675 let resource = Resource::new(
677 *hash,
678 ResourceType::Unknown,
679 path,
680 ResourceStatus::Discovering,
681 files.clone(),
682 );
683 let mut resources_write = self.resources.write().await;
684 resources_write.insert(*hash, resource.clone());
685 drop(resources_write);
686
687 let dht_sub = self.dht.subscribe().await;
689
690 notify_event!(self, DownloadStarted, resource);
692
693 let metadata_result = self.get_metadata(hash, path).await;
695
696 if let Err(e) = metadata_result {
697 let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
699 notify_event!(self, MetadataNotFound, resource);
700 dht_sub.unsubscribe().await;
701 return Err(e)
702 }
703 let (mut chunked, metadata_seeder) = metadata_result.unwrap();
704
705 let resources_read = self.resources.read().await;
707 let resource = match resources_read.get(hash) {
708 Some(resource) => resource,
709 None => {
710 dht_sub.unsubscribe().await;
712 return Ok(())
713 }
714 };
715 let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked);
716 drop(resources_read);
717
718 if let Err(e) = create_all_files(&files_vec).await {
720 dht_sub.unsubscribe().await;
721 return Err(e)
722 }
723
724 let resource = update_resource!(hash, {
726 status = ResourceStatus::Verifying,
727 total_chunks_count = chunked.len() as u64,
728 total_bytes_size = chunked.get_fileseq().len(),
729 rtype = match chunked.is_dir() {
730 false => ResourceType::File,
731 true => ResourceType::Directory,
732 },
733 });
734 notify_event!(self, MetadataDownloadCompleted, resource);
735
736 let chunk_hashes = resource.get_selected_chunks(&chunked);
738
739 if let Err(e) = self.write_scraps(&mut chunked, &chunk_hashes).await {
741 dht_sub.unsubscribe().await;
742 return Err(e)
743 }
744
745 let verify_res = self.verify_chunks(&resource, &mut chunked).await;
747 if let Err(e) = verify_res {
748 dht_sub.unsubscribe().await;
749 error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
750 return Err(e);
751 }
752 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
753
754 if let ResourceType::File = resource.rtype {
756 update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
757 notify_event!(self, ResourceUpdated, resource);
758 }
759
760 if !chunked.is_dir() {
765 let fs_metadata = fs::metadata(&path).await;
766 if let Err(e) = fs_metadata {
767 dht_sub.unsubscribe().await;
768 return Err(e.into());
769 }
770 if fs_metadata.unwrap().len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
771 if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
772 let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
773 }
774 }
775 }
776
777 let chunks: HashSet<(blake3::Hash, bool)> =
779 chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
780
781 let mut missing_chunks: HashSet<blake3::Hash> =
783 chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
784
785 update_resource!(hash, {
787 target_chunks_count = chunks.len() as u64,
788 total_chunks_downloaded = chunked.local_chunks() as u64,
789 target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
790
791 target_bytes_size =
792 chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
793 total_bytes_downloaded = total_bytes_downloaded,
794 target_bytes_downloaded = target_bytes_downloaded,
795 });
796
797 let download_completed = async |chunked: &ChunkedStorage| -> Result<()> {
798 let resource = update_resource!(hash, {
800 status = match chunked.is_complete() {
801 true => ResourceStatus::Seeding,
802 false => ResourceStatus::Incomplete,
803 },
804 target_chunks_downloaded = chunks.len() as u64,
805 total_chunks_downloaded = chunked.local_chunks() as u64,
806 });
807
808 if chunked.is_complete() {
810 if let Ok(seeder) = self.new_seeder(hash).await {
811 let seeders = vec![seeder];
812 let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() };
813 let _ = self.dht.announce(hash, &seeders, &self_announce).await;
814 }
815 }
816
817 notify_event!(self, DownloadCompleted, resource);
819
820 Ok(())
821 };
822
823 if missing_chunks.is_empty() {
825 dht_sub.unsubscribe().await;
826 return download_completed(&chunked).await;
827 }
828
829 let resource = update_resource!(hash, {
831 status = ResourceStatus::Downloading,
832 });
833 notify_event!(self, MetadataDownloadCompleted, resource);
834
835 if metadata_seeder.is_none() {
837 if let Err(e) = self.lookup_tx.send(*hash).await {
838 dht_sub.unsubscribe().await;
839 return Err(e.into())
840 }
841 }
842
843 let _ =
845 fetch_chunks(self, hash, &mut chunked, &dht_sub, metadata_seeder, &mut missing_chunks)
846 .await;
847
848 dht_sub.unsubscribe().await;
850
851 let mut chunked = self.geode.get(hash, path).await?;
853
854 let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
856 notify_event!(self, ResourceUpdated, resource);
857
858 self.verify_chunks(&resource, &mut chunked).await?;
860
861 let is_complete = chunked
862 .iter()
863 .filter(|(hash, _)| chunk_hashes.contains(hash))
864 .all(|(_, available)| *available);
865
866 if !is_complete {
869 let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
871
872 notify_event!(self, MissingChunks, resource);
874
875 return Ok(());
876 }
877
878 download_completed(&chunked).await
879 }
880
881 async fn write_scraps(
882 &self,
883 chunked: &mut ChunkedStorage,
884 chunk_hashes: &HashSet<blake3::Hash>,
885 ) -> Result<()> {
886 let mut scraps = HashMap::new();
888 for chunk_hash in chunk_hashes {
890 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
891 if scrap.is_none() {
892 continue;
893 }
894
895 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
897 if scrap.is_err() {
898 continue;
899 }
900 let scrap: Scrap = scrap.unwrap();
901
902 scraps.insert(chunk_hash, scrap);
904 }
905
906 if !scraps.is_empty() {
908 info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
909 }
910 for (scrap_hash, mut scrap) in scraps {
911 let len = scrap.chunk.len();
912 let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
913 if let Err(e) = write_res {
914 error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
915 continue;
916 }
917 let (_, chunk_bytes_written) = write_res.unwrap();
918
919 if chunk_bytes_written == len {
921 self.scrap_tree.remove(scrap_hash.as_bytes())?;
922 continue;
923 }
924 let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
926 if let Err(e) = chunk_res {
927 error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
928 continue;
929 }
930 scrap.hash_written = blake3::hash(&chunk_res.unwrap());
931 if let Err(e) =
932 self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
933 {
934 error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
935 }
936 }
937
938 Ok(())
939 }
940
941 pub async fn verify_chunks(
948 &self,
949 resource: &Resource,
950 chunked: &mut ChunkedStorage,
951 ) -> Result<(u64, u64)> {
952 let chunks = chunked.get_chunks().clone();
953 let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
954
955 for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
957 let chunk =
959 match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
960 Ok(c) => c,
961 Err(Error::Io(ErrorKind::NotFound)) => continue,
962 Err(e) => {
963 warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
964 break
965 }
966 };
967
968 if self.geode.verify_chunk(chunk_hash, &chunk) {
970 chunked.get_chunk_mut(chunk_index).1 = true;
971 bytes.insert(
972 *chunk_hash,
973 (chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
974 );
975 }
976 }
977
978 let chunks = chunked.get_chunks().clone();
980 let missing_on_fs: Vec<_> =
981 chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
982
983 for (chunk_index, (chunk_hash, _)) in missing_on_fs {
985 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
986 if scrap.is_none() {
987 continue;
988 }
989
990 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
992 if scrap.is_err() {
993 continue;
994 }
995 let scrap: Scrap = scrap.unwrap();
996 if blake3::hash(&scrap.chunk) != *chunk_hash {
997 continue;
998 }
999
1000 let scrap_chunk =
1002 self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
1003 if scrap_chunk.is_err() {
1004 continue;
1005 }
1006 let scrap_chunk = scrap_chunk.unwrap();
1007
1008 if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
1010 continue;
1011 }
1012
1013 chunked.get_chunk_mut(chunk_index).1 = true;
1015
1016 bytes.insert(
1018 *chunk_hash,
1019 (scrap.chunk.len(), resource.get_selected_bytes(chunked, &scrap.chunk)),
1020 );
1021 }
1022
1023 if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
1027 if !chunked.is_dir() && *last_chunk_available {
1028 if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
1029 let exact_file_size =
1030 chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
1031 chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
1032 }
1033 }
1034 }
1035
1036 let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
1037 let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
1038
1039 Ok((total_bytes_downloaded, target_bytes_downloaded))
1040 }
1041
1042 pub async fn put(&self, path: &Path) -> Result<()> {
1044 let put_tasks = self.put_tasks.read().await;
1045 drop(put_tasks);
1046
1047 self.put_tx.send(path.to_path_buf()).await?;
1048
1049 Ok(())
1050 }
1051
1052 pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
1055 let self_node = self.node().await?;
1056
1057 if self_node.addresses.is_empty() {
1058 return Err(Error::Custom(
1059 "Cannot put resource, you don't have any external address".to_string(),
1060 ))
1061 }
1062
1063 let metadata = fs::metadata(path).await?;
1064
1065 let (files, resource_type) = if metadata.is_file() {
1067 (vec![(path.clone(), metadata.len())], ResourceType::File)
1068 } else if metadata.is_dir() {
1069 let mut files = get_all_files(path).await?;
1070 self.geode.sort_files(&mut files);
1071 (files, ResourceType::Directory)
1072 } else {
1073 return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
1074 };
1075
1076 let stream = FileSequence::new(&files, false);
1078 let total_size = stream.len();
1079 let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
1080
1081 let relative_files = if let ResourceType::Directory = resource_type {
1083 let relative_files = files
1085 .into_iter()
1086 .map(|(file_path, size)| match file_path.strip_prefix(path) {
1087 Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1088 Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1089 })
1090 .collect::<Result<Vec<_>>>()?;
1091
1092 self.geode.hash_files_metadata(&mut hasher, &relative_files);
1094
1095 relative_files
1096 } else {
1097 vec![]
1098 };
1099
1100 let hash = hasher.finalize();
1102
1103 if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1105 error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1106 return Err(e)
1107 }
1108
1109 if let Err(e) =
1111 self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1112 {
1113 error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1114 return Err(e.into())
1115 }
1116
1117 let mut resources_write = self.resources.write().await;
1119 resources_write.insert(
1120 hash,
1121 Resource {
1122 hash,
1123 rtype: resource_type,
1124 path: path.to_path_buf(),
1125 status: ResourceStatus::Seeding,
1126 file_selection: FileSelection::All,
1127 total_chunks_count: chunk_hashes.len() as u64,
1128 target_chunks_count: chunk_hashes.len() as u64,
1129 total_chunks_downloaded: chunk_hashes.len() as u64,
1130 target_chunks_downloaded: chunk_hashes.len() as u64,
1131 total_bytes_size: total_size,
1132 target_bytes_size: total_size,
1133 total_bytes_downloaded: total_size,
1134 target_bytes_downloaded: total_size,
1135 speeds: vec![],
1136 },
1137 );
1138 drop(resources_write);
1139
1140 if let Ok(seeder) = self.new_seeder(&hash).await {
1142 let seeders = vec![seeder];
1143 let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() };
1144 let _ = self.dht.announce(&hash, &seeders, &fud_announce).await;
1145 }
1146
1147 notify_event!(self, InsertCompleted, {
1149 hash,
1150 path: path.to_path_buf()
1151 });
1152
1153 Ok(())
1154 }
1155
1156 pub async fn remove(&self, hash: &blake3::Hash) {
1165 let mut resources_write = self.resources.write().await;
1167 resources_write.remove(hash);
1168 drop(resources_write);
1169
1170 if let Ok(Some(path)) = self.hash_to_path(hash) {
1172 let chunked = self.geode.get(hash, &path).await;
1173
1174 if let Ok(chunked) = chunked {
1175 for (chunk_hash, _) in chunked.iter() {
1176 let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
1177 }
1178 }
1179 }
1180
1181 let hash_str = hash_to_string(hash);
1183 let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1184 let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1185
1186 let _ = self.path_tree.remove(hash.as_bytes());
1188
1189 let _ = self.file_selection_tree.remove(hash.as_bytes());
1191
1192 notify_event!(self, ResourceRemoved, { hash: *hash });
1194 }
1195
1196 pub async fn prune_seeders(&self, expiry_secs: u32) {
1198 let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
1199 let mut seeders_write = self.dht.hash_table.write().await;
1200
1201 let keys: Vec<_> = seeders_write.keys().cloned().collect();
1202
1203 for key in keys {
1204 let items = seeders_write.get_mut(&key).unwrap();
1205 items.retain(|item| item.timestamp > expiry_timestamp);
1206 if items.is_empty() {
1207 seeders_write.remove(&key);
1208 }
1209 }
1210 }
1211
1212 pub async fn stop(&self) {
1214 info!("Stopping fetch tasks...");
1215 let fetch_tasks = self.fetch_tasks.read().await;
1217 let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1218 fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1219 drop(fetch_tasks);
1220
1221 for task in cloned_fetch_tasks.values() {
1222 task.stop().await;
1223 }
1224
1225 info!("Stopping put tasks...");
1226 let put_tasks = self.put_tasks.read().await;
1227 let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1228 put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1229 drop(put_tasks);
1230
1231 for task in cloned_put_tasks.values() {
1232 task.stop().await;
1233 }
1234
1235 info!("Stopping lookup tasks...");
1236 let lookup_tasks = self.lookup_tasks.read().await;
1237 let cloned_lookup_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1238 lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1239 drop(lookup_tasks);
1240
1241 for task in cloned_lookup_tasks.values() {
1242 task.stop().await;
1243 }
1244
1245 let mut tasks = self.tasks.write().await;
1247 for (name, task) in tasks.clone() {
1248 info!("Stopping {name} task...");
1249 task.stop().await;
1250 }
1251 *tasks = HashMap::new();
1252 }
1253}