fud/
download.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::HashSet,
21    path::{Path, PathBuf},
22    time::Instant,
23};
24
25use futures::{future::FutureExt, pin_mut, select};
26use log::{error, info, warn};
27use rand::{
28    prelude::{IteratorRandom, SliceRandom},
29    rngs::OsRng,
30};
31
32use darkfi::{
33    dht::DhtNode,
34    geode::{hash_to_string, ChunkedStorage},
35    net::ChannelPtr,
36    system::Subscription,
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::{
42    event::{self, notify_event, FudEvent},
43    proto::{FudChunkReply, FudDirectoryReply, FudFileReply, FudFindRequest, FudNotFound},
44    util::create_all_files,
45    Fud, FudSeeder, ResourceStatus, ResourceType, Scrap,
46};
47
48/// Receive seeders from a subscription, and execute an async expression for
49/// each deduplicated seeder once (seeder order is random).
50/// It will keep going until the expression returns `Ok(())`, or there are
51/// no more seeders.
52/// It has an optional `favored_seeder` argument that will be tried first if
53/// specified.
54macro_rules! seeders_loop {
55    ($seeders_sub:expr, $favored_seeder:expr, $code:expr) => {
56        let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
57        let mut is_done = false;
58
59        // Try favored seeder
60        let favored_seeder: Option<FudSeeder> = $favored_seeder;
61        if let Some(seeder) = favored_seeder {
62            queried_seeders.insert(seeder.node.id());
63            if $code(seeder).await.is_ok() {
64                is_done = true;
65            }
66        }
67
68        // Try other seeders using the subscription
69        while !is_done {
70            let rep = $seeders_sub.receive().await;
71            if rep.is_none() {
72                break; // None means the lookup is done
73            }
74            let seeders = rep.unwrap().clone();
75            let mut shuffled_seeders = {
76                let mut vec: Vec<_> = seeders.iter().cloned().collect();
77                vec.shuffle(&mut OsRng);
78                vec
79            };
80            // Loop over seeders
81            while let Some(seeder) = shuffled_seeders.pop() {
82                // Only use a seeder once
83                if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
84                    continue;
85                }
86                queried_seeders.insert(seeder.node.id());
87
88                if $code(seeder).await.is_err() {
89                    continue;
90                }
91
92                is_done = true;
93                break;
94            }
95        }
96    };
97    ($seeders_sub:expr, $code:expr) => {
98        seeders_loop!($seeders_sub, None, $code)
99    };
100}
101
102enum ChunkFetchControl {
103    NextChunk,
104    NextSeeder,
105    Abort,
106}
107
108struct ChunkFetchContext<'a> {
109    fud: &'a Fud,
110    hash: &'a blake3::Hash,
111    chunked: &'a mut ChunkedStorage,
112    chunks: &'a mut HashSet<blake3::Hash>,
113}
114
115/// Fetch `chunks` for `chunked` (file or directory) from seeders in `seeders_sub`.
116pub async fn fetch_chunks(
117    fud: &Fud,
118    hash: &blake3::Hash,
119    chunked: &mut ChunkedStorage,
120    seeders_sub: &Subscription<Option<Vec<FudSeeder>>>,
121    favored_seeder: Option<FudSeeder>,
122    chunks: &mut HashSet<blake3::Hash>,
123) -> Result<()> {
124    let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks };
125
126    seeders_loop!(seeders_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> {
127        let channel = match fud.dht.get_channel(&seeder.node, Some(*hash)).await {
128            Ok(channel) => channel,
129            Err(e) => {
130                warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
131                return Err(e)
132            }
133        };
134        let mut chunks_to_query = ctx.chunks.clone();
135        info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
136
137        loop {
138            // Loop over chunks
139            match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await {
140                ChunkFetchControl::NextChunk => continue,
141                ChunkFetchControl::NextSeeder => break,
142                ChunkFetchControl::Abort => {
143                    fud.dht.cleanup_channel(channel).await;
144                    return Ok(())
145                }
146            };
147        }
148
149        fud.dht.cleanup_channel(channel).await;
150
151        // Stop when there are no missing chunks
152        if ctx.chunks.is_empty() {
153            return Ok(())
154        }
155
156        Err(().into())
157    });
158
159    Ok(())
160}
161
162/// Fetch a single chunk and return what should be done next
163async fn fetch_chunk(
164    ctx: &mut ChunkFetchContext<'_>,
165    channel: &ChannelPtr,
166    seeder: &FudSeeder,
167    chunks_to_query: &mut HashSet<blake3::Hash>,
168) -> ChunkFetchControl {
169    // Select a chunk to request
170    let mut chunk = None;
171    if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
172        chunk = Some(*random_chunk);
173    }
174
175    if chunk.is_none() {
176        // No more chunks to request from this seeder
177        return ChunkFetchControl::NextSeeder;
178    }
179
180    let chunk_hash = chunk.unwrap();
181    chunks_to_query.remove(&chunk_hash);
182
183    let start_time = Instant::now();
184    let msg_subsystem = channel.message_subsystem();
185    msg_subsystem.add_dispatch::<FudChunkReply>().await;
186    msg_subsystem.add_dispatch::<FudNotFound>().await;
187    let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
188    let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
189
190    let send_res = channel.send(&FudFindRequest { info: Some(*ctx.hash), key: chunk_hash }).await;
191    if let Err(e) = send_res {
192        warn!(target: "fud::download::fetch_chunk()", "Error while sending FudFindRequest: {e}");
193        return ChunkFetchControl::NextSeeder;
194    }
195
196    let chunk_recv = msg_subscriber_chunk.receive_with_timeout(ctx.fud.chunk_timeout).fuse();
197    let notfound_recv = msg_subscriber_notfound.receive_with_timeout(ctx.fud.chunk_timeout).fuse();
198
199    pin_mut!(chunk_recv, notfound_recv);
200
201    // Wait for a FudChunkReply or FudNotFound
202    select! {
203        chunk_reply = chunk_recv => {
204            msg_subscriber_chunk.unsubscribe().await;
205            msg_subscriber_notfound.unsubscribe().await;
206            if let Err(e) = chunk_reply {
207                warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}");
208                return ChunkFetchControl::NextSeeder;
209            }
210            let reply = chunk_reply.unwrap();
211            handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await
212        }
213        notfound_reply = notfound_recv => {
214            msg_subscriber_chunk.unsubscribe().await;
215            msg_subscriber_notfound.unsubscribe().await;
216            if let Err(e) = notfound_reply {
217                warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}");
218                return ChunkFetchControl::NextSeeder;
219            }
220            info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
221            notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash });
222            ChunkFetchControl::NextChunk
223        }
224    }
225}
226
227/// Processes an incoming chunk
228async fn handle_chunk_reply(
229    ctx: &mut ChunkFetchContext<'_>,
230    chunk_hash: &blake3::Hash,
231    reply: &FudChunkReply,
232    seeder: &FudSeeder,
233    start_time: &Instant,
234) -> ChunkFetchControl {
235    let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await;
236    if let Err(e) = write_res {
237        error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash));
238        return ChunkFetchControl::NextChunk;
239    }
240    let (inserted_hash, bytes_written) = write_res.unwrap();
241    if inserted_hash != *chunk_hash {
242        warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk");
243        return ChunkFetchControl::NextChunk;
244    }
245
246    info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id()));
247
248    // If we did not write the whole chunk to the filesystem,
249    // save the chunk in the scraps.
250    if bytes_written < reply.chunk.len() {
251        info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash));
252        let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await;
253        if let Err(e) = chunk_written {
254            error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}");
255            return ChunkFetchControl::NextChunk;
256        }
257        let scrap = Scrap {
258            chunk: reply.chunk.clone(),
259            hash_written: blake3::hash(&chunk_written.unwrap()),
260        };
261        if let Err(e) =
262            ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await)
263        {
264            error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash));
265            return ChunkFetchControl::NextChunk;
266        }
267    }
268
269    // Update the resource
270    let mut resources_write = ctx.fud.resources.write().await;
271    let resource = resources_write.get_mut(ctx.hash);
272    if resource.is_none() {
273        return ChunkFetchControl::Abort // Resource was removed
274    }
275    let resource = resource.unwrap();
276    resource.status = ResourceStatus::Downloading;
277    resource.total_chunks_downloaded += 1;
278    resource.target_chunks_downloaded += 1;
279
280    resource.total_bytes_downloaded += reply.chunk.len() as u64;
281    resource.target_bytes_downloaded +=
282        resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64;
283    resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
284    if resource.speeds.len() > 12 {
285        resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last few speeds
286    }
287
288    // If we just fetched the last chunk of a file, compute
289    // `total_bytes_size` (and `target_bytes_size`) again,
290    // as `geode.write_chunk()` updated the FileSequence
291    // to the exact file size.
292    if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() {
293        if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash {
294            resource.total_bytes_size = ctx.chunked.get_fileseq().len();
295            resource.target_bytes_size = resource.total_bytes_size;
296        }
297    }
298    let resource = resource.clone();
299    drop(resources_write);
300
301    notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource });
302    ctx.chunks.remove(chunk_hash);
303    ChunkFetchControl::NextChunk
304}
305
306enum MetadataFetchReply {
307    Directory(FudDirectoryReply),
308    File(FudFileReply),
309    Chunk(FudChunkReply),
310}
311
312/// Fetch a single resource metadata from seeders received from `seeders_sub`.
313/// If the resource is a file smaller than a single chunk then seeder can send the
314/// chunk directly, and we will create the file from it on path `path`.
315/// 1. Wait for seeders from the subscription
316/// 2. Request the metadata from the seeders
317/// 3. Insert metadata to geode using the reply
318pub async fn fetch_metadata(
319    fud: &Fud,
320    hash: &blake3::Hash,
321    seeders_sub: &Subscription<Option<Vec<FudSeeder>>>,
322    path: &Path,
323) -> Result<FudSeeder> {
324    let mut result: Option<(FudSeeder, MetadataFetchReply)> = None;
325
326    seeders_loop!(seeders_sub, async |seeder: FudSeeder| -> Result<()> {
327        let channel = fud.dht.get_channel(&seeder.node, Some(*hash)).await?;
328        let msg_subsystem = channel.message_subsystem();
329        msg_subsystem.add_dispatch::<FudChunkReply>().await;
330        msg_subsystem.add_dispatch::<FudFileReply>().await;
331        msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
332        msg_subsystem.add_dispatch::<FudNotFound>().await;
333        let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
334        let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
335        let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
336        let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
337
338        let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await;
339        if let Err(e) = send_res {
340            warn!(target: "fud::download::fetch_metadata()", "Error while sending FudFindRequest: {e}");
341            msg_subscriber_chunk.unsubscribe().await;
342            msg_subscriber_file.unsubscribe().await;
343            msg_subscriber_dir.unsubscribe().await;
344            msg_subscriber_notfound.unsubscribe().await;
345            fud.dht.cleanup_channel(channel).await;
346            return Err(e)
347        }
348
349        let chunk_recv = msg_subscriber_chunk.receive_with_timeout(fud.chunk_timeout).fuse();
350        let file_recv = msg_subscriber_file.receive_with_timeout(fud.chunk_timeout).fuse();
351        let dir_recv = msg_subscriber_dir.receive_with_timeout(fud.chunk_timeout).fuse();
352        let notfound_recv = msg_subscriber_notfound.receive_with_timeout(fud.chunk_timeout).fuse();
353
354        pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
355
356        let cleanup = async || {
357            msg_subscriber_chunk.unsubscribe().await;
358            msg_subscriber_file.unsubscribe().await;
359            msg_subscriber_dir.unsubscribe().await;
360            msg_subscriber_notfound.unsubscribe().await;
361            fud.dht.cleanup_channel(channel).await;
362        };
363
364        // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound
365        select! {
366            // Received a chunk while requesting metadata, this is allowed to
367            // optimize fetching files smaller than a single chunk
368            chunk_reply = chunk_recv => {
369                cleanup().await;
370                if let Err(e) = chunk_reply {
371                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}");
372                    return Err(e)
373                }
374                let reply = chunk_reply.unwrap();
375                let chunk_hash = blake3::hash(&reply.chunk);
376                // Check that this is the only chunk in the file
377                if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) {
378                    warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash");
379                    return Err(().into())
380                }
381                info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
382                result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone())));
383                Ok(())
384            }
385            file_reply = file_recv => {
386                cleanup().await;
387                if let Err(e) = file_reply {
388                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}");
389                    return Err(e)
390                }
391                let reply = file_reply.unwrap();
392                if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
393                    warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata");
394                    return Err(().into())
395                }
396                info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
397                result = Some((seeder, MetadataFetchReply::File((*reply).clone())));
398                Ok(())
399            }
400            dir_reply = dir_recv => {
401                cleanup().await;
402                if let Err(e) = dir_reply {
403                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}");
404                    return Err(e)
405                }
406                let reply = dir_reply.unwrap();
407
408                // Convert all file paths from String to PathBuf
409                let files: Vec<_> = reply.files.clone().into_iter()
410                    .map(|(path_str, size)| (PathBuf::from(path_str), size))
411                    .collect();
412
413                if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
414                    warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata");
415                    return Err(().into())
416                }
417                info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
418                result = Some((seeder, MetadataFetchReply::Directory((*reply).clone())));
419                Ok(())
420            }
421            notfound_reply = notfound_recv => {
422                cleanup().await;
423                if let Err(e) = notfound_reply {
424                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
425                    return Err(e)
426                }
427                info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
428                Err(().into())
429            }
430        }
431    });
432
433    // We did not find the resource
434    if result.is_none() {
435        return Err(Error::GeodeFileRouteNotFound)
436    }
437
438    // Insert metadata to geode using the reply
439    // At this point the reply content is already verified
440    let (seeder, reply) = result.unwrap();
441    match reply {
442        MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
443            // Convert all file paths from String to PathBuf
444            let mut files: Vec<_> =
445                files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect();
446
447            fud.geode.sort_files(&mut files);
448            if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await {
449                error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
450                return Err(e)
451            }
452        }
453        MetadataFetchReply::File(FudFileReply { chunk_hashes }) => {
454            if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
455                error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
456                return Err(e)
457            }
458        }
459        // Looked for a file but got a chunk: the entire file fits in a single chunk
460        MetadataFetchReply::Chunk(FudChunkReply { chunk }) => {
461            info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk");
462            let chunk_hash = blake3::hash(&chunk);
463            if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await {
464                error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash));
465                return Err(e)
466            }
467            create_all_files(&[path.to_path_buf()]).await?;
468            let mut chunked_file = ChunkedStorage::new(
469                &[chunk_hash],
470                &[(path.to_path_buf(), chunk.len() as u64)],
471                false,
472            );
473            if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
474                error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
475                return Err(e)
476            };
477        }
478    };
479
480    Ok(seeder)
481}