fud/
proto.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use smol::Executor;
21use std::{path::StripPrefixError, sync::Arc};
22use tracing::{debug, error, info, warn};
23
24use darkfi::{
25    dht::{event::DhtEvent, DhtHandler},
26    geode::hash_to_string,
27    impl_p2p_message,
28    net::{
29        metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30        session::SESSION_INBOUND,
31        ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
32        ProtocolJobsManager, ProtocolJobsManagerPtr,
33    },
34    Error, Result,
35};
36use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
37use darkfi_serial::{SerialDecodable, SerialEncodable};
38
39use crate::{
40    dht::{FudNode, FudSeeder},
41    Fud,
42};
43
44/// Trait for resource-specific messages.
45/// Adds a method to get the resource's hash from the message.
46pub trait ResourceMessage {
47    fn resource_hash(&self) -> blake3::Hash;
48}
49macro_rules! impl_resource_msg {
50    ($msg:ty, $field:ident) => {
51        impl ResourceMessage for $msg {
52            fn resource_hash(&self) -> blake3::Hash {
53                self.$field
54            }
55        }
56    };
57    ($msg:ty) => {
58        impl_resource_msg!($msg, resource);
59    };
60}
61
62/// Message representing a file reply from the network
63#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
64pub struct FudFileReply {
65    pub resource: blake3::Hash,
66    pub chunk_hashes: Vec<blake3::Hash>,
67}
68impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
69impl_resource_msg!(FudFileReply);
70
71/// Message representing a directory reply from the network
72#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
73pub struct FudDirectoryReply {
74    pub resource: blake3::Hash,
75    pub chunk_hashes: Vec<blake3::Hash>,
76    pub files: Vec<(String, u64)>, // Vec of (file path, file size)
77}
78impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
79impl_resource_msg!(FudDirectoryReply);
80
81/// Message representing a node announcing a key on the network
82#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
83pub struct FudAnnounce {
84    pub key: blake3::Hash,
85    pub seeders: Vec<FudSeeder>,
86}
87impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
88
89/// Message representing a chunk reply from the network
90#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
91pub struct FudChunkReply {
92    pub resource: blake3::Hash,
93    // TODO: This should be a chunk-sized array, but then we need padding?
94    pub chunk: Vec<u8>,
95}
96impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
97impl_resource_msg!(FudChunkReply);
98
99/// Message representing a reply when a metadata is not found
100#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
101pub struct FudMetadataNotFound {
102    pub resource: blake3::Hash,
103}
104impl_p2p_message!(FudMetadataNotFound, "FudMetadataNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
105impl_resource_msg!(FudMetadataNotFound);
106
107/// Message representing a reply when a chunk is not found
108#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
109pub struct FudChunkNotFound {
110    pub resource: blake3::Hash,
111    pub chunk: blake3::Hash,
112}
113impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
114impl_resource_msg!(FudChunkNotFound);
115
116/// Message representing a ping request on the network
117#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
118pub struct FudPingRequest {
119    pub random: u64,
120}
121impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
122
123/// Message representing a ping reply on the network
124#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
125pub struct FudPingReply {
126    pub node: FudNode,
127    pub random: u64,
128    /// Signature of the random u64 from the ping request
129    pub sig: Signature,
130}
131impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
132
133/// Message representing a find file/directory request from the network
134#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
135pub struct FudMetadataRequest {
136    pub resource: blake3::Hash,
137}
138impl_p2p_message!(FudMetadataRequest, "FudMetadataRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
139impl_resource_msg!(FudMetadataRequest);
140
141/// Message representing a find chunk request from the network
142#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
143pub struct FudChunkRequest {
144    pub resource: blake3::Hash,
145    pub chunk: blake3::Hash,
146}
147impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
148impl_resource_msg!(FudChunkRequest);
149
150/// Message representing a find nodes request on the network
151#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
152pub struct FudNodesRequest {
153    pub key: blake3::Hash,
154}
155impl_p2p_message!(FudNodesRequest, "FudNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
156
157/// Message representing a find nodes reply on the network
158#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
159pub struct FudNodesReply {
160    pub key: blake3::Hash,
161    pub nodes: Vec<FudNode>,
162}
163impl_p2p_message!(FudNodesReply, "FudNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
164impl_resource_msg!(FudNodesReply, key);
165
166/// Message representing a find seeders request on the network
167#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
168pub struct FudSeedersRequest {
169    pub key: blake3::Hash,
170}
171impl_p2p_message!(FudSeedersRequest, "FudSeedersRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
172
173/// Message representing a find seeders reply on the network
174#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
175pub struct FudSeedersReply {
176    pub key: blake3::Hash,
177    pub seeders: Vec<FudSeeder>,
178    pub nodes: Vec<FudNode>,
179}
180impl_p2p_message!(FudSeedersReply, "FudSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
181impl_resource_msg!(FudSeedersReply, key);
182
183/// P2P protocol implementation for fud.
184pub struct ProtocolFud {
185    channel: ChannelPtr,
186    ping_request_sub: MessageSubscription<FudPingRequest>,
187    find_metadata_request_sub: MessageSubscription<FudMetadataRequest>,
188    find_chunk_request_sub: MessageSubscription<FudChunkRequest>,
189    find_nodes_request_sub: MessageSubscription<FudNodesRequest>,
190    find_seeders_request_sub: MessageSubscription<FudSeedersRequest>,
191    announce_sub: MessageSubscription<FudAnnounce>,
192    fud: Arc<Fud>,
193    jobsman: ProtocolJobsManagerPtr,
194}
195
196impl ProtocolFud {
197    pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
198        debug!(
199            target: "fud::proto::ProtocolFud::init()",
200            "Adding ProtocolFud to the protocol registry"
201        );
202
203        let msg_subsystem = channel.message_subsystem();
204        msg_subsystem.add_dispatch::<FudPingRequest>().await;
205        msg_subsystem.add_dispatch::<FudPingReply>().await;
206        msg_subsystem.add_dispatch::<FudMetadataRequest>().await;
207        msg_subsystem.add_dispatch::<FudChunkRequest>().await;
208        msg_subsystem.add_dispatch::<FudChunkReply>().await;
209        msg_subsystem.add_dispatch::<FudChunkNotFound>().await;
210        msg_subsystem.add_dispatch::<FudFileReply>().await;
211        msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
212        msg_subsystem.add_dispatch::<FudMetadataNotFound>().await;
213        msg_subsystem.add_dispatch::<FudNodesRequest>().await;
214        msg_subsystem.add_dispatch::<FudNodesReply>().await;
215        msg_subsystem.add_dispatch::<FudSeedersRequest>().await;
216        msg_subsystem.add_dispatch::<FudSeedersReply>().await;
217        msg_subsystem.add_dispatch::<FudAnnounce>().await;
218
219        let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
220        let find_metadata_request_sub = channel.subscribe_msg::<FudMetadataRequest>().await?;
221        let find_chunk_request_sub = channel.subscribe_msg::<FudChunkRequest>().await?;
222        let find_nodes_request_sub = channel.subscribe_msg::<FudNodesRequest>().await?;
223        let find_seeders_request_sub = channel.subscribe_msg::<FudSeedersRequest>().await?;
224        let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
225
226        Ok(Arc::new(Self {
227            channel: channel.clone(),
228            ping_request_sub,
229            find_metadata_request_sub,
230            find_chunk_request_sub,
231            find_nodes_request_sub,
232            find_seeders_request_sub,
233            announce_sub,
234            fud,
235            jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
236        }))
237    }
238
239    async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
240        debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
241
242        loop {
243            let ping_req = match self.ping_request_sub.receive().await {
244                Ok(v) => v,
245                Err(Error::ChannelStopped) => continue,
246                Err(_) => continue,
247            };
248            info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST from {}", self.channel.display_address());
249            self.fud.dht.update_channel(self.channel.info.id).await;
250
251            let self_node = self.fud.node().await;
252            if self_node.is_err() {
253                self.channel.stop().await;
254                continue
255            }
256            let state = self.fud.state.read().await;
257            if state.is_none() {
258                self.channel.stop().await;
259                continue
260            }
261
262            let reply = FudPingReply {
263                node: self_node.unwrap(),
264                random: ping_req.random,
265                sig: state.clone().unwrap().secret_key.sign(&ping_req.random.to_be_bytes()),
266            };
267            drop(state);
268
269            if let Err(e) = self.channel.send(&reply).await {
270                self.fud
271                    .dht
272                    .event_publisher
273                    .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Err(e) })
274                    .await;
275                continue;
276            }
277            self.fud
278                .dht
279                .event_publisher
280                .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Ok(()) })
281                .await;
282
283            // Ping the peer if this is an inbound connection
284            if self.channel.session_type_id() & SESSION_INBOUND != 0 {
285                let _ = self.fud.ping(self.channel.clone()).await;
286            }
287        }
288    }
289
290    async fn handle_fud_metadata_request(self: Arc<Self>) -> Result<()> {
291        debug!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "START");
292
293        loop {
294            let request = match self.find_metadata_request_sub.receive().await {
295                Ok(v) => v,
296                Err(Error::ChannelStopped) => continue,
297                Err(_) => continue,
298            };
299            info!(target: "fud::ProtocolFud::handle_fud_request()", "Received METADATA REQUEST for {}", hash_to_string(&request.resource));
300            self.fud.dht.update_channel(self.channel.info.id).await;
301
302            let notfound = async || {
303                let reply = FudMetadataNotFound { resource: request.resource };
304                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "We do not have the metadata of {}", hash_to_string(&request.resource));
305                let _ = self.channel.send(&reply).await;
306            };
307
308            let path = self.fud.hash_to_path(&request.resource).ok().flatten();
309            if path.is_none() {
310                notfound().await;
311                continue
312            }
313            let path = path.unwrap();
314
315            let chunked_file = self.fud.geode.get(&request.resource, &path).await.ok();
316            if chunked_file.is_none() {
317                notfound().await;
318                continue
319            }
320            let mut chunked_file = chunked_file.unwrap();
321
322            // If it's a file with a single chunk, just reply with the chunk
323            if chunked_file.len() == 1 && !chunked_file.is_dir() {
324                let chunk_hash = chunked_file.get_chunks()[0].0;
325                let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
326                if let Ok(chunk) = chunk {
327                    if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.resource {
328                        // TODO: Run geode GC
329                        notfound().await;
330                        continue
331                    }
332                    let reply = FudChunkReply { resource: request.resource, chunk };
333                    info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
334                    let _ = self.channel.send(&reply).await;
335                    continue
336                }
337                // We don't have the chunk, but we can still reply with the metadata
338            }
339
340            // Reply with the metadata
341            match chunked_file.is_dir() {
342                false => {
343                    let reply = FudFileReply {
344                        resource: request.resource,
345                        chunk_hashes: chunked_file
346                            .get_chunks()
347                            .iter()
348                            .map(|(chunk, _)| *chunk)
349                            .collect(),
350                    };
351                    info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.resource));
352                    let _ = self.channel.send(&reply).await;
353                }
354                true => {
355                    let files = chunked_file
356                        .get_files()
357                        .iter()
358                        .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
359                            Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
360                            Err(e) => Err(e),
361                        })
362                        .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
363                    if let Err(e) = files {
364                        error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
365                        notfound().await;
366                        continue
367                    }
368                    let reply = FudDirectoryReply {
369                        resource: request.resource,
370                        chunk_hashes: chunked_file
371                            .get_chunks()
372                            .iter()
373                            .map(|(chunk, _)| *chunk)
374                            .collect(),
375                        files: files.unwrap(),
376                    };
377                    info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.resource));
378                    let _ = self.channel.send(&reply).await;
379                }
380            };
381        }
382    }
383
384    async fn handle_fud_chunk_request(self: Arc<Self>) -> Result<()> {
385        debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START");
386
387        loop {
388            let request = match self.find_chunk_request_sub.receive().await {
389                Ok(v) => v,
390                Err(Error::ChannelStopped) => continue,
391                Err(_) => continue,
392            };
393            info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Received CHUNK REQUEST for {}", hash_to_string(&request.resource));
394            self.fud.dht.update_channel(self.channel.info.id).await;
395
396            let notfound = async || {
397                let reply = FudChunkNotFound { resource: request.resource, chunk: request.chunk };
398                info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "We do not have chunk {} of resource {}", hash_to_string(&request.resource), hash_to_string(&request.chunk));
399                let _ = self.channel.send(&reply).await;
400            };
401
402            let path = self.fud.hash_to_path(&request.resource).ok().flatten();
403            if path.is_none() {
404                notfound().await;
405                continue
406            }
407            let path = path.unwrap();
408
409            let chunked = self.fud.geode.get(&request.resource, &path).await;
410            if chunked.is_err() {
411                notfound().await;
412                continue
413            }
414
415            let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.chunk).await;
416            if let Ok(chunk) = chunk {
417                if !self.fud.geode.verify_chunk(&request.chunk, &chunk) {
418                    // TODO: Run geode GC
419                    notfound().await;
420                    continue
421                }
422                let reply = FudChunkReply { resource: request.resource, chunk };
423                info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.chunk));
424                let _ = self.channel.send(&reply).await;
425                continue
426            }
427
428            notfound().await;
429        }
430    }
431
432    async fn handle_fud_nodes_request(self: Arc<Self>) -> Result<()> {
433        debug!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "START");
434
435        loop {
436            let request = match self.find_nodes_request_sub.receive().await {
437                Ok(v) => v,
438                Err(Error::ChannelStopped) => continue,
439                Err(_) => continue,
440            };
441            info!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
442            self.fud.dht.update_channel(self.channel.info.id).await;
443
444            let reply = FudNodesReply {
445                key: request.key,
446                nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
447            };
448            match self.channel.send(&reply).await {
449                Ok(()) => continue,
450                Err(_e) => continue,
451            }
452        }
453    }
454
455    async fn handle_fud_seeders_request(self: Arc<Self>) -> Result<()> {
456        debug!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "START");
457
458        loop {
459            let request = match self.find_seeders_request_sub.receive().await {
460                Ok(v) => v,
461                Err(Error::ChannelStopped) => continue,
462                Err(_) => continue,
463            };
464            info!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "Received FIND SEEDERS for {} from {:?}", hash_to_string(&request.key), self.channel);
465            self.fud.dht.update_channel(self.channel.info.id).await;
466
467            let router = self.fud.dht.hash_table.read().await;
468            let peers = router.get(&request.key);
469
470            match peers {
471                Some(seeders) => {
472                    let _ = self
473                        .channel
474                        .send(&FudSeedersReply {
475                            key: request.key,
476                            seeders: seeders.to_vec(),
477                            nodes: self
478                                .fud
479                                .dht()
480                                .find_neighbors(&request.key, self.fud.dht().settings.k)
481                                .await,
482                        })
483                        .await;
484                }
485                None => {
486                    let _ = self
487                        .channel
488                        .send(&FudSeedersReply {
489                            key: request.key,
490                            seeders: vec![],
491                            nodes: self
492                                .fud
493                                .dht()
494                                .find_neighbors(&request.key, self.fud.dht().settings.k)
495                                .await,
496                        })
497                        .await;
498                }
499            };
500        }
501    }
502
503    async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
504        debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
505
506        loop {
507            let request = match self.announce_sub.receive().await {
508                Ok(v) => v,
509                Err(Error::ChannelStopped) => continue,
510                Err(_) => continue,
511            };
512            info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
513            self.fud.dht.update_channel(self.channel.info.id).await;
514
515            let mut seeders = vec![];
516
517            for seeder in request.seeders.clone() {
518                if seeder.node.addresses.is_empty() {
519                    continue
520                }
521                if let Err(e) = self.fud.pow.write().await.verify_node(&seeder.node.data).await {
522                    warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid PoW: {e}");
523                    continue
524                }
525                if !seeder.verify_signature().await {
526                    warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid signature");
527                    continue
528                }
529
530                // TODO: Limit the number of addresses
531                // TODO: Verify each address
532                seeders.push(seeder);
533            }
534
535            self.fud.add_value(&request.key, &seeders).await;
536        }
537    }
538}
539
540#[async_trait]
541impl ProtocolBase for ProtocolFud {
542    async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
543        debug!(target: "fud::ProtocolFud::start()", "START");
544        self.jobsman.clone().start(executor.clone());
545        self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
546        self.jobsman
547            .clone()
548            .spawn(self.clone().handle_fud_metadata_request(), executor.clone())
549            .await;
550        self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await;
551        self.jobsman.clone().spawn(self.clone().handle_fud_nodes_request(), executor.clone()).await;
552        self.jobsman
553            .clone()
554            .spawn(self.clone().handle_fud_seeders_request(), executor.clone())
555            .await;
556        self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
557        debug!(target: "fud::ProtocolFud::start()", "END");
558        Ok(())
559    }
560
561    fn name(&self) -> &'static str {
562        "ProtocolFud"
563    }
564}