fud/
proto.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use log::{debug, error, info};
21use smol::Executor;
22use std::{path::StripPrefixError, sync::Arc};
23
24use darkfi::{
25    dht::DhtHandler,
26    geode::hash_to_string,
27    impl_p2p_message,
28    net::{
29        metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30        ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
31        ProtocolJobsManager, ProtocolJobsManagerPtr,
32    },
33    Error, Result,
34};
35use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
36use darkfi_serial::{SerialDecodable, SerialEncodable};
37
38use crate::{
39    dht::{FudNode, FudSeeder},
40    Fud,
41};
42
43/// Message representing a file reply from the network
44#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
45pub struct FudFileReply {
46    pub chunk_hashes: Vec<blake3::Hash>,
47}
48impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
49
50/// Message representing a directory reply from the network
51#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
52pub struct FudDirectoryReply {
53    pub chunk_hashes: Vec<blake3::Hash>,
54    pub files: Vec<(String, u64)>, // Vec of (file path, file size)
55}
56impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
57
58/// Message representing a node announcing a key on the network
59#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
60pub struct FudAnnounce {
61    pub key: blake3::Hash,
62    pub seeders: Vec<FudSeeder>,
63}
64impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
65
66/// Message representing a chunk reply from the network
67#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
68pub struct FudChunkReply {
69    // TODO: This should be a chunk-sized array, but then we need padding?
70    pub chunk: Vec<u8>,
71}
72impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
73
74/// Message representing a chunk reply when a file is not found
75#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
76pub struct FudNotFound;
77impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
78
79/// Message representing a ping request on the network
80#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
81pub struct FudPingRequest {
82    pub random: u64,
83}
84impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
85
86/// Message representing a ping reply on the network
87#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
88pub struct FudPingReply {
89    pub node: FudNode,
90    /// Signature of the random u64 from the ping request
91    pub sig: Signature,
92}
93impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
94
95/// Message representing a find file/chunk request from the network
96#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
97pub struct FudFindRequest {
98    pub info: Option<blake3::Hash>,
99    pub key: blake3::Hash,
100}
101impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
102
103/// Message representing a find nodes request on the network
104#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
105pub struct FudFindNodesRequest {
106    pub key: blake3::Hash,
107}
108impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
109
110/// Message representing a find nodes reply on the network
111#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
112pub struct FudFindNodesReply {
113    pub nodes: Vec<FudNode>,
114}
115impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
116
117/// Message representing a find seeders request on the network
118#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
119pub struct FudFindSeedersRequest {
120    pub key: blake3::Hash,
121}
122impl_p2p_message!(
123    FudFindSeedersRequest,
124    "FudFindSeedersRequest",
125    0,
126    0,
127    DEFAULT_METERING_CONFIGURATION
128);
129
130/// Message representing a find seeders reply on the network
131#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
132pub struct FudFindSeedersReply {
133    pub seeders: Vec<FudSeeder>,
134    pub nodes: Vec<FudNode>,
135}
136impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
137
138/// P2P protocol implementation for fud.
139pub struct ProtocolFud {
140    channel: ChannelPtr,
141    ping_request_sub: MessageSubscription<FudPingRequest>,
142    find_request_sub: MessageSubscription<FudFindRequest>,
143    find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>,
144    find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>,
145    announce_sub: MessageSubscription<FudAnnounce>,
146    fud: Arc<Fud>,
147    jobsman: ProtocolJobsManagerPtr,
148}
149
150impl ProtocolFud {
151    pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
152        debug!(
153            target: "fud::proto::ProtocolFud::init()",
154            "Adding ProtocolFud to the protocol registry"
155        );
156
157        let msg_subsystem = channel.message_subsystem();
158        msg_subsystem.add_dispatch::<FudPingRequest>().await;
159        msg_subsystem.add_dispatch::<FudFindRequest>().await;
160        msg_subsystem.add_dispatch::<FudFindNodesRequest>().await;
161        msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await;
162        msg_subsystem.add_dispatch::<FudAnnounce>().await;
163
164        let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
165        let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?;
166        let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?;
167        let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?;
168        let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
169
170        Ok(Arc::new(Self {
171            channel: channel.clone(),
172            ping_request_sub,
173            find_request_sub,
174            find_nodes_request_sub,
175            find_seeders_request_sub,
176            announce_sub,
177            fud,
178            jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
179        }))
180    }
181
182    async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
183        debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
184
185        loop {
186            let ping_req = match self.ping_request_sub.receive().await {
187                Ok(v) => v,
188                Err(Error::ChannelStopped) => continue,
189                Err(e) => {
190                    error!("{e}");
191                    continue
192                }
193            };
194            info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST");
195
196            let reply = FudPingReply {
197                node: self.fud.node().await,
198                sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()),
199            };
200            match self.channel.send(&reply).await {
201                Ok(()) => continue,
202                Err(_e) => continue,
203            }
204        }
205    }
206
207    async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> {
208        debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START");
209
210        loop {
211            let request = match self.find_request_sub.receive().await {
212                Ok(v) => v,
213                Err(Error::ChannelStopped) => continue,
214                Err(e) => {
215                    error!("{e}");
216                    continue
217                }
218            };
219            info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND for {}", hash_to_string(&request.key));
220
221            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
222            if let Some(node) = node {
223                self.fud.dht.update_node(&node).await;
224            }
225
226            if self.handle_fud_chunk_request(&request).await {
227                continue;
228            }
229
230            if self.handle_fud_metadata_request(&request).await {
231                continue;
232            }
233
234            // Request did not match anything we have
235            let reply = FudNotFound {};
236            info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
237            let _ = self.channel.send(&reply).await;
238        }
239    }
240
241    /// If the FudFindRequest matches a chunk we have, handle it.
242    /// Returns true if the chunk was found.
243    async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool {
244        let hash = request.info;
245        if hash.is_none() {
246            return false;
247        }
248        let hash = hash.unwrap();
249
250        let path = self.fud.hash_to_path(&hash).ok().flatten();
251        if path.is_none() {
252            return false;
253        }
254        let path = path.unwrap();
255
256        let chunked = self.fud.geode.get(&hash, &path).await;
257        if chunked.is_err() {
258            return false;
259        }
260
261        let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await;
262        if let Ok(chunk) = chunk {
263            if !self.fud.geode.verify_chunk(&request.key, &chunk) {
264                // TODO: Run geode GC
265                return false;
266            }
267            let reply = FudChunkReply { chunk };
268            info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key));
269            let _ = self.channel.send(&reply).await;
270            return true;
271        }
272
273        false
274    }
275
276    /// If the FudFindRequest matches a file we have, handle it
277    /// Returns true if the file was found.
278    async fn handle_fud_metadata_request(&self, request: &FudFindRequest) -> bool {
279        let path = self.fud.hash_to_path(&request.key).ok().flatten();
280        if path.is_none() {
281            return false;
282        }
283        let path = path.unwrap();
284
285        let chunked_file = self.fud.geode.get(&request.key, &path).await.ok();
286        if chunked_file.is_none() {
287            return false;
288        }
289        let mut chunked_file = chunked_file.unwrap();
290
291        // If it's a file with a single chunk, just reply with the chunk
292        if chunked_file.len() == 1 && !chunked_file.is_dir() {
293            let chunk_hash = chunked_file.get_chunks()[0].0;
294            let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
295            if let Ok(chunk) = chunk {
296                if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.key {
297                    // TODO: Run geode GC
298                    return false;
299                }
300                let reply = FudChunkReply { chunk };
301                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
302                let _ = self.channel.send(&reply).await;
303                return true;
304            }
305            return false;
306        }
307
308        // Otherwise reply with the metadata
309        match chunked_file.is_dir() {
310            false => {
311                let reply = FudFileReply {
312                    chunk_hashes: chunked_file
313                        .get_chunks()
314                        .iter()
315                        .map(|(chunk, _)| *chunk)
316                        .collect(),
317                };
318                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.key));
319                let _ = self.channel.send(&reply).await;
320            }
321            true => {
322                let files = chunked_file
323                    .get_files()
324                    .iter()
325                    .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
326                        Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
327                        Err(e) => Err(e),
328                    })
329                    .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
330                if let Err(e) = files {
331                    error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
332                    return false;
333                }
334                let reply = FudDirectoryReply {
335                    chunk_hashes: chunked_file
336                        .get_chunks()
337                        .iter()
338                        .map(|(chunk, _)| *chunk)
339                        .collect(),
340                    files: files.unwrap(),
341                };
342                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.key));
343                let _ = self.channel.send(&reply).await;
344            }
345        };
346
347        true
348    }
349
350    async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
351        debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");
352
353        loop {
354            let request = match self.find_nodes_request_sub.receive().await {
355                Ok(v) => v,
356                Err(Error::ChannelStopped) => continue,
357                Err(e) => {
358                    error!("{e}");
359                    continue
360                }
361            };
362            info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
363
364            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
365            if let Some(node) = node {
366                self.fud.dht.update_node(&node).await;
367            }
368
369            let reply = FudFindNodesReply {
370                nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
371            };
372            match self.channel.send(&reply).await {
373                Ok(()) => continue,
374                Err(_e) => continue,
375            }
376        }
377    }
378
379    async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> {
380        debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START");
381
382        loop {
383            let request = match self.find_seeders_request_sub.receive().await {
384                Ok(v) => v,
385                Err(Error::ChannelStopped) => continue,
386                Err(e) => {
387                    error!("{e}");
388                    continue
389                }
390            };
391            info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key));
392
393            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
394            if let Some(node) = node {
395                self.fud.dht.update_node(&node).await;
396            }
397
398            let router = self.fud.dht.hash_table.read().await;
399            let peers = router.get(&request.key);
400
401            match peers {
402                Some(seeders) => {
403                    let _ = self
404                        .channel
405                        .send(&FudFindSeedersReply {
406                            seeders: seeders.to_vec(),
407                            nodes: self
408                                .fud
409                                .dht()
410                                .find_neighbors(&request.key, self.fud.dht().settings.k)
411                                .await,
412                        })
413                        .await;
414                }
415                None => {
416                    let _ = self
417                        .channel
418                        .send(&FudFindSeedersReply {
419                            seeders: vec![],
420                            nodes: self
421                                .fud
422                                .dht()
423                                .find_neighbors(&request.key, self.fud.dht().settings.k)
424                                .await,
425                        })
426                        .await;
427                }
428            };
429        }
430    }
431
432    async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
433        debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
434
435        loop {
436            let request = match self.announce_sub.receive().await {
437                Ok(v) => v,
438                Err(Error::ChannelStopped) => continue,
439                Err(e) => {
440                    error!("{e}");
441                    continue
442                }
443            };
444            info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
445
446            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
447            if let Some(node) = node {
448                self.fud.dht.update_node(&node).await;
449            }
450
451            let mut seeders = vec![];
452
453            for seeder in request.seeders.clone() {
454                if seeder.node.addresses.is_empty() {
455                    continue
456                }
457                // TODO: Verify each address
458                seeders.push(seeder);
459            }
460
461            self.fud.add_value(&request.key, &seeders).await;
462        }
463    }
464}
465
466#[async_trait]
467impl ProtocolBase for ProtocolFud {
468    async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
469        debug!(target: "fud::ProtocolFud::start()", "START");
470        self.jobsman.clone().start(executor.clone());
471        self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
472        self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await;
473        self.jobsman
474            .clone()
475            .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone())
476            .await;
477        self.jobsman
478            .clone()
479            .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone())
480            .await;
481        self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
482        debug!(target: "fud::ProtocolFud::start()", "END");
483        Ok(())
484    }
485
486    fn name(&self) -> &'static str {
487        "ProtocolFud"
488    }
489}