1use std::{
20 collections::HashSet,
21 path::{Path, PathBuf},
22 time::Instant,
23};
24
25use futures::{future::FutureExt, pin_mut, select};
26use log::{error, info, warn};
27use rand::{
28 prelude::{IteratorRandom, SliceRandom},
29 rngs::OsRng,
30};
31
32use darkfi::{
33 dht::DhtNode,
34 geode::{hash_to_string, ChunkedStorage},
35 net::ChannelPtr,
36 system::Subscription,
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::{
42 event::{self, notify_event, FudEvent},
43 proto::{FudChunkReply, FudDirectoryReply, FudFileReply, FudFindRequest, FudNotFound},
44 util::create_all_files,
45 Fud, FudSeeder, ResourceStatus, ResourceType, Scrap,
46};
47
48macro_rules! seeders_loop {
55 ($seeders_sub:expr, $favored_seeder:expr, $code:expr) => {
56 let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
57 let mut is_done = false;
58
59 let favored_seeder: Option<FudSeeder> = $favored_seeder;
61 if let Some(seeder) = favored_seeder {
62 queried_seeders.insert(seeder.node.id());
63 if $code(seeder).await.is_ok() {
64 is_done = true;
65 }
66 }
67
68 while !is_done {
70 let rep = $seeders_sub.receive().await;
71 if rep.is_none() {
72 break; }
74 let seeders = rep.unwrap().clone();
75 let mut shuffled_seeders = {
76 let mut vec: Vec<_> = seeders.iter().cloned().collect();
77 vec.shuffle(&mut OsRng);
78 vec
79 };
80 while let Some(seeder) = shuffled_seeders.pop() {
82 if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
84 continue;
85 }
86 queried_seeders.insert(seeder.node.id());
87
88 if $code(seeder).await.is_err() {
89 continue;
90 }
91
92 is_done = true;
93 break;
94 }
95 }
96 };
97 ($seeders_sub:expr, $code:expr) => {
98 seeders_loop!($seeders_sub, None, $code)
99 };
100}
101
102enum ChunkFetchControl {
103 NextChunk,
104 NextSeeder,
105 Abort,
106}
107
108struct ChunkFetchContext<'a> {
109 fud: &'a Fud,
110 hash: &'a blake3::Hash,
111 chunked: &'a mut ChunkedStorage,
112 chunks: &'a mut HashSet<blake3::Hash>,
113}
114
115pub async fn fetch_chunks(
117 fud: &Fud,
118 hash: &blake3::Hash,
119 chunked: &mut ChunkedStorage,
120 seeders_sub: &Subscription<Option<Vec<FudSeeder>>>,
121 favored_seeder: Option<FudSeeder>,
122 chunks: &mut HashSet<blake3::Hash>,
123) -> Result<()> {
124 let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks };
125
126 seeders_loop!(seeders_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> {
127 let channel = match fud.dht.get_channel(&seeder.node, Some(*hash)).await {
128 Ok(channel) => channel,
129 Err(e) => {
130 warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
131 return Err(e)
132 }
133 };
134 let mut chunks_to_query = ctx.chunks.clone();
135 info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
136
137 loop {
138 match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await {
140 ChunkFetchControl::NextChunk => continue,
141 ChunkFetchControl::NextSeeder => break,
142 ChunkFetchControl::Abort => {
143 fud.dht.cleanup_channel(channel).await;
144 return Ok(())
145 }
146 };
147 }
148
149 fud.dht.cleanup_channel(channel).await;
150
151 if ctx.chunks.is_empty() {
153 return Ok(())
154 }
155
156 Err(().into())
157 });
158
159 Ok(())
160}
161
162async fn fetch_chunk(
164 ctx: &mut ChunkFetchContext<'_>,
165 channel: &ChannelPtr,
166 seeder: &FudSeeder,
167 chunks_to_query: &mut HashSet<blake3::Hash>,
168) -> ChunkFetchControl {
169 let mut chunk = None;
171 if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
172 chunk = Some(*random_chunk);
173 }
174
175 if chunk.is_none() {
176 return ChunkFetchControl::NextSeeder;
178 }
179
180 let chunk_hash = chunk.unwrap();
181 chunks_to_query.remove(&chunk_hash);
182
183 let start_time = Instant::now();
184 let msg_subsystem = channel.message_subsystem();
185 msg_subsystem.add_dispatch::<FudChunkReply>().await;
186 msg_subsystem.add_dispatch::<FudNotFound>().await;
187 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
188 let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
189
190 let send_res = channel.send(&FudFindRequest { info: Some(*ctx.hash), key: chunk_hash }).await;
191 if let Err(e) = send_res {
192 warn!(target: "fud::download::fetch_chunk()", "Error while sending FudFindRequest: {e}");
193 return ChunkFetchControl::NextSeeder;
194 }
195
196 let chunk_recv = msg_subscriber_chunk.receive_with_timeout(ctx.fud.chunk_timeout).fuse();
197 let notfound_recv = msg_subscriber_notfound.receive_with_timeout(ctx.fud.chunk_timeout).fuse();
198
199 pin_mut!(chunk_recv, notfound_recv);
200
201 select! {
203 chunk_reply = chunk_recv => {
204 msg_subscriber_chunk.unsubscribe().await;
205 msg_subscriber_notfound.unsubscribe().await;
206 if let Err(e) = chunk_reply {
207 warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}");
208 return ChunkFetchControl::NextSeeder;
209 }
210 let reply = chunk_reply.unwrap();
211 handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await
212 }
213 notfound_reply = notfound_recv => {
214 msg_subscriber_chunk.unsubscribe().await;
215 msg_subscriber_notfound.unsubscribe().await;
216 if let Err(e) = notfound_reply {
217 warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}");
218 return ChunkFetchControl::NextSeeder;
219 }
220 info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
221 notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash });
222 ChunkFetchControl::NextChunk
223 }
224 }
225}
226
227async fn handle_chunk_reply(
229 ctx: &mut ChunkFetchContext<'_>,
230 chunk_hash: &blake3::Hash,
231 reply: &FudChunkReply,
232 seeder: &FudSeeder,
233 start_time: &Instant,
234) -> ChunkFetchControl {
235 let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await;
236 if let Err(e) = write_res {
237 error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash));
238 return ChunkFetchControl::NextChunk;
239 }
240 let (inserted_hash, bytes_written) = write_res.unwrap();
241 if inserted_hash != *chunk_hash {
242 warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk");
243 return ChunkFetchControl::NextChunk;
244 }
245
246 info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id()));
247
248 if bytes_written < reply.chunk.len() {
251 info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash));
252 let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await;
253 if let Err(e) = chunk_written {
254 error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}");
255 return ChunkFetchControl::NextChunk;
256 }
257 let scrap = Scrap {
258 chunk: reply.chunk.clone(),
259 hash_written: blake3::hash(&chunk_written.unwrap()),
260 };
261 if let Err(e) =
262 ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await)
263 {
264 error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash));
265 return ChunkFetchControl::NextChunk;
266 }
267 }
268
269 let mut resources_write = ctx.fud.resources.write().await;
271 let resource = resources_write.get_mut(ctx.hash);
272 if resource.is_none() {
273 return ChunkFetchControl::Abort }
275 let resource = resource.unwrap();
276 resource.status = ResourceStatus::Downloading;
277 resource.total_chunks_downloaded += 1;
278 resource.target_chunks_downloaded += 1;
279
280 resource.total_bytes_downloaded += reply.chunk.len() as u64;
281 resource.target_bytes_downloaded +=
282 resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64;
283 resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
284 if resource.speeds.len() > 12 {
285 resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); }
287
288 if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() {
293 if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash {
294 resource.total_bytes_size = ctx.chunked.get_fileseq().len();
295 resource.target_bytes_size = resource.total_bytes_size;
296 }
297 }
298 let resource = resource.clone();
299 drop(resources_write);
300
301 notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource });
302 ctx.chunks.remove(chunk_hash);
303 ChunkFetchControl::NextChunk
304}
305
306enum MetadataFetchReply {
307 Directory(FudDirectoryReply),
308 File(FudFileReply),
309 Chunk(FudChunkReply),
310}
311
312pub async fn fetch_metadata(
319 fud: &Fud,
320 hash: &blake3::Hash,
321 seeders_sub: &Subscription<Option<Vec<FudSeeder>>>,
322 path: &Path,
323) -> Result<FudSeeder> {
324 let mut result: Option<(FudSeeder, MetadataFetchReply)> = None;
325
326 seeders_loop!(seeders_sub, async |seeder: FudSeeder| -> Result<()> {
327 let channel = fud.dht.get_channel(&seeder.node, Some(*hash)).await?;
328 let msg_subsystem = channel.message_subsystem();
329 msg_subsystem.add_dispatch::<FudChunkReply>().await;
330 msg_subsystem.add_dispatch::<FudFileReply>().await;
331 msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
332 msg_subsystem.add_dispatch::<FudNotFound>().await;
333 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
334 let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
335 let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
336 let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
337
338 let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await;
339 if let Err(e) = send_res {
340 warn!(target: "fud::download::fetch_metadata()", "Error while sending FudFindRequest: {e}");
341 msg_subscriber_chunk.unsubscribe().await;
342 msg_subscriber_file.unsubscribe().await;
343 msg_subscriber_dir.unsubscribe().await;
344 msg_subscriber_notfound.unsubscribe().await;
345 fud.dht.cleanup_channel(channel).await;
346 return Err(e)
347 }
348
349 let chunk_recv = msg_subscriber_chunk.receive_with_timeout(fud.chunk_timeout).fuse();
350 let file_recv = msg_subscriber_file.receive_with_timeout(fud.chunk_timeout).fuse();
351 let dir_recv = msg_subscriber_dir.receive_with_timeout(fud.chunk_timeout).fuse();
352 let notfound_recv = msg_subscriber_notfound.receive_with_timeout(fud.chunk_timeout).fuse();
353
354 pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
355
356 let cleanup = async || {
357 msg_subscriber_chunk.unsubscribe().await;
358 msg_subscriber_file.unsubscribe().await;
359 msg_subscriber_dir.unsubscribe().await;
360 msg_subscriber_notfound.unsubscribe().await;
361 fud.dht.cleanup_channel(channel).await;
362 };
363
364 select! {
366 chunk_reply = chunk_recv => {
369 cleanup().await;
370 if let Err(e) = chunk_reply {
371 warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}");
372 return Err(e)
373 }
374 let reply = chunk_reply.unwrap();
375 let chunk_hash = blake3::hash(&reply.chunk);
376 if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) {
378 warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash");
379 return Err(().into())
380 }
381 info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
382 result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone())));
383 Ok(())
384 }
385 file_reply = file_recv => {
386 cleanup().await;
387 if let Err(e) = file_reply {
388 warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}");
389 return Err(e)
390 }
391 let reply = file_reply.unwrap();
392 if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
393 warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata");
394 return Err(().into())
395 }
396 info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
397 result = Some((seeder, MetadataFetchReply::File((*reply).clone())));
398 Ok(())
399 }
400 dir_reply = dir_recv => {
401 cleanup().await;
402 if let Err(e) = dir_reply {
403 warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}");
404 return Err(e)
405 }
406 let reply = dir_reply.unwrap();
407
408 let files: Vec<_> = reply.files.clone().into_iter()
410 .map(|(path_str, size)| (PathBuf::from(path_str), size))
411 .collect();
412
413 if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
414 warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata");
415 return Err(().into())
416 }
417 info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
418 result = Some((seeder, MetadataFetchReply::Directory((*reply).clone())));
419 Ok(())
420 }
421 notfound_reply = notfound_recv => {
422 cleanup().await;
423 if let Err(e) = notfound_reply {
424 warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
425 return Err(e)
426 }
427 info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
428 Err(().into())
429 }
430 }
431 });
432
433 if result.is_none() {
435 return Err(Error::GeodeFileRouteNotFound)
436 }
437
438 let (seeder, reply) = result.unwrap();
441 match reply {
442 MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
443 let mut files: Vec<_> =
445 files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect();
446
447 fud.geode.sort_files(&mut files);
448 if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await {
449 error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
450 return Err(e)
451 }
452 }
453 MetadataFetchReply::File(FudFileReply { chunk_hashes }) => {
454 if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
455 error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
456 return Err(e)
457 }
458 }
459 MetadataFetchReply::Chunk(FudChunkReply { chunk }) => {
461 info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk");
462 let chunk_hash = blake3::hash(&chunk);
463 if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await {
464 error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash));
465 return Err(e)
466 }
467 create_all_files(&[path.to_path_buf()]).await?;
468 let mut chunked_file = ChunkedStorage::new(
469 &[chunk_hash],
470 &[(path.to_path_buf(), chunk.len() as u64)],
471 false,
472 );
473 if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
474 error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
475 return Err(e)
476 };
477 }
478 };
479
480 Ok(seeder)
481}