1use std::{
20 collections::HashSet,
21 path::{Path, PathBuf},
22 time::Instant,
23};
24
25use futures::{future::FutureExt, pin_mut, select};
26use rand::{
27 prelude::{IteratorRandom, SliceRandom},
28 rngs::OsRng,
29};
30use tracing::{error, info, warn};
31
32use darkfi::{
33 dht::{event::DhtEvent, DhtHandler, DhtNode},
34 geode::{hash_to_string, ChunkedStorage},
35 net::ChannelPtr,
36 system::Subscription,
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::{
42 event::{self, notify_event, FudEvent},
43 proto::{
44 FudChunkNotFound, FudChunkReply, FudChunkRequest, FudDirectoryReply, FudFileReply,
45 FudMetadataNotFound, FudMetadataRequest,
46 },
47 util::{create_all_files, receive_resource_msg},
48 Fud, FudSeeder, ResourceStatus, ResourceType, Scrap,
49};
50
51type FudDhtEvent = DhtEvent<<Fud as DhtHandler>::Node, <Fud as DhtHandler>::Value>;
52
53macro_rules! seeders_loop {
60 ($key:expr, $fud:expr, $dht_sub:expr, $favored_seeder:expr, $code:expr) => {
61 let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
62 let mut is_done = false;
63
64 let favored_seeder: Option<FudSeeder> = $favored_seeder;
66 if let Some(seeder) = favored_seeder {
67 queried_seeders.insert(seeder.node.id());
68 if $code(seeder).await.is_ok() {
69 is_done = true;
70 }
71 }
72
73 while !is_done {
75 let event = $dht_sub.receive().await;
76 if event.key() != Some($key) {
77 continue }
79 if let DhtEvent::ValueLookupCompleted { .. } = event {
80 break }
82 if !matches!(event, DhtEvent::ValueFound { .. }) {
83 continue }
85 let seeders = event.into_value().unwrap();
86 let mut shuffled_seeders = {
87 let mut vec: Vec<_> = seeders.iter().cloned().collect();
88 vec.shuffle(&mut OsRng);
89 vec
90 };
91 while let Some(seeder) = shuffled_seeders.pop() {
93 if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
95 continue
96 }
97 queried_seeders.insert(seeder.node.id());
98
99 if $code(seeder).await.is_err() {
100 continue
101 }
102
103 is_done = true;
104 break
105 }
106 }
107 };
108 ($key:expr, $fud:expr, $dht_sub:expr, $code:expr) => {
109 seeders_loop!($key, $fud, $dht_sub, None, $code)
110 };
111}
112
113enum ChunkFetchControl {
114 NextChunk,
115 NextSeeder,
116 Abort,
117}
118
119struct ChunkFetchContext<'a> {
120 fud: &'a Fud,
121 hash: &'a blake3::Hash,
122 chunked: &'a mut ChunkedStorage,
123 chunks: &'a mut HashSet<blake3::Hash>,
124}
125
126pub async fn fetch_chunks(
128 fud: &Fud,
129 hash: &blake3::Hash,
130 chunked: &mut ChunkedStorage,
131 dht_sub: &Subscription<FudDhtEvent>,
132 favored_seeder: Option<FudSeeder>,
133 chunks: &mut HashSet<blake3::Hash>,
134) -> Result<()> {
135 let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks };
136
137 seeders_loop!(hash, fud, dht_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> {
138 let (channel, _) = match fud.dht.get_channel(&seeder.node).await {
139 Ok(channel) => channel,
140 Err(e) => {
141 warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
142 return Err(e)
143 }
144 };
145 let mut chunks_to_query = ctx.chunks.clone();
146 info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
147
148 loop {
149 match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await {
151 ChunkFetchControl::NextChunk => continue,
152 ChunkFetchControl::NextSeeder => break,
153 ChunkFetchControl::Abort => {
154 fud.dht.cleanup_channel(channel).await;
155 return Ok(())
156 }
157 };
158 }
159
160 fud.dht.cleanup_channel(channel).await;
161
162 if ctx.chunks.is_empty() {
164 return Ok(())
165 }
166
167 Err(().into())
168 });
169
170 Ok(())
171}
172
173async fn fetch_chunk(
175 ctx: &mut ChunkFetchContext<'_>,
176 channel: &ChannelPtr,
177 seeder: &FudSeeder,
178 chunks_to_query: &mut HashSet<blake3::Hash>,
179) -> ChunkFetchControl {
180 let mut chunk = None;
182 if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
183 chunk = Some(*random_chunk);
184 }
185
186 if chunk.is_none() {
187 return ChunkFetchControl::NextSeeder;
189 }
190
191 let chunk_hash = chunk.unwrap();
192 chunks_to_query.remove(&chunk_hash);
193
194 let start_time = Instant::now();
195 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
196 let msg_subscriber_notfound = channel.subscribe_msg::<FudChunkNotFound>().await.unwrap();
197
198 let send_res = channel.send(&FudChunkRequest { resource: *ctx.hash, chunk: chunk_hash }).await;
199 if let Err(e) = send_res {
200 warn!(target: "fud::download::fetch_chunk()", "Error while sending FudChunkRequest: {e}");
201 return ChunkFetchControl::NextSeeder;
202 }
203
204 let chunk_recv =
205 receive_resource_msg(&msg_subscriber_chunk, *ctx.hash, ctx.fud.chunk_timeout).fuse();
206 let notfound_recv =
207 receive_resource_msg(&msg_subscriber_notfound, *ctx.hash, ctx.fud.chunk_timeout).fuse();
208
209 pin_mut!(chunk_recv, notfound_recv);
210
211 select! {
213 chunk_reply = chunk_recv => {
214 msg_subscriber_chunk.unsubscribe().await;
215 msg_subscriber_notfound.unsubscribe().await;
216 if let Err(e) = chunk_reply {
217 warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}");
218 return ChunkFetchControl::NextSeeder;
219 }
220 let reply = chunk_reply.unwrap();
221 handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await
222 }
223 notfound_reply = notfound_recv => {
224 msg_subscriber_chunk.unsubscribe().await;
225 msg_subscriber_notfound.unsubscribe().await;
226 if let Err(e) = notfound_reply {
227 warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}");
228 return ChunkFetchControl::NextSeeder;
229 }
230 info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
231 notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash });
232 ChunkFetchControl::NextChunk
233 }
234 }
235}
236
237async fn handle_chunk_reply(
239 ctx: &mut ChunkFetchContext<'_>,
240 chunk_hash: &blake3::Hash,
241 reply: &FudChunkReply,
242 seeder: &FudSeeder,
243 start_time: &Instant,
244) -> ChunkFetchControl {
245 let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await;
246 if let Err(e) = write_res {
247 error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash));
248 return ChunkFetchControl::NextChunk;
249 }
250 let (inserted_hash, bytes_written) = write_res.unwrap();
251 if inserted_hash != *chunk_hash {
252 warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk");
253 return ChunkFetchControl::NextChunk;
254 }
255
256 info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id()));
257
258 if bytes_written < reply.chunk.len() {
261 info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash));
262 let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await;
263 if let Err(e) = chunk_written {
264 error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}");
265 return ChunkFetchControl::NextChunk;
266 }
267 let scrap = Scrap {
268 chunk: reply.chunk.clone(),
269 hash_written: blake3::hash(&chunk_written.unwrap()),
270 };
271 if let Err(e) =
272 ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await)
273 {
274 error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash));
275 return ChunkFetchControl::NextChunk;
276 }
277 }
278
279 let mut resources_write = ctx.fud.resources.write().await;
281 let resource = resources_write.get_mut(ctx.hash);
282 if resource.is_none() {
283 return ChunkFetchControl::Abort }
285 let resource = resource.unwrap();
286 resource.status = ResourceStatus::Downloading;
287 resource.total_chunks_downloaded += 1;
288 resource.target_chunks_downloaded += 1;
289
290 resource.total_bytes_downloaded += reply.chunk.len() as u64;
291 resource.target_bytes_downloaded +=
292 resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64;
293 resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
294 if resource.speeds.len() > 12 {
295 resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); }
297
298 if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() {
303 if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash {
304 resource.total_bytes_size = ctx.chunked.get_fileseq().len();
305 resource.target_bytes_size = resource.total_bytes_size;
306 }
307 }
308 let resource = resource.clone();
309 drop(resources_write);
310
311 notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource });
312 ctx.chunks.remove(chunk_hash);
313 ChunkFetchControl::NextChunk
314}
315
316enum MetadataFetchReply {
317 Directory(FudDirectoryReply),
318 File(FudFileReply),
319 Chunk(FudChunkReply),
320}
321
322pub async fn fetch_metadata(
329 fud: &Fud,
330 hash: &blake3::Hash,
331 path: &Path,
332 dht_sub: &Subscription<FudDhtEvent>,
333) -> Result<FudSeeder> {
334 let mut result: Option<(FudSeeder, MetadataFetchReply)> = None;
335
336 seeders_loop!(hash, fud, dht_sub, async |seeder: FudSeeder| -> Result<()> {
337 let (channel, _) = fud.dht.get_channel(&seeder.node).await?;
338 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
339 let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
340 let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
341 let msg_subscriber_notfound = channel.subscribe_msg::<FudMetadataNotFound>().await.unwrap();
342
343 let send_res = channel.send(&FudMetadataRequest { resource: *hash }).await;
344 if let Err(e) = send_res {
345 warn!(target: "fud::download::fetch_metadata()", "Error while sending FudMetadataRequest: {e}");
346 msg_subscriber_chunk.unsubscribe().await;
347 msg_subscriber_file.unsubscribe().await;
348 msg_subscriber_dir.unsubscribe().await;
349 msg_subscriber_notfound.unsubscribe().await;
350 fud.dht.cleanup_channel(channel).await;
351 return Err(e)
352 }
353
354 let chunk_recv =
355 receive_resource_msg(&msg_subscriber_chunk, *hash, fud.chunk_timeout).fuse();
356 let file_recv = receive_resource_msg(&msg_subscriber_file, *hash, fud.chunk_timeout).fuse();
357 let dir_recv = receive_resource_msg(&msg_subscriber_dir, *hash, fud.chunk_timeout).fuse();
358 let notfound_recv =
359 receive_resource_msg(&msg_subscriber_notfound, *hash, fud.chunk_timeout).fuse();
360
361 pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
362
363 let cleanup = async || {
364 msg_subscriber_chunk.unsubscribe().await;
365 msg_subscriber_file.unsubscribe().await;
366 msg_subscriber_dir.unsubscribe().await;
367 msg_subscriber_notfound.unsubscribe().await;
368 fud.dht.cleanup_channel(channel).await;
369 };
370
371 select! {
373 chunk_reply = chunk_recv => {
376 cleanup().await;
377 if let Err(e) = chunk_reply {
378 warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}");
379 return Err(e)
380 }
381 let reply = chunk_reply.unwrap();
382 let chunk_hash = blake3::hash(&reply.chunk);
383 if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) {
385 warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash");
386 return Err(().into())
387 }
388 info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
389 result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone())));
390 Ok(())
391 }
392 file_reply = file_recv => {
393 cleanup().await;
394 if let Err(e) = file_reply {
395 warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}");
396 return Err(e)
397 }
398 let reply = file_reply.unwrap();
399 if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
400 warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata");
401 return Err(().into())
402 }
403 info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
404 result = Some((seeder, MetadataFetchReply::File((*reply).clone())));
405 Ok(())
406 }
407 dir_reply = dir_recv => {
408 cleanup().await;
409 if let Err(e) = dir_reply {
410 warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}");
411 return Err(e)
412 }
413 let reply = dir_reply.unwrap();
414
415 let files: Vec<_> = reply.files.clone().into_iter()
417 .map(|(path_str, size)| (PathBuf::from(path_str), size))
418 .collect();
419
420 if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
421 warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata");
422 return Err(().into())
423 }
424 info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
425 result = Some((seeder, MetadataFetchReply::Directory((*reply).clone())));
426 Ok(())
427 }
428 notfound_reply = notfound_recv => {
429 cleanup().await;
430 if let Err(e) = notfound_reply {
431 warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
432 return Err(e)
433 }
434 info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
435 Err(().into())
436 }
437 }
438 });
439
440 if result.is_none() {
442 return Err(Error::GeodeFileRouteNotFound)
443 }
444
445 let (seeder, reply) = result.unwrap();
448 match reply {
449 MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes, .. }) => {
450 let mut files: Vec<_> =
452 files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect();
453
454 fud.geode.sort_files(&mut files);
455 if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await {
456 error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
457 return Err(e)
458 }
459 }
460 MetadataFetchReply::File(FudFileReply { chunk_hashes, .. }) => {
461 if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
462 error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
463 return Err(e)
464 }
465 }
466 MetadataFetchReply::Chunk(FudChunkReply { chunk, .. }) => {
468 info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk");
469 let chunk_hash = blake3::hash(&chunk);
470 if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await {
471 error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash));
472 return Err(e)
473 }
474 create_all_files(&[path.to_path_buf()]).await?;
475 let mut chunked_file = ChunkedStorage::new(
476 &[chunk_hash],
477 &[(path.to_path_buf(), chunk.len() as u64)],
478 false,
479 );
480 if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
481 error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
482 return Err(e)
483 };
484 }
485 };
486
487 Ok(seeder)
488}