fud/
dht.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use log::debug;
23use num_bigint::BigUint;
24use rand::{rngs::OsRng, Rng};
25use url::Url;
26
27use darkfi::{
28    dht::{impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode},
29    geode::hash_to_string,
30    net::ChannelPtr,
31    util::time::Timestamp,
32    Error, Result,
33};
34use darkfi_sdk::crypto::schnorr::SchnorrPublic;
35use darkfi_serial::{SerialDecodable, SerialEncodable};
36
37use crate::{
38    pow::VerifiableNodeData,
39    proto::{
40        FudAnnounce, FudFindNodesReply, FudFindNodesRequest, FudFindSeedersReply,
41        FudFindSeedersRequest, FudPingReply, FudPingRequest,
42    },
43    Fud,
44};
45
46#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
47pub struct FudNode {
48    pub data: VerifiableNodeData,
49    pub addresses: Vec<Url>,
50}
51impl_dht_node_defaults!(FudNode);
52
53impl DhtNode for FudNode {
54    fn id(&self) -> blake3::Hash {
55        self.data.id()
56    }
57    fn addresses(&self) -> Vec<Url> {
58        self.addresses.clone()
59    }
60}
61
62/// The values of the DHT are `Vec<FudSeeder>`, mapping resource hashes to lists of [`FudSeeder`]s
63#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
64pub struct FudSeeder {
65    /// Resource that this seeder provides
66    pub key: blake3::Hash,
67    /// Seeder's node data
68    pub node: FudNode,
69    /// When this [`FudSeeder`] was added to our hash table.
70    /// This is not sent to other nodes.
71    #[skip_serialize]
72    pub timestamp: u64,
73}
74
75impl PartialEq for FudSeeder {
76    fn eq(&self, other: &Self) -> bool {
77        self.key == other.key && self.node.id() == other.node.id()
78    }
79}
80
81/// [`DhtHandler`] implementation for fud
82#[async_trait]
83impl DhtHandler for Fud {
84    type Value = Vec<FudSeeder>;
85    type Node = FudNode;
86
87    fn dht(&self) -> Arc<Dht<Self>> {
88        self.dht.clone()
89    }
90
91    async fn node(&self) -> FudNode {
92        FudNode {
93            data: self.node_data.read().await.clone(),
94            addresses: self
95                .p2p
96                .clone()
97                .hosts()
98                .external_addrs()
99                .await
100                .iter()
101                .filter(|addr| !addr.to_string().contains("[::]"))
102                .cloned()
103                .collect(),
104        }
105    }
106
107    async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
108        debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
109        let msg_subsystem = channel.message_subsystem();
110        msg_subsystem.add_dispatch::<FudPingReply>().await;
111        let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
112
113        // Send `FudPingRequest`
114        let mut rng = OsRng;
115        let request = FudPingRequest { random: rng.gen() };
116        channel.send(&request).await?;
117
118        // Wait for `FudPingReply`
119        let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await;
120        msg_subscriber.unsubscribe().await;
121        let reply = reply?;
122
123        // Verify the signature
124        if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
125            channel.ban().await;
126            return Err(Error::InvalidSignature)
127        }
128
129        // Verify PoW
130        if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await {
131            channel.ban().await;
132            return Err(e)
133        }
134
135        Ok(reply.node.clone())
136    }
137
138    // TODO: Optimize this
139    async fn on_new_node(&self, node: &FudNode) -> Result<()> {
140        debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id()));
141
142        // If this is the first node we know about, then bootstrap and announce our files
143        if !self.dht.is_bootstrapped().await {
144            let _ = self.init().await;
145        }
146
147        // Send keys that are closer to this node than we are
148        let self_id = self.node_data.read().await.id();
149        let channel = self.dht.get_channel(node, None).await?;
150        for (key, seeders) in self.dht.hash_table.read().await.iter() {
151            let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id()));
152            let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
153            if node_distance <= self_distance {
154                let _ = channel.send(&FudAnnounce { key: *key, seeders: seeders.clone() }).await;
155            }
156        }
157        self.dht.cleanup_channel(channel).await;
158
159        Ok(())
160    }
161
162    async fn find_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> {
163        debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
164
165        let channel = self.dht.get_channel(node, None).await?;
166        let msg_subsystem = channel.message_subsystem();
167        msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
168        let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
169
170        let request = FudFindNodesRequest { key: *key };
171        channel.send(&request).await?;
172
173        let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await;
174
175        msg_subscriber_nodes.unsubscribe().await;
176        self.dht.cleanup_channel(channel).await;
177
178        Ok(reply?.nodes.clone())
179    }
180
181    async fn find_value(
182        &self,
183        node: &FudNode,
184        key: &blake3::Hash,
185    ) -> Result<DhtLookupReply<FudNode, Vec<FudSeeder>>> {
186        debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
187
188        let channel = self.dht.get_channel(node, None).await?;
189        let msg_subsystem = channel.message_subsystem();
190        msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
191        let msg_subscriber = channel.subscribe_msg::<FudFindSeedersReply>().await.unwrap();
192
193        let request = FudFindSeedersRequest { key: *key };
194        channel.send(&request).await?;
195
196        let recv = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await;
197
198        msg_subscriber.unsubscribe().await;
199        self.dht.cleanup_channel(channel).await;
200
201        let rep = recv?;
202        Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone()))
203    }
204
205    async fn add_value(&self, key: &blake3::Hash, value: &Vec<FudSeeder>) {
206        let mut seeders = value.clone();
207
208        // Remove seeders with no external addresses
209        seeders.retain(|item| !item.node.addresses().is_empty());
210
211        // Set all seeders' timestamp. They are not sent to others nodes so they default to 0.
212        let timestamp = Timestamp::current_time().inner();
213        for seeder in &mut seeders {
214            seeder.timestamp = timestamp;
215        }
216
217        debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key));
218
219        let mut seeders_write = self.dht.hash_table.write().await;
220        let existing_seeders = seeders_write.get_mut(key);
221
222        if let Some(existing_seeders) = existing_seeders {
223            existing_seeders.retain(|it| !seeders.contains(it));
224            existing_seeders.extend(seeders.clone());
225        } else {
226            let mut vec = Vec::new();
227            vec.extend(seeders.clone());
228            seeders_write.insert(*key, vec);
229        }
230    }
231
232    fn key_to_string(key: &blake3::Hash) -> String {
233        hash_to_string(key)
234    }
235}