1use std::{collections::HashSet, sync::Arc};
20
21use smol::{channel::Receiver, lock::RwLock};
22use tinyjson::JsonValue;
23use tracing::{debug, error, info};
24
25use darkfi::{
26 blockchain::BlockDifficulty,
27 net::{ChannelPtr, P2pPtr},
28 rpc::jsonrpc::JsonSubscriber,
29 util::encoding::base64,
30 validator::{
31 consensus::{Fork, Proposal},
32 pow::PoWModule,
33 utils::{best_fork_index, header_rank},
34 verification::verify_fork_proposal,
35 ValidatorPtr,
36 },
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::proto::{
42 ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
43 ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
44 ProposalMessage, BATCH,
45};
46
47pub async fn handle_unknown_proposals(
49 receiver: Receiver<(Proposal, u32)>,
50 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
51 validator: ValidatorPtr,
52 p2p: P2pPtr,
53 proposals_sub: JsonSubscriber,
54 blocks_sub: JsonSubscriber,
55) -> Result<()> {
56 debug!(target: "darkfid::task::handle_unknown_proposal", "START");
57 loop {
58 let (proposal, channel) = match receiver.recv().await {
60 Ok(m) => m,
61 Err(e) => {
62 debug!(
63 target: "darkfid::task::handle_unknown_proposal",
64 "recv fail: {e}"
65 );
66 continue
67 }
68 };
69
70 let lock = unknown_proposals.read().await;
72 let contains_proposal = lock.contains(proposal.hash.inner());
73 drop(lock);
74 if !contains_proposal {
75 debug!(
76 target: "darkfid::task::handle_unknown_proposal",
77 "Proposal {} is not in our unknown proposals queue.",
78 proposal.hash,
79 );
80 continue
81 };
82
83 if handle_unknown_proposal(
85 &validator,
86 &p2p,
87 &proposals_sub,
88 &blocks_sub,
89 channel,
90 &proposal,
91 )
92 .await
93 {
94 if let Some(channel) = p2p.get_channel(channel) {
96 channel.ban().await;
97 }
98 };
99
100 let mut lock = unknown_proposals.write().await;
102 lock.remove(proposal.hash.inner());
103 drop(lock);
104 }
105}
106
107async fn handle_unknown_proposal(
110 validator: &ValidatorPtr,
111 p2p: &P2pPtr,
112 proposals_sub: &JsonSubscriber,
113 blocks_sub: &JsonSubscriber,
114 channel: u32,
115 proposal: &Proposal,
116) -> bool {
117 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
119 let Some(channel) = p2p.get_channel(channel) else {
120 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
121 return false
122 };
123
124 let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
126 debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
127 return true
128 };
129
130 let last = match validator.blockchain.last() {
132 Ok(l) => l,
133 Err(e) => {
134 error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
135 return false
136 }
137 };
138 let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
139 if let Err(e) = channel.send(&request).await {
140 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
141 return true
142 };
143
144 let response = match response_sub
146 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
147 .await
148 {
149 Ok(r) => r,
150 Err(e) => {
151 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
152 return true
153 }
154 };
155 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
156
157 debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
159
160 if response.proposals.is_empty() {
162 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
163 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
164 }
165
166 if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
168 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
169 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
170 }
171
172 if response.proposals[0].block.header.previous != last.1 {
174 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
175 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
176 }
177
178 if response.proposals.last().unwrap().hash != proposal.hash {
180 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
181 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
182 }
183
184 for proposal in &response.proposals {
186 match validator.append_proposal(proposal).await {
188 Ok(()) => { }
189 Err(Error::ProposalAlreadyExists) => continue,
191 Err(e) => {
192 debug!(
193 target: "darkfid::task::handle_unknown_proposal",
194 "Error while appending response proposal: {e}"
195 );
196 break;
197 }
198 };
199
200 let message = ProposalMessage(proposal.clone());
202 p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
203
204 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
206 proposals_sub.notify(vec![enc_prop].into()).await;
207 }
208
209 false
210}
211
212async fn handle_reorg(
220 validator: &ValidatorPtr,
221 p2p: &P2pPtr,
222 proposals_sub: &JsonSubscriber,
223 blocks_sub: &JsonSubscriber,
224 channel: ChannelPtr,
225 proposal: &Proposal,
226) -> bool {
227 info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
228
229 if proposal.block.header.height == 0 {
231 debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
232 return true
233 }
234
235 let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
237 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
238 return true
239 };
240
241 let mut peer_header_hashes = vec![];
243
244 let mut previous_height = proposal.block.header.height;
246 let mut previous_hash = proposal.hash;
247 for height in (0..proposal.block.header.height).rev() {
248 let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
250 if let Err(e) = channel.send(&request).await {
251 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
252 return true
253 };
254
255 let response = match response_sub
257 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
258 .await
259 {
260 Ok(r) => r,
261 Err(e) => {
262 debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
263 return true
264 }
265 };
266 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
267
268 let Some(peer_header) = response.fork_header else {
270 debug!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
271 return true
272 };
273
274 let headers = match validator.blockchain.blocks.get_order(&[height], false) {
276 Ok(r) => r,
277 Err(e) => {
278 error!(target: "darkfid::task::handle_reorg", "Retrieving headers failed: {e}");
279 return false
280 }
281 };
282 match headers[0] {
283 Some(known_header) => {
284 if known_header == peer_header {
285 previous_height = height;
286 previous_hash = known_header;
287 break
288 }
289 peer_header_hashes.insert(0, peer_header);
291 }
292 None => peer_header_hashes.insert(0, peer_header),
293 }
294 }
295
296 if peer_header_hashes.is_empty() {
298 debug!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
299 return true
300 }
301
302 let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
304 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
305 return true
306 };
307
308 let last_common_height = previous_height;
310 let last_difficulty = match previous_height {
311 0 => {
312 let genesis_timestamp = match validator.blockchain.genesis_block() {
313 Ok(b) => b.header.timestamp,
314 Err(e) => {
315 error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
316 return false
317 }
318 };
319 BlockDifficulty::genesis(genesis_timestamp)
320 }
321 _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
322 Ok(d) => d[0].clone().unwrap(),
323 Err(e) => {
324 error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
325 return false
326 }
327 },
328 };
329
330 let module = match PoWModule::new(
332 validator.consensus.blockchain.clone(),
333 validator.consensus.module.read().await.target,
334 validator.consensus.module.read().await.fixed_difficulty.clone(),
335 Some(last_common_height + 1),
336 ) {
337 Ok(m) => m,
338 Err(e) => {
339 error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
340 return false
341 }
342 };
343
344 info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
346 let mut batch = Vec::with_capacity(BATCH);
347 let mut total_processed = 0;
348 let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
349 let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
350 let mut headers_module = module.clone();
351 for (index, hash) in peer_header_hashes.iter().enumerate() {
352 batch.push(*hash);
354
355 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
357 continue
358 }
359
360 let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
362 if let Err(e) = channel.send(&request).await {
363 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
364 return true
365 };
366
367 let response = match response_sub
369 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
370 .await
371 {
372 Ok(r) => r,
373 Err(e) => {
374 debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
375 return true
376 }
377 };
378 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
379
380 if response.headers.len() != batch.len() {
382 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
383 return true
384 }
385
386 for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
388 let peer_header_hash = peer_header.hash();
389 debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
390
391 if peer_header_hash != batch[peer_header_index] {
393 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
394 return true
395 }
396
397 if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
399 debug!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
400 return true
401 }
402
403 let (next_target, next_difficulty) = match headers_module
405 .next_mine_target_and_difficulty()
406 {
407 Ok(p) => p,
408 Err(e) => {
409 debug!(target: "darkfid::task::handle_reorg", "Retrieving next mine target and difficulty failed: {e}");
410 return false
411 }
412 };
413
414 let (target_distance_sq, hash_distance_sq) = match header_rank(
416 peer_header,
417 &next_target,
418 ) {
419 Ok(distances) => distances,
420 Err(e) => {
421 debug!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
422 return true
423 }
424 };
425
426 targets_rank += target_distance_sq.clone();
428 hashes_rank += hash_distance_sq.clone();
429
430 if let Err(e) = headers_module.append(peer_header, &next_difficulty) {
432 debug!(target: "darkfid::task::handle_reorg", "Error while appending header to module: {e}");
433 return true
434 };
435
436 previous_height = peer_header.height;
438 previous_hash = peer_header_hash;
439 }
440
441 total_processed += response.headers.len();
442 info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
443
444 batch = Vec::with_capacity(BATCH);
446 }
447
448 let forks = validator.consensus.forks.read().await;
450 let index = match best_fork_index(&forks) {
451 Ok(i) => i,
452 Err(e) => {
453 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
454 return false
455 }
456 };
457 let best_fork = &forks[index];
458 if targets_rank < best_fork.targets_rank ||
459 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
460 {
461 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
462 drop(forks);
463 return true
464 }
465 drop(forks);
466
467 let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
469 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
470 return true
471 };
472
473 let mut peer_fork =
475 match Fork::new(validator.consensus.blockchain.clone(), module.clone()).await {
476 Ok(f) => f,
477 Err(e) => {
478 error!(target: "darkfid::task::handle_reorg", "Generating peer fork failed: {e}");
479 return false
480 }
481 };
482 peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
483 peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
484
485 let inverse_diffs = match validator
487 .blockchain
488 .blocks
489 .get_state_inverse_diffs_after(last_common_height)
490 {
491 Ok(i) => i,
492 Err(e) => {
493 error!(target: "darkfid::task::handle_reorg", "Retrieving state inverse diffs failed: {e}");
494 return false
495 }
496 };
497 for inverse_diff in inverse_diffs.iter().rev() {
498 if let Err(e) =
499 peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)
500 {
501 error!(target: "darkfid::task::handle_reorg", "Applying inverse diff failed: {e}");
502 return false
503 }
504 }
505
506 if let Err(e) = peer_fork.compute_monotree() {
508 error!(target: "darkfid::task::handle_reorg", "Rebuilding peer fork monotree failed: {e}");
509 return false
510 }
511
512 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
514 let mut batch = Vec::with_capacity(BATCH);
515 let mut total_processed = 0;
516 for (index, hash) in peer_header_hashes.iter().enumerate() {
517 batch.push(*hash);
519
520 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
522 continue
523 }
524
525 let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
527 if let Err(e) = channel.send(&request).await {
528 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
529 return true
530 };
531
532 let response = match response_sub
534 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
535 .await
536 {
537 Ok(r) => r,
538 Err(e) => {
539 debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
540 return true
541 }
542 };
543 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
544
545 if response.proposals.len() != batch.len() {
547 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
548 return true
549 }
550
551 for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
553 info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
554
555 if peer_proposal.hash != batch[peer_proposal_index] {
557 error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
558 return true
559 }
560
561 if let Err(e) =
563 verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await
564 {
565 error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
566 return true
567 }
568
569 if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
571 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
572 return true
573 }
574 }
575
576 total_processed += response.proposals.len();
577 info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
578
579 batch = Vec::with_capacity(BATCH);
581 }
582
583 if let Err(e) = verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await {
585 error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
586 return true
587 }
588
589 if let Err(e) = peer_fork.append_proposal(proposal).await {
591 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
592 return true
593 }
594
595 let mut forks = validator.consensus.forks.write().await;
597 let index = match best_fork_index(&forks) {
598 Ok(i) => i,
599 Err(e) => {
600 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
601 return false
602 }
603 };
604 let best_fork = &forks[index];
605 if peer_fork.targets_rank < best_fork.targets_rank ||
606 (peer_fork.targets_rank == best_fork.targets_rank &&
607 peer_fork.hashes_rank <= best_fork.hashes_rank)
608 {
609 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
610 drop(forks);
611 return true
612 }
613
614 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
616 *validator.consensus.module.write().await = module;
617 *forks = vec![peer_fork];
618 drop(forks);
619
620 let confirmed = match validator.confirmation().await {
622 Ok(f) => f,
623 Err(e) => {
624 error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
625 return false
626 }
627 };
628
629 if !confirmed.is_empty() {
630 let mut notif_blocks = Vec::with_capacity(confirmed.len());
631 for block in confirmed {
632 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
633 }
634 blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
635 }
636
637 let message = ProposalMessage(proposal.clone());
639 p2p.broadcast(&message).await;
640
641 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
643 proposals_sub.notify(vec![enc_prop].into()).await;
644
645 false
646}