fud/
proto.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use log::{debug, error, info};
21use smol::Executor;
22use std::{path::StripPrefixError, sync::Arc};
23
24use darkfi::{
25    dht::{DhtHandler, DhtRouterItem},
26    geode::hash_to_string,
27    impl_p2p_message,
28    net::{
29        metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30        ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
31        ProtocolJobsManager, ProtocolJobsManagerPtr,
32    },
33    Error, Result,
34};
35use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
36use darkfi_serial::{SerialDecodable, SerialEncodable};
37
38use super::{Fud, FudNode};
39
40/// Message representing a file reply from the network
41#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
42pub struct FudFileReply {
43    pub chunk_hashes: Vec<blake3::Hash>,
44}
45impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
46
47/// Message representing a directory reply from the network
48#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
49pub struct FudDirectoryReply {
50    pub chunk_hashes: Vec<blake3::Hash>,
51    pub files: Vec<(String, u64)>, // Vec of (file path, file size)
52}
53impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
54
55/// Message representing a node announcing a key on the network
56#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
57pub struct FudAnnounce {
58    pub key: blake3::Hash,
59    pub seeders: Vec<DhtRouterItem<FudNode>>,
60}
61impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
62
63/// Message representing a chunk reply from the network
64#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
65pub struct FudChunkReply {
66    // TODO: This should be a chunk-sized array, but then we need padding?
67    pub chunk: Vec<u8>,
68}
69impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
70
71/// Message representing a chunk reply when a file is not found
72#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
73pub struct FudNotFound;
74impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
75
76/// Message representing a ping request on the network
77#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
78pub struct FudPingRequest {
79    pub random: u64,
80}
81impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
82
83/// Message representing a ping reply on the network
84#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
85pub struct FudPingReply {
86    pub node: FudNode,
87    /// Signature of the random u64 from the ping request
88    pub sig: Signature,
89}
90impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
91
92/// Message representing a find file/chunk request from the network
93#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
94pub struct FudFindRequest {
95    pub info: Option<blake3::Hash>,
96    pub key: blake3::Hash,
97}
98impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
99
100/// Message representing a find nodes request on the network
101#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
102pub struct FudFindNodesRequest {
103    pub key: blake3::Hash,
104}
105impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
106
107/// Message representing a find nodes reply on the network
108#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
109pub struct FudFindNodesReply {
110    pub nodes: Vec<FudNode>,
111}
112impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
113
114/// Message representing a find seeders request on the network
115#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
116pub struct FudFindSeedersRequest {
117    pub key: blake3::Hash,
118}
119impl_p2p_message!(
120    FudFindSeedersRequest,
121    "FudFindSeedersRequest",
122    0,
123    0,
124    DEFAULT_METERING_CONFIGURATION
125);
126
127/// Message representing a find seeders reply on the network
128#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
129pub struct FudFindSeedersReply {
130    pub seeders: Vec<DhtRouterItem<FudNode>>,
131}
132impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
133
134/// P2P protocol implementation for fud.
135pub struct ProtocolFud {
136    channel: ChannelPtr,
137    ping_request_sub: MessageSubscription<FudPingRequest>,
138    find_request_sub: MessageSubscription<FudFindRequest>,
139    find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>,
140    find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>,
141    announce_sub: MessageSubscription<FudAnnounce>,
142    fud: Arc<Fud>,
143    jobsman: ProtocolJobsManagerPtr,
144}
145
146impl ProtocolFud {
147    pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
148        debug!(
149            target: "fud::proto::ProtocolFud::init()",
150            "Adding ProtocolFud to the protocol registry"
151        );
152
153        let msg_subsystem = channel.message_subsystem();
154        msg_subsystem.add_dispatch::<FudPingRequest>().await;
155        msg_subsystem.add_dispatch::<FudFindRequest>().await;
156        msg_subsystem.add_dispatch::<FudFindNodesRequest>().await;
157        msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await;
158        msg_subsystem.add_dispatch::<FudAnnounce>().await;
159
160        let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
161        let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?;
162        let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?;
163        let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?;
164        let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
165
166        Ok(Arc::new(Self {
167            channel: channel.clone(),
168            ping_request_sub,
169            find_request_sub,
170            find_nodes_request_sub,
171            find_seeders_request_sub,
172            announce_sub,
173            fud,
174            jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
175        }))
176    }
177
178    async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
179        debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
180
181        loop {
182            let ping_req = match self.ping_request_sub.receive().await {
183                Ok(v) => v,
184                Err(Error::ChannelStopped) => continue,
185                Err(e) => {
186                    error!("{e}");
187                    continue
188                }
189            };
190            info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST");
191
192            let reply = FudPingReply {
193                node: self.fud.node().await,
194                sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()),
195            };
196            match self.channel.send(&reply).await {
197                Ok(()) => continue,
198                Err(_e) => continue,
199            }
200        }
201    }
202
203    async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> {
204        debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START");
205
206        loop {
207            let request = match self.find_request_sub.receive().await {
208                Ok(v) => v,
209                Err(Error::ChannelStopped) => continue,
210                Err(e) => {
211                    error!("{e}");
212                    continue
213                }
214            };
215            info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND for {}", hash_to_string(&request.key));
216
217            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
218            if let Some(node) = node {
219                self.fud.update_node(&node).await;
220            }
221
222            if self.handle_fud_chunk_request(&request).await {
223                continue;
224            }
225
226            if self.handle_fud_metadata_request(&request).await {
227                continue;
228            }
229
230            // Request did not match anything we have
231            let reply = FudNotFound {};
232            info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
233            let _ = self.channel.send(&reply).await;
234        }
235    }
236
237    /// If the FudFindRequest matches a chunk we have, handle it.
238    /// Returns true if the chunk was found.
239    async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool {
240        let hash = request.info;
241        if hash.is_none() {
242            return false;
243        }
244        let hash = hash.unwrap();
245
246        let path = self.fud.hash_to_path(&hash).ok().flatten();
247        if path.is_none() {
248            return false;
249        }
250        let path = path.unwrap();
251
252        let chunked = self.fud.geode.get(&hash, &path).await;
253        if chunked.is_err() {
254            return false;
255        }
256
257        let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await;
258        if let Ok(chunk) = chunk {
259            if !self.fud.geode.verify_chunk(&request.key, &chunk) {
260                // TODO: Run geode GC
261                return false;
262            }
263            let reply = FudChunkReply { chunk };
264            info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key));
265            let _ = self.channel.send(&reply).await;
266            return true;
267        }
268
269        false
270    }
271
272    /// If the FudFindRequest matches a file we have, handle it
273    /// Returns true if the file was found.
274    async fn handle_fud_metadata_request(&self, request: &FudFindRequest) -> bool {
275        let path = self.fud.hash_to_path(&request.key).ok().flatten();
276        if path.is_none() {
277            return false;
278        }
279        let path = path.unwrap();
280
281        let chunked_file = self.fud.geode.get(&request.key, &path).await.ok();
282        if chunked_file.is_none() {
283            return false;
284        }
285        let mut chunked_file = chunked_file.unwrap();
286
287        // If it's a file with a single chunk, just reply with the chunk
288        if chunked_file.len() == 1 && !chunked_file.is_dir() {
289            let chunk_hash = chunked_file.get_chunks()[0].0;
290            let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
291            if let Ok(chunk) = chunk {
292                if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.key {
293                    // TODO: Run geode GC
294                    return false;
295                }
296                let reply = FudChunkReply { chunk };
297                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
298                let _ = self.channel.send(&reply).await;
299                return true;
300            }
301            return false;
302        }
303
304        // Otherwise reply with the metadata
305        match chunked_file.is_dir() {
306            false => {
307                let reply = FudFileReply {
308                    chunk_hashes: chunked_file
309                        .get_chunks()
310                        .iter()
311                        .map(|(chunk, _)| *chunk)
312                        .collect(),
313                };
314                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.key));
315                let _ = self.channel.send(&reply).await;
316            }
317            true => {
318                let files = chunked_file
319                    .get_files()
320                    .iter()
321                    .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
322                        Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
323                        Err(e) => Err(e),
324                    })
325                    .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
326                if let Err(e) = files {
327                    error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
328                    return false;
329                }
330                let reply = FudDirectoryReply {
331                    chunk_hashes: chunked_file
332                        .get_chunks()
333                        .iter()
334                        .map(|(chunk, _)| *chunk)
335                        .collect(),
336                    files: files.unwrap(),
337                };
338                info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.key));
339                let _ = self.channel.send(&reply).await;
340            }
341        };
342
343        true
344    }
345
346    async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
347        debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");
348
349        loop {
350            let request = match self.find_nodes_request_sub.receive().await {
351                Ok(v) => v,
352                Err(Error::ChannelStopped) => continue,
353                Err(e) => {
354                    error!("{e}");
355                    continue
356                }
357            };
358            info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
359
360            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
361            if let Some(node) = node {
362                self.fud.update_node(&node).await;
363            }
364
365            let reply = FudFindNodesReply {
366                nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
367            };
368            match self.channel.send(&reply).await {
369                Ok(()) => continue,
370                Err(_e) => continue,
371            }
372        }
373    }
374
375    async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> {
376        debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START");
377
378        loop {
379            let request = match self.find_seeders_request_sub.receive().await {
380                Ok(v) => v,
381                Err(Error::ChannelStopped) => continue,
382                Err(e) => {
383                    error!("{e}");
384                    continue
385                }
386            };
387            info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key));
388
389            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
390            if let Some(node) = node {
391                self.fud.update_node(&node).await;
392            }
393
394            let router = self.fud.seeders_router.read().await;
395            let peers = router.get(&request.key);
396
397            match peers {
398                Some(seeders) => {
399                    let _ = self
400                        .channel
401                        .send(&FudFindSeedersReply { seeders: seeders.iter().cloned().collect() })
402                        .await;
403                }
404                None => {
405                    let _ = self.channel.send(&FudFindSeedersReply { seeders: vec![] }).await;
406                }
407            };
408        }
409    }
410
411    async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
412        debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
413
414        loop {
415            let request = match self.announce_sub.receive().await {
416                Ok(v) => v,
417                Err(Error::ChannelStopped) => continue,
418                Err(e) => {
419                    error!("{e}");
420                    continue
421                }
422            };
423            info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
424
425            let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
426            if let Some(node) = node {
427                self.fud.update_node(&node).await;
428            }
429
430            let mut seeders = vec![];
431
432            for seeder in request.seeders.clone() {
433                if seeder.node.addresses.is_empty() {
434                    continue
435                }
436                // TODO: Verify each address
437                seeders.push(seeder);
438            }
439
440            self.fud.add_to_router(self.fud.seeders_router.clone(), &request.key, seeders).await;
441        }
442    }
443}
444
445#[async_trait]
446impl ProtocolBase for ProtocolFud {
447    async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
448        debug!(target: "fud::ProtocolFud::start()", "START");
449        self.jobsman.clone().start(executor.clone());
450        self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
451        self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await;
452        self.jobsman
453            .clone()
454            .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone())
455            .await;
456        self.jobsman
457            .clone()
458            .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone())
459            .await;
460        self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
461        debug!(target: "fud::ProtocolFud::start()", "END");
462        Ok(())
463    }
464
465    fn name(&self) -> &'static str {
466        "ProtocolFud"
467    }
468}