fud/
dht.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use rand::{rngs::OsRng, Rng};
23use tinyjson::JsonValue;
24use tracing::{debug, warn};
25use url::Url;
26
27use darkfi::{
28    dht::{
29        event::DhtEvent, impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode,
30        HostCacheItem,
31    },
32    geode::hash_to_string,
33    net::{
34        session::{SESSION_DIRECT, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND},
35        ChannelPtr,
36    },
37    rpc::util::json_map,
38    system::timeout::timeout,
39    util::time::Timestamp,
40    Error, Result,
41};
42use darkfi_sdk::crypto::schnorr::{SchnorrPublic, Signature};
43use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
44
45use crate::{
46    pow::VerifiableNodeData,
47    proto::{
48        FudAnnounce, FudNodesReply, FudNodesRequest, FudPingReply, FudPingRequest, FudSeedersReply,
49        FudSeedersRequest,
50    },
51    util::receive_resource_msg,
52    Fud,
53};
54
55#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
56pub struct FudNode {
57    pub data: VerifiableNodeData,
58    pub addresses: Vec<Url>,
59}
60impl_dht_node_defaults!(FudNode);
61
62impl DhtNode for FudNode {
63    fn id(&self) -> blake3::Hash {
64        self.data.id()
65    }
66    fn addresses(&self) -> Vec<Url> {
67        self.addresses.clone()
68    }
69}
70
71impl From<FudNode> for JsonValue {
72    fn from(node: FudNode) -> JsonValue {
73        json_map([
74            ("id", JsonValue::String(hash_to_string(&node.id()))),
75            (
76                "addresses",
77                JsonValue::Array(
78                    node.addresses.iter().map(|addr| JsonValue::String(addr.to_string())).collect(),
79                ),
80            ),
81        ])
82    }
83}
84
85/// The values of the DHT are `Vec<FudSeeder>`, mapping resource hashes to lists of [`FudSeeder`]s
86#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
87pub struct FudSeeder {
88    /// Resource that this seeder provides
89    pub key: blake3::Hash,
90    /// Seeder's node data
91    pub node: FudNode,
92    /// Seeder's signature of (key || node)
93    pub sig: Signature,
94    /// When this [`FudSeeder`] was added to our hash table.
95    /// This is not sent to other nodes.
96    #[skip_serialize]
97    pub timestamp: u64,
98}
99
100impl PartialEq for FudSeeder {
101    fn eq(&self, other: &Self) -> bool {
102        self.key == other.key && self.node.id() == other.node.id()
103    }
104}
105
106impl From<FudSeeder> for JsonValue {
107    fn from(seeder: FudSeeder) -> JsonValue {
108        json_map([
109            ("key", JsonValue::String(hash_to_string(&seeder.key))),
110            ("node", seeder.node.into()),
111        ])
112    }
113}
114
115impl FudSeeder {
116    pub async fn verify_signature(&self) -> bool {
117        self.node.data.public_key.verify(
118            &[self.key.as_bytes().to_vec(), serialize_async(&self.node).await].concat(),
119            &self.sig,
120        )
121    }
122}
123
124/// [`DhtHandler`] implementation for fud
125#[async_trait]
126impl DhtHandler for Fud {
127    type Value = Vec<FudSeeder>;
128    type Node = FudNode;
129
130    fn dht(&self) -> Arc<Dht<Self>> {
131        self.dht.clone()
132    }
133
134    async fn node(&self) -> Result<FudNode> {
135        let state = self.state.read().await;
136        if state.is_none() {
137            return Err(Error::Custom("Fud is not ready yet".to_string()));
138        }
139
140        return Ok(FudNode {
141            data: state.clone().unwrap().node_data,
142            addresses: self
143                .p2p
144                .clone()
145                .hosts()
146                .external_addrs()
147                .await
148                .iter()
149                .filter(|addr| !addr.to_string().contains("[::]"))
150                .cloned()
151                .collect(),
152        })
153    }
154
155    async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
156        debug!(target: "fud::DhtHandler::ping()", "Sending ping to {}", channel.display_address());
157
158        // Setup `FudPingReply` subscriber
159        let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
160
161        // Send `FudPingRequest`
162        let mut rng = OsRng;
163        let request = FudPingRequest { random: rng.gen() };
164        if channel.is_stopped() {
165            return Err(Error::ChannelStopped)
166        }
167        channel.send(&request).await?;
168
169        // Wait for `FudPingReply`
170        let reply = msg_subscriber.receive_with_timeout(self.dht.settings.timeout).await;
171        msg_subscriber.unsubscribe().await;
172        let reply = reply?;
173        let node = &reply.node;
174
175        // Verify the signature
176        if !node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
177            warn!(target: "fud::DhtHandler::ping()", "Received an invalid signature while pinging {}", channel.display_address());
178            self.dht
179                .event_publisher
180                .notify(DhtEvent::PingReceived {
181                    from: channel.clone(),
182                    result: Err(Error::InvalidSignature),
183                })
184                .await;
185            self.dht.cleanup_channel(channel.clone()).await;
186            channel.ban().await;
187            return Err(Error::InvalidSignature)
188        }
189
190        // Verify PoW
191        if let Err(e) = self.pow.write().await.verify_node(&node.data).await {
192            warn!(target: "fud::DhtHandler::ping()", "Received an invalid PoW while pinging {}: {e}", channel.display_address());
193            self.dht
194                .event_publisher
195                .notify(DhtEvent::PingReceived { from: channel.clone(), result: Err(e.clone()) })
196                .await;
197            self.dht.cleanup_channel(channel.clone()).await;
198            channel.ban().await;
199            return Err(e)
200        }
201        self.dht
202            .event_publisher
203            .notify(DhtEvent::PingReceived { from: channel.clone(), result: Ok(node.id()) })
204            .await;
205
206        if channel.session_type_id() & (SESSION_OUTBOUND | SESSION_DIRECT | SESSION_MANUAL) != 0 {
207            // Wait for the other node to ping us
208            let ping_timeout = Duration::from_secs(10);
209
210            if let Err(e) = timeout(ping_timeout, self.dht.wait_fully_pinged(channel.info.id)).await
211            {
212                self.dht.cleanup_channel(channel).await;
213                return Err(e.into())
214            }
215
216            let mut host_cache = self.dht.host_cache.write().await;
217
218            // If we had another node id for this host in our cache, remove
219            // the old one from the buckets and seeders
220            if let Some(cached) = host_cache.get(channel.address()) {
221                if cached.node_id != node.id() {
222                    self.dht.remove_node(&cached.node_id).await;
223
224                    for (_, seeders) in self.dht.hash_table.write().await.iter_mut() {
225                        seeders.retain(|seeder| seeder.node.id() != cached.node_id);
226                    }
227                }
228            }
229
230            // Update host cache
231            host_cache.insert(
232                channel.address().clone(),
233                HostCacheItem { last_ping: Timestamp::current_time(), node_id: node.id() },
234            );
235
236            drop(host_cache);
237
238            // Update our buckets
239            if !node.addresses().is_empty() {
240                self.dht.update_node(&node.clone(), channel.clone()).await;
241            }
242        } else if channel.session_type_id() & SESSION_INBOUND != 0 {
243            // If it's an inbound connection, verify that we can connect to at
244            // least one of the provided external addresses.
245            // This may try to create a new outbound channel and it will update
246            // our buckets if successful.
247            let _ = self.verify_node_tx.send(node.clone()).await;
248        }
249
250        // Update the channel cache
251        self.dht.add_channel_to_cache(channel.info.id, node).await;
252
253        Ok(node.clone())
254    }
255
256    async fn store(
257        &self,
258        channel: ChannelPtr,
259        key: &blake3::Hash,
260        value: &Vec<FudSeeder>,
261    ) -> Result<()> {
262        debug!(target: "fud::DhtHandler::store()", "Announcing {} to {}", hash_to_string(key), channel.display_address());
263
264        channel.send(&FudAnnounce { key: *key, seeders: value.clone() }).await
265    }
266
267    async fn find_nodes(&self, channel: ChannelPtr, key: &blake3::Hash) -> Result<Vec<FudNode>> {
268        debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), channel.display_address());
269
270        let msg_subscriber_nodes = channel.subscribe_msg::<FudNodesReply>().await.unwrap();
271
272        let request = FudNodesRequest { key: *key };
273        channel.send(&request).await?;
274
275        let reply =
276            receive_resource_msg(&msg_subscriber_nodes, *key, self.dht().settings.timeout).await;
277
278        msg_subscriber_nodes.unsubscribe().await;
279
280        Ok(reply?.nodes.clone())
281    }
282
283    async fn find_value(
284        &self,
285        channel: ChannelPtr,
286        key: &blake3::Hash,
287    ) -> Result<DhtLookupReply<FudNode, Vec<FudSeeder>>> {
288        debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} (or close nodes) from {}", hash_to_string(key), channel.display_address());
289
290        let msg_subscriber = channel.subscribe_msg::<FudSeedersReply>().await.unwrap();
291
292        let request = FudSeedersRequest { key: *key };
293        channel.send(&request).await?;
294
295        let recv = receive_resource_msg(&msg_subscriber, *key, self.dht().settings.timeout).await;
296
297        msg_subscriber.unsubscribe().await;
298
299        let rep = recv?;
300        Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone()))
301    }
302
303    async fn add_value(&self, key: &blake3::Hash, value: &Vec<FudSeeder>) {
304        let mut seeders = value.clone();
305
306        // Remove seeders with no external addresses
307        seeders.retain(|item| !item.node.addresses().is_empty());
308
309        // Set all seeders' timestamp. They are not sent to others nodes so they default to 0.
310        let timestamp = Timestamp::current_time().inner();
311        for seeder in &mut seeders {
312            seeder.timestamp = timestamp;
313        }
314
315        debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key));
316
317        let mut seeders_write = self.dht.hash_table.write().await;
318        let existing_seeders = seeders_write.get_mut(key);
319
320        if let Some(existing_seeders) = existing_seeders {
321            existing_seeders.retain(|it| !seeders.contains(it));
322            existing_seeders.extend(seeders.clone());
323        } else {
324            let mut vec = Vec::new();
325            vec.extend(seeders.clone());
326            seeders_write.insert(*key, vec);
327        }
328    }
329
330    fn key_to_string(key: &blake3::Hash) -> String {
331        hash_to_string(key)
332    }
333}