1use async_trait::async_trait;
20use log::{debug, error, info};
21use smol::Executor;
22use std::{path::StripPrefixError, sync::Arc};
23
24use darkfi::{
25 dht::{DhtHandler, DhtRouterItem},
26 geode::hash_to_string,
27 impl_p2p_message,
28 net::{
29 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
30 ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
31 ProtocolJobsManager, ProtocolJobsManagerPtr,
32 },
33 Error, Result,
34};
35use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
36use darkfi_serial::{SerialDecodable, SerialEncodable};
37
38use super::{Fud, FudNode};
39
40#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
42pub struct FudFileReply {
43 pub chunk_hashes: Vec<blake3::Hash>,
44}
45impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
46
47#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
49pub struct FudDirectoryReply {
50 pub chunk_hashes: Vec<blake3::Hash>,
51 pub files: Vec<(String, u64)>, }
53impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
54
55#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
57pub struct FudAnnounce {
58 pub key: blake3::Hash,
59 pub seeders: Vec<DhtRouterItem<FudNode>>,
60}
61impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
62
63#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
65pub struct FudChunkReply {
66 pub chunk: Vec<u8>,
68}
69impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
70
71#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
73pub struct FudNotFound;
74impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
75
76#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
78pub struct FudPingRequest {
79 pub random: u64,
80}
81impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
82
83#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
85pub struct FudPingReply {
86 pub node: FudNode,
87 pub sig: Signature,
89}
90impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
91
92#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
94pub struct FudFindRequest {
95 pub info: Option<blake3::Hash>,
96 pub key: blake3::Hash,
97}
98impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
99
100#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
102pub struct FudFindNodesRequest {
103 pub key: blake3::Hash,
104}
105impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
106
107#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
109pub struct FudFindNodesReply {
110 pub nodes: Vec<FudNode>,
111}
112impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
113
114#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
116pub struct FudFindSeedersRequest {
117 pub key: blake3::Hash,
118}
119impl_p2p_message!(
120 FudFindSeedersRequest,
121 "FudFindSeedersRequest",
122 0,
123 0,
124 DEFAULT_METERING_CONFIGURATION
125);
126
127#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
129pub struct FudFindSeedersReply {
130 pub seeders: Vec<DhtRouterItem<FudNode>>,
131}
132impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
133
134pub struct ProtocolFud {
136 channel: ChannelPtr,
137 ping_request_sub: MessageSubscription<FudPingRequest>,
138 find_request_sub: MessageSubscription<FudFindRequest>,
139 find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>,
140 find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>,
141 announce_sub: MessageSubscription<FudAnnounce>,
142 fud: Arc<Fud>,
143 jobsman: ProtocolJobsManagerPtr,
144}
145
146impl ProtocolFud {
147 pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
148 debug!(
149 target: "fud::proto::ProtocolFud::init()",
150 "Adding ProtocolFud to the protocol registry"
151 );
152
153 let msg_subsystem = channel.message_subsystem();
154 msg_subsystem.add_dispatch::<FudPingRequest>().await;
155 msg_subsystem.add_dispatch::<FudFindRequest>().await;
156 msg_subsystem.add_dispatch::<FudFindNodesRequest>().await;
157 msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await;
158 msg_subsystem.add_dispatch::<FudAnnounce>().await;
159
160 let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
161 let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?;
162 let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?;
163 let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?;
164 let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
165
166 Ok(Arc::new(Self {
167 channel: channel.clone(),
168 ping_request_sub,
169 find_request_sub,
170 find_nodes_request_sub,
171 find_seeders_request_sub,
172 announce_sub,
173 fud,
174 jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
175 }))
176 }
177
178 async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
179 debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
180
181 loop {
182 let ping_req = match self.ping_request_sub.receive().await {
183 Ok(v) => v,
184 Err(Error::ChannelStopped) => continue,
185 Err(e) => {
186 error!("{e}");
187 continue
188 }
189 };
190 info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST");
191
192 let reply = FudPingReply {
193 node: self.fud.node().await,
194 sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()),
195 };
196 match self.channel.send(&reply).await {
197 Ok(()) => continue,
198 Err(_e) => continue,
199 }
200 }
201 }
202
203 async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> {
204 debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START");
205
206 loop {
207 let request = match self.find_request_sub.receive().await {
208 Ok(v) => v,
209 Err(Error::ChannelStopped) => continue,
210 Err(e) => {
211 error!("{e}");
212 continue
213 }
214 };
215 info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND for {}", hash_to_string(&request.key));
216
217 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
218 if let Some(node) = node {
219 self.fud.update_node(&node).await;
220 }
221
222 if self.handle_fud_chunk_request(&request).await {
223 continue;
224 }
225
226 if self.handle_fud_metadata_request(&request).await {
227 continue;
228 }
229
230 let reply = FudNotFound {};
232 info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
233 let _ = self.channel.send(&reply).await;
234 }
235 }
236
237 async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool {
240 let hash = request.info;
241 if hash.is_none() {
242 return false;
243 }
244 let hash = hash.unwrap();
245
246 let path = self.fud.hash_to_path(&hash).ok().flatten();
247 if path.is_none() {
248 return false;
249 }
250 let path = path.unwrap();
251
252 let chunked = self.fud.geode.get(&hash, &path).await;
253 if chunked.is_err() {
254 return false;
255 }
256
257 let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await;
258 if let Ok(chunk) = chunk {
259 if !self.fud.geode.verify_chunk(&request.key, &chunk) {
260 return false;
262 }
263 let reply = FudChunkReply { chunk };
264 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key));
265 let _ = self.channel.send(&reply).await;
266 return true;
267 }
268
269 false
270 }
271
272 async fn handle_fud_metadata_request(&self, request: &FudFindRequest) -> bool {
275 let path = self.fud.hash_to_path(&request.key).ok().flatten();
276 if path.is_none() {
277 return false;
278 }
279 let path = path.unwrap();
280
281 let chunked_file = self.fud.geode.get(&request.key, &path).await.ok();
282 if chunked_file.is_none() {
283 return false;
284 }
285 let mut chunked_file = chunked_file.unwrap();
286
287 if chunked_file.len() == 1 && !chunked_file.is_dir() {
289 let chunk_hash = chunked_file.get_chunks()[0].0;
290 let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
291 if let Ok(chunk) = chunk {
292 if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.key {
293 return false;
295 }
296 let reply = FudChunkReply { chunk };
297 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
298 let _ = self.channel.send(&reply).await;
299 return true;
300 }
301 return false;
302 }
303
304 match chunked_file.is_dir() {
306 false => {
307 let reply = FudFileReply {
308 chunk_hashes: chunked_file
309 .get_chunks()
310 .iter()
311 .map(|(chunk, _)| *chunk)
312 .collect(),
313 };
314 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.key));
315 let _ = self.channel.send(&reply).await;
316 }
317 true => {
318 let files = chunked_file
319 .get_files()
320 .iter()
321 .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
322 Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
323 Err(e) => Err(e),
324 })
325 .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
326 if let Err(e) = files {
327 error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
328 return false;
329 }
330 let reply = FudDirectoryReply {
331 chunk_hashes: chunked_file
332 .get_chunks()
333 .iter()
334 .map(|(chunk, _)| *chunk)
335 .collect(),
336 files: files.unwrap(),
337 };
338 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.key));
339 let _ = self.channel.send(&reply).await;
340 }
341 };
342
343 true
344 }
345
346 async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
347 debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");
348
349 loop {
350 let request = match self.find_nodes_request_sub.receive().await {
351 Ok(v) => v,
352 Err(Error::ChannelStopped) => continue,
353 Err(e) => {
354 error!("{e}");
355 continue
356 }
357 };
358 info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
359
360 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
361 if let Some(node) = node {
362 self.fud.update_node(&node).await;
363 }
364
365 let reply = FudFindNodesReply {
366 nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
367 };
368 match self.channel.send(&reply).await {
369 Ok(()) => continue,
370 Err(_e) => continue,
371 }
372 }
373 }
374
375 async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> {
376 debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START");
377
378 loop {
379 let request = match self.find_seeders_request_sub.receive().await {
380 Ok(v) => v,
381 Err(Error::ChannelStopped) => continue,
382 Err(e) => {
383 error!("{e}");
384 continue
385 }
386 };
387 info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key));
388
389 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
390 if let Some(node) = node {
391 self.fud.update_node(&node).await;
392 }
393
394 let router = self.fud.seeders_router.read().await;
395 let peers = router.get(&request.key);
396
397 match peers {
398 Some(seeders) => {
399 let _ = self
400 .channel
401 .send(&FudFindSeedersReply { seeders: seeders.iter().cloned().collect() })
402 .await;
403 }
404 None => {
405 let _ = self.channel.send(&FudFindSeedersReply { seeders: vec![] }).await;
406 }
407 };
408 }
409 }
410
411 async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
412 debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
413
414 loop {
415 let request = match self.announce_sub.receive().await {
416 Ok(v) => v,
417 Err(Error::ChannelStopped) => continue,
418 Err(e) => {
419 error!("{e}");
420 continue
421 }
422 };
423 info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
424
425 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
426 if let Some(node) = node {
427 self.fud.update_node(&node).await;
428 }
429
430 let mut seeders = vec![];
431
432 for seeder in request.seeders.clone() {
433 if seeder.node.addresses.is_empty() {
434 continue
435 }
436 seeders.push(seeder);
438 }
439
440 self.fud.add_to_router(self.fud.seeders_router.clone(), &request.key, seeders).await;
441 }
442 }
443}
444
445#[async_trait]
446impl ProtocolBase for ProtocolFud {
447 async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
448 debug!(target: "fud::ProtocolFud::start()", "START");
449 self.jobsman.clone().start(executor.clone());
450 self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
451 self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await;
452 self.jobsman
453 .clone()
454 .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone())
455 .await;
456 self.jobsman
457 .clone()
458 .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone())
459 .await;
460 self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
461 debug!(target: "fud::ProtocolFud::start()", "END");
462 Ok(())
463 }
464
465 fn name(&self) -> &'static str {
466 "ProtocolFud"
467 }
468}