1use std::{sync::Arc, time::Duration};
20
21use async_trait::async_trait;
22use rand::{rngs::OsRng, Rng};
23use tinyjson::JsonValue;
24use tracing::{debug, warn};
25use url::Url;
26
27use darkfi::{
28 dht::{
29 event::DhtEvent, impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode,
30 HostCacheItem,
31 },
32 geode::hash_to_string,
33 net::{
34 session::{SESSION_DIRECT, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND},
35 ChannelPtr,
36 },
37 rpc::util::json_map,
38 system::timeout::timeout,
39 util::time::Timestamp,
40 Error, Result,
41};
42use darkfi_sdk::crypto::schnorr::{SchnorrPublic, Signature};
43use darkfi_serial::{serialize_async, SerialDecodable, SerialEncodable};
44
45use crate::{
46 pow::VerifiableNodeData,
47 proto::{
48 FudAnnounce, FudNodesReply, FudNodesRequest, FudPingReply, FudPingRequest, FudSeedersReply,
49 FudSeedersRequest,
50 },
51 util::receive_resource_msg,
52 Fud,
53};
54
55#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
56pub struct FudNode {
57 pub data: VerifiableNodeData,
58 pub addresses: Vec<Url>,
59}
60impl_dht_node_defaults!(FudNode);
61
62impl DhtNode for FudNode {
63 fn id(&self) -> blake3::Hash {
64 self.data.id()
65 }
66 fn addresses(&self) -> Vec<Url> {
67 self.addresses.clone()
68 }
69}
70
71impl From<FudNode> for JsonValue {
72 fn from(node: FudNode) -> JsonValue {
73 json_map([
74 ("id", JsonValue::String(hash_to_string(&node.id()))),
75 (
76 "addresses",
77 JsonValue::Array(
78 node.addresses.iter().map(|addr| JsonValue::String(addr.to_string())).collect(),
79 ),
80 ),
81 ])
82 }
83}
84
85#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
87pub struct FudSeeder {
88 pub key: blake3::Hash,
90 pub node: FudNode,
92 pub sig: Signature,
94 #[skip_serialize]
97 pub timestamp: u64,
98}
99
100impl PartialEq for FudSeeder {
101 fn eq(&self, other: &Self) -> bool {
102 self.key == other.key && self.node.id() == other.node.id()
103 }
104}
105
106impl From<FudSeeder> for JsonValue {
107 fn from(seeder: FudSeeder) -> JsonValue {
108 json_map([
109 ("key", JsonValue::String(hash_to_string(&seeder.key))),
110 ("node", seeder.node.into()),
111 ])
112 }
113}
114
115impl FudSeeder {
116 pub async fn verify_signature(&self) -> bool {
117 self.node.data.public_key.verify(
118 &[self.key.as_bytes().to_vec(), serialize_async(&self.node).await].concat(),
119 &self.sig,
120 )
121 }
122}
123
124#[async_trait]
126impl DhtHandler for Fud {
127 type Value = Vec<FudSeeder>;
128 type Node = FudNode;
129
130 fn dht(&self) -> Arc<Dht<Self>> {
131 self.dht.clone()
132 }
133
134 async fn node(&self) -> Result<FudNode> {
135 let state = self.state.read().await;
136 if state.is_none() {
137 return Err(Error::Custom("Fud is not ready yet".to_string()));
138 }
139
140 return Ok(FudNode {
141 data: state.clone().unwrap().node_data,
142 addresses: self
143 .p2p
144 .clone()
145 .hosts()
146 .external_addrs()
147 .await
148 .iter()
149 .filter(|addr| !addr.to_string().contains("[::]"))
150 .cloned()
151 .collect(),
152 })
153 }
154
155 async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
156 debug!(target: "fud::DhtHandler::ping()", "Sending ping to {}", channel.display_address());
157
158 let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
160
161 let mut rng = OsRng;
163 let request = FudPingRequest { random: rng.gen() };
164 if channel.is_stopped() {
165 return Err(Error::ChannelStopped)
166 }
167 channel.send(&request).await?;
168
169 let reply = msg_subscriber.receive_with_timeout(self.dht.settings.timeout).await;
171 msg_subscriber.unsubscribe().await;
172 let reply = reply?;
173 let node = &reply.node;
174
175 if !node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
177 warn!(target: "fud::DhtHandler::ping()", "Received an invalid signature while pinging {}", channel.display_address());
178 self.dht
179 .event_publisher
180 .notify(DhtEvent::PingReceived {
181 from: channel.clone(),
182 result: Err(Error::InvalidSignature),
183 })
184 .await;
185 self.dht.cleanup_channel(channel.clone()).await;
186 channel.ban().await;
187 return Err(Error::InvalidSignature)
188 }
189
190 if let Err(e) = self.pow.write().await.verify_node(&node.data).await {
192 warn!(target: "fud::DhtHandler::ping()", "Received an invalid PoW while pinging {}: {e}", channel.display_address());
193 self.dht
194 .event_publisher
195 .notify(DhtEvent::PingReceived { from: channel.clone(), result: Err(e.clone()) })
196 .await;
197 self.dht.cleanup_channel(channel.clone()).await;
198 channel.ban().await;
199 return Err(e)
200 }
201 self.dht
202 .event_publisher
203 .notify(DhtEvent::PingReceived { from: channel.clone(), result: Ok(node.id()) })
204 .await;
205
206 if channel.session_type_id() & (SESSION_OUTBOUND | SESSION_DIRECT | SESSION_MANUAL) != 0 {
207 let ping_timeout = Duration::from_secs(10);
209
210 if let Err(e) = timeout(ping_timeout, self.dht.wait_fully_pinged(channel.info.id)).await
211 {
212 self.dht.cleanup_channel(channel).await;
213 return Err(e.into())
214 }
215
216 let mut host_cache = self.dht.host_cache.write().await;
217
218 if let Some(cached) = host_cache.get(channel.address()) {
221 if cached.node_id != node.id() {
222 self.dht.remove_node(&cached.node_id).await;
223
224 for (_, seeders) in self.dht.hash_table.write().await.iter_mut() {
225 seeders.retain(|seeder| seeder.node.id() != cached.node_id);
226 }
227 }
228 }
229
230 host_cache.insert(
232 channel.address().clone(),
233 HostCacheItem { last_ping: Timestamp::current_time(), node_id: node.id() },
234 );
235
236 drop(host_cache);
237
238 if !node.addresses().is_empty() {
240 self.dht.update_node(&node.clone(), channel.clone()).await;
241 }
242 } else if channel.session_type_id() & SESSION_INBOUND != 0 {
243 let _ = self.verify_node_tx.send(node.clone()).await;
248 }
249
250 self.dht.add_channel_to_cache(channel.info.id, node).await;
252
253 Ok(node.clone())
254 }
255
256 async fn store(
257 &self,
258 channel: ChannelPtr,
259 key: &blake3::Hash,
260 value: &Vec<FudSeeder>,
261 ) -> Result<()> {
262 debug!(target: "fud::DhtHandler::store()", "Announcing {} to {}", hash_to_string(key), channel.display_address());
263
264 channel.send(&FudAnnounce { key: *key, seeders: value.clone() }).await
265 }
266
267 async fn find_nodes(&self, channel: ChannelPtr, key: &blake3::Hash) -> Result<Vec<FudNode>> {
268 debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), channel.display_address());
269
270 let msg_subscriber_nodes = channel.subscribe_msg::<FudNodesReply>().await.unwrap();
271
272 let request = FudNodesRequest { key: *key };
273 channel.send(&request).await?;
274
275 let reply =
276 receive_resource_msg(&msg_subscriber_nodes, *key, self.dht().settings.timeout).await;
277
278 msg_subscriber_nodes.unsubscribe().await;
279
280 Ok(reply?.nodes.clone())
281 }
282
283 async fn find_value(
284 &self,
285 channel: ChannelPtr,
286 key: &blake3::Hash,
287 ) -> Result<DhtLookupReply<FudNode, Vec<FudSeeder>>> {
288 debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} (or close nodes) from {}", hash_to_string(key), channel.display_address());
289
290 let msg_subscriber = channel.subscribe_msg::<FudSeedersReply>().await.unwrap();
291
292 let request = FudSeedersRequest { key: *key };
293 channel.send(&request).await?;
294
295 let recv = receive_resource_msg(&msg_subscriber, *key, self.dht().settings.timeout).await;
296
297 msg_subscriber.unsubscribe().await;
298
299 let rep = recv?;
300 Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone()))
301 }
302
303 async fn add_value(&self, key: &blake3::Hash, value: &Vec<FudSeeder>) {
304 let mut seeders = value.clone();
305
306 seeders.retain(|item| !item.node.addresses().is_empty());
308
309 let timestamp = Timestamp::current_time().inner();
311 for seeder in &mut seeders {
312 seeder.timestamp = timestamp;
313 }
314
315 debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key));
316
317 let mut seeders_write = self.dht.hash_table.write().await;
318 let existing_seeders = seeders_write.get_mut(key);
319
320 if let Some(existing_seeders) = existing_seeders {
321 existing_seeders.retain(|it| !seeders.contains(it));
322 existing_seeders.extend(seeders.clone());
323 } else {
324 let mut vec = Vec::new();
325 vec.extend(seeders.clone());
326 seeders_write.insert(*key, vec);
327 }
328 }
329
330 fn key_to_string(key: &blake3::Hash) -> String {
331 hash_to_string(key)
332 }
333}