1use std::sync::Arc;
20
21use log::{error, info, warn};
22
23use darkfi::{
24 dht::{DhtHandler, DhtNode},
25 geode::hash_to_string,
26 system::{sleep, StoppableTask},
27 Error, Result,
28};
29
30use crate::{event, event::notify_event, proto::FudAnnounce, Fud, FudEvent};
31
32pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
37 loop {
38 let (hash, path, files) = fud.get_rx.recv().await.unwrap();
39
40 let mut fetch_tasks = fud.fetch_tasks.write().await;
42 let task = StoppableTask::new();
43 fetch_tasks.insert(hash, task.clone());
44 drop(fetch_tasks);
45
46 let fud_1 = fud.clone();
48 let fud_2 = fud.clone();
49 task.start(
50 async move { fud_1.fetch_resource(&hash, &path, &files).await },
51 move |res| async move {
52 let mut fetch_tasks = fud_2.fetch_tasks.write().await;
55 fetch_tasks.remove(&hash);
56
57 let lookup_tasks = fud_2.lookup_tasks.read().await;
59 if let Some(lookup_task) = lookup_tasks.get(&hash) {
60 lookup_task.stop().await;
61 }
62
63 match res {
64 Ok(()) | Err(Error::DetachedTaskStopped) => { }
65 Err(e) => {
66 error!(target: "fud::get_task()", "Error while fetching resource: {e}");
67
68 notify_event!(fud_2, DownloadError, {
70 hash,
71 error: e.to_string(),
72 });
73 }
74 }
75 },
76 Error::DetachedTaskStopped,
77 fud.executor.clone(),
78 );
79 }
80}
81
82pub async fn put_task(fud: Arc<Fud>) -> Result<()> {
84 loop {
85 let path = fud.put_rx.recv().await.unwrap();
86
87 let mut put_tasks = fud.put_tasks.write().await;
89 let task = StoppableTask::new();
90 put_tasks.insert(path.clone(), task.clone());
91 drop(put_tasks);
92
93 let fud_1 = fud.clone();
95 let fud_2 = fud.clone();
96 let path_ = path.clone();
97 task.start(
98 async move { fud_1.insert_resource(&path_).await },
99 move |res| async move {
100 let mut put_tasks = fud_2.put_tasks.write().await;
103 put_tasks.remove(&path);
104 match res {
105 Ok(()) | Err(Error::DetachedTaskStopped) => { }
106 Err(e) => {
107 error!(target: "fud::put_task()", "Error while inserting resource: {e}");
108
109 notify_event!(fud_2, InsertError, {
111 path,
112 error: e.to_string(),
113 });
114 }
115 }
116 },
117 Error::DetachedTaskStopped,
118 fud.executor.clone(),
119 );
120 }
121}
122
123pub async fn lookup_task(fud: Arc<Fud>) -> Result<()> {
125 loop {
126 let (key, seeders_pub) = fud.lookup_rx.recv().await.unwrap();
127
128 let mut lookup_tasks = fud.lookup_tasks.write().await;
129 let task = StoppableTask::new();
130 lookup_tasks.insert(key, task.clone());
131 drop(lookup_tasks);
132
133 let fud_1 = fud.clone();
134 let fud_2 = fud.clone();
135 task.start(
136 async move {
137 fud_1.dht.lookup_value(&key, seeders_pub).await?;
138 Ok(())
139 },
140 move |res| async move {
141 let mut lookup_tasks = fud_2.lookup_tasks.write().await;
144 lookup_tasks.remove(&key);
145 match res {
146 Ok(()) | Err(Error::DetachedTaskStopped) => { }
147 Err(e) => {
148 error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}");
149 }
150 }
151 },
152 Error::DetachedTaskStopped,
153 fud.executor.clone(),
154 );
155 }
156}
157
158pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
161 let interval = 3600; loop {
164 sleep(interval).await;
165
166 info!(target: "fud::announce_seed_task()", "Verifying seeds...");
167 let seeding_resources = match fud.verify_resources(None).await {
168 Ok(resources) => resources,
169 Err(e) => {
170 error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {e}");
171 continue;
172 }
173 };
174
175 info!(target: "fud::announce_seed_task()", "Announcing files...");
176 for resource in seeding_resources {
177 let seeders = vec![fud.new_seeder(&resource.hash).await];
178 let _ = fud
179 .dht
180 .announce(
181 &resource.hash,
182 &seeders.clone(),
183 &FudAnnounce { key: resource.hash, seeders },
184 )
185 .await;
186 }
187
188 info!(target: "fud::announce_seed_task()", "Pruning seeders...");
189 fud.prune_seeders(interval.try_into().unwrap()).await;
190 }
191}
192
193pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
199 let interval = 600; loop {
202 sleep(interval).await;
203
204 let mut pow = fud.pow.write().await;
205 let btc = &mut pow.bitcoin_hash_cache;
206
207 if btc.update().await.is_err() {
208 continue
209 }
210
211 let block = fud.node_data.read().await.btc_block_hash;
212 let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
213 Some(i) => i < 6,
214 None => true,
215 };
216
217 if !needs_dht_reset {
218 let dht = fud.dht();
220 let mut buckets = dht.buckets.write().await;
221 for bucket in buckets.iter_mut() {
222 for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
223 if !btc.block_hashes.contains(&node.data.btc_block_hash) {
225 bucket.nodes.remove(i);
226 info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
227 }
228 }
229 }
230 drop(buckets);
231
232 let mut seeders_table = fud.dht.hash_table.write().await;
234 for (key, seeders) in seeders_table.iter_mut() {
235 for (i, seeder) in seeders.clone().iter().enumerate().rev() {
236 if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
237 seeders.remove(i);
238 info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
239 }
240 }
241 }
242
243 continue
244 }
245
246 info!(target: "fud::node_id_task()", "Creating a new node id...");
247 let (node_data, secret_key) = match pow.generate_node().await {
248 Ok(res) => res,
249 Err(e) => {
250 warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
251 continue
252 }
253 };
254 drop(pow);
255 info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
256
257 let dht = fud.dht();
259 let mut channel_cache = dht.channel_cache.write().await;
260 for channel in dht.p2p.hosts().channels().clone() {
261 channel.stop().await;
262 channel_cache.remove(&channel.info.id);
263 }
264 drop(channel_cache);
265
266 dht.reset().await;
268
269 *fud.node_data.write().await = node_data;
271 *fud.secret_key.write().await = secret_key;
272
273 }
275}
276
277macro_rules! start_task {
278 ($fud:expr, $task_name:expr, $task_fn:expr, $tasks:expr) => {{
279 info!(target: "fud", "Starting {} task", $task_name);
280 let task = StoppableTask::new();
281 let fud_ = $fud.clone();
282 task.clone().start(
283 async move { $task_fn(fud_).await },
284 |res| async {
285 match res {
286 Ok(()) | Err(Error::DetachedTaskStopped) => { }
287 Err(e) => error!(target: "fud", "Failed starting {} task: {e}", $task_name),
288 }
289 },
290 Error::DetachedTaskStopped,
291 $fud.executor.clone(),
292 );
293 $tasks.insert($task_name.to_string(), task);
294 }};
295}
296pub(crate) use start_task;