darkfid/task/
unknown_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{collections::HashSet, sync::Arc};
20
21use smol::{channel::Receiver, lock::RwLock};
22use tinyjson::JsonValue;
23use tracing::{debug, error, info};
24
25use darkfi::{
26    blockchain::BlockDifficulty,
27    net::{ChannelPtr, P2pPtr},
28    rpc::jsonrpc::JsonSubscriber,
29    util::encoding::base64,
30    validator::{
31        consensus::{Fork, Proposal},
32        pow::PoWModule,
33        utils::{best_fork_index, header_rank},
34        verification::verify_fork_proposal,
35        ValidatorPtr,
36    },
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::proto::{
42    ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
43    ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
44    ProposalMessage, BATCH,
45};
46
47/// Background task to handle unknown proposals.
48pub async fn handle_unknown_proposals(
49    receiver: Receiver<(Proposal, u32)>,
50    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
51    validator: ValidatorPtr,
52    p2p: P2pPtr,
53    proposals_sub: JsonSubscriber,
54    blocks_sub: JsonSubscriber,
55) -> Result<()> {
56    debug!(target: "darkfid::task::handle_unknown_proposal", "START");
57    loop {
58        // Wait for a new unknown proposal trigger
59        let (proposal, channel) = match receiver.recv().await {
60            Ok(m) => m,
61            Err(e) => {
62                debug!(
63                    target: "darkfid::task::handle_unknown_proposal",
64                    "recv fail: {e}"
65                );
66                continue
67            }
68        };
69
70        // Check if proposal exists in our queue
71        let lock = unknown_proposals.read().await;
72        let contains_proposal = lock.contains(proposal.hash.inner());
73        drop(lock);
74        if !contains_proposal {
75            debug!(
76                target: "darkfid::task::handle_unknown_proposal",
77                "Proposal {} is not in our unknown proposals queue.",
78                proposal.hash,
79            );
80            continue
81        };
82
83        // Handle the unknown proposal
84        if handle_unknown_proposal(
85            &validator,
86            &p2p,
87            &proposals_sub,
88            &blocks_sub,
89            channel,
90            &proposal,
91        )
92        .await
93        {
94            // Ban channel if it exists
95            if let Some(channel) = p2p.get_channel(channel) {
96                channel.ban().await;
97            }
98        };
99
100        // Remove proposal from the queue
101        let mut lock = unknown_proposals.write().await;
102        lock.remove(proposal.hash.inner());
103        drop(lock);
104    }
105}
106
107/// Background task to handle an unknown proposal.
108/// Returns a boolean flag indicate if we should ban the channel.
109async fn handle_unknown_proposal(
110    validator: &ValidatorPtr,
111    p2p: &P2pPtr,
112    proposals_sub: &JsonSubscriber,
113    blocks_sub: &JsonSubscriber,
114    channel: u32,
115    proposal: &Proposal,
116) -> bool {
117    // If proposal fork chain was not found, we ask our peer for its sequence
118    debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
119    let Some(channel) = p2p.get_channel(channel) else {
120        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
121        return false
122    };
123
124    // Communication setup
125    let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
126        debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
127        return true
128    };
129
130    // Grab last known block to create the request and execute it
131    let last = match validator.blockchain.last() {
132        Ok(l) => l,
133        Err(e) => {
134            error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
135            return false
136        }
137    };
138    let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
139    if let Err(e) = channel.send(&request).await {
140        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
141        return true
142    };
143
144    // Node waits for response
145    let response = match response_sub
146        .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
147        .await
148    {
149        Ok(r) => r,
150        Err(e) => {
151            debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
152            return true
153        }
154    };
155    debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
156
157    // Verify and store retrieved proposals
158    debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
159
160    // Response should not be empty
161    if response.proposals.is_empty() {
162        debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
163        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
164    }
165
166    // Sequence length must correspond to requested height
167    if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
168        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
169        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
170    }
171
172    // First proposal must extend canonical
173    if response.proposals[0].block.header.previous != last.1 {
174        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
175        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
176    }
177
178    // Last proposal must be the same as the one requested
179    if response.proposals.last().unwrap().hash != proposal.hash {
180        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
181        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
182    }
183
184    // Process response proposals
185    for proposal in &response.proposals {
186        // Append proposal
187        match validator.append_proposal(proposal).await {
188            Ok(()) => { /* Do nothing */ }
189            // Skip already existing proposals
190            Err(Error::ProposalAlreadyExists) => continue,
191            Err(e) => {
192                debug!(
193                    target: "darkfid::task::handle_unknown_proposal",
194                    "Error while appending response proposal: {e}"
195                );
196                break;
197            }
198        };
199
200        // Broadcast proposal to rest nodes
201        let message = ProposalMessage(proposal.clone());
202        p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
203
204        // Notify proposals subscriber
205        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
206        proposals_sub.notify(vec![enc_prop].into()).await;
207    }
208
209    false
210}
211
212/// Auxiliary function to handle a potential reorg.
213/// We first find our last common block with the peer,
214/// then grab the header sequence from that block until
215/// the proposal and check if it ranks higher than our
216/// current best ranking fork, to perform a reorg.
217/// Returns a boolean flag indicate if we should ban the
218/// channel.
219async fn handle_reorg(
220    validator: &ValidatorPtr,
221    p2p: &P2pPtr,
222    proposals_sub: &JsonSubscriber,
223    blocks_sub: &JsonSubscriber,
224    channel: ChannelPtr,
225    proposal: &Proposal,
226) -> bool {
227    info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
228
229    // Check if genesis proposal was provided
230    if proposal.block.header.height == 0 {
231        debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
232        return true
233    }
234
235    // Communication setup
236    let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
237        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
238        return true
239    };
240
241    // Keep track of received header hashes sequence
242    let mut peer_header_hashes = vec![];
243
244    // Find last common header, going backwards from the proposal
245    let mut previous_height = proposal.block.header.height;
246    let mut previous_hash = proposal.hash;
247    for height in (0..proposal.block.header.height).rev() {
248        // Request peer header hash for this height
249        let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
250        if let Err(e) = channel.send(&request).await {
251            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
252            return true
253        };
254
255        // Node waits for response
256        let response = match response_sub
257            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
258            .await
259        {
260            Ok(r) => r,
261            Err(e) => {
262                debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
263                return true
264            }
265        };
266        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
267
268        // Check if peer returned a header
269        let Some(peer_header) = response.fork_header else {
270            debug!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
271            return true
272        };
273
274        // Check if we know this header
275        let headers = match validator.blockchain.blocks.get_order(&[height], false) {
276            Ok(r) => r,
277            Err(e) => {
278                error!(target: "darkfid::task::handle_reorg", "Retrieving headers failed: {e}");
279                return false
280            }
281        };
282        match headers[0] {
283            Some(known_header) => {
284                if known_header == peer_header {
285                    previous_height = height;
286                    previous_hash = known_header;
287                    break
288                }
289                // Since we retrieve in right -> left order we push them in reverse order
290                peer_header_hashes.insert(0, peer_header);
291            }
292            None => peer_header_hashes.insert(0, peer_header),
293        }
294    }
295
296    // Check if we have a sequence to process
297    if peer_header_hashes.is_empty() {
298        debug!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
299        return true
300    }
301
302    // Communication setup
303    let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
304        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
305        return true
306    };
307
308    // Grab last common height ranks
309    let last_common_height = previous_height;
310    let last_difficulty = match previous_height {
311        0 => {
312            let genesis_timestamp = match validator.blockchain.genesis_block() {
313                Ok(b) => b.header.timestamp,
314                Err(e) => {
315                    error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
316                    return false
317                }
318            };
319            BlockDifficulty::genesis(genesis_timestamp)
320        }
321        _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
322            Ok(d) => d[0].clone().unwrap(),
323            Err(e) => {
324                error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
325                return false
326            }
327        },
328    };
329
330    // Create a new PoW from last common height
331    let module = match PoWModule::new(
332        validator.consensus.blockchain.clone(),
333        validator.consensus.module.read().await.target,
334        validator.consensus.module.read().await.fixed_difficulty.clone(),
335        Some(last_common_height + 1),
336    ) {
337        Ok(m) => m,
338        Err(e) => {
339            error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
340            return false
341        }
342    };
343
344    // Retrieve the headers of the hashes sequence, in batches, keeping track of the sequence ranking
345    info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
346    let mut batch = Vec::with_capacity(BATCH);
347    let mut total_processed = 0;
348    let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
349    let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
350    let mut headers_module = module.clone();
351    for (index, hash) in peer_header_hashes.iter().enumerate() {
352        // Add hash in batch sequence
353        batch.push(*hash);
354
355        // Check if batch is full so we can send it
356        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
357            continue
358        }
359
360        // Request peer headers
361        let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
362        if let Err(e) = channel.send(&request).await {
363            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
364            return true
365        };
366
367        // Node waits for response
368        let response = match response_sub
369            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
370            .await
371        {
372            Ok(r) => r,
373            Err(e) => {
374                debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
375                return true
376            }
377        };
378        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
379
380        // Response sequence must be the same length as the one requested
381        if response.headers.len() != batch.len() {
382            debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
383            return true
384        }
385
386        // Process retrieved headers
387        for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
388            let peer_header_hash = peer_header.hash();
389            debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
390
391            // Validate its the header we requested
392            if peer_header_hash != batch[peer_header_index] {
393                debug!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
394                return true
395            }
396
397            // Validate sequence is correct
398            if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
399                debug!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
400                return true
401            }
402
403            // Grab next mine target and difficulty
404            let (next_target, next_difficulty) = match headers_module
405                .next_mine_target_and_difficulty()
406            {
407                Ok(p) => p,
408                Err(e) => {
409                    debug!(target: "darkfid::task::handle_reorg", "Retrieving next mine target and difficulty failed: {e}");
410                    return false
411                }
412            };
413
414            // Verify header hash and calculate its rank
415            let (target_distance_sq, hash_distance_sq) = match header_rank(
416                peer_header,
417                &next_target,
418            ) {
419                Ok(distances) => distances,
420                Err(e) => {
421                    debug!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
422                    return true
423                }
424            };
425
426            // Update sequence ranking
427            targets_rank += target_distance_sq.clone();
428            hashes_rank += hash_distance_sq.clone();
429
430            // Update PoW headers module
431            if let Err(e) = headers_module.append(peer_header, &next_difficulty) {
432                debug!(target: "darkfid::task::handle_reorg", "Error while appending header to module: {e}");
433                return true
434            };
435
436            // Set previous header
437            previous_height = peer_header.height;
438            previous_hash = peer_header_hash;
439        }
440
441        total_processed += response.headers.len();
442        info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
443
444        // Reset batch
445        batch = Vec::with_capacity(BATCH);
446    }
447
448    // Check if the sequence ranks higher than our current best fork
449    let forks = validator.consensus.forks.read().await;
450    let index = match best_fork_index(&forks) {
451        Ok(i) => i,
452        Err(e) => {
453            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
454            return false
455        }
456    };
457    let best_fork = &forks[index];
458    if targets_rank < best_fork.targets_rank ||
459        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
460    {
461        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
462        drop(forks);
463        return true
464    }
465    drop(forks);
466
467    // Communication setup
468    let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
469        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
470        return true
471    };
472
473    // Create a fork from last common height
474    let mut peer_fork =
475        match Fork::new(validator.consensus.blockchain.clone(), module.clone()).await {
476            Ok(f) => f,
477            Err(e) => {
478                error!(target: "darkfid::task::handle_reorg", "Generating peer fork failed: {e}");
479                return false
480            }
481        };
482    peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
483    peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
484
485    // Grab all state inverse diffs after last common height, and add them to the fork
486    let inverse_diffs = match validator
487        .blockchain
488        .blocks
489        .get_state_inverse_diffs_after(last_common_height)
490    {
491        Ok(i) => i,
492        Err(e) => {
493            error!(target: "darkfid::task::handle_reorg", "Retrieving state inverse diffs failed: {e}");
494            return false
495        }
496    };
497    for inverse_diff in inverse_diffs.iter().rev() {
498        if let Err(e) =
499            peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)
500        {
501            error!(target: "darkfid::task::handle_reorg", "Applying inverse diff failed: {e}");
502            return false
503        }
504    }
505
506    // Rebuild fork contracts states monotree
507    if let Err(e) = peer_fork.compute_monotree() {
508        error!(target: "darkfid::task::handle_reorg", "Rebuilding peer fork monotree failed: {e}");
509        return false
510    }
511
512    // Retrieve the proposals of the hashes sequence, in batches
513    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
514    let mut batch = Vec::with_capacity(BATCH);
515    let mut total_processed = 0;
516    for (index, hash) in peer_header_hashes.iter().enumerate() {
517        // Add hash in batch sequence
518        batch.push(*hash);
519
520        // Check if batch is full so we can send it
521        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
522            continue
523        }
524
525        // Request peer proposals
526        let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
527        if let Err(e) = channel.send(&request).await {
528            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
529            return true
530        };
531
532        // Node waits for response
533        let response = match response_sub
534            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
535            .await
536        {
537            Ok(r) => r,
538            Err(e) => {
539                debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
540                return true
541            }
542        };
543        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
544
545        // Response sequence must be the same length as the one requested
546        if response.proposals.len() != batch.len() {
547            debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
548            return true
549        }
550
551        // Process retrieved proposal
552        for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
553            info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
554
555            // Validate its the proposal we requested
556            if peer_proposal.hash != batch[peer_proposal_index] {
557                error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
558                return true
559            }
560
561            // Verify proposal
562            if let Err(e) =
563                verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await
564            {
565                error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
566                return true
567            }
568
569            // Append proposal
570            if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
571                error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
572                return true
573            }
574        }
575
576        total_processed += response.proposals.len();
577        info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
578
579        // Reset batch
580        batch = Vec::with_capacity(BATCH);
581    }
582
583    // Verify trigger proposal
584    if let Err(e) = verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await {
585        error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
586        return true
587    }
588
589    // Append trigger proposal
590    if let Err(e) = peer_fork.append_proposal(proposal).await {
591        error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
592        return true
593    }
594
595    // Check if the peer fork ranks higher than our current best fork
596    let mut forks = validator.consensus.forks.write().await;
597    let index = match best_fork_index(&forks) {
598        Ok(i) => i,
599        Err(e) => {
600            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
601            return false
602        }
603    };
604    let best_fork = &forks[index];
605    if peer_fork.targets_rank < best_fork.targets_rank ||
606        (peer_fork.targets_rank == best_fork.targets_rank &&
607            peer_fork.hashes_rank <= best_fork.hashes_rank)
608    {
609        info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
610        drop(forks);
611        return true
612    }
613
614    // Execute the reorg
615    info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
616    *validator.consensus.module.write().await = module;
617    *forks = vec![peer_fork];
618    drop(forks);
619
620    // Check if we can confirm anything and broadcast them
621    let confirmed = match validator.confirmation().await {
622        Ok(f) => f,
623        Err(e) => {
624            error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
625            return false
626        }
627    };
628
629    if !confirmed.is_empty() {
630        let mut notif_blocks = Vec::with_capacity(confirmed.len());
631        for block in confirmed {
632            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
633        }
634        blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
635    }
636
637    // Broadcast proposal to the network
638    let message = ProposalMessage(proposal.clone());
639    p2p.broadcast(&message).await;
640
641    // Notify proposals subscriber
642    let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
643    proposals_sub.notify(vec![enc_prop].into()).await;
644
645    false
646}