1use std::{
20 collections::{HashMap, HashSet},
21 io::ErrorKind,
22 path::{Path, PathBuf},
23 sync::Arc,
24 time::Instant,
25};
26
27use async_trait::async_trait;
28use futures::{future::FutureExt, pin_mut, select};
29use log::{debug, error, info, warn};
30use num_bigint::BigUint;
31use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, Rng};
32use sled_overlay::sled;
33use smol::{
34 channel,
35 fs::{self, File, OpenOptions},
36 lock::RwLock,
37};
38use url::Url;
39
40use darkfi::{
41 dht::{
42 impl_dht_node_defaults, tasks as dht_tasks, Dht, DhtHandler, DhtNode, DhtRouterItem,
43 DhtRouterPtr, DhtSettings,
44 },
45 geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
46 net::{ChannelPtr, P2pPtr},
47 system::{ExecutorPtr, PublisherPtr, StoppableTask},
48 util::path::expand_path,
49 Error, Result,
50};
51use darkfi_sdk::crypto::{schnorr::SchnorrPublic, SecretKey};
52use darkfi_serial::{deserialize_async, serialize_async, SerialDecodable, SerialEncodable};
53
54pub mod proto;
56use proto::{
57 FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply, FudFindNodesReply,
58 FudFindNodesRequest, FudFindRequest, FudFindSeedersReply, FudFindSeedersRequest, FudNotFound,
59 FudPingReply, FudPingRequest,
60};
61
62pub mod event;
64use event::{notify_event, FudEvent};
65
66pub mod resource;
68use resource::{Resource, ResourceStatus, ResourceType};
69
70pub mod scrap;
72use scrap::Scrap;
73
74pub mod rpc;
76
77pub mod tasks;
79use tasks::{start_task, FetchReply};
80
81pub mod bitcoin;
83
84pub mod pow;
86use pow::{FudPow, VerifiableNodeData};
87
88pub mod equix;
90
91pub mod settings;
93use settings::Args;
94
95pub mod util;
97use util::{get_all_files, FileSelection};
98
99const SLED_PATH_TREE: &[u8] = b"_fud_paths";
100const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
101const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
102
103#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
104pub struct FudNode {
105 data: VerifiableNodeData,
106 addresses: Vec<Url>,
107}
108impl_dht_node_defaults!(FudNode);
109
110impl DhtNode for FudNode {
111 fn id(&self) -> blake3::Hash {
112 self.data.id()
113 }
114 fn addresses(&self) -> Vec<Url> {
115 self.addresses.clone()
116 }
117}
118
119pub struct Fud {
120 pub node_data: Arc<RwLock<VerifiableNodeData>>,
122
123 pub secret_key: Arc<RwLock<SecretKey>>,
125
126 pub seeders_router: DhtRouterPtr<FudNode>,
128
129 p2p: P2pPtr,
131
132 geode: Geode,
134
135 downloads_path: PathBuf,
137
138 chunk_timeout: u64,
140
141 pub pow: Arc<RwLock<FudPow>>,
143
144 dht: Arc<Dht<FudNode>>,
146
147 resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
149
150 path_tree: sled::Tree,
152
153 file_selection_tree: sled::Tree,
157
158 scrap_tree: sled::Tree,
163
164 get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
165 get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
166
167 put_tx: channel::Sender<PathBuf>,
168 put_rx: channel::Receiver<PathBuf>,
169
170 fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
172
173 put_tasks: Arc<RwLock<HashMap<PathBuf, Arc<StoppableTask>>>>,
175
176 tasks: Arc<RwLock<HashMap<String, Arc<StoppableTask>>>>,
178
179 event_publisher: PublisherPtr<FudEvent>,
181
182 pub executor: ExecutorPtr,
184}
185
186#[async_trait]
187impl DhtHandler<FudNode> for Fud {
188 fn dht(&self) -> Arc<Dht<FudNode>> {
189 self.dht.clone()
190 }
191
192 async fn node(&self) -> FudNode {
193 FudNode {
194 data: self.node_data.read().await.clone(),
195 addresses: self
196 .p2p
197 .clone()
198 .hosts()
199 .external_addrs()
200 .await
201 .iter()
202 .filter(|addr| !addr.to_string().contains("[::]"))
203 .cloned()
204 .collect(),
205 }
206 }
207
208 async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
209 debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
210 let msg_subsystem = channel.message_subsystem();
211 msg_subsystem.add_dispatch::<FudPingReply>().await;
212 let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
213
214 let mut rng = OsRng;
216 let request = FudPingRequest { random: rng.gen() };
217 channel.send(&request).await?;
218
219 let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await?;
221 msg_subscriber.unsubscribe().await;
222
223 if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
225 channel.ban().await;
226 return Err(Error::InvalidSignature)
227 }
228
229 if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await {
231 channel.ban().await;
232 return Err(e)
233 }
234
235 Ok(reply.node.clone())
236 }
237
238 async fn on_new_node(&self, node: &FudNode) -> Result<()> {
240 debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id()));
241
242 if !self.dht().is_bootstrapped().await {
244 let _ = self.init().await;
245 }
246
247 let self_id = self.node_data.read().await.id();
249 let channel = self.get_channel(node, None).await?;
250 for (key, seeders) in self.seeders_router.read().await.iter() {
251 let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id()));
252 let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
253 if node_distance <= self_distance {
254 let _ = channel
255 .send(&FudAnnounce {
256 key: *key,
257 seeders: seeders.clone().into_iter().collect(),
258 })
259 .await;
260 }
261 }
262 self.cleanup_channel(channel).await;
263
264 Ok(())
265 }
266
267 async fn fetch_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> {
268 debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
269
270 let channel = self.get_channel(node, None).await?;
271 let msg_subsystem = channel.message_subsystem();
272 msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
273 let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
274
275 let request = FudFindNodesRequest { key: *key };
276 channel.send(&request).await?;
277
278 let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await;
279
280 msg_subscriber_nodes.unsubscribe().await;
281 self.cleanup_channel(channel).await;
282
283 Ok(reply?.nodes.clone())
284 }
285}
286
287impl Fud {
288 pub async fn new(
289 settings: Args,
290 p2p: P2pPtr,
291 sled_db: &sled::Db,
292 event_publisher: PublisherPtr<FudEvent>,
293 executor: ExecutorPtr,
294 ) -> Result<Self> {
295 let basedir = expand_path(&settings.base_dir)?;
296 let downloads_path = match settings.downloads_path {
297 Some(downloads_path) => expand_path(&downloads_path)?,
298 None => basedir.join("downloads"),
299 };
300
301 let mut pow = FudPow::new(settings.pow.into(), executor.clone());
303 pow.bitcoin_hash_cache.update().await?; let (node_data, secret_key) = pow.generate_node().await?;
305 info!(target: "fud", "Your node ID: {}", hash_to_string(&node_data.id()));
306
307 info!("Instantiating Geode instance");
309 let geode = Geode::new(&basedir).await?;
310
311 let dht_settings: DhtSettings = settings.dht.into();
313 let dht: Arc<Dht<FudNode>> =
314 Arc::new(Dht::<FudNode>::new(&dht_settings, p2p.clone(), executor.clone()).await);
315
316 let (get_tx, get_rx) = smol::channel::unbounded();
317 let (put_tx, put_rx) = smol::channel::unbounded();
318 let fud = Self {
319 node_data: Arc::new(RwLock::new(node_data)),
320 secret_key: Arc::new(RwLock::new(secret_key)),
321 seeders_router: Arc::new(RwLock::new(HashMap::new())),
322 p2p,
323 geode,
324 downloads_path,
325 chunk_timeout: settings.chunk_timeout,
326 pow: Arc::new(RwLock::new(pow)),
327 dht,
328 path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
329 file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
330 scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?,
331 resources: Arc::new(RwLock::new(HashMap::new())),
332 get_tx,
333 get_rx,
334 put_tx,
335 put_rx,
336 fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
337 put_tasks: Arc::new(RwLock::new(HashMap::new())),
338 tasks: Arc::new(RwLock::new(HashMap::new())),
339 event_publisher,
340 executor,
341 };
342
343 Ok(fud)
344 }
345
346 pub async fn start_tasks(self: &Arc<Self>) {
347 let mut tasks = self.tasks.write().await;
348 start_task!(self, "get", tasks::get_task, tasks);
349 start_task!(self, "put", tasks::put_task, tasks);
350 start_task!(self, "DHT channel", dht_tasks::channel_task::<Fud, FudNode>, tasks);
351 start_task!(self, "announce", tasks::announce_seed_task, tasks);
352 start_task!(self, "node ID", tasks::node_id_task, tasks);
353 }
354
355 async fn init(&self) -> Result<()> {
358 info!(target: "fud::init()", "Bootstrapping the DHT...");
359 self.bootstrap().await;
360
361 info!(target: "fud::init()", "Finding resources...");
362 let mut resources_write = self.resources.write().await;
363 for result in self.path_tree.iter() {
364 if result.is_err() {
365 continue;
366 }
367
368 let (hash, path) = result.unwrap();
370 let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
371 Ok(v) => v,
372 Err(_) => continue,
373 };
374 let hash = blake3::Hash::from_bytes(hash_bytes);
375
376 let path_bytes = path.to_vec();
378 let path_str = match std::str::from_utf8(&path_bytes) {
379 Ok(v) => v,
380 Err(_) => continue,
381 };
382 let path: PathBuf = match expand_path(path_str) {
383 Ok(v) => v,
384 Err(_) => continue,
385 };
386
387 let mut file_selection = FileSelection::All;
389 if let Ok(Some(fs)) = self.file_selection_tree.get(hash.as_bytes()) {
390 if let Ok(path_list) = deserialize_async::<Vec<Vec<u8>>>(&fs).await {
391 file_selection = FileSelection::Set(
392 path_list
393 .into_iter()
394 .filter_map(|bytes| {
395 std::str::from_utf8(&bytes)
396 .ok()
397 .and_then(|path_str| expand_path(path_str).ok())
398 })
399 .collect(),
400 );
401 }
402 }
403
404 resources_write.insert(
406 hash,
407 Resource::new(
408 hash,
409 ResourceType::Unknown,
410 &path,
411 ResourceStatus::Incomplete,
412 file_selection,
413 ),
414 );
415 }
416 drop(resources_write);
417
418 info!(target: "fud::init()", "Verifying resources...");
419 let resources = self.verify_resources(None).await?;
420
421 let self_node = self.node().await;
422
423 if self_node.addresses.is_empty() {
425 return Ok(());
426 }
427
428 let self_router_items: Vec<DhtRouterItem<FudNode>> = vec![self_node.into()];
430 for resource in &resources {
431 self.add_to_router(
432 self.seeders_router.clone(),
433 &resource.hash,
434 self_router_items.clone(),
435 )
436 .await;
437 }
438
439 info!(target: "fud::init()", "Announcing resources...");
440 let seeders = vec![self.node().await.into()];
441 for resource in resources {
442 let _ = self
443 .announce(
444 &resource.hash,
445 &FudAnnounce { key: resource.hash, seeders: seeders.clone() },
446 self.seeders_router.clone(),
447 )
448 .await;
449 }
450
451 Ok(())
452 }
453
454 pub fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
456 if let Some(value) = self.path_tree.get(hash.as_bytes())? {
457 let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
458 return Ok(Some(path));
459 }
460
461 Ok(None)
462 }
463
464 pub fn path_to_hash(&self, path: &Path) -> Result<Option<blake3::Hash>> {
466 let path_string = path.to_string_lossy().to_string();
467 let path_bytes = path_string.as_bytes();
468 for path_item in self.path_tree.iter() {
469 let (key, value) = path_item?;
470 if value == path_bytes {
471 let bytes: &[u8] = &key;
472 if bytes.len() != 32 {
473 return Err(Error::Custom(format!(
474 "Expected a 32-byte BLAKE3, got {} bytes",
475 bytes.len()
476 )));
477 }
478
479 let array: [u8; 32] = bytes.try_into().unwrap();
480 return Ok(Some(array.into()))
481 }
482 }
483
484 Ok(None)
485 }
486
487 pub async fn verify_resources(
494 &self,
495 hashes: Option<Vec<blake3::Hash>>,
496 ) -> Result<Vec<Resource>> {
497 let mut resources_write = self.resources.write().await;
498
499 let update_resource = async |resource: &mut Resource,
500 status: ResourceStatus,
501 chunked: Option<&ChunkedStorage>,
502 total_bytes_downloaded: u64,
503 target_bytes_downloaded: u64| {
504 let files = match chunked {
505 Some(chunked) => resource.get_selected_files(chunked),
506 None => vec![],
507 };
508 let chunk_hashes = match chunked {
509 Some(chunked) => resource.get_selected_chunks(chunked),
510 None => HashSet::new(),
511 };
512
513 if let Some(chunked) = chunked {
514 resource.rtype = match chunked.is_dir() {
515 false => ResourceType::File,
516 true => ResourceType::Directory,
517 };
518 }
519
520 resource.status = status;
521 resource.total_chunks_count = match chunked {
522 Some(chunked) => chunked.len() as u64,
523 None => 0,
524 };
525 resource.target_chunks_count = chunk_hashes.len() as u64;
526 resource.total_chunks_downloaded = match chunked {
527 Some(chunked) => chunked.local_chunks() as u64,
528 None => 0,
529 };
530 resource.target_chunks_downloaded = match chunked {
531 Some(chunked) => chunked
532 .iter()
533 .filter(|(hash, available)| chunk_hashes.contains(hash) && *available)
534 .count() as u64,
535 None => 0,
536 };
537
538 resource.total_bytes_size = match chunked {
539 Some(chunked) => chunked.get_fileseq().len(),
540 None => 0,
541 };
542 resource.target_bytes_size = match chunked {
543 Some(chunked) => chunked
544 .get_files()
545 .iter()
546 .filter(|(path, _)| files.contains(path))
547 .map(|(_, size)| size)
548 .sum(),
549 None => 0,
550 };
551
552 resource.total_bytes_downloaded = total_bytes_downloaded;
553 resource.target_bytes_downloaded = target_bytes_downloaded;
554
555 notify_event!(self, ResourceUpdated, resource);
556 };
557
558 let mut seeding_resources: Vec<Resource> = vec![];
559 for (_, mut resource) in resources_write.iter_mut() {
560 if let Some(ref hashes_list) = hashes {
561 if !hashes_list.contains(&resource.hash) {
562 continue;
563 }
564 }
565
566 match resource.status {
567 ResourceStatus::Seeding => {}
568 ResourceStatus::Incomplete => {}
569 _ => continue,
570 };
571
572 let resource_path = match self.hash_to_path(&resource.hash) {
574 Ok(Some(v)) => v,
575 Ok(None) | Err(_) => {
576 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
577 continue;
578 }
579 };
580 let mut chunked = match self.geode.get(&resource.hash, &resource_path).await {
581 Ok(v) => v,
582 Err(_) => {
583 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
584 continue;
585 }
586 };
587 let verify_res = self.verify_chunks(resource, &mut chunked).await;
588 if let Err(e) = verify_res {
589 error!(target: "fud::verify_resources()", "Error while verifying chunks of {}: {e}", hash_to_string(&resource.hash));
590 update_resource(&mut resource, ResourceStatus::Incomplete, None, 0, 0).await;
591 continue;
592 }
593 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
594
595 if !chunked.is_complete() {
596 update_resource(
597 &mut resource,
598 ResourceStatus::Incomplete,
599 Some(&chunked),
600 total_bytes_downloaded,
601 target_bytes_downloaded,
602 )
603 .await;
604 continue;
605 }
606
607 update_resource(
608 &mut resource,
609 ResourceStatus::Seeding,
610 Some(&chunked),
611 total_bytes_downloaded,
612 target_bytes_downloaded,
613 )
614 .await;
615 seeding_resources.push(resource.clone());
616 }
617
618 Ok(seeding_resources)
619 }
620
621 async fn fetch_seeders(
623 &self,
624 nodes: &Vec<FudNode>,
625 key: &blake3::Hash,
626 ) -> HashSet<DhtRouterItem<FudNode>> {
627 let self_node = self.node().await;
628 let mut seeders: HashSet<DhtRouterItem<FudNode>> = HashSet::new();
629
630 for node in nodes {
631 let channel = match self.get_channel(node, None).await {
632 Ok(channel) => channel,
633 Err(e) => {
634 warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id()));
635 continue;
636 }
637 };
638 let msg_subsystem = channel.message_subsystem();
639 msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
640
641 let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
642 Ok(msg_subscriber) => msg_subscriber,
643 Err(e) => {
644 warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {e}");
645 self.cleanup_channel(channel).await;
646 continue;
647 }
648 };
649
650 let send_res = channel.send(&FudFindSeedersRequest { key: *key }).await;
651 if let Err(e) = send_res {
652 warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {e}");
653 msg_subscriber.unsubscribe().await;
654 self.cleanup_channel(channel).await;
655 continue;
656 }
657
658 let reply = match msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await
659 {
660 Ok(reply) => reply,
661 Err(e) => {
662 warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {e}");
663 msg_subscriber.unsubscribe().await;
664 self.cleanup_channel(channel).await;
665 continue;
666 }
667 };
668
669 msg_subscriber.unsubscribe().await;
670 self.cleanup_channel(channel).await;
671
672 seeders.extend(reply.seeders.clone());
673 }
674
675 seeders =
676 seeders.iter().filter(|seeder| seeder.node.id() != self_node.id()).cloned().collect();
677
678 info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(key));
679 seeders
680 }
681
682 async fn fetch_chunks(
684 &self,
685 hash: &blake3::Hash,
686 chunked: &mut ChunkedStorage,
687 seeders: &HashSet<DhtRouterItem<FudNode>>,
688 chunks: &HashSet<blake3::Hash>,
689 ) -> Result<()> {
690 let mut remaining_chunks = chunks.clone();
691 let mut shuffled_seeders = {
692 let mut vec: Vec<_> = seeders.iter().cloned().collect();
693 vec.shuffle(&mut OsRng);
694 vec
695 };
696
697 while let Some(seeder) = shuffled_seeders.pop() {
698 let channel = match self.get_channel(&seeder.node, Some(*hash)).await {
699 Ok(channel) => channel,
700 Err(e) => {
701 warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
702 continue;
703 }
704 };
705 let mut chunks_to_query = remaining_chunks.clone();
706 info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
707 loop {
708 let start_time = Instant::now();
709 let msg_subsystem = channel.message_subsystem();
710 msg_subsystem.add_dispatch::<FudChunkReply>().await;
711 msg_subsystem.add_dispatch::<FudNotFound>().await;
712 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
713 let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
714
715 let mut chunk = None;
717 if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
718 chunk = Some(*random_chunk);
719 }
720
721 if chunk.is_none() {
722 break; }
725 let chunk_hash = chunk.unwrap();
726 chunks_to_query.remove(&chunk_hash);
727
728 let send_res =
729 channel.send(&FudFindRequest { info: Some(*hash), key: chunk_hash }).await;
730 if let Err(e) = send_res {
731 warn!(target: "fud::fetch_chunks()", "Error while sending FudFindRequest: {e}");
732 break; }
734
735 let chunk_recv =
736 msg_subscriber_chunk.receive_with_timeout(self.chunk_timeout).fuse();
737 let notfound_recv =
738 msg_subscriber_notfound.receive_with_timeout(self.chunk_timeout).fuse();
739
740 pin_mut!(chunk_recv, notfound_recv);
741
742 select! {
744 chunk_reply = chunk_recv => {
745 if let Err(e) = chunk_reply {
746 warn!(target: "fud::fetch_chunks()", "Error waiting for chunk reply: {e}");
747 break; }
749 let reply = chunk_reply.unwrap();
750
751 match self.geode.write_chunk(chunked, &reply.chunk).await {
752 Ok((inserted_hash, bytes_written)) => {
753 if inserted_hash != chunk_hash {
754 warn!(target: "fud::fetch_chunks()", "Received chunk does not match requested chunk");
755 msg_subscriber_chunk.unsubscribe().await;
756 msg_subscriber_notfound.unsubscribe().await;
757 continue; }
759
760 info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
761
762 if bytes_written < reply.chunk.len() {
765 info!(target: "fud::fetch_chunks()", "Saving chunk {} as a scrap", hash_to_string(&chunk_hash));
766 let chunk_written = self.geode.get_chunk(chunked, &chunk_hash).await?;
767 if let Err(e) = self.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&Scrap {
768 chunk: reply.chunk.clone(),
769 hash_written: blake3::hash(&chunk_written),
770 }).await) {
771 error!(target: "fud::fetch_chunks()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(&chunk_hash))
772 }
773 }
774
775 let mut resources_write = self.resources.write().await;
777 let resource = match resources_write.get_mut(hash) {
778 Some(resource) => {
779 resource.status = ResourceStatus::Downloading;
780 resource.total_chunks_downloaded += 1;
781 resource.target_chunks_downloaded += 1;
782
783 resource.total_bytes_downloaded += reply.chunk.len() as u64;
784 resource.target_bytes_downloaded += resource.get_selected_bytes(chunked, &reply.chunk) as u64;
785 resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
786 if resource.speeds.len() > 12 {
787 resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); }
789
790 if let Some((last_chunk_hash, _)) = chunked.iter().last() {
795 if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == chunk_hash {
796 resource.total_bytes_size = chunked.get_fileseq().len();
797 resource.target_bytes_size = resource.total_bytes_size;
798 }
799 }
800 resource.clone()
801 }
802 None => return Ok(()) };
804 drop(resources_write);
805
806 notify_event!(self, ChunkDownloadCompleted, { hash: *hash, chunk_hash, resource });
807 remaining_chunks.remove(&chunk_hash);
808 }
809 Err(e) => {
810 error!(target: "fud::fetch_chunks()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
811 }
812 };
813 }
814 notfound_reply = notfound_recv => {
815 if let Err(e) = notfound_reply {
816 warn!(target: "fud::fetch_chunks()", "Error waiting for NOTFOUND reply: {e}");
817 msg_subscriber_chunk.unsubscribe().await;
818 msg_subscriber_notfound.unsubscribe().await;
819 break; }
821 info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
822 notify_event!(self, ChunkNotFound, { hash: *hash, chunk_hash });
823 }
824 };
825
826 msg_subscriber_chunk.unsubscribe().await;
827 msg_subscriber_notfound.unsubscribe().await;
828 }
829
830 self.cleanup_channel(channel).await;
831
832 if remaining_chunks.is_empty() {
834 break;
835 }
836 }
837
838 Ok(())
839 }
840
841 pub async fn fetch_metadata(
848 &self,
849 hash: &blake3::Hash,
850 nodes: &Vec<FudNode>,
851 path: &Path,
852 ) -> Result<()> {
853 let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
854 let mut result: Option<FetchReply> = None;
855
856 for node in nodes {
857 let channel = match self.get_channel(node, Some(*hash)).await {
859 Ok(channel) => channel,
860 Err(e) => {
861 warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id()));
862 continue;
863 }
864 };
865 let msg_subsystem = channel.message_subsystem();
866 msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
867
868 let msg_subscriber = match channel.subscribe_msg::<FudFindSeedersReply>().await {
869 Ok(msg_subscriber) => msg_subscriber,
870 Err(e) => {
871 warn!(target: "fud::fetch_metadata()", "Error subscribing to msg: {e}");
872 continue;
873 }
874 };
875
876 let send_res = channel.send(&FudFindSeedersRequest { key: *hash }).await;
877 if let Err(e) = send_res {
878 warn!(target: "fud::fetch_metadata()", "Error while sending FudFindSeedersRequest: {e}");
879 msg_subscriber.unsubscribe().await;
880 self.cleanup_channel(channel).await;
881 continue;
882 }
883
884 let reply = match msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await
885 {
886 Ok(reply) => reply,
887 Err(e) => {
888 warn!(target: "fud::fetch_metadata()", "Error waiting for reply: {e}");
889 msg_subscriber.unsubscribe().await;
890 self.cleanup_channel(channel).await;
891 continue;
892 }
893 };
894
895 let mut seeders = reply.seeders.clone();
896 info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id()));
897
898 msg_subscriber.unsubscribe().await;
899 self.cleanup_channel(channel).await;
900
901 while let Some(seeder) = seeders.pop() {
903 if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
905 continue;
906 }
907 queried_seeders.insert(seeder.node.id());
908
909 let channel = self.get_channel(&seeder.node, Some(*hash)).await;
910 if let Ok(channel) = channel {
911 let msg_subsystem = channel.message_subsystem();
912 msg_subsystem.add_dispatch::<FudChunkReply>().await;
913 msg_subsystem.add_dispatch::<FudFileReply>().await;
914 msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
915 msg_subsystem.add_dispatch::<FudNotFound>().await;
916 let msg_subscriber_chunk =
917 channel.subscribe_msg::<FudChunkReply>().await.unwrap();
918 let msg_subscriber_file =
919 channel.subscribe_msg::<FudFileReply>().await.unwrap();
920 let msg_subscriber_dir =
921 channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
922 let msg_subscriber_notfound =
923 channel.subscribe_msg::<FudNotFound>().await.unwrap();
924
925 let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await;
926 if let Err(e) = send_res {
927 warn!(target: "fud::fetch_metadata()", "Error while sending FudFindRequest: {e}");
928 msg_subscriber_chunk.unsubscribe().await;
929 msg_subscriber_file.unsubscribe().await;
930 msg_subscriber_dir.unsubscribe().await;
931 msg_subscriber_notfound.unsubscribe().await;
932 self.cleanup_channel(channel).await;
933 continue;
934 }
935
936 let chunk_recv =
937 msg_subscriber_chunk.receive_with_timeout(self.chunk_timeout).fuse();
938 let file_recv =
939 msg_subscriber_file.receive_with_timeout(self.chunk_timeout).fuse();
940 let dir_recv =
941 msg_subscriber_dir.receive_with_timeout(self.chunk_timeout).fuse();
942 let notfound_recv =
943 msg_subscriber_notfound.receive_with_timeout(self.chunk_timeout).fuse();
944
945 pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
946
947 let cleanup = async || {
948 msg_subscriber_chunk.unsubscribe().await;
949 msg_subscriber_file.unsubscribe().await;
950 msg_subscriber_dir.unsubscribe().await;
951 msg_subscriber_notfound.unsubscribe().await;
952 self.cleanup_channel(channel).await;
953 };
954
955 select! {
957 chunk_reply = chunk_recv => {
960 cleanup().await;
961 if let Err(e) = chunk_reply {
962 warn!(target: "fud::fetch_metadata()", "Error waiting for chunk reply: {e}");
963 continue;
964 }
965 let reply = chunk_reply.unwrap();
966 let chunk_hash = blake3::hash(&reply.chunk);
967 if !self.geode.verify_metadata(hash, &[chunk_hash], &[]) {
969 warn!(target: "fud::fetch_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash");
970 continue;
971 }
972 info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
973 result = Some(FetchReply::Chunk((*reply).clone()));
974 break;
975 }
976 file_reply = file_recv => {
977 cleanup().await;
978 if let Err(e) = file_reply {
979 warn!(target: "fud::fetch_metadata()", "Error waiting for file reply: {e}");
980 continue;
981 }
982 let reply = file_reply.unwrap();
983 if !self.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
984 warn!(target: "fud::fetch_metadata()", "Received invalid file metadata");
985 continue;
986 }
987 info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
988 result = Some(FetchReply::File((*reply).clone()));
989 break;
990 }
991 dir_reply = dir_recv => {
992 cleanup().await;
993 if let Err(e) = dir_reply {
994 warn!(target: "fud::fetch_metadata()", "Error waiting for directory reply: {e}");
995 continue;
996 }
997 let reply = dir_reply.unwrap();
998
999 let files: Vec<_> = reply.files.clone().into_iter()
1001 .map(|(path_str, size)| (PathBuf::from(path_str), size))
1002 .collect();
1003
1004 if !self.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
1005 warn!(target: "fud::fetch_metadata()", "Received invalid directory metadata");
1006 continue;
1007 }
1008 info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
1009 result = Some(FetchReply::Directory((*reply).clone()));
1010 break;
1011 }
1012 notfound_reply = notfound_recv => {
1013 cleanup().await;
1014 if let Err(e) = notfound_reply {
1015 warn!(target: "fud::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
1016 continue;
1017 }
1018 info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
1019 }
1020 };
1021 }
1022 }
1023
1024 if result.is_some() {
1025 break;
1026 }
1027 }
1028
1029 if result.is_none() {
1031 return Err(Error::GeodeFileRouteNotFound)
1032 }
1033
1034 match result.unwrap() {
1037 FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
1038 let mut files: Vec<_> = files
1040 .into_iter()
1041 .map(|(path_str, size)| (PathBuf::from(path_str), size))
1042 .collect();
1043
1044 self.geode.sort_files(&mut files);
1045 if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &files).await {
1046 error!(target: "fud::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
1047 return Err(e)
1048 }
1049 }
1050 FetchReply::File(FudFileReply { chunk_hashes }) => {
1051 if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
1052 error!(target: "fud::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
1053 return Err(e)
1054 }
1055 }
1056 FetchReply::Chunk(FudChunkReply { chunk }) => {
1058 info!(target: "fud::fetch_metadata()", "File fits in a single chunk");
1059 let chunk_hash = blake3::hash(&chunk);
1060 let _ = self.geode.insert_metadata(hash, &[chunk_hash], &[]).await;
1061 let mut chunked_file = ChunkedStorage::new(
1062 &[chunk_hash],
1063 &[(path.to_path_buf(), chunk.len() as u64)],
1064 false,
1065 );
1066 if let Err(e) = self.geode.write_chunk(&mut chunked_file, &chunk).await {
1067 error!(target: "fud::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
1068 return Err(e)
1069 };
1070 }
1071 };
1072
1073 Ok(())
1074 }
1075
1076 pub async fn get(&self, hash: &blake3::Hash, path: &Path, files: FileSelection) -> Result<()> {
1081 let fetch_tasks = self.fetch_tasks.read().await;
1082 if fetch_tasks.contains_key(hash) {
1083 return Err(Error::Custom(format!(
1084 "Resource {} is already being downloaded",
1085 hash_to_string(hash)
1086 )))
1087 }
1088 drop(fetch_tasks);
1089
1090 self.get_tx.send((*hash, path.to_path_buf(), files)).await?;
1091
1092 Ok(())
1093 }
1094
1095 pub async fn get_metadata(
1098 &self,
1099 hash: &blake3::Hash,
1100 path: &Path,
1101 ) -> Result<(ChunkedStorage, Vec<FudNode>)> {
1102 match self.geode.get(hash, path).await {
1103 Ok(v) => Ok((v, vec![])),
1105 Err(Error::GeodeNeedsGc) => todo!(),
1107 Err(Error::GeodeFileNotFound) => {
1109 info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash));
1111 let closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default();
1112
1113 match self.fetch_metadata(hash, &closest_nodes, path).await {
1115 Ok(()) => Ok((self.geode.get(hash, path).await?, closest_nodes)),
1117 Err(e) => Err(e),
1119 }
1120 }
1121
1122 Err(e) => {
1123 error!(target: "fud::get_metadata()", "{e}");
1124 Err(e)
1125 }
1126 }
1127 }
1128
1129 pub async fn fetch_resource(
1132 &self,
1133 hash: &blake3::Hash,
1134 path: &Path,
1135 files: &FileSelection,
1136 ) -> Result<()> {
1137 let self_node = self.node().await;
1138
1139 let hash_bytes = hash.as_bytes();
1140 let path_string = path.to_string_lossy().to_string();
1141 let path_bytes = path_string.as_bytes();
1142
1143 macro_rules! update_resource {
1146 ($hash:ident, { $($field:ident = $value:expr $(,)?)* }) => {{
1147 let mut resources_write = self.resources.write().await;
1148 let resource = match resources_write.get_mut($hash) {
1149 Some(resource) => {
1150 $(resource.$field = $value;)* resource.clone()
1152 }
1153 None => return Ok(()), };
1155 resource
1156 }};
1157 }
1158
1159 if let Ok(Some(hash_found)) = self.path_to_hash(path) {
1161 if *hash != hash_found {
1162 return Err(Error::Custom(format!(
1163 "There is already another resource on path {path_string}"
1164 )))
1165 }
1166 }
1167
1168 self.path_tree.insert(hash_bytes, path_bytes)?;
1170
1171 if let FileSelection::Set(selected_files) = files {
1173 let paths: Vec<Vec<u8>> = selected_files
1174 .iter()
1175 .map(|f| f.to_string_lossy().to_string().as_bytes().to_vec())
1176 .collect();
1177 let serialized_paths = serialize_async(&paths).await;
1178 if let Err(e) = self.file_selection_tree.insert(hash_bytes, serialized_paths) {
1180 return Err(Error::SledError(e))
1181 }
1182 }
1183
1184 let resource = Resource::new(
1186 *hash,
1187 ResourceType::Unknown,
1188 path,
1189 ResourceStatus::Discovering,
1190 files.clone(),
1191 );
1192 let mut resources_write = self.resources.write().await;
1193 resources_write.insert(*hash, resource.clone());
1194 drop(resources_write);
1195
1196 notify_event!(self, DownloadStarted, resource);
1198
1199 let (mut chunked, mut closest_nodes) = match self.get_metadata(hash, path).await {
1201 Ok(chunked) => chunked,
1202 Err(e) => {
1203 let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
1205 notify_event!(self, MetadataNotFound, resource);
1206 return Err(e);
1207 }
1208 };
1209
1210 let resources_read = self.resources.read().await;
1212 let resource = match resources_read.get(hash) {
1213 Some(resource) => resource,
1214 None => return Ok(()), };
1216 let files_vec: Vec<PathBuf> = resource.get_selected_files(&chunked);
1217 drop(resources_read);
1218
1219 for file_path in files_vec.iter() {
1221 if !file_path.exists() {
1222 if let Some(dir) = file_path.parent() {
1223 fs::create_dir_all(dir).await?;
1224 }
1225 File::create(&file_path).await?;
1226 }
1227 }
1228
1229 let resource = update_resource!(hash, {
1231 status = ResourceStatus::Verifying,
1232 total_chunks_count = chunked.len() as u64,
1233 total_bytes_size = chunked.get_fileseq().len(),
1234 rtype = match chunked.is_dir() {
1235 false => ResourceType::File,
1236 true => ResourceType::Directory,
1237 },
1238 });
1239 notify_event!(self, MetadataDownloadCompleted, resource);
1240
1241 let chunk_hashes = resource.get_selected_chunks(&chunked);
1243
1244 self.write_scraps(&mut chunked, &chunk_hashes).await?;
1246
1247 let verify_res = self.verify_chunks(&resource, &mut chunked).await;
1249 if let Err(e) = verify_res {
1250 error!(target: "fud::fetch_resource()", "Error while verifying chunks: {e}");
1251 return Err(e);
1252 }
1253 let (total_bytes_downloaded, target_bytes_downloaded) = verify_res.unwrap();
1254
1255 if let ResourceType::File = resource.rtype {
1257 update_resource!(hash, { total_bytes_size = chunked.get_fileseq().len() });
1258 notify_event!(self, ResourceUpdated, resource);
1259 }
1260
1261 if !chunked.is_dir() {
1266 let fs_metadata = fs::metadata(&path).await?;
1267 if fs_metadata.len() > (chunked.len() * MAX_CHUNK_SIZE) as u64 {
1268 if let Ok(file) = OpenOptions::new().write(true).create(true).open(path).await {
1269 let _ = file.set_len((chunked.len() * MAX_CHUNK_SIZE) as u64).await;
1270 }
1271 }
1272 }
1273
1274 let chunks: HashSet<(blake3::Hash, bool)> =
1276 chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect();
1277
1278 let missing_chunks: HashSet<blake3::Hash> =
1280 chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect();
1281
1282 update_resource!(hash, {
1284 target_chunks_count = chunks.len() as u64,
1285 total_chunks_downloaded = chunked.local_chunks() as u64,
1286 target_chunks_downloaded = (chunks.len() - missing_chunks.len()) as u64,
1287
1288 target_bytes_size =
1289 chunked.get_fileseq().subset_len(files_vec.into_iter().collect()),
1290 total_bytes_downloaded = total_bytes_downloaded,
1291 target_bytes_downloaded = target_bytes_downloaded,
1292 });
1293
1294 if missing_chunks.is_empty() {
1296 let resource = update_resource!(hash, {
1298 status = match chunked.is_complete() {
1299 true => ResourceStatus::Seeding,
1300 false => ResourceStatus::Incomplete,
1301 }
1302 });
1303
1304 if chunked.is_complete() {
1306 let self_announce =
1307 FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] };
1308 let _ = self.announce(hash, &self_announce, self.seeders_router.clone()).await;
1309 }
1310
1311 notify_event!(self, DownloadCompleted, resource);
1313
1314 return Ok(());
1315 }
1316
1317 let resource = update_resource!(hash, {
1319 status = ResourceStatus::Downloading,
1320 });
1321 notify_event!(self, MetadataDownloadCompleted, resource);
1322
1323 if closest_nodes.is_empty() {
1325 closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default();
1326 }
1327
1328 let seeders = self.fetch_seeders(&closest_nodes, hash).await;
1330
1331 self.fetch_chunks(hash, &mut chunked, &seeders, &missing_chunks).await?;
1333
1334 let mut chunked = match self.geode.get(hash, path).await {
1336 Ok(v) => v,
1337 Err(e) => {
1338 error!(target: "fud::fetch_resource()", "{e}");
1339 return Err(e);
1340 }
1341 };
1342
1343 let resource = update_resource!(hash, { status = ResourceStatus::Verifying });
1345 notify_event!(self, ResourceUpdated, resource);
1346
1347 self.verify_chunks(&resource, &mut chunked).await?;
1349
1350 let is_complete = chunked
1351 .iter()
1352 .filter(|(hash, _)| chunk_hashes.contains(hash))
1353 .all(|(_, available)| *available);
1354
1355 if !is_complete {
1358 let resource = update_resource!(hash, { status = ResourceStatus::Incomplete });
1360
1361 notify_event!(self, MissingChunks, resource);
1363
1364 return Ok(());
1365 }
1366
1367 let resource = update_resource!(hash, {
1369 status = match chunked.is_complete() {
1370 true => ResourceStatus::Seeding,
1371 false => ResourceStatus::Incomplete,
1372 },
1373 target_chunks_downloaded = chunks.len() as u64,
1374 total_chunks_downloaded = chunked.local_chunks() as u64,
1375 });
1376
1377 if chunked.is_complete() {
1379 let self_announce = FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] };
1380 let _ = self.announce(hash, &self_announce, self.seeders_router.clone()).await;
1381 }
1382
1383 notify_event!(self, DownloadCompleted, resource);
1385
1386 Ok(())
1387 }
1388
1389 async fn write_scraps(
1390 &self,
1391 chunked: &mut ChunkedStorage,
1392 chunk_hashes: &HashSet<blake3::Hash>,
1393 ) -> Result<()> {
1394 let mut scraps = HashMap::new();
1396 for chunk_hash in chunk_hashes {
1398 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
1399 if scrap.is_none() {
1400 continue;
1401 }
1402
1403 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
1405 if scrap.is_err() {
1406 continue;
1407 }
1408 let scrap: Scrap = scrap.unwrap();
1409
1410 scraps.insert(chunk_hash, scrap);
1412 }
1413
1414 if !scraps.is_empty() {
1416 info!(target: "fud::write_scraps()", "Writing {} scraps...", scraps.len());
1417 }
1418 for (scrap_hash, mut scrap) in scraps {
1419 let len = scrap.chunk.len();
1420 let write_res = self.geode.write_chunk(chunked, scrap.chunk.clone()).await;
1421 if let Err(e) = write_res {
1422 error!(target: "fud::write_scraps()", "Error rewriting scrap {}: {e}", hash_to_string(scrap_hash));
1423 continue;
1424 }
1425 let (_, chunk_bytes_written) = write_res.unwrap();
1426
1427 if chunk_bytes_written == len {
1429 self.scrap_tree.remove(scrap_hash.as_bytes())?;
1430 continue;
1431 }
1432 let chunk_res = self.geode.get_chunk(chunked, scrap_hash).await;
1434 if let Err(e) = chunk_res {
1435 error!(target: "fud::write_scraps()", "Failed to get scrap {}: {e}", hash_to_string(scrap_hash));
1436 continue;
1437 }
1438 scrap.hash_written = blake3::hash(&chunk_res.unwrap());
1439 if let Err(e) =
1440 self.scrap_tree.insert(scrap_hash.as_bytes(), serialize_async(&scrap).await)
1441 {
1442 error!(target: "fud::write_scraps()", "Failed to save chunk {} as a scrap after rewrite: {e}", hash_to_string(scrap_hash));
1443 }
1444 }
1445
1446 Ok(())
1447 }
1448
1449 pub async fn verify_chunks(
1456 &self,
1457 resource: &Resource,
1458 chunked: &mut ChunkedStorage,
1459 ) -> Result<(u64, u64)> {
1460 let chunks = chunked.get_chunks().clone();
1461 let mut bytes: HashMap<blake3::Hash, (usize, usize)> = HashMap::new();
1462
1463 for (chunk_index, (chunk_hash, _)) in chunks.iter().enumerate() {
1465 let chunk =
1467 match self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await {
1468 Ok(c) => c,
1469 Err(Error::Io(ErrorKind::NotFound)) => continue,
1470 Err(e) => {
1471 warn!(target: "fud::verify_chunks()", "Error while verifying chunks: {e}");
1472 break
1473 }
1474 };
1475
1476 if self.geode.verify_chunk(chunk_hash, &chunk) {
1478 chunked.get_chunk_mut(chunk_index).1 = true;
1479 bytes.insert(
1480 *chunk_hash,
1481 (chunk.len(), resource.get_selected_bytes(chunked, &chunk)),
1482 );
1483 }
1484 }
1485
1486 let chunks = chunked.get_chunks().clone();
1488 let missing_on_fs: Vec<_> =
1489 chunks.iter().enumerate().filter(|(_, (_, available))| !available).collect();
1490
1491 for (chunk_index, (chunk_hash, _)) in missing_on_fs {
1493 let scrap = self.scrap_tree.get(chunk_hash.as_bytes())?;
1494 if scrap.is_none() {
1495 continue;
1496 }
1497
1498 let scrap = deserialize_async(scrap.unwrap().as_ref()).await;
1500 if scrap.is_err() {
1501 continue;
1502 }
1503 let scrap: Scrap = scrap.unwrap();
1504 if blake3::hash(&scrap.chunk) != *chunk_hash {
1505 continue;
1506 }
1507
1508 let scrap_chunk =
1510 self.geode.read_chunk(&mut chunked.get_fileseq_mut(), &chunk_index).await;
1511 if scrap_chunk.is_err() {
1512 continue;
1513 }
1514 let scrap_chunk = scrap_chunk.unwrap();
1515
1516 if !self.geode.verify_chunk(&scrap.hash_written, &scrap_chunk) {
1518 continue;
1519 }
1520
1521 chunked.get_chunk_mut(chunk_index).1 = true;
1523
1524 bytes.insert(
1526 *chunk_hash,
1527 (scrap.chunk.len(), resource.get_selected_bytes(chunked, &scrap.chunk)),
1528 );
1529 }
1530
1531 if let Some((last_chunk_hash, last_chunk_available)) = chunked.iter().last() {
1535 if !chunked.is_dir() && *last_chunk_available {
1536 if let Some((last_chunk_size, _)) = bytes.get(last_chunk_hash) {
1537 let exact_file_size =
1538 chunked.len() * MAX_CHUNK_SIZE - (MAX_CHUNK_SIZE - last_chunk_size);
1539 chunked.get_fileseq_mut().set_file_size(0, exact_file_size as u64);
1540 }
1541 }
1542 }
1543
1544 let total_bytes_downloaded = bytes.iter().map(|(_, (b, _))| b).sum::<usize>() as u64;
1545 let target_bytes_downloaded = bytes.iter().map(|(_, (_, b))| b).sum::<usize>() as u64;
1546
1547 Ok((total_bytes_downloaded, target_bytes_downloaded))
1548 }
1549
1550 pub async fn put(&self, path: &Path) -> Result<()> {
1552 let put_tasks = self.put_tasks.read().await;
1553 drop(put_tasks);
1554
1555 self.put_tx.send(path.to_path_buf()).await?;
1556
1557 Ok(())
1558 }
1559
1560 pub async fn insert_resource(&self, path: &PathBuf) -> Result<()> {
1563 let self_node = self.node().await;
1564
1565 if self_node.addresses.is_empty() {
1566 return Err(Error::Custom(
1567 "Cannot put resource, you don't have any external address".to_string(),
1568 ))
1569 }
1570
1571 let metadata = fs::metadata(path).await?;
1572
1573 let (files, resource_type) = if metadata.is_file() {
1575 (vec![(path.clone(), metadata.len())], ResourceType::File)
1576 } else if metadata.is_dir() {
1577 let mut files = get_all_files(path).await?;
1578 self.geode.sort_files(&mut files);
1579 (files, ResourceType::Directory)
1580 } else {
1581 return Err(Error::Custom(format!("{} is not a valid path", path.to_string_lossy())))
1582 };
1583
1584 let stream = FileSequence::new(&files, false);
1586 let total_size = stream.len();
1587 let (mut hasher, chunk_hashes) = self.geode.chunk_stream(stream).await?;
1588
1589 let relative_files = if let ResourceType::Directory = resource_type {
1591 let relative_files = files
1593 .into_iter()
1594 .map(|(file_path, size)| match file_path.strip_prefix(path) {
1595 Ok(rel_path) => Ok((rel_path.to_path_buf(), size)),
1596 Err(_) => Err(Error::Custom("Invalid file path".to_string())),
1597 })
1598 .collect::<Result<Vec<_>>>()?;
1599
1600 self.geode.hash_files_metadata(&mut hasher, &relative_files);
1602
1603 relative_files
1604 } else {
1605 vec![]
1606 };
1607
1608 let hash = hasher.finalize();
1610
1611 if let Err(e) = self.geode.insert_metadata(&hash, &chunk_hashes, &relative_files).await {
1613 error!(target: "fud::put()", "Failed inserting {path:?} to geode: {e}");
1614 return Err(e)
1615 }
1616
1617 if let Err(e) =
1619 self.path_tree.insert(hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
1620 {
1621 error!(target: "fud::put()", "Failed inserting new resource into sled: {e}");
1622 return Err(e.into())
1623 }
1624
1625 let mut resources_write = self.resources.write().await;
1627 resources_write.insert(
1628 hash,
1629 Resource {
1630 hash,
1631 rtype: resource_type,
1632 path: path.to_path_buf(),
1633 status: ResourceStatus::Seeding,
1634 file_selection: FileSelection::All,
1635 total_chunks_count: chunk_hashes.len() as u64,
1636 target_chunks_count: chunk_hashes.len() as u64,
1637 total_chunks_downloaded: chunk_hashes.len() as u64,
1638 target_chunks_downloaded: chunk_hashes.len() as u64,
1639 total_bytes_size: total_size,
1640 target_bytes_size: total_size,
1641 total_bytes_downloaded: total_size,
1642 target_bytes_downloaded: total_size,
1643 speeds: vec![],
1644 },
1645 );
1646 drop(resources_write);
1647
1648 let fud_announce = FudAnnounce { key: hash, seeders: vec![self_node.into()] };
1650 let _ = self.announce(&hash, &fud_announce, self.seeders_router.clone()).await;
1651
1652 notify_event!(self, InsertCompleted, {
1654 hash,
1655 path: path.to_path_buf()
1656 });
1657
1658 Ok(())
1659 }
1660
1661 pub async fn remove(&self, hash: &blake3::Hash) {
1670 let mut resources_write = self.resources.write().await;
1672 resources_write.remove(hash);
1673 drop(resources_write);
1674
1675 if let Ok(Some(path)) = self.hash_to_path(hash) {
1677 let chunked = self.geode.get(hash, &path).await;
1678
1679 if let Ok(chunked) = chunked {
1680 for (chunk_hash, _) in chunked.iter() {
1681 let _ = self.scrap_tree.remove(chunk_hash.as_bytes());
1682 }
1683 }
1684 }
1685
1686 let hash_str = hash_to_string(hash);
1688 let _ = fs::remove_file(self.geode.files_path.join(&hash_str)).await;
1689 let _ = fs::remove_file(self.geode.dirs_path.join(&hash_str)).await;
1690
1691 let _ = self.path_tree.remove(hash.as_bytes());
1693
1694 let _ = self.file_selection_tree.remove(hash.as_bytes());
1696
1697 notify_event!(self, ResourceRemoved, { hash: *hash });
1699 }
1700
1701 pub async fn stop(&self) {
1703 info!("Stopping fetch tasks...");
1704 let fetch_tasks = self.fetch_tasks.read().await;
1706 let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
1707 fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
1708 drop(fetch_tasks);
1709
1710 for task in cloned_fetch_tasks.values() {
1712 task.stop().await;
1713 }
1714
1715 info!("Stopping put tasks...");
1716 let put_tasks = self.put_tasks.read().await;
1718 let cloned_put_tasks: HashMap<PathBuf, Arc<StoppableTask>> =
1719 put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect();
1720 drop(put_tasks);
1721
1722 for task in cloned_put_tasks.values() {
1724 task.stop().await;
1725 }
1726
1727 let mut tasks = self.tasks.write().await;
1729 for (name, task) in tasks.clone() {
1730 info!("Stopping {name} task...");
1731 task.stop().await;
1732 }
1733 *tasks = HashMap::new();
1734 }
1735}