1use std::{collections::HashMap, sync::Arc};
20
21use log::{error, info, warn};
22
23use darkfi::{
24 dht::{DhtHandler, DhtNode},
25 geode::hash_to_string,
26 system::{sleep, StoppableTask},
27 Error, Result,
28};
29
30use crate::{
31 event,
32 event::notify_event,
33 proto::{FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply},
34 Fud, FudEvent,
35};
36
37pub enum FetchReply {
38 Directory(FudDirectoryReply),
39 File(FudFileReply),
40 Chunk(FudChunkReply),
41}
42
43pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
48 loop {
49 let (hash, path, files) = fud.get_rx.recv().await.unwrap();
50
51 let mut fetch_tasks = fud.fetch_tasks.write().await;
53 let task = StoppableTask::new();
54 fetch_tasks.insert(hash, task.clone());
55 drop(fetch_tasks);
56
57 let fud_1 = fud.clone();
59 let fud_2 = fud.clone();
60 task.start(
61 async move { fud_1.fetch_resource(&hash, &path, &files).await },
62 move |res| async move {
63 let mut fetch_tasks = fud_2.fetch_tasks.write().await;
66 fetch_tasks.remove(&hash);
67 match res {
68 Ok(()) | Err(Error::DetachedTaskStopped) => { }
69 Err(e) => {
70 error!(target: "fud::get_task()", "Error while fetching resource: {e}");
71
72 notify_event!(fud_2, DownloadError, {
74 hash,
75 error: e.to_string(),
76 });
77 }
78 }
79 },
80 Error::DetachedTaskStopped,
81 fud.executor.clone(),
82 );
83 }
84}
85
86pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
88 loop {
89 let path = fud.put_rx.recv().await.unwrap();
90
91 let mut put_tasks = fud.put_tasks.write().await;
93 let task = StoppableTask::new();
94 put_tasks.insert(path.clone(), task.clone());
95 drop(put_tasks);
96
97 let fud_1 = fud.clone();
99 let fud_2 = fud.clone();
100 let path_ = path.clone();
101 task.start(
102 async move { fud_1.insert_resource(&path_).await },
103 move |res| async move {
104 let mut put_tasks = fud_2.put_tasks.write().await;
107 put_tasks.remove(&path);
108 match res {
109 Ok(()) | Err(Error::DetachedTaskStopped) => { }
110 Err(e) => {
111 error!(target: "fud::put_task()", "Error while inserting resource: {e}");
112
113 notify_event!(fud_2, InsertError, {
115 path,
116 error: e.to_string(),
117 });
118 }
119 }
120 },
121 Error::DetachedTaskStopped,
122 fud.executor.clone(),
123 );
124 }
125}
126
127pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
130 let interval = 3600; loop {
133 sleep(interval).await;
134
135 let seeders = vec![fud.node().await.into()];
136
137 info!(target: "fud::announce_seed_task()", "Verifying seeds...");
138 let seeding_resources = match fud.verify_resources(None).await {
139 Ok(resources) => resources,
140 Err(e) => {
141 error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
142 continue;
143 }
144 };
145
146 info!(target: "fud::announce_seed_task()", "Announcing files...");
147 for resource in seeding_resources {
148 let _ = fud
149 .announce(
150 &resource.hash,
151 &FudAnnounce { key: resource.hash, seeders: seeders.clone() },
152 fud.seeders_router.clone(),
153 )
154 .await;
155 }
156
157 info!(target: "fud::announce_seed_task()", "Pruning seeders...");
158 fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await;
159 }
160}
161
162pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
168 let interval = 600; loop {
171 sleep(interval).await;
172
173 let mut pow = fud.pow.write().await;
174 let btc = &mut pow.bitcoin_hash_cache;
175
176 if btc.update().await.is_err() {
177 continue
178 }
179
180 let block = fud.node_data.read().await.btc_block_hash;
181 let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
182 Some(i) => i < 6,
183 None => true,
184 };
185
186 if !needs_dht_reset {
187 let dht = fud.dht();
189 let mut buckets = dht.buckets.write().await;
190 for bucket in buckets.iter_mut() {
191 for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
192 if !btc.block_hashes.contains(&node.data.btc_block_hash) {
194 bucket.nodes.remove(i);
195 info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
196 }
197 }
198 }
199 drop(buckets);
200
201 let mut seeders_router = fud.seeders_router.write().await;
203 for (key, seeders) in seeders_router.iter_mut() {
204 for seeder in seeders.clone().iter() {
205 if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
206 seeders.remove(seeder);
207 info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
208 }
209 }
210 }
211
212 continue
213 }
214
215 info!(target: "fud::node_id_task()", "Creating a new node id...");
216 let (node_data, secret_key) = match pow.generate_node().await {
217 Ok(res) => res,
218 Err(e) => {
219 warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
220 continue
221 }
222 };
223 drop(pow);
224 info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
225
226 let dht = fud.dht();
228 let mut channel_cache = dht.channel_cache.write().await;
229 for channel in dht.p2p.hosts().channels().clone() {
230 channel.stop().await;
231 channel_cache.remove(&channel.info.id);
232 }
233 drop(channel_cache);
234
235 dht.reset().await;
237
238 *fud.seeders_router.write().await = HashMap::new();
240
241 *fud.node_data.write().await = node_data;
243 *fud.secret_key.write().await = secret_key;
244
245 }
247}
248
249macro_rules! start_task {
250 ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
251 info!(target: "fud", "Starting {} task", $task_name);
252 let task = StoppableTask::new();
253 let fud_ = $fud.clone();
254 task.clone().start(
255 async move { $task_fn(fud_).await },
256 |res| async {
257 match res {
258 Ok(()) | Err(Error::DetachedTaskStopped) => { }
259 Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
260 }
261 },
262 Error::DetachedTaskStopped,
263 $fud.executor.clone(),
264 );
265 $tasks.insert($task_name.to_string(), task);
266 }};
267}
268pub(crate) use start_task;