1use std::collections::HashMap;
20
21use darkfi::{
22 blockchain::HeaderHash, net::ChannelPtr, rpc::jsonrpc::JsonSubscriber, system::sleep,
23 util::encoding::base64, validator::consensus::Proposal, Error, Result,
24};
25use darkfi_serial::serialize_async;
26use log::{debug, info, warn};
27use rand::{prelude::SliceRandom, rngs::OsRng};
28use tinyjson::JsonValue;
29
30use crate::{
31 proto::{
32 ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, SyncRequest,
33 SyncResponse, TipRequest, TipResponse, BATCH,
34 },
35 DarkfiNodePtr,
36};
37
38pub async fn sync_task(node: &DarkfiNodePtr, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
43 info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
44
45 let block_sub = node.subscribers.get("blocks").unwrap();
47
48 let mut last = node.validator.blockchain.last()?;
50
51 if let Some(checkpoint) = checkpoint {
53 if checkpoint.0 > last.0 {
54 node.validator.blockchain.headers.remove_all_sync()?;
55 }
56 }
57
58 if let Some(next) = node.validator.blockchain.headers.get_first_sync()? {
60 if next.height == last.0 + 1 {
61 if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? {
63 last = (last_sync.height, last_sync.hash());
64 }
65 } else {
66 node.validator.blockchain.headers.remove_all_sync()?;
68 }
69 }
70 info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
71
72 let (mut common_tip_height, mut common_tip_peers) =
74 most_common_tip(node, &last.1, checkpoint).await;
75
76 if let Some(checkpoint) = checkpoint {
78 if checkpoint.0 > last.0 {
79 info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
80 retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
83
84 last = retrieve_blocks(node, &common_tip_peers, last, block_sub, true).await?;
86 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
87
88 (common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await;
90 }
91 }
92
93 loop {
95 retrieve_headers(node, &common_tip_peers, last.0, common_tip_height + 1).await?;
98
99 let last_received =
101 retrieve_blocks(node, &common_tip_peers, last, block_sub, false).await?;
102 info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
103
104 if last == last_received {
105 break
106 }
107
108 last = last_received;
109
110 (common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await;
112 }
113
114 sync_best_fork(node, &common_tip_peers, &last.1).await;
116
117 let confirmed = node.validator.confirmation().await?;
119 if !confirmed.is_empty() {
120 let mut notif_blocks = Vec::with_capacity(confirmed.len());
122 for block in confirmed {
123 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
124 }
125 block_sub.notify(JsonValue::Array(notif_blocks)).await;
126 }
127
128 *node.validator.synced.write().await = true;
129 info!(target: "darkfid::task::sync_task", "Blockchain synced!");
130 Ok(())
131}
132
133async fn synced_peers(
136 node: &DarkfiNodePtr,
137 last_tip: &HeaderHash,
138 checkpoint: Option<(u32, HeaderHash)>,
139) -> HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> {
140 info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
141 let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
142 let mut tips = HashMap::new();
143 loop {
144 let peers = node.p2p_handler.p2p.hosts().channels();
146
147 for peer in peers {
149 if let Some(c) = checkpoint {
151 let Ok(response_sub) = peer.subscribe_msg::<HeaderSyncResponse>().await else {
153 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncResponse` communication setup with peer: {peer:?}");
154 continue
155 };
156
157 let request = HeaderSyncRequest { height: c.0 + 1 };
159 if let Err(e) = peer.send(&request).await {
160 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
161 continue
162 };
163
164 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
166 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
167 continue
168 };
169
170 if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1 {
172 debug!(target: "darkfid::task::sync::synced_peers", "Invalid `HeaderSyncResponse` from peer: {peer:?}");
173 continue
174 }
175 }
176
177 let Ok(response_sub) = peer.subscribe_msg::<TipResponse>().await else {
179 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipResponse` communication setup with peer: {peer:?}");
180 continue
181 };
182
183 let request = TipRequest { tip: *last_tip };
185 if let Err(e) = peer.send(&request).await {
186 debug!(target: "darkfid::task::sync::synced_peers", "Failure during `TipRequest` send to peer {peer:?}: {e}");
187 continue
188 };
189
190 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
192 debug!(target: "darkfid::task::sync::synced_peers", "Timeout while waiting for `TipResponse` from peer: {peer:?}");
193 continue
194 };
195
196 if response.synced && response.height.is_some() && response.hash.is_some() {
198 let tip = (response.height.unwrap(), *response.hash.unwrap().inner());
199 let Some(tip_peers) = tips.get_mut(&tip) else {
200 tips.insert(tip, vec![peer.clone()]);
201 continue
202 };
203 tip_peers.push(peer.clone());
204 }
205 }
206
207 if !tips.is_empty() {
209 break
210 }
211
212 warn!(target: "darkfid::task::sync::synced_peers", "Node is not connected to other synced nodes, waiting to retry...");
213 let subscription = node.p2p_handler.p2p.hosts().subscribe_channel().await;
214 let _ = subscription.receive().await;
215 subscription.unsubscribe().await;
216
217 info!(target: "darkfid::task::sync::synced_peers", "Sleeping for {comms_timeout} to allow for more nodes to connect...");
218 sleep(comms_timeout).await;
219 }
220
221 tips
222}
223
224async fn most_common_tip(
226 node: &DarkfiNodePtr,
227 last_tip: &HeaderHash,
228 checkpoint: Option<(u32, HeaderHash)>,
229) -> (u32, Vec<ChannelPtr>) {
230 let tips = synced_peers(node, last_tip, checkpoint).await;
232
233 info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
235 let mut common_tip = (0, [0u8; 32], vec![]);
236 for (tip, peers) in tips {
237 if peers.len() < common_tip.2.len() {
239 continue;
240 }
241 if peers.len() == common_tip.2.len() || tip.0 < common_tip.0 {
244 continue;
245 }
246 common_tip = (tip.0, tip.1, peers);
248 }
249
250 info!(target: "darkfid::task::sync::most_common_tip", "Most common tip: {} - {}", common_tip.0, HeaderHash::new(common_tip.1));
251 (common_tip.0, common_tip.2)
252}
253
254async fn retrieve_headers(
256 node: &DarkfiNodePtr,
257 peers: &[ChannelPtr],
258 last_known: u32,
259 tip_height: u32,
260) -> Result<()> {
261 info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers...");
262 let mut peer_subs = vec![];
264 for peer in peers {
265 match peer.subscribe_msg::<HeaderSyncResponse>().await {
266 Ok(response_sub) => peer_subs.push(Some(response_sub)),
267 Err(e) => {
268 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncResponse` communication setup with peer {peer:?}: {e}");
269 peer_subs.push(None)
270 }
271 }
272 }
273 let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
274
275 let total = tip_height - last_known - 1;
277 let mut last_tip_height = tip_height;
278 'headers_loop: loop {
279 for (index, peer) in peers.iter().enumerate() {
280 let Some(ref response_sub) = peer_subs[index] else {
282 continue;
283 };
284
285 let request = HeaderSyncRequest { height: last_tip_height };
287 if let Err(e) = peer.send(&request).await {
288 debug!(target: "darkfid::task::sync::retrieve_headers", "Failure during `HeaderSyncRequest` send to peer {peer:?}: {e}");
289 continue
290 };
291
292 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
294 debug!(target: "darkfid::task::sync::retrieve_headers", "Timeout while waiting for `HeaderSyncResponse` from peer: {peer:?}");
295 continue
296 };
297
298 let mut response_headers = response.headers.to_vec();
300 response_headers.retain(|h| h.height > last_known);
301
302 if response_headers.is_empty() {
303 break 'headers_loop
304 }
305
306 node.validator.blockchain.headers.insert_sync(&response_headers)?;
308 last_tip_height = response_headers[0].height;
309 info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{}", node.validator.blockchain.headers.len_sync(), total);
310 }
311 }
312
313 if node.validator.blockchain.headers.is_empty_sync() {
315 return Ok(());
316 }
317
318 info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence...");
323 let mut verified_headers = 0;
324 let total = node.validator.blockchain.headers.len_sync();
325 let last_known = node.validator.consensus.best_fork_last_header().await?;
328 let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
329 if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
330 node.validator.blockchain.headers.remove_all_sync()?;
331 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
332 }
333 verified_headers += 1;
334 for (index, header) in headers[1..].iter().enumerate() {
335 if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
336 node.validator.blockchain.headers.remove_all_sync()?;
337 return Err(Error::BlockIsInvalid(header.hash().as_string()))
338 }
339 verified_headers += 1;
340 }
341 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total);
342
343 let mut last_checked = headers.last().unwrap().clone();
345 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
346 while !headers.is_empty() {
347 if headers[0].previous != last_checked.hash() ||
348 headers[0].height != last_checked.height + 1
349 {
350 node.validator.blockchain.headers.remove_all_sync()?;
351 return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
352 }
353 verified_headers += 1;
354 for (index, header) in headers[1..].iter().enumerate() {
355 if header.previous != headers[index].hash() ||
356 header.height != headers[index].height + 1
357 {
358 node.validator.blockchain.headers.remove_all_sync()?;
359 return Err(Error::BlockIsInvalid(header.hash().as_string()))
360 }
361 verified_headers += 1;
362 }
363 last_checked = headers.last().unwrap().clone();
364 headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?;
365 info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total);
366 }
367
368 info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!");
369 Ok(())
370}
371
372async fn retrieve_blocks(
374 node: &DarkfiNodePtr,
375 peers: &[ChannelPtr],
376 last_known: (u32, HeaderHash),
377 block_sub: &JsonSubscriber,
378 checkpoint_blocks: bool,
379) -> Result<(u32, HeaderHash)> {
380 info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers...");
381 let mut last_received = last_known;
382 let mut peer_subs = vec![];
384 for peer in peers {
385 match peer.subscribe_msg::<SyncResponse>().await {
386 Ok(response_sub) => peer_subs.push(Some(response_sub)),
387 Err(e) => {
388 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncResponse` communication setup with peer {peer:?}: {e}");
389 peer_subs.push(None)
390 }
391 }
392 }
393 let comms_timeout = node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout;
394
395 let mut received_blocks = 0;
396 let total = node.validator.blockchain.headers.len_sync();
397 'blocks_loop: loop {
398 'peers_loop: for (index, peer) in peers.iter().enumerate() {
399 let Some(ref response_sub) = peer_subs[index] else {
401 continue;
402 };
403
404 let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?;
406 if headers.is_empty() {
407 break 'blocks_loop
408 }
409 let mut headers_hashes = Vec::with_capacity(headers.len());
410 let mut synced_headers = Vec::with_capacity(headers.len());
411 for header in &headers {
412 headers_hashes.push(header.hash());
413 synced_headers.push(header.height);
414 }
415
416 let request = SyncRequest { headers: headers_hashes.clone() };
418 if let Err(e) = peer.send(&request).await {
419 debug!(target: "darkfid::task::sync::retrieve_blocks", "Failure during `SyncRequest` send to peer {peer:?}: {e}");
420 continue
421 };
422
423 let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
425 debug!(target: "darkfid::task::sync::retrieve_blocks", "Timeout while waiting for `SyncResponse` from peer: {peer:?}");
426 continue
427 };
428
429 debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
431 received_blocks += response.blocks.len();
432 if checkpoint_blocks {
433 if let Err(e) =
434 node.validator.add_checkpoint_blocks(&response.blocks, &headers_hashes).await
435 {
436 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while adding checkpoint blocks: {e}");
437 continue
438 };
439 } else {
440 for block in &response.blocks {
441 if let Err(e) =
442 node.validator.append_proposal(&Proposal::new(block.clone())).await
443 {
444 debug!(target: "darkfid::task::sync::retrieve_blocks", "Error while appending proposal: {e}");
445 continue 'peers_loop
446 };
447 }
448 }
449 last_received = (*synced_headers.last().unwrap(), *headers_hashes.last().unwrap());
450
451 node.validator.blockchain.headers.remove_sync(&synced_headers)?;
453
454 if checkpoint_blocks {
455 let mut notif_blocks = Vec::with_capacity(response.blocks.len());
457 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks added:");
458 for (index, block) in response.blocks.iter().enumerate() {
459 info!(target: "darkfid::task::sync::retrieve_blocks", "\t{} - {}", headers_hashes[index], headers[index].height);
460 notif_blocks
461 .push(JsonValue::String(base64::encode(&serialize_async(block).await)));
462 }
463 block_sub.notify(JsonValue::Array(notif_blocks)).await;
464 } else {
465 let confirmed = node.validator.confirmation().await?;
467 if !confirmed.is_empty() {
468 let mut notif_blocks = Vec::with_capacity(confirmed.len());
470 for block in confirmed {
471 notif_blocks.push(JsonValue::String(base64::encode(
472 &serialize_async(&block).await,
473 )));
474 }
475 block_sub.notify(JsonValue::Array(notif_blocks)).await;
476 }
477 }
478
479 info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {}/{}", received_blocks, total);
480 }
481 }
482
483 Ok(last_received)
484}
485
486async fn sync_best_fork(node: &DarkfiNodePtr, peers: &[ChannelPtr], last_tip: &HeaderHash) {
488 info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers...");
489 let peer = &peers.choose(&mut OsRng).unwrap();
491
492 let Ok(response_sub) = peer.subscribe_msg::<ForkSyncResponse>().await else {
494 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncResponse` communication setup with peer: {peer:?}");
495 return
496 };
497 let notif_sub = node.subscribers.get("proposals").unwrap();
498
499 let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
501 if let Err(e) = peer.send(&request).await {
502 debug!(target: "darkfid::task::sync::sync_best_fork", "Failure during `ForkSyncRequest` send to peer {peer:?}: {e}");
503 return
504 };
505
506 let Ok(response) = response_sub
508 .receive_with_timeout(node.p2p_handler.p2p.settings().read().await.outbound_connect_timeout)
509 .await
510 else {
511 debug!(target: "darkfid::task::sync::sync_best_fork", "Timeout while waiting for `ForkSyncResponse` from peer: {peer:?}");
512 return
513 };
514
515 debug!(target: "darkfid::task::sync::sync_best_fork", "Processing received proposals");
517 for proposal in &response.proposals {
518 if let Err(e) = node.validator.append_proposal(proposal).await {
519 debug!(target: "darkfid::task::sync::sync_best_fork", "Error while appending proposal: {e}");
520 return
521 };
522 let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
524 notif_sub.notify(vec![enc_prop].into()).await;
525 }
526}