1use log::{debug, error, info, warn};
20use tinyjson::JsonValue;
21
22use darkfi::{
23 blockchain::BlockDifficulty,
24 net::{ChannelPtr, P2pPtr},
25 rpc::jsonrpc::JsonSubscriber,
26 util::encoding::base64,
27 validator::{
28 consensus::{Fork, Proposal},
29 pow::PoWModule,
30 utils::{best_fork_index, header_rank},
31 verification::verify_fork_proposal,
32 ValidatorPtr,
33 },
34 Error, Result,
35};
36use darkfi_serial::serialize_async;
37
38use crate::proto::{
39 ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
40 ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
41 ProposalMessage, BATCH,
42};
43
44pub async fn handle_unknown_proposal(
46 validator: ValidatorPtr,
47 p2p: P2pPtr,
48 proposals_sub: JsonSubscriber,
49 blocks_sub: JsonSubscriber,
50 channel: u32,
51 proposal: Proposal,
52) -> Result<()> {
53 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
55 let Some(channel) = p2p.get_channel(channel) else {
56 error!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
57 return Ok(())
58 };
59
60 let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
62 error!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
63 return Ok(())
64 };
65
66 let last = match validator.blockchain.last() {
68 Ok(l) => l,
69 Err(e) => {
70 debug!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
71 return Ok(())
72 }
73 };
74 let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
75 if let Err(e) = channel.send(&request).await {
76 debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
77 return Ok(())
78 };
79
80 let response = match response_sub
82 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
83 .await
84 {
85 Ok(r) => r,
86 Err(e) => {
87 debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
88 return Ok(())
89 }
90 };
91 debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
92
93 debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
95
96 if response.proposals.is_empty() {
98 warn!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
99 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
100 }
101
102 if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
104 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
105 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
106 }
107
108 if response.proposals[0].block.header.previous != last.1 {
110 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
111 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
112 }
113
114 if response.proposals.last().unwrap().hash != proposal.hash {
116 debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
117 return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
118 }
119
120 for proposal in &response.proposals {
122 match validator.append_proposal(proposal).await {
124 Ok(()) => { }
125 Err(Error::ProposalAlreadyExists) => continue,
127 Err(e) => {
128 error!(
129 target: "darkfid::task::handle_unknown_proposal",
130 "Error while appending response proposal: {e}"
131 );
132 break;
133 }
134 };
135
136 let message = ProposalMessage(proposal.clone());
138 p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
139
140 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
142 proposals_sub.notify(vec![enc_prop].into()).await;
143 }
144
145 Ok(())
146}
147
148async fn handle_reorg(
155 validator: ValidatorPtr,
156 p2p: P2pPtr,
157 proposals_sub: JsonSubscriber,
158 blocks_sub: JsonSubscriber,
159 channel: ChannelPtr,
160 proposal: Proposal,
161) -> Result<()> {
162 info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
163
164 if proposal.block.header.height == 0 {
166 info!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
167 return Ok(())
168 }
169
170 let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
172 error!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
173 return Ok(())
174 };
175
176 let mut peer_header_hashes = vec![];
178
179 let mut previous_height = proposal.block.header.height;
181 let mut previous_hash = proposal.hash;
182 for height in (0..proposal.block.header.height).rev() {
183 let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
185 if let Err(e) = channel.send(&request).await {
186 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
187 return Ok(())
188 };
189
190 let response = match response_sub
192 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
193 .await
194 {
195 Ok(r) => r,
196 Err(e) => {
197 debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
198 return Ok(())
199 }
200 };
201 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
202
203 let Some(peer_header) = response.fork_header else {
205 info!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
206 return Ok(())
207 };
208
209 match validator.blockchain.blocks.get_order(&[height], false)?[0] {
211 Some(known_header) => {
212 if known_header == peer_header {
213 previous_height = height;
214 previous_hash = known_header;
215 break
216 }
217 peer_header_hashes.insert(0, peer_header);
219 }
220 None => peer_header_hashes.insert(0, peer_header),
221 }
222 }
223
224 if peer_header_hashes.is_empty() {
226 info!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
227 return Ok(())
228 }
229
230 let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
232 error!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
233 return Ok(())
234 };
235
236 let last_common_height = previous_height;
238 let last_difficulty = match previous_height {
239 0 => BlockDifficulty::genesis(validator.blockchain.genesis_block()?.header.timestamp),
240 _ => validator.blockchain.blocks.get_difficulty(&[last_common_height], true)?[0]
241 .clone()
242 .unwrap(),
243 };
244
245 let module = PoWModule::new(
247 validator.consensus.blockchain.clone(),
248 validator.consensus.module.read().await.target,
249 validator.consensus.module.read().await.fixed_difficulty.clone(),
250 Some(last_common_height + 1),
251 )?;
252
253 info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
255 let mut batch = Vec::with_capacity(BATCH);
256 let mut total_processed = 0;
257 let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
258 let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
259 let mut headers_module = module.clone();
260 for (index, hash) in peer_header_hashes.iter().enumerate() {
261 batch.push(*hash);
263
264 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
266 continue
267 }
268
269 let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
271 if let Err(e) = channel.send(&request).await {
272 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
273 return Ok(())
274 };
275
276 let response = match response_sub
278 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
279 .await
280 {
281 Ok(r) => r,
282 Err(e) => {
283 debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
284 return Ok(())
285 }
286 };
287 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
288
289 if response.headers.len() != batch.len() {
291 error!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
292 return Ok(())
293 }
294
295 for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
297 let peer_header_hash = peer_header.hash();
298 info!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
299
300 if peer_header_hash != batch[peer_header_index] {
302 error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
303 return Ok(())
304 }
305
306 if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
308 error!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
309 return Ok(())
310 }
311
312 let (next_target, next_difficulty) =
314 headers_module.next_mine_target_and_difficulty()?;
315
316 let (target_distance_sq, hash_distance_sq) = match header_rank(
318 peer_header,
319 &next_target,
320 ) {
321 Ok(distances) => distances,
322 Err(e) => {
323 error!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
324 return Ok(())
325 }
326 };
327
328 targets_rank += target_distance_sq.clone();
330 hashes_rank += hash_distance_sq.clone();
331
332 headers_module.append(peer_header.timestamp, &next_difficulty);
334
335 previous_height = peer_header.height;
337 previous_hash = peer_header_hash;
338 }
339
340 total_processed += response.headers.len();
341 info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
342
343 batch = Vec::with_capacity(BATCH);
345 }
346
347 let forks = validator.consensus.forks.read().await;
349 let best_fork = &forks[best_fork_index(&forks)?];
350 if targets_rank < best_fork.targets_rank ||
351 (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
352 {
353 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
354 drop(forks);
355 return Ok(())
356 }
357 drop(forks);
358
359 let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
361 error!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
362 return Ok(())
363 };
364
365 let mut peer_fork = Fork::new(validator.consensus.blockchain.clone(), module).await?;
367 peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
368 peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
369
370 let inverse_diffs =
372 validator.blockchain.blocks.get_state_inverse_diffs_after(last_common_height)?;
373 for inverse_diff in inverse_diffs.iter().rev() {
374 peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)?;
375 }
376
377 info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
379 let mut batch = Vec::with_capacity(BATCH);
380 let mut total_processed = 0;
381 for (index, hash) in peer_header_hashes.iter().enumerate() {
382 batch.push(*hash);
384
385 if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
387 continue
388 }
389
390 let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
392 if let Err(e) = channel.send(&request).await {
393 debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
394 return Ok(())
395 };
396
397 let response = match response_sub
399 .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
400 .await
401 {
402 Ok(r) => r,
403 Err(e) => {
404 debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
405 return Ok(())
406 }
407 };
408 debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
409
410 if response.proposals.len() != batch.len() {
412 error!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
413 return Ok(())
414 }
415
416 for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
418 info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
419
420 if peer_proposal.hash != batch[peer_proposal_index] {
422 error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
423 return Ok(())
424 }
425
426 if let Err(e) =
428 verify_fork_proposal(&peer_fork, peer_proposal, validator.verify_fees).await
429 {
430 error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
431 return Ok(())
432 }
433
434 if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
436 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
437 return Ok(())
438 }
439 }
440
441 total_processed += response.proposals.len();
442 info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
443
444 batch = Vec::with_capacity(BATCH);
446 }
447
448 if let Err(e) = verify_fork_proposal(&peer_fork, &proposal, validator.verify_fees).await {
450 error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
451 return Ok(())
452 }
453
454 if let Err(e) = peer_fork.append_proposal(&proposal).await {
456 error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
457 return Ok(())
458 }
459
460 let mut forks = validator.consensus.forks.write().await;
462 let best_fork = &forks[best_fork_index(&forks)?];
463 if peer_fork.targets_rank < best_fork.targets_rank ||
464 (peer_fork.targets_rank == best_fork.targets_rank &&
465 peer_fork.hashes_rank <= best_fork.hashes_rank)
466 {
467 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
468 drop(forks);
469 return Ok(())
470 }
471
472 info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
474 *forks = vec![peer_fork];
475 drop(forks);
476
477 let confirmed = match validator.confirmation().await {
479 Ok(f) => f,
480 Err(e) => {
481 error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
482 return Ok(())
483 }
484 };
485
486 if !confirmed.is_empty() {
487 let mut notif_blocks = Vec::with_capacity(confirmed.len());
488 for block in confirmed {
489 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
490 }
491 blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
492 }
493
494 let message = ProposalMessage(proposal.clone());
496 p2p.broadcast(&message).await;
497
498 let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
500 proposals_sub.notify(vec![enc_prop].into()).await;
501
502 Ok(())
503}