1use std::sync::Arc;
20
21use async_trait::async_trait;
22use log::debug;
23use num_bigint::BigUint;
24use rand::{rngs::OsRng, Rng};
25use url::Url;
26
27use darkfi::{
28 dht::{impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode},
29 geode::hash_to_string,
30 net::ChannelPtr,
31 util::time::Timestamp,
32 Error, Result,
33};
34use darkfi_sdk::crypto::schnorr::SchnorrPublic;
35use darkfi_serial::{SerialDecodable, SerialEncodable};
36
37use crate::{
38 pow::VerifiableNodeData,
39 proto::{
40 FudAnnounce, FudFindNodesReply, FudFindNodesRequest, FudFindSeedersReply,
41 FudFindSeedersRequest, FudPingReply, FudPingRequest,
42 },
43 Fud,
44};
45
46#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
47pub struct FudNode {
48 pub data: VerifiableNodeData,
49 pub addresses: Vec<Url>,
50}
51impl_dht_node_defaults!(FudNode);
52
53impl DhtNode for FudNode {
54 fn id(&self) -> blake3::Hash {
55 self.data.id()
56 }
57 fn addresses(&self) -> Vec<Url> {
58 self.addresses.clone()
59 }
60}
61
62#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
64pub struct FudSeeder {
65 pub key: blake3::Hash,
67 pub node: FudNode,
69 #[skip_serialize]
72 pub timestamp: u64,
73}
74
75impl PartialEq for FudSeeder {
76 fn eq(&self, other: &Self) -> bool {
77 self.key == other.key && self.node.id() == other.node.id()
78 }
79}
80
81#[async_trait]
83impl DhtHandler for Fud {
84 type Value = Vec<FudSeeder>;
85 type Node = FudNode;
86
87 fn dht(&self) -> Arc<Dht<Self>> {
88 self.dht.clone()
89 }
90
91 async fn node(&self) -> FudNode {
92 FudNode {
93 data: self.node_data.read().await.clone(),
94 addresses: self
95 .p2p
96 .clone()
97 .hosts()
98 .external_addrs()
99 .await
100 .iter()
101 .filter(|addr| !addr.to_string().contains("[::]"))
102 .cloned()
103 .collect(),
104 }
105 }
106
107 async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
108 debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
109 let msg_subsystem = channel.message_subsystem();
110 msg_subsystem.add_dispatch::<FudPingReply>().await;
111 let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
112
113 let mut rng = OsRng;
115 let request = FudPingRequest { random: rng.gen() };
116 channel.send(&request).await?;
117
118 let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await;
120 msg_subscriber.unsubscribe().await;
121 let reply = reply?;
122
123 if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
125 channel.ban().await;
126 return Err(Error::InvalidSignature)
127 }
128
129 if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await {
131 channel.ban().await;
132 return Err(e)
133 }
134
135 Ok(reply.node.clone())
136 }
137
138 async fn on_new_node(&self, node: &FudNode) -> Result<()> {
140 debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id()));
141
142 if !self.dht.is_bootstrapped().await {
144 let _ = self.init().await;
145 }
146
147 let self_id = self.node_data.read().await.id();
149 let channel = self.dht.get_channel(node, None).await?;
150 for (key, seeders) in self.dht.hash_table.read().await.iter() {
151 let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id()));
152 let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
153 if node_distance <= self_distance {
154 let _ = channel.send(&FudAnnounce { key: *key, seeders: seeders.clone() }).await;
155 }
156 }
157 self.dht.cleanup_channel(channel).await;
158
159 Ok(())
160 }
161
162 async fn find_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> {
163 debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
164
165 let channel = self.dht.get_channel(node, None).await?;
166 let msg_subsystem = channel.message_subsystem();
167 msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
168 let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
169
170 let request = FudFindNodesRequest { key: *key };
171 channel.send(&request).await?;
172
173 let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await;
174
175 msg_subscriber_nodes.unsubscribe().await;
176 self.dht.cleanup_channel(channel).await;
177
178 Ok(reply?.nodes.clone())
179 }
180
181 async fn find_value(
182 &self,
183 node: &FudNode,
184 key: &blake3::Hash,
185 ) -> Result<DhtLookupReply<FudNode, Vec<FudSeeder>>> {
186 debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
187
188 let channel = self.dht.get_channel(node, None).await?;
189 let msg_subsystem = channel.message_subsystem();
190 msg_subsystem.add_dispatch::<FudFindSeedersReply>().await;
191 let msg_subscriber = channel.subscribe_msg::<FudFindSeedersReply>().await.unwrap();
192
193 let request = FudFindSeedersRequest { key: *key };
194 channel.send(&request).await?;
195
196 let recv = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await;
197
198 msg_subscriber.unsubscribe().await;
199 self.dht.cleanup_channel(channel).await;
200
201 let rep = recv?;
202 Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone()))
203 }
204
205 async fn add_value(&self, key: &blake3::Hash, value: &Vec<FudSeeder>) {
206 let mut seeders = value.clone();
207
208 seeders.retain(|item| !item.node.addresses().is_empty());
210
211 let timestamp = Timestamp::current_time().inner();
213 for seeder in &mut seeders {
214 seeder.timestamp = timestamp;
215 }
216
217 debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key));
218
219 let mut seeders_write = self.dht.hash_table.write().await;
220 let existing_seeders = seeders_write.get_mut(key);
221
222 if let Some(existing_seeders) = existing_seeders {
223 existing_seeders.retain(|it| !seeders.contains(it));
224 existing_seeders.extend(seeders.clone());
225 } else {
226 let mut vec = Vec::new();
227 vec.extend(seeders.clone());
228 seeders_write.insert(*key, vec);
229 }
230 }
231
232 fn key_to_string(key: &blake3::Hash) -> String {
233 hash_to_string(key)
234 }
235}