fud/
download.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::HashSet,
21    path::{Path, PathBuf},
22    time::Instant,
23};
24
25use futures::{future::FutureExt, pin_mut, select};
26use rand::{
27    prelude::{IteratorRandom, SliceRandom},
28    rngs::OsRng,
29};
30use tracing::{error, info, warn};
31
32use darkfi::{
33    dht::{event::DhtEvent, DhtHandler, DhtNode},
34    geode::{hash_to_string, ChunkedStorage},
35    net::ChannelPtr,
36    system::Subscription,
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::{
42    event::{self, notify_event, FudEvent},
43    proto::{
44        FudChunkNotFound, FudChunkReply, FudChunkRequest, FudDirectoryReply, FudFileReply,
45        FudMetadataNotFound, FudMetadataRequest,
46    },
47    util::{create_all_files, receive_resource_msg},
48    Fud, FudSeeder, ResourceStatus, ResourceType, Scrap,
49};
50
51type FudDhtEvent = DhtEvent<<Fud as DhtHandler>::Node, <Fud as DhtHandler>::Value>;
52
53/// Receive seeders from a DHT events subscription, and execute an async
54/// expression for each deduplicated seeder once (seeder order is random).
55/// It will keep going until the expression returns `Ok(())`, or there are
56/// no more seeders.
57/// It has an optional `favored_seeder` argument that will be tried first if
58/// specified.
59macro_rules! seeders_loop {
60    ($key:expr, $fud:expr, $dht_sub:expr, $favored_seeder:expr, $code:expr) => {
61        let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
62        let mut is_done = false;
63
64        // Try favored seeder
65        let favored_seeder: Option<FudSeeder> = $favored_seeder;
66        if let Some(seeder) = favored_seeder {
67            queried_seeders.insert(seeder.node.id());
68            if $code(seeder).await.is_ok() {
69                is_done = true;
70            }
71        }
72
73        // Try other seeders using the DHT subscription
74        while !is_done {
75            let event = $dht_sub.receive().await;
76            if event.key() != Some($key) {
77                continue // Ignore this event if it's not about the right key
78            }
79            if let DhtEvent::ValueLookupCompleted { .. } = event {
80                break // Lookup is done
81            }
82            if !matches!(event, DhtEvent::ValueFound { .. }) {
83                continue // Ignore this event as it's not a ValueFound
84            }
85            let seeders = event.into_value().unwrap();
86            let mut shuffled_seeders = {
87                let mut vec: Vec<_> = seeders.iter().cloned().collect();
88                vec.shuffle(&mut OsRng);
89                vec
90            };
91            // Loop over seeders
92            while let Some(seeder) = shuffled_seeders.pop() {
93                // Only use a seeder once
94                if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
95                    continue
96                }
97                queried_seeders.insert(seeder.node.id());
98
99                if $code(seeder).await.is_err() {
100                    continue
101                }
102
103                is_done = true;
104                break
105            }
106        }
107    };
108    ($key:expr, $fud:expr, $dht_sub:expr, $code:expr) => {
109        seeders_loop!($key, $fud, $dht_sub, None, $code)
110    };
111}
112
113enum ChunkFetchControl {
114    NextChunk,
115    NextSeeder,
116    Abort,
117}
118
119struct ChunkFetchContext<'a> {
120    fud: &'a Fud,
121    hash: &'a blake3::Hash,
122    chunked: &'a mut ChunkedStorage,
123    chunks: &'a mut HashSet<blake3::Hash>,
124}
125
126/// Fetch `chunks` for `chunked` (file or directory) from seeders in `seeders_sub`.
127pub async fn fetch_chunks(
128    fud: &Fud,
129    hash: &blake3::Hash,
130    chunked: &mut ChunkedStorage,
131    dht_sub: &Subscription<FudDhtEvent>,
132    favored_seeder: Option<FudSeeder>,
133    chunks: &mut HashSet<blake3::Hash>,
134) -> Result<()> {
135    let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks };
136
137    seeders_loop!(hash, fud, dht_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> {
138        let (channel, _) = match fud.dht.get_channel(&seeder.node).await {
139            Ok(channel) => channel,
140            Err(e) => {
141                warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
142                return Err(e)
143            }
144        };
145        let mut chunks_to_query = ctx.chunks.clone();
146        info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
147
148        loop {
149            // Loop over chunks
150            match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await {
151                ChunkFetchControl::NextChunk => continue,
152                ChunkFetchControl::NextSeeder => break,
153                ChunkFetchControl::Abort => {
154                    fud.dht.cleanup_channel(channel).await;
155                    return Ok(())
156                }
157            };
158        }
159
160        fud.dht.cleanup_channel(channel).await;
161
162        // Stop when there are no missing chunks
163        if ctx.chunks.is_empty() {
164            return Ok(())
165        }
166
167        Err(().into())
168    });
169
170    Ok(())
171}
172
173/// Fetch a single chunk and return what should be done next
174async fn fetch_chunk(
175    ctx: &mut ChunkFetchContext<'_>,
176    channel: &ChannelPtr,
177    seeder: &FudSeeder,
178    chunks_to_query: &mut HashSet<blake3::Hash>,
179) -> ChunkFetchControl {
180    // Select a chunk to request
181    let mut chunk = None;
182    if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
183        chunk = Some(*random_chunk);
184    }
185
186    if chunk.is_none() {
187        // No more chunks to request from this seeder
188        return ChunkFetchControl::NextSeeder;
189    }
190
191    let chunk_hash = chunk.unwrap();
192    chunks_to_query.remove(&chunk_hash);
193
194    let start_time = Instant::now();
195    let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
196    let msg_subscriber_notfound = channel.subscribe_msg::<FudChunkNotFound>().await.unwrap();
197
198    let send_res = channel.send(&FudChunkRequest { resource: *ctx.hash, chunk: chunk_hash }).await;
199    if let Err(e) = send_res {
200        warn!(target: "fud::download::fetch_chunk()", "Error while sending FudChunkRequest: {e}");
201        return ChunkFetchControl::NextSeeder;
202    }
203
204    let chunk_recv =
205        receive_resource_msg(&msg_subscriber_chunk, *ctx.hash, ctx.fud.chunk_timeout).fuse();
206    let notfound_recv =
207        receive_resource_msg(&msg_subscriber_notfound, *ctx.hash, ctx.fud.chunk_timeout).fuse();
208
209    pin_mut!(chunk_recv, notfound_recv);
210
211    // Wait for a FudChunkReply or FudNotFound
212    select! {
213        chunk_reply = chunk_recv => {
214            msg_subscriber_chunk.unsubscribe().await;
215            msg_subscriber_notfound.unsubscribe().await;
216            if let Err(e) = chunk_reply {
217                warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}");
218                return ChunkFetchControl::NextSeeder;
219            }
220            let reply = chunk_reply.unwrap();
221            handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await
222        }
223        notfound_reply = notfound_recv => {
224            msg_subscriber_chunk.unsubscribe().await;
225            msg_subscriber_notfound.unsubscribe().await;
226            if let Err(e) = notfound_reply {
227                warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}");
228                return ChunkFetchControl::NextSeeder;
229            }
230            info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
231            notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash });
232            ChunkFetchControl::NextChunk
233        }
234    }
235}
236
237/// Processes an incoming chunk
238async fn handle_chunk_reply(
239    ctx: &mut ChunkFetchContext<'_>,
240    chunk_hash: &blake3::Hash,
241    reply: &FudChunkReply,
242    seeder: &FudSeeder,
243    start_time: &Instant,
244) -> ChunkFetchControl {
245    let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await;
246    if let Err(e) = write_res {
247        error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash));
248        return ChunkFetchControl::NextChunk;
249    }
250    let (inserted_hash, bytes_written) = write_res.unwrap();
251    if inserted_hash != *chunk_hash {
252        warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk");
253        return ChunkFetchControl::NextChunk;
254    }
255
256    info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id()));
257
258    // If we did not write the whole chunk to the filesystem,
259    // save the chunk in the scraps.
260    if bytes_written < reply.chunk.len() {
261        info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash));
262        let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await;
263        if let Err(e) = chunk_written {
264            error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}");
265            return ChunkFetchControl::NextChunk;
266        }
267        let scrap = Scrap {
268            chunk: reply.chunk.clone(),
269            hash_written: blake3::hash(&chunk_written.unwrap()),
270        };
271        if let Err(e) =
272            ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await)
273        {
274            error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash));
275            return ChunkFetchControl::NextChunk;
276        }
277    }
278
279    // Update the resource
280    let mut resources_write = ctx.fud.resources.write().await;
281    let resource = resources_write.get_mut(ctx.hash);
282    if resource.is_none() {
283        return ChunkFetchControl::Abort // Resource was removed
284    }
285    let resource = resource.unwrap();
286    resource.status = ResourceStatus::Downloading;
287    resource.total_chunks_downloaded += 1;
288    resource.target_chunks_downloaded += 1;
289
290    resource.total_bytes_downloaded += reply.chunk.len() as u64;
291    resource.target_bytes_downloaded +=
292        resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64;
293    resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
294    if resource.speeds.len() > 12 {
295        resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last few speeds
296    }
297
298    // If we just fetched the last chunk of a file, compute
299    // `total_bytes_size` (and `target_bytes_size`) again,
300    // as `geode.write_chunk()` updated the FileSequence
301    // to the exact file size.
302    if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() {
303        if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash {
304            resource.total_bytes_size = ctx.chunked.get_fileseq().len();
305            resource.target_bytes_size = resource.total_bytes_size;
306        }
307    }
308    let resource = resource.clone();
309    drop(resources_write);
310
311    notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource });
312    ctx.chunks.remove(chunk_hash);
313    ChunkFetchControl::NextChunk
314}
315
316enum MetadataFetchReply {
317    Directory(FudDirectoryReply),
318    File(FudFileReply),
319    Chunk(FudChunkReply),
320}
321
322/// Fetch a single resource metadata from seeders received from `seeders_sub`.
323/// If the resource is a file smaller than a single chunk then seeder can send the
324/// chunk directly, and we will create the file from it on path `path`.
325/// 1. Wait for seeders from the subscription
326/// 2. Request the metadata from the seeders
327/// 3. Insert metadata to geode using the reply
328pub async fn fetch_metadata(
329    fud: &Fud,
330    hash: &blake3::Hash,
331    path: &Path,
332    dht_sub: &Subscription<FudDhtEvent>,
333) -> Result<FudSeeder> {
334    let mut result: Option<(FudSeeder, MetadataFetchReply)> = None;
335
336    seeders_loop!(hash, fud, dht_sub, async |seeder: FudSeeder| -> Result<()> {
337        let (channel, _) = fud.dht.get_channel(&seeder.node).await?;
338        let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
339        let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
340        let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
341        let msg_subscriber_notfound = channel.subscribe_msg::<FudMetadataNotFound>().await.unwrap();
342
343        let send_res = channel.send(&FudMetadataRequest { resource: *hash }).await;
344        if let Err(e) = send_res {
345            warn!(target: "fud::download::fetch_metadata()", "Error while sending FudMetadataRequest: {e}");
346            msg_subscriber_chunk.unsubscribe().await;
347            msg_subscriber_file.unsubscribe().await;
348            msg_subscriber_dir.unsubscribe().await;
349            msg_subscriber_notfound.unsubscribe().await;
350            fud.dht.cleanup_channel(channel).await;
351            return Err(e)
352        }
353
354        let chunk_recv =
355            receive_resource_msg(&msg_subscriber_chunk, *hash, fud.chunk_timeout).fuse();
356        let file_recv = receive_resource_msg(&msg_subscriber_file, *hash, fud.chunk_timeout).fuse();
357        let dir_recv = receive_resource_msg(&msg_subscriber_dir, *hash, fud.chunk_timeout).fuse();
358        let notfound_recv =
359            receive_resource_msg(&msg_subscriber_notfound, *hash, fud.chunk_timeout).fuse();
360
361        pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
362
363        let cleanup = async || {
364            msg_subscriber_chunk.unsubscribe().await;
365            msg_subscriber_file.unsubscribe().await;
366            msg_subscriber_dir.unsubscribe().await;
367            msg_subscriber_notfound.unsubscribe().await;
368            fud.dht.cleanup_channel(channel).await;
369        };
370
371        // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound
372        select! {
373            // Received a chunk while requesting metadata, this is allowed to
374            // optimize fetching files smaller than a single chunk
375            chunk_reply = chunk_recv => {
376                cleanup().await;
377                if let Err(e) = chunk_reply {
378                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}");
379                    return Err(e)
380                }
381                let reply = chunk_reply.unwrap();
382                let chunk_hash = blake3::hash(&reply.chunk);
383                // Check that this is the only chunk in the file
384                if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) {
385                    warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash");
386                    return Err(().into())
387                }
388                info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
389                result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone())));
390                Ok(())
391            }
392            file_reply = file_recv => {
393                cleanup().await;
394                if let Err(e) = file_reply {
395                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}");
396                    return Err(e)
397                }
398                let reply = file_reply.unwrap();
399                if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
400                    warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata");
401                    return Err(().into())
402                }
403                info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
404                result = Some((seeder, MetadataFetchReply::File((*reply).clone())));
405                Ok(())
406            }
407            dir_reply = dir_recv => {
408                cleanup().await;
409                if let Err(e) = dir_reply {
410                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}");
411                    return Err(e)
412                }
413                let reply = dir_reply.unwrap();
414
415                // Convert all file paths from String to PathBuf
416                let files: Vec<_> = reply.files.clone().into_iter()
417                    .map(|(path_str, size)| (PathBuf::from(path_str), size))
418                    .collect();
419
420                if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
421                    warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata");
422                    return Err(().into())
423                }
424                info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
425                result = Some((seeder, MetadataFetchReply::Directory((*reply).clone())));
426                Ok(())
427            }
428            notfound_reply = notfound_recv => {
429                cleanup().await;
430                if let Err(e) = notfound_reply {
431                    warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
432                    return Err(e)
433                }
434                info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
435                Err(().into())
436            }
437        }
438    });
439
440    // We did not find the resource
441    if result.is_none() {
442        return Err(Error::GeodeFileRouteNotFound)
443    }
444
445    // Insert metadata to geode using the reply
446    // At this point the reply content is already verified
447    let (seeder, reply) = result.unwrap();
448    match reply {
449        MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes, .. }) => {
450            // Convert all file paths from String to PathBuf
451            let mut files: Vec<_> =
452                files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect();
453
454            fud.geode.sort_files(&mut files);
455            if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await {
456                error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
457                return Err(e)
458            }
459        }
460        MetadataFetchReply::File(FudFileReply { chunk_hashes, .. }) => {
461            if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
462                error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
463                return Err(e)
464            }
465        }
466        // Looked for a file but got a chunk: the entire file fits in a single chunk
467        MetadataFetchReply::Chunk(FudChunkReply { chunk, .. }) => {
468            info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk");
469            let chunk_hash = blake3::hash(&chunk);
470            if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await {
471                error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash));
472                return Err(e)
473            }
474            create_all_files(&[path.to_path_buf()]).await?;
475            let mut chunked_file = ChunkedStorage::new(
476                &[chunk_hash],
477                &[(path.to_path_buf(), chunk.len() as u64)],
478                false,
479            );
480            if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
481                error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
482                return Err(e)
483            };
484        }
485    };
486
487    Ok(seeder)
488}