1use async_trait::async_trait;
20use log::{debug, error, info};
21use smol::Executor;
22use std::{path::StripPrefixError, sync::Arc};
23
24use darkfi::{
25 dht::DhtHandler,
26 geode::hash_to_string,
27 impl_p2p_message,
28 net::{
29 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30 ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
31 ProtocolJobsManager, ProtocolJobsManagerPtr,
32 },
33 Error, Result,
34};
35use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
36use darkfi_serial::{SerialDecodable, SerialEncodable};
37
38use crate::{
39 dht::{FudNode, FudSeeder},
40 Fud,
41};
42
43#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
45pub struct FudFileReply {
46 pub chunk_hashes: Vec<blake3::Hash>,
47}
48impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
49
50#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
52pub struct FudDirectoryReply {
53 pub chunk_hashes: Vec<blake3::Hash>,
54 pub files: Vec<(String, u64)>, }
56impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
57
58#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
60pub struct FudAnnounce {
61 pub key: blake3::Hash,
62 pub seeders: Vec<FudSeeder>,
63}
64impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
65
66#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
68pub struct FudChunkReply {
69 pub chunk: Vec<u8>,
71}
72impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
73
74#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
76pub struct FudNotFound;
77impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
78
79#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
81pub struct FudPingRequest {
82 pub random: u64,
83}
84impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
85
86#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
88pub struct FudPingReply {
89 pub node: FudNode,
90 pub sig: Signature,
92}
93impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
94
95#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
97pub struct FudFindRequest {
98 pub info: Option<blake3::Hash>,
99 pub key: blake3::Hash,
100}
101impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
102
103#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
105pub struct FudFindNodesRequest {
106 pub key: blake3::Hash,
107}
108impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
109
110#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
112pub struct FudFindNodesReply {
113 pub nodes: Vec<FudNode>,
114}
115impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
116
117#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
119pub struct FudFindSeedersRequest {
120 pub key: blake3::Hash,
121}
122impl_p2p_message!(
123 FudFindSeedersRequest,
124 "FudFindSeedersRequest",
125 0,
126 0,
127 DEFAULT_METERING_CONFIGURATION
128);
129
130#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
132pub struct FudFindSeedersReply {
133 pub seeders: Vec<FudSeeder>,
134 pub nodes: Vec<FudNode>,
135}
136impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
137
138pub struct ProtocolFud {
140 channel: ChannelPtr,
141 ping_request_sub: MessageSubscription<FudPingRequest>,
142 find_request_sub: MessageSubscription<FudFindRequest>,
143 find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>,
144 find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>,
145 announce_sub: MessageSubscription<FudAnnounce>,
146 fud: Arc<Fud>,
147 jobsman: ProtocolJobsManagerPtr,
148}
149
150impl ProtocolFud {
151 pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
152 debug!(
153 target: "fud::proto::ProtocolFud::init()",
154 "Adding ProtocolFud to the protocol registry"
155 );
156
157 let msg_subsystem = channel.message_subsystem();
158 msg_subsystem.add_dispatch::<FudPingRequest>().await;
159 msg_subsystem.add_dispatch::<FudFindRequest>().await;
160 msg_subsystem.add_dispatch::<FudFindNodesRequest>().await;
161 msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await;
162 msg_subsystem.add_dispatch::<FudAnnounce>().await;
163
164 let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
165 let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?;
166 let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?;
167 let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?;
168 let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
169
170 Ok(Arc::new(Self {
171 channel: channel.clone(),
172 ping_request_sub,
173 find_request_sub,
174 find_nodes_request_sub,
175 find_seeders_request_sub,
176 announce_sub,
177 fud,
178 jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
179 }))
180 }
181
182 async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
183 debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
184
185 loop {
186 let ping_req = match self.ping_request_sub.receive().await {
187 Ok(v) => v,
188 Err(Error::ChannelStopped) => continue,
189 Err(e) => {
190 error!("{e}");
191 continue
192 }
193 };
194 info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST");
195
196 let reply = FudPingReply {
197 node: self.fud.node().await,
198 sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()),
199 };
200 match self.channel.send(&reply).await {
201 Ok(()) => continue,
202 Err(_e) => continue,
203 }
204 }
205 }
206
207 async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> {
208 debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START");
209
210 loop {
211 let request = match self.find_request_sub.receive().await {
212 Ok(v) => v,
213 Err(Error::ChannelStopped) => continue,
214 Err(e) => {
215 error!("{e}");
216 continue
217 }
218 };
219 info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND for {}", hash_to_string(&request.key));
220
221 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
222 if let Some(node) = node {
223 self.fud.dht.update_node(&node).await;
224 }
225
226 if self.handle_fud_chunk_request(&request).await {
227 continue;
228 }
229
230 if self.handle_fud_metadata_request(&request).await {
231 continue;
232 }
233
234 let reply = FudNotFound {};
236 info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
237 let _ = self.channel.send(&reply).await;
238 }
239 }
240
241 async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool {
244 let hash = request.info;
245 if hash.is_none() {
246 return false;
247 }
248 let hash = hash.unwrap();
249
250 let path = self.fud.hash_to_path(&hash).ok().flatten();
251 if path.is_none() {
252 return false;
253 }
254 let path = path.unwrap();
255
256 let chunked = self.fud.geode.get(&hash, &path).await;
257 if chunked.is_err() {
258 return false;
259 }
260
261 let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await;
262 if let Ok(chunk) = chunk {
263 if !self.fud.geode.verify_chunk(&request.key, &chunk) {
264 return false;
266 }
267 let reply = FudChunkReply { chunk };
268 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key));
269 let _ = self.channel.send(&reply).await;
270 return true;
271 }
272
273 false
274 }
275
276 async fn handle_fud_metadata_request(&self, request: &FudFindRequest) -> bool {
279 let path = self.fud.hash_to_path(&request.key).ok().flatten();
280 if path.is_none() {
281 return false;
282 }
283 let path = path.unwrap();
284
285 let chunked_file = self.fud.geode.get(&request.key, &path).await.ok();
286 if chunked_file.is_none() {
287 return false;
288 }
289 let mut chunked_file = chunked_file.unwrap();
290
291 if chunked_file.len() == 1 && !chunked_file.is_dir() {
293 let chunk_hash = chunked_file.get_chunks()[0].0;
294 let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
295 if let Ok(chunk) = chunk {
296 if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.key {
297 return false;
299 }
300 let reply = FudChunkReply { chunk };
301 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
302 let _ = self.channel.send(&reply).await;
303 return true;
304 }
305 return false;
306 }
307
308 match chunked_file.is_dir() {
310 false => {
311 let reply = FudFileReply {
312 chunk_hashes: chunked_file
313 .get_chunks()
314 .iter()
315 .map(|(chunk, _)| *chunk)
316 .collect(),
317 };
318 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.key));
319 let _ = self.channel.send(&reply).await;
320 }
321 true => {
322 let files = chunked_file
323 .get_files()
324 .iter()
325 .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
326 Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
327 Err(e) => Err(e),
328 })
329 .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
330 if let Err(e) = files {
331 error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
332 return false;
333 }
334 let reply = FudDirectoryReply {
335 chunk_hashes: chunked_file
336 .get_chunks()
337 .iter()
338 .map(|(chunk, _)| *chunk)
339 .collect(),
340 files: files.unwrap(),
341 };
342 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.key));
343 let _ = self.channel.send(&reply).await;
344 }
345 };
346
347 true
348 }
349
350 async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
351 debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");
352
353 loop {
354 let request = match self.find_nodes_request_sub.receive().await {
355 Ok(v) => v,
356 Err(Error::ChannelStopped) => continue,
357 Err(e) => {
358 error!("{e}");
359 continue
360 }
361 };
362 info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
363
364 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
365 if let Some(node) = node {
366 self.fud.dht.update_node(&node).await;
367 }
368
369 let reply = FudFindNodesReply {
370 nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
371 };
372 match self.channel.send(&reply).await {
373 Ok(()) => continue,
374 Err(_e) => continue,
375 }
376 }
377 }
378
379 async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> {
380 debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START");
381
382 loop {
383 let request = match self.find_seeders_request_sub.receive().await {
384 Ok(v) => v,
385 Err(Error::ChannelStopped) => continue,
386 Err(e) => {
387 error!("{e}");
388 continue
389 }
390 };
391 info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key));
392
393 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
394 if let Some(node) = node {
395 self.fud.dht.update_node(&node).await;
396 }
397
398 let router = self.fud.dht.hash_table.read().await;
399 let peers = router.get(&request.key);
400
401 match peers {
402 Some(seeders) => {
403 let _ = self
404 .channel
405 .send(&FudFindSeedersReply {
406 seeders: seeders.to_vec(),
407 nodes: self
408 .fud
409 .dht()
410 .find_neighbors(&request.key, self.fud.dht().settings.k)
411 .await,
412 })
413 .await;
414 }
415 None => {
416 let _ = self
417 .channel
418 .send(&FudFindSeedersReply {
419 seeders: vec![],
420 nodes: self
421 .fud
422 .dht()
423 .find_neighbors(&request.key, self.fud.dht().settings.k)
424 .await,
425 })
426 .await;
427 }
428 };
429 }
430 }
431
432 async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
433 debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
434
435 loop {
436 let request = match self.announce_sub.receive().await {
437 Ok(v) => v,
438 Err(Error::ChannelStopped) => continue,
439 Err(e) => {
440 error!("{e}");
441 continue
442 }
443 };
444 info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
445
446 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
447 if let Some(node) = node {
448 self.fud.dht.update_node(&node).await;
449 }
450
451 let mut seeders = vec![];
452
453 for seeder in request.seeders.clone() {
454 if seeder.node.addresses.is_empty() {
455 continue
456 }
457 seeders.push(seeder);
459 }
460
461 self.fud.add_value(&request.key, &seeders).await;
462 }
463 }
464}
465
466#[async_trait]
467impl ProtocolBase for ProtocolFud {
468 async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
469 debug!(target: "fud::ProtocolFud::start()", "START");
470 self.jobsman.clone().start(executor.clone());
471 self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
472 self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await;
473 self.jobsman
474 .clone()
475 .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone())
476 .await;
477 self.jobsman
478 .clone()
479 .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone())
480 .await;
481 self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
482 debug!(target: "fud::ProtocolFud::start()", "END");
483 Ok(())
484 }
485
486 fn name(&self) -> &'static str {
487 "ProtocolFud"
488 }
489}