1use std::{collections::HashSet, sync::Arc};
20
21use log::{debug, error, info};
22use smol::{channel::Receiver, lock::RwLock};
23use tinyjson::JsonValue;
24
25use darkfi::{
26 blockchain::BlockDifficulty,
27 net::{ChannelPtr, P2pPtr},
28 rpc::jsonrpc::JsonSubscriber,
29 util::encoding::base64,
30 validator::{
31 consensus::{Fork, Proposal},
32 pow::PoWModule,
33 utils::{best_fork_index, header_rank},
34 verification::verify_fork_proposal,
35 ValidatorPtr,
36 },
37 Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::proto::{
42 ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
43 ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
44 ProposalMessage, BATCH,
45};
46
47pub async fn handle_unknown_proposals(
49 receiver: Receiver<(Proposal, u32)>,
50 unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
51 validator: ValidatorPtr,
52 p2p: P2pPtr,
53 proposals_sub: JsonSubscriber,
54 blocks_sub: JsonSubscriber,
55) -> Result<()> {
56 debug!(target: "darkfid::task::handle_unknown_proposal", "START");
57 loop {
58 let (proposal, channel) = match receiver.recv().await {
60 Ok(m) => m,
61 Err(e) => {
62 debug!(
63 target: "darkfid::task::handle_unknown_proposal",
64 "recv fail: {e}"
65 );
66 continue
67 }
68 };
69
70 let lock = unknown_proposals.read().await;
72 let contains_proposal = lock.contains(proposal.hash.inner());
73 drop(lock);
74 if !contains_proposal {
75 debug!(
76 target: "darkfid::task::handle_unknown_proposal",
77 "Proposal {} is not in our unknown proposals queue.",
78 proposal.hash,
79 );
80 continue
81 };
82
83 if handle_unknown_proposal(
85 &validator,
86 &p2p,
87 &proposals_sub,
88 &blocks_sub,
89 channel,
90 &proposal,
91 )
92 .await
93 {
94 if let Some(channel) = p2p.get_channel(channel) {
96 channel.ban().await;
97 }
98 };
99
100 let mut lock = unknown_proposals.write().await;
102 lock.remove(proposal.hash.inner());
103 drop(lock);
104 }
105}
106
107async fn handle_unknown_proposal(
110 validator: &ValidatorPtr,
111 p2p: &P2pPtr,
112 proposals_sub: &JsonSubscriber,
113 blocks_sub: &JsonSubscriber,
114 channel: u32,
115 proposal: &Proposal,
116) -> bool {
117 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
119 let Some(channel) = p2p.get_channel(channel) else {
120 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
121 return false
122 };
123
124 let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
126 debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
127 return true
128 };
129
130 let last = match validator.blockchain.last() {
132 Ok(l) => l,
133 Err(e) => {
134 error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
135 return false
136 }
137 };
138 let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
139 if let Err(e) = channel.send(&request).await {
140 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
141 return true
142 };
143
144 let response = match response_sub
146 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
147 .await
148 {
149 Ok(r) => r,
150 Err(e) => {
151 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
152 return true
153 }
154 };
155 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
156
157 debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
159
160 if response.proposals.is_empty() {
162 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
163 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
164 }
165
166 if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
168 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
169 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
170 }
171
172 if response.proposals[0].block.header.previous != last.1 {
174 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
175 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
176 }
177
178 if response.proposals.last().unwrap().hash != proposal.hash {
180 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
181 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
182 }
183
184 for proposal in &response.proposals {
186 match validator.append_proposal(proposal).await {
188 Ok(()) => { }
189 Err(Error::ProposalAlreadyExists) => continue,
191 Err(e) => {
192 debug!(
193 target: "darkfid::task::handle_unknown_proposal",
194 "Error while appending response proposal: {e}"
195 );
196 break;
197 }
198 };
199
200 let message = ProposalMessage(proposal.clone());
202 p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
203
204 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
206 proposals_sub.notify(vec![enc_prop].into()).await;
207 }
208
209 false
210}
211
212async fn handle_reorg(
220 validator: &ValidatorPtr,
221 p2p: &P2pPtr,
222 proposals_sub: &JsonSubscriber,
223 blocks_sub: &JsonSubscriber,
224 channel: ChannelPtr,
225 proposal: &Proposal,
226) -> bool {
227 info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
228
229 if proposal.block.header.height == 0 {
231 debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
232 return true
233 }
234
235 let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
237 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
238 return true
239 };
240
241 let mut peer_header_hashes = vec![];
243
244 let mut previous_height = proposal.block.header.height;
246 let mut previous_hash = proposal.hash;
247 for height in (0..proposal.block.header.height).rev() {
248 let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
250 if let Err(e) = channel.send(&request).await {
251 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
252 return true
253 };
254
255 let response = match response_sub
257 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
258 .await
259 {
260 Ok(r) => r,
261 Err(e) => {
262 debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
263 return true
264 }
265 };
266 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
267
268 let Some(peer_header) = response.fork_header else {
270 debug!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
271 return true
272 };
273
274 let headers = match validator.blockchain.blocks.get_order(&[height], false) {
276 Ok(r) => r,
277 Err(e) => {
278 error!(target: "darkfid::task::handle_reorg", "Retrieving headers failed: {e}");
279 return false
280 }
281 };
282 match headers[0] {
283 Some(known_header) => {
284 if known_header == peer_header {
285 previous_height = height;
286 previous_hash = known_header;
287 break
288 }
289 peer_header_hashes.insert(0, peer_header);
291 }
292 None => peer_header_hashes.insert(0, peer_header),
293 }
294 }
295
296 if peer_header_hashes.is_empty() {
298 debug!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
299 return true
300 }
301
302 let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
304 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
305 return true
306 };
307
308 let last_common_height = previous_height;
310 let last_difficulty = match previous_height {
311 0 => {
312 let genesis_timestamp = match validator.blockchain.genesis_block() {
313 Ok(b) => b.header.timestamp,
314 Err(e) => {
315 error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
316 return false
317 }
318 };
319 BlockDifficulty::genesis(genesis_timestamp)
320 }
321 _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
322 Ok(d) => d[0].clone().unwrap(),
323 Err(e) => {
324 error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
325 return false
326 }
327 },
328 };
329
330 let module = match PoWModule::new(
332 validator.consensus.blockchain.clone(),
333 validator.consensus.module.read().await.target,
334 validator.consensus.module.read().await.fixed_difficulty.clone(),
335 Some(last_common_height + 1),
336 ) {
337 Ok(m) => m,
338 Err(e) => {
339 error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
340 return false
341 }
342 };
343
344 info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
346 let mut batch = Vec::with_capacity(BATCH);
347 let mut total_processed = 0;
348 let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
349 let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
350 let mut headers_module = module.clone();
351 for (index, hash) in peer_header_hashes.iter().enumerate() {
352 batch.push(*hash);
354
355 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
357 continue
358 }
359
360 let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
362 if let Err(e) = channel.send(&request).await {
363 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
364 return true
365 };
366
367 let response = match response_sub
369 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
370 .await
371 {
372 Ok(r) => r,
373 Err(e) => {
374 debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
375 return true
376 }
377 };
378 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
379
380 if response.headers.len() != batch.len() {
382 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
383 return true
384 }
385
386 for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
388 let peer_header_hash = peer_header.hash();
389 debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
390
391 if peer_header_hash != batch[peer_header_index] {
393 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
394 return true
395 }
396
397 if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
399 debug!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
400 return true
401 }
402
403 let (next_target, next_difficulty) = match headers_module
405 .next_mine_target_and_difficulty()
406 {
407 Ok(p) => p,
408 Err(e) => {
409 debug!(target: "darkfid::task::handle_reorg", "Retrieving next mine target and difficulty failed: {e}");
410 return false
411 }
412 };
413
414 let (target_distance_sq, hash_distance_sq) = match header_rank(
416 peer_header,
417 &next_target,
418 ) {
419 Ok(distances) => distances,
420 Err(e) => {
421 debug!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
422 return true
423 }
424 };
425
426 targets_rank += target_distance_sq.clone();
428 hashes_rank += hash_distance_sq.clone();
429
430 headers_module.append(peer_header.timestamp, &next_difficulty);
432
433 previous_height = peer_header.height;
435 previous_hash = peer_header_hash;
436 }
437
438 total_processed += response.headers.len();
439 info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
440
441 batch = Vec::with_capacity(BATCH);
443 }
444
445 let forks = validator.consensus.forks.read().await;
447 let index = match best_fork_index(&forks) {
448 Ok(i) => i,
449 Err(e) => {
450 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
451 return false
452 }
453 };
454 let best_fork = &forks[index];
455 if targets_rank < best_fork.targets_rank ||
456 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
457 {
458 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
459 drop(forks);
460 return true
461 }
462 drop(forks);
463
464 let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
466 debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
467 return true
468 };
469
470 let mut peer_fork =
472 match Fork::new(validator.consensus.blockchain.clone(), module.clone()).await {
473 Ok(f) => f,
474 Err(e) => {
475 error!(target: "darkfid::task::handle_reorg", "Generating peer fork failed: {e}");
476 return false
477 }
478 };
479 peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
480 peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
481
482 let inverse_diffs = match validator
484 .blockchain
485 .blocks
486 .get_state_inverse_diffs_after(last_common_height)
487 {
488 Ok(i) => i,
489 Err(e) => {
490 error!(target: "darkfid::task::handle_reorg", "Retrieving state inverse diffs failed: {e}");
491 return false
492 }
493 };
494 for inverse_diff in inverse_diffs.iter().rev() {
495 if let Err(e) =
496 peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)
497 {
498 error!(target: "darkfid::task::handle_reorg", "Applying inverse diff failed: {e}");
499 return false
500 }
501 }
502
503 if let Err(e) = peer_fork.compute_monotree() {
505 error!(target: "darkfid::task::handle_reorg", "Rebuilding peer fork monotree failed: {e}");
506 return false
507 }
508
509 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
511 let mut batch = Vec::with_capacity(BATCH);
512 let mut total_processed = 0;
513 for (index, hash) in peer_header_hashes.iter().enumerate() {
514 batch.push(*hash);
516
517 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
519 continue
520 }
521
522 let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
524 if let Err(e) = channel.send(&request).await {
525 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
526 return true
527 };
528
529 let response = match response_sub
531 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
532 .await
533 {
534 Ok(r) => r,
535 Err(e) => {
536 debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
537 return true
538 }
539 };
540 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
541
542 if response.proposals.len() != batch.len() {
544 debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
545 return true
546 }
547
548 for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
550 info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
551
552 if peer_proposal.hash != batch[peer_proposal_index] {
554 error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
555 return true
556 }
557
558 if let Err(e) =
560 verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await
561 {
562 error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
563 return true
564 }
565
566 if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
568 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
569 return true
570 }
571 }
572
573 total_processed += response.proposals.len();
574 info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
575
576 batch = Vec::with_capacity(BATCH);
578 }
579
580 if let Err(e) = verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await {
582 error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
583 return true
584 }
585
586 if let Err(e) = peer_fork.append_proposal(proposal).await {
588 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
589 return true
590 }
591
592 let mut forks = validator.consensus.forks.write().await;
594 let index = match best_fork_index(&forks) {
595 Ok(i) => i,
596 Err(e) => {
597 debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
598 return false
599 }
600 };
601 let best_fork = &forks[index];
602 if peer_fork.targets_rank < best_fork.targets_rank ||
603 (peer_fork.targets_rank == best_fork.targets_rank &&
604 peer_fork.hashes_rank <= best_fork.hashes_rank)
605 {
606 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
607 drop(forks);
608 return true
609 }
610
611 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
613 *validator.consensus.module.write().await = module;
614 *forks = vec![peer_fork];
615 drop(forks);
616
617 let confirmed = match validator.confirmation().await {
619 Ok(f) => f,
620 Err(e) => {
621 error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
622 return false
623 }
624 };
625
626 if !confirmed.is_empty() {
627 let mut notif_blocks = Vec::with_capacity(confirmed.len());
628 for block in confirmed {
629 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
630 }
631 blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
632 }
633
634 let message = ProposalMessage(proposal.clone());
636 p2p.broadcast(&message).await;
637
638 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
640 proposals_sub.notify(vec![enc_prop].into()).await;
641
642 false
643}