1use async_trait::async_trait;
20use smol::Executor;
21use std::{path::StripPrefixError, sync::Arc};
22use tracing::{debug, error, info, warn};
23
24use darkfi::{
25 dht::{event::DhtEvent, DhtHandler},
26 geode::hash_to_string,
27 impl_p2p_message,
28 net::{
29 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30 session::SESSION_INBOUND,
31 ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
32 ProtocolJobsManager, ProtocolJobsManagerPtr,
33 },
34 Error, Result,
35};
36use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
37use darkfi_serial::{SerialDecodable, SerialEncodable};
38
39use crate::{
40 dht::{FudNode, FudSeeder},
41 Fud,
42};
43
44pub trait ResourceMessage {
47 fn resource_hash(&self) -> blake3::Hash;
48}
49macro_rules! impl_resource_msg {
50 ($msg:ty, $field:ident) => {
51 impl ResourceMessage for $msg {
52 fn resource_hash(&self) -> blake3::Hash {
53 self.$field
54 }
55 }
56 };
57 ($msg:ty) => {
58 impl_resource_msg!($msg, resource);
59 };
60}
61
62#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
64pub struct FudFileReply {
65 pub resource: blake3::Hash,
66 pub chunk_hashes: Vec<blake3::Hash>,
67}
68impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
69impl_resource_msg!(FudFileReply);
70
71#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
73pub struct FudDirectoryReply {
74 pub resource: blake3::Hash,
75 pub chunk_hashes: Vec<blake3::Hash>,
76 pub files: Vec<(String, u64)>, }
78impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
79impl_resource_msg!(FudDirectoryReply);
80
81#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
83pub struct FudAnnounce {
84 pub key: blake3::Hash,
85 pub seeders: Vec<FudSeeder>,
86}
87impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
88
89#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
91pub struct FudChunkReply {
92 pub resource: blake3::Hash,
93 pub chunk: Vec<u8>,
95}
96impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
97impl_resource_msg!(FudChunkReply);
98
99#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
101pub struct FudMetadataNotFound {
102 pub resource: blake3::Hash,
103}
104impl_p2p_message!(FudMetadataNotFound, "FudMetadataNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
105impl_resource_msg!(FudMetadataNotFound);
106
107#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
109pub struct FudChunkNotFound {
110 pub resource: blake3::Hash,
111 pub chunk: blake3::Hash,
112}
113impl_p2p_message!(FudChunkNotFound, "FudChunkNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
114impl_resource_msg!(FudChunkNotFound);
115
116#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
118pub struct FudPingRequest {
119 pub random: u64,
120}
121impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
122
123#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
125pub struct FudPingReply {
126 pub node: FudNode,
127 pub random: u64,
128 pub sig: Signature,
130}
131impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
132
133#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
135pub struct FudMetadataRequest {
136 pub resource: blake3::Hash,
137}
138impl_p2p_message!(FudMetadataRequest, "FudMetadataRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
139impl_resource_msg!(FudMetadataRequest);
140
141#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
143pub struct FudChunkRequest {
144 pub resource: blake3::Hash,
145 pub chunk: blake3::Hash,
146}
147impl_p2p_message!(FudChunkRequest, "FudChunkRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
148impl_resource_msg!(FudChunkRequest);
149
150#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
152pub struct FudNodesRequest {
153 pub key: blake3::Hash,
154}
155impl_p2p_message!(FudNodesRequest, "FudNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
156
157#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
159pub struct FudNodesReply {
160 pub key: blake3::Hash,
161 pub nodes: Vec<FudNode>,
162}
163impl_p2p_message!(FudNodesReply, "FudNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
164impl_resource_msg!(FudNodesReply, key);
165
166#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
168pub struct FudSeedersRequest {
169 pub key: blake3::Hash,
170}
171impl_p2p_message!(FudSeedersRequest, "FudSeedersRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
172
173#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
175pub struct FudSeedersReply {
176 pub key: blake3::Hash,
177 pub seeders: Vec<FudSeeder>,
178 pub nodes: Vec<FudNode>,
179}
180impl_p2p_message!(FudSeedersReply, "FudSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
181impl_resource_msg!(FudSeedersReply, key);
182
183pub struct ProtocolFud {
185 channel: ChannelPtr,
186 ping_request_sub: MessageSubscription<FudPingRequest>,
187 find_metadata_request_sub: MessageSubscription<FudMetadataRequest>,
188 find_chunk_request_sub: MessageSubscription<FudChunkRequest>,
189 find_nodes_request_sub: MessageSubscription<FudNodesRequest>,
190 find_seeders_request_sub: MessageSubscription<FudSeedersRequest>,
191 announce_sub: MessageSubscription<FudAnnounce>,
192 fud: Arc<Fud>,
193 jobsman: ProtocolJobsManagerPtr,
194}
195
196impl ProtocolFud {
197 pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
198 debug!(
199 target: "fud::proto::ProtocolFud::init()",
200 "Adding ProtocolFud to the protocol registry"
201 );
202
203 let msg_subsystem = channel.message_subsystem();
204 msg_subsystem.add_dispatch::<FudPingRequest>().await;
205 msg_subsystem.add_dispatch::<FudPingReply>().await;
206 msg_subsystem.add_dispatch::<FudMetadataRequest>().await;
207 msg_subsystem.add_dispatch::<FudChunkRequest>().await;
208 msg_subsystem.add_dispatch::<FudChunkReply>().await;
209 msg_subsystem.add_dispatch::<FudChunkNotFound>().await;
210 msg_subsystem.add_dispatch::<FudFileReply>().await;
211 msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
212 msg_subsystem.add_dispatch::<FudMetadataNotFound>().await;
213 msg_subsystem.add_dispatch::<FudNodesRequest>().await;
214 msg_subsystem.add_dispatch::<FudNodesReply>().await;
215 msg_subsystem.add_dispatch::<FudSeedersRequest>().await;
216 msg_subsystem.add_dispatch::<FudSeedersReply>().await;
217 msg_subsystem.add_dispatch::<FudAnnounce>().await;
218
219 let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
220 let find_metadata_request_sub = channel.subscribe_msg::<FudMetadataRequest>().await?;
221 let find_chunk_request_sub = channel.subscribe_msg::<FudChunkRequest>().await?;
222 let find_nodes_request_sub = channel.subscribe_msg::<FudNodesRequest>().await?;
223 let find_seeders_request_sub = channel.subscribe_msg::<FudSeedersRequest>().await?;
224 let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
225
226 Ok(Arc::new(Self {
227 channel: channel.clone(),
228 ping_request_sub,
229 find_metadata_request_sub,
230 find_chunk_request_sub,
231 find_nodes_request_sub,
232 find_seeders_request_sub,
233 announce_sub,
234 fud,
235 jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
236 }))
237 }
238
239 async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
240 debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
241
242 loop {
243 let ping_req = match self.ping_request_sub.receive().await {
244 Ok(v) => v,
245 Err(Error::ChannelStopped) => continue,
246 Err(_) => continue,
247 };
248 info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST from {}", self.channel.display_address());
249 self.fud.dht.update_channel(self.channel.info.id).await;
250
251 let self_node = self.fud.node().await;
252 if self_node.is_err() {
253 self.channel.stop().await;
254 continue
255 }
256 let state = self.fud.state.read().await;
257 if state.is_none() {
258 self.channel.stop().await;
259 continue
260 }
261
262 let reply = FudPingReply {
263 node: self_node.unwrap(),
264 random: ping_req.random,
265 sig: state.clone().unwrap().secret_key.sign(&ping_req.random.to_be_bytes()),
266 };
267 drop(state);
268
269 if let Err(e) = self.channel.send(&reply).await {
270 self.fud
271 .dht
272 .event_publisher
273 .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Err(e) })
274 .await;
275 continue;
276 }
277 self.fud
278 .dht
279 .event_publisher
280 .notify(DhtEvent::PingSent { to: self.channel.clone(), result: Ok(()) })
281 .await;
282
283 if self.channel.session_type_id() & SESSION_INBOUND != 0 {
285 let _ = self.fud.ping(self.channel.clone()).await;
286 }
287 }
288 }
289
290 async fn handle_fud_metadata_request(self: Arc<Self>) -> Result<()> {
291 debug!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "START");
292
293 loop {
294 let request = match self.find_metadata_request_sub.receive().await {
295 Ok(v) => v,
296 Err(Error::ChannelStopped) => continue,
297 Err(_) => continue,
298 };
299 info!(target: "fud::ProtocolFud::handle_fud_request()", "Received METADATA REQUEST for {}", hash_to_string(&request.resource));
300 self.fud.dht.update_channel(self.channel.info.id).await;
301
302 let notfound = async || {
303 let reply = FudMetadataNotFound { resource: request.resource };
304 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "We do not have the metadata of {}", hash_to_string(&request.resource));
305 let _ = self.channel.send(&reply).await;
306 };
307
308 let path = self.fud.hash_to_path(&request.resource).ok().flatten();
309 if path.is_none() {
310 notfound().await;
311 continue
312 }
313 let path = path.unwrap();
314
315 let chunked_file = self.fud.geode.get(&request.resource, &path).await.ok();
316 if chunked_file.is_none() {
317 notfound().await;
318 continue
319 }
320 let mut chunked_file = chunked_file.unwrap();
321
322 if chunked_file.len() == 1 && !chunked_file.is_dir() {
324 let chunk_hash = chunked_file.get_chunks()[0].0;
325 let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
326 if let Ok(chunk) = chunk {
327 if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.resource {
328 notfound().await;
330 continue
331 }
332 let reply = FudChunkReply { resource: request.resource, chunk };
333 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
334 let _ = self.channel.send(&reply).await;
335 continue
336 }
337 }
339
340 match chunked_file.is_dir() {
342 false => {
343 let reply = FudFileReply {
344 resource: request.resource,
345 chunk_hashes: chunked_file
346 .get_chunks()
347 .iter()
348 .map(|(chunk, _)| *chunk)
349 .collect(),
350 };
351 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.resource));
352 let _ = self.channel.send(&reply).await;
353 }
354 true => {
355 let files = chunked_file
356 .get_files()
357 .iter()
358 .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
359 Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
360 Err(e) => Err(e),
361 })
362 .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
363 if let Err(e) = files {
364 error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
365 notfound().await;
366 continue
367 }
368 let reply = FudDirectoryReply {
369 resource: request.resource,
370 chunk_hashes: chunked_file
371 .get_chunks()
372 .iter()
373 .map(|(chunk, _)| *chunk)
374 .collect(),
375 files: files.unwrap(),
376 };
377 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.resource));
378 let _ = self.channel.send(&reply).await;
379 }
380 };
381 }
382 }
383
384 async fn handle_fud_chunk_request(self: Arc<Self>) -> Result<()> {
385 debug!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "START");
386
387 loop {
388 let request = match self.find_chunk_request_sub.receive().await {
389 Ok(v) => v,
390 Err(Error::ChannelStopped) => continue,
391 Err(_) => continue,
392 };
393 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Received CHUNK REQUEST for {}", hash_to_string(&request.resource));
394 self.fud.dht.update_channel(self.channel.info.id).await;
395
396 let notfound = async || {
397 let reply = FudChunkNotFound { resource: request.resource, chunk: request.chunk };
398 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "We do not have chunk {} of resource {}", hash_to_string(&request.resource), hash_to_string(&request.chunk));
399 let _ = self.channel.send(&reply).await;
400 };
401
402 let path = self.fud.hash_to_path(&request.resource).ok().flatten();
403 if path.is_none() {
404 notfound().await;
405 continue
406 }
407 let path = path.unwrap();
408
409 let chunked = self.fud.geode.get(&request.resource, &path).await;
410 if chunked.is_err() {
411 notfound().await;
412 continue
413 }
414
415 let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.chunk).await;
416 if let Ok(chunk) = chunk {
417 if !self.fud.geode.verify_chunk(&request.chunk, &chunk) {
418 notfound().await;
420 continue
421 }
422 let reply = FudChunkReply { resource: request.resource, chunk };
423 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.chunk));
424 let _ = self.channel.send(&reply).await;
425 continue
426 }
427
428 notfound().await;
429 }
430 }
431
432 async fn handle_fud_nodes_request(self: Arc<Self>) -> Result<()> {
433 debug!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "START");
434
435 loop {
436 let request = match self.find_nodes_request_sub.receive().await {
437 Ok(v) => v,
438 Err(Error::ChannelStopped) => continue,
439 Err(_) => continue,
440 };
441 info!(target: "fud::ProtocolFud::handle_fud_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
442 self.fud.dht.update_channel(self.channel.info.id).await;
443
444 let reply = FudNodesReply {
445 key: request.key,
446 nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
447 };
448 match self.channel.send(&reply).await {
449 Ok(()) => continue,
450 Err(_e) => continue,
451 }
452 }
453 }
454
455 async fn handle_fud_seeders_request(self: Arc<Self>) -> Result<()> {
456 debug!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "START");
457
458 loop {
459 let request = match self.find_seeders_request_sub.receive().await {
460 Ok(v) => v,
461 Err(Error::ChannelStopped) => continue,
462 Err(_) => continue,
463 };
464 info!(target: "fud::ProtocolFud::handle_fud_seeders_request()", "Received FIND SEEDERS for {} from {:?}", hash_to_string(&request.key), self.channel);
465 self.fud.dht.update_channel(self.channel.info.id).await;
466
467 let router = self.fud.dht.hash_table.read().await;
468 let peers = router.get(&request.key);
469
470 match peers {
471 Some(seeders) => {
472 let _ = self
473 .channel
474 .send(&FudSeedersReply {
475 key: request.key,
476 seeders: seeders.to_vec(),
477 nodes: self
478 .fud
479 .dht()
480 .find_neighbors(&request.key, self.fud.dht().settings.k)
481 .await,
482 })
483 .await;
484 }
485 None => {
486 let _ = self
487 .channel
488 .send(&FudSeedersReply {
489 key: request.key,
490 seeders: vec![],
491 nodes: self
492 .fud
493 .dht()
494 .find_neighbors(&request.key, self.fud.dht().settings.k)
495 .await,
496 })
497 .await;
498 }
499 };
500 }
501 }
502
503 async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
504 debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
505
506 loop {
507 let request = match self.announce_sub.receive().await {
508 Ok(v) => v,
509 Err(Error::ChannelStopped) => continue,
510 Err(_) => continue,
511 };
512 info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
513 self.fud.dht.update_channel(self.channel.info.id).await;
514
515 let mut seeders = vec![];
516
517 for seeder in request.seeders.clone() {
518 if seeder.node.addresses.is_empty() {
519 continue
520 }
521 if let Err(e) = self.fud.pow.write().await.verify_node(&seeder.node.data).await {
522 warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid PoW: {e}");
523 continue
524 }
525 if !seeder.verify_signature().await {
526 warn!(target: "fud::ProtocolFud::handle_fud_announce()", "Received seeder with invalid signature");
527 continue
528 }
529
530 seeders.push(seeder);
533 }
534
535 self.fud.add_value(&request.key, &seeders).await;
536 }
537 }
538}
539
540#[async_trait]
541impl ProtocolBase for ProtocolFud {
542 async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
543 debug!(target: "fud::ProtocolFud::start()", "START");
544 self.jobsman.clone().start(executor.clone());
545 self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
546 self.jobsman
547 .clone()
548 .spawn(self.clone().handle_fud_metadata_request(), executor.clone())
549 .await;
550 self.jobsman.clone().spawn(self.clone().handle_fud_chunk_request(), executor.clone()).await;
551 self.jobsman.clone().spawn(self.clone().handle_fud_nodes_request(), executor.clone()).await;
552 self.jobsman
553 .clone()
554 .spawn(self.clone().handle_fud_seeders_request(), executor.clone())
555 .await;
556 self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
557 debug!(target: "fud::ProtocolFud::start()", "END");
558 Ok(())
559 }
560
561 fn name(&self) -> &'static str {
562 "ProtocolFud"
563 }
564}