1use std::collections::HashMap;
20
21use darkfi::{
22 blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23 util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use log::{debug, info, warn};
27use rand::{prelude::SliceRandom, rngs::OsRng};
28use tinyjson::JsonValue;
29
30use crate::{
31 proto::{
32 ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33 SyncResponse, TipRequest, TipResponse, BATCH,
34 },
35 DarkfiNodePtr,
36};
37
38pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43 info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45 let block_sub = node.subscribers.get("blocks").unwrap();
47
48 let mut last = node.validator.blockchain.last()?;
50
51 if let Some(checkpoint) = checkpoint {
53 if checkpoint.0 > last.0 {
54 node.validator.blockchain.headers.remove_all_sync()?;
55 }
56 }
57
58 if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
60 if next.height == last.0 + 1 {
61 if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
63 last = (last_sync.height, last_sync.hash());
64 }
65 } else {
66 node.validator.blockchain.headers.remove_all_sync()?;
68 }
69 }
70 info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
71
72 let (mut common_tip_height, common_tip_hash, mut common_tip_peers) =
74 most_common_tip(node, &last.1, checkpoint).await;
75
76 if common_tip_hash == [0u8; 32] {
79 *node.validator.synced.write().await = true;
80 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
81 return Ok(())
82 }
83
84 if let Some(checkpoint) = checkpoint {
86 if checkpoint.0 > last.0 {
87 info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
88 retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
91
92 last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
94 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
95
96 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
98 }
99 }
100
101 loop {
103 retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
106
107 let last_received =
109 retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
110 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
111
112 if last == last_received {
113 break
114 }
115
116 last = last_received;
117
118 (common_tip_height, _, common_tip_peers) = most_common_tip(node, &last.1, None).await;
120 }
121
122 sync_best_fork(node, &common_tip_peers, &last.1).await;
124
125 let confirmed = node.validator.confirmation().await?;
127 if !confirmed.is_empty() {
128 let mut notif_blocks = Vec::with_capacity(confirmed.len());
130 for block in confirmed {
131 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
132 }
133 block_sub.notify(JsonValue::Array(notif_blocks)).await;
134 }
135
136 *node.validator.synced.write().await = true;
137 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
138 Ok(())
139}
140
141async fn synced_peers(
144 node: &DarkfiNodePtr,
145 last_tip: &HeaderHash,
146 checkpoint: Option<(u32, HeaderHash)>,
147) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
148 info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
149 let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
150 let mut tips = HashMap::new();
151 loop {
152 let peers = node.p2p_handler.p2p.hosts().channels();
154
155 for peer in peers {
157 if let Some(c) = checkpoint {
159 let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
161 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
162 continue
163 };
164
165 let request = HeaderSyncRequest { height: c.0 + 1 };
167 if let Err(e) = peer.send(&request).await {
168 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
169 continue
170 };
171
172 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
174 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
175 continue
176 };
177
178 if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
180 debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
181 continue
182 }
183 }
184
185 let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
187 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
188 continue
189 };
190
191 let request = TipRequest { tip: *last_tip };
193 if let Err(e) = peer.send(&request).await {
194 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
195 continue
196 };
197
198 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
200 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
201 continue
202 };
203
204 if response.synced {
206 let tip = if response.height.is_some() && response.hash.is_some() {
208 (response.height.unwrap(), *response.hash.unwrap().inner())
209 } else {
210 (0, [0u8; 32])
214 };
215 let Some(tip_peers) = tips.get_mut(&tip) else {
216 tips.insert(tip, vec![peer.clone()]);
217 continue
218 };
219 tip_peers.push(peer.clone());
220 }
221 }
222
223 if !tips.is_empty() {
225 break
226 }
227
228 warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
229 let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
230 let _ = subscription.receive().await;
231 subscription.unsubscribe().await;
232
233 info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
234 sleep(comms_timeout).await;
235 }
236
237 tips
238}
239
240async fn most_common_tip(
242 node: &DarkfiNodePtr,
243 last_tip: &HeaderHash,
244 checkpoint: Option<(u32, HeaderHash)>,
245) -> (u32, [u8; 32], Vec<ChannelPtr>) {
246 let tips = synced_peers(node, last_tip, checkpoint).await;
248
249 info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
251 let mut common_tip = (0, [0u8; 32], vec![]);
252 for (tip, peers) in tips {
253 if peers.len() < common_tip.2.len() {
255 continue;
256 }
257 if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
260 continue;
261 }
262 common_tip = (tip.0, tip.1, peers);
264 }
265
266 info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
267 common_tip
268}
269
270async fn retrieve_headers(
272 node: &DarkfiNodePtr,
273 peers: &[ChannelPtr],
274 last_known: u32,
275 tip_height: u32,
276) -> Result<()> {
277 info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
278 let mut peer_subs = vec![];
280 for peer in peers {
281 match peer.subscribe_msg::<HeaderSyncResponse>().await {
282 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
283 Err(e) => {
284 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
285 peer_subs.push((None, true))
286 }
287 }
288 }
289 let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
290
291 let total = tip_height - last_known - 1;
293 let mut last_tip_height = tip_height;
294 'headers_loop: loop {
295 let mut count = 0;
297 for (peer_sub, failed) in &peer_subs {
298 if peer_sub.is_none() || *failed {
299 count += 1;
300 }
301 }
302 if count == peer_subs.len() {
303 debug!(target: "darkfid::task::sync::retrieve_headers", "All peer connections failed.");
304 break
305 }
306
307 for (index, peer) in peers.iter().enumerate() {
308 let (peer_sub, failed) = &mut peer_subs[index];
310 if *failed {
311 continue;
312 }
313 let Some(ref response_sub) = peer_sub else {
314 continue;
315 };
316
317 let request = HeaderSyncRequest { height: last_tip_height };
319 if let Err(e) = peer.send(&request).await {
320 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
321 *failed = true;
322 continue
323 };
324
325 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
327 debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
328 *failed = true;
329 continue
330 };
331
332 let mut response_headers = response.headers.to_vec();
334 response_headers.retain(|h| h.height > last_known);
335
336 if response_headers.is_empty() {
337 break 'headers_loop
338 }
339
340 node.validator.blockchain.headers.insert_sync(&response_headers)?;
342 last_tip_height = response_headers[0].height;
343 info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{total}", node.validator.blockchain.headers.len_sync());
344 }
345 }
346
347 if node.validator.blockchain.headers.is_empty_sync() {
349 return Ok(());
350 }
351
352 info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
357 let mut verified_headers = 0;
358 let total = node.validator.blockchain.headers.len_sync();
359 let last_known = node.validator.consensus.best_fork_last_header().await?;
362 let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
363 if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
364 node.validator.blockchain.headers.remove_all_sync()?;
365 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
366 }
367 verified_headers += 1;
368 for (index, header) in headers[1..].iter().enumerate() {
369 if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
370 node.validator.blockchain.headers.remove_all_sync()?;
371 return Err(Error::BlockIsInvalid(header.hash().as_string()))
372 }
373 verified_headers += 1;
374 }
375 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
376
377 let mut last_checked = headers.last().unwrap().clone();
379 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
380 while !headers.is_empty() {
381 if headers[0].previous != last_checked.hash() ||
382 headers[0].height != last_checked.height + 1
383 {
384 node.validator.blockchain.headers.remove_all_sync()?;
385 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
386 }
387 verified_headers += 1;
388 for (index, header) in headers[1..].iter().enumerate() {
389 if header.previous != headers[index].hash() ||
390 header.height != headers[index].height + 1
391 {
392 node.validator.blockchain.headers.remove_all_sync()?;
393 return Err(Error::BlockIsInvalid(header.hash().as_string()))
394 }
395 verified_headers += 1;
396 }
397 last_checked = headers.last().unwrap().clone();
398 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
399 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {verified_headers}/{total}");
400 }
401
402 info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
403 Ok(())
404}
405
406async fn retrieve_blocks(
408 node: &DarkfiNodePtr,
409 peers: &[ChannelPtr],
410 last_known: (u32, HeaderHash),
411 block_sub: &JsonSubscriber,
412 checkpoint_blocks: bool,
413) -> Result<(u32, HeaderHash)> {
414 info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
415 let mut last_received = last_known;
416 let mut peer_subs = vec![];
418 for peer in peers {
419 match peer.subscribe_msg::<SyncResponse>().await {
420 Ok(response_sub) => peer_subs.push((Some(response_sub), false)),
421 Err(e) => {
422 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
423 peer_subs.push((None, true))
424 }
425 }
426 }
427 let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
428
429 let mut received_blocks = 0;
430 let total = node.validator.blockchain.headers.len_sync();
431 'blocks_loop: loop {
432 let mut count = 0;
434 for (peer_sub, failed) in &peer_subs {
435 if peer_sub.is_none() || *failed {
436 count += 1;
437 }
438 }
439 if count == peer_subs.len() {
440 debug!(target: "darkfid::task::sync::retrieve_blocks", "All peer connections failed.");
441 break
442 }
443
444 'peers_loop: for (index, peer) in peers.iter().enumerate() {
445 let (peer_sub, failed) = &mut peer_subs[index];
447 if *failed {
448 continue;
449 }
450 let Some(ref response_sub) = peer_sub else {
451 continue;
452 };
453
454 let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
456 if headers.is_empty() {
457 break 'blocks_loop
458 }
459 let mut headers_hashes = Vec::with_capacity(headers.len());
460 let mut synced_headers = Vec::with_capacity(headers.len());
461 for header in &headers {
462 headers_hashes.push(header.hash());
463 synced_headers.push(header.height);
464 }
465
466 let request = SyncRequest { headers: headers_hashes.clone() };
468 if let Err(e) = peer.send(&request).await {
469 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
470 *failed = true;
471 continue
472 };
473
474 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
476 debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
477 *failed = true;
478 continue
479 };
480
481 debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
483 received_blocks += response.blocks.len();
484 if checkpoint_blocks {
485 if let Err(e) =
486 node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
487 {
488 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
489 continue
490 };
491 } else {
492 for block in &response.blocks {
493 if let Err(e) =
494 node.validator.append_proposal(&Proposal::new(block.clone())).await
495 {
496 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
497 continue 'peers_loop
498 };
499 }
500 }
501 last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
502
503 node.validator.blockchain.headers.remove_sync(&synced_headers)?;
505
506 if checkpoint_blocks {
507 let mut notif_blocks = Vec::with_capacity(response.blocks.len());
509 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
510 for (index, block) in response.blocks.iter().enumerate() {
511 info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
512 notif_blocks
513 .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
514 }
515 block_sub.notify(JsonValue::Array(notif_blocks)).await;
516 } else {
517 let confirmed = node.validator.confirmation().await?;
519 if !confirmed.is_empty() {
520 let mut notif_blocks = Vec::with_capacity(confirmed.len());
522 for block in confirmed {
523 notif_blocks.push(JsonValue::String(base64::encode(
524 &serialize_async(&block).await,
525 )));
526 }
527 block_sub.notify(JsonValue::Array(notif_blocks)).await;
528 }
529 }
530
531 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {received_blocks}/{total}");
532 }
533 }
534
535 Ok(last_received)
536}
537
538async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
540 info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
541 let peer = &peers.choose(&mut OsRng).unwrap();
543
544 let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
546 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
547 return
548 };
549 let notif_sub = node.subscribers.get("proposals").unwrap();
550
551 let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
553 if let Err(e) = peer.send(&request).await {
554 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
555 return
556 };
557
558 let Ok(response) = response_sub
560 .receive_with_timeout(node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout)
561 .await
562 else {
563 debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
564 return
565 };
566
567 debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
569 for proposal in &response.proposals {
570 if let Err(e) = node.validator.append_proposal(proposal).await {
571 debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
572 return
573 };
574 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
576 notif_sub.notify(vec![enc_prop].into()).await;
577 }
578}