darkfid/task/
unknown_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{collections::HashSet, sync::Arc};
20
21use log::{debug, error, info};
22use smol::{channel::Receiver, lock::RwLock};
23use tinyjson::JsonValue;
24
25use darkfi::{
26    blockchain::BlockDifficulty,
27    net::{ChannelPtr, P2pPtr},
28    rpc::jsonrpc::JsonSubscriber,
29    util::encoding::base64,
30    validator::{
31        consensus::{Fork, Proposal},
32        pow::PoWModule,
33        utils::{best_fork_index, header_rank},
34        verification::verify_fork_proposal,
35        ValidatorPtr,
36    },
37    Error, Result,
38};
39use darkfi_serial::serialize_async;
40
41use crate::proto::{
42    ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
43    ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
44    ProposalMessage, BATCH,
45};
46
47/// Background task to handle unknown proposals.
48pub async fn handle_unknown_proposals(
49    receiver: Receiver<(Proposal, u32)>,
50    unknown_proposals: Arc<RwLock<HashSet<[u8; 32]>>>,
51    validator: ValidatorPtr,
52    p2p: P2pPtr,
53    proposals_sub: JsonSubscriber,
54    blocks_sub: JsonSubscriber,
55) -> Result<()> {
56    debug!(target: "darkfid::task::handle_unknown_proposal", "START");
57    loop {
58        // Wait for a new unknown proposal trigger
59        let (proposal, channel) = match receiver.recv().await {
60            Ok(m) => m,
61            Err(e) => {
62                debug!(
63                    target: "darkfid::task::handle_unknown_proposal",
64                    "recv fail: {e}"
65                );
66                continue
67            }
68        };
69
70        // Check if proposal exists in our queue
71        let lock = unknown_proposals.read().await;
72        let contains_proposal = lock.contains(proposal.hash.inner());
73        drop(lock);
74        if !contains_proposal {
75            debug!(
76                target: "darkfid::task::handle_unknown_proposal",
77                "Proposal {} is not in our unknown proposals queue.",
78                proposal.hash,
79            );
80            continue
81        };
82
83        // Handle the unknown proposal
84        if handle_unknown_proposal(
85            &validator,
86            &p2p,
87            &proposals_sub,
88            &blocks_sub,
89            channel,
90            &proposal,
91        )
92        .await
93        {
94            // Ban channel if it exists
95            if let Some(channel) = p2p.get_channel(channel) {
96                channel.ban().await;
97            }
98        };
99
100        // Remove proposal from the queue
101        let mut lock = unknown_proposals.write().await;
102        lock.remove(proposal.hash.inner());
103        drop(lock);
104    }
105}
106
107/// Background task to handle an unknown proposal.
108/// Returns a boolean flag indicate if we should ban the channel.
109async fn handle_unknown_proposal(
110    validator: &ValidatorPtr,
111    p2p: &P2pPtr,
112    proposals_sub: &JsonSubscriber,
113    blocks_sub: &JsonSubscriber,
114    channel: u32,
115    proposal: &Proposal,
116) -> bool {
117    // If proposal fork chain was not found, we ask our peer for its sequence
118    debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
119    let Some(channel) = p2p.get_channel(channel) else {
120        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
121        return false
122    };
123
124    // Communication setup
125    let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
126        debug!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
127        return true
128    };
129
130    // Grab last known block to create the request and execute it
131    let last = match validator.blockchain.last() {
132        Ok(l) => l,
133        Err(e) => {
134            error!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
135            return false
136        }
137    };
138    let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
139    if let Err(e) = channel.send(&request).await {
140        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
141        return true
142    };
143
144    // Node waits for response
145    let response = match response_sub
146        .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
147        .await
148    {
149        Ok(r) => r,
150        Err(e) => {
151            debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
152            return true
153        }
154    };
155    debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
156
157    // Verify and store retrieved proposals
158    debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
159
160    // Response should not be empty
161    if response.proposals.is_empty() {
162        debug!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
163        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
164    }
165
166    // Sequence length must correspond to requested height
167    if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
168        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
169        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
170    }
171
172    // First proposal must extend canonical
173    if response.proposals[0].block.header.previous != last.1 {
174        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
175        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
176    }
177
178    // Last proposal must be the same as the one requested
179    if response.proposals.last().unwrap().hash != proposal.hash {
180        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
181        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
182    }
183
184    // Process response proposals
185    for proposal in &response.proposals {
186        // Append proposal
187        match validator.append_proposal(proposal).await {
188            Ok(()) => { /* Do nothing */ }
189            // Skip already existing proposals
190            Err(Error::ProposalAlreadyExists) => continue,
191            Err(e) => {
192                debug!(
193                    target: "darkfid::task::handle_unknown_proposal",
194                    "Error while appending response proposal: {e}"
195                );
196                break;
197            }
198        };
199
200        // Broadcast proposal to rest nodes
201        let message = ProposalMessage(proposal.clone());
202        p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
203
204        // Notify proposals subscriber
205        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
206        proposals_sub.notify(vec![enc_prop].into()).await;
207    }
208
209    false
210}
211
212/// Auxiliary function to handle a potential reorg.
213/// We first find our last common block with the peer,
214/// then grab the header sequence from that block until
215/// the proposal and check if it ranks higher than our
216/// current best ranking fork, to perform a reorg.
217/// Returns a boolean flag indicate if we should ban the
218/// channel.
219async fn handle_reorg(
220    validator: &ValidatorPtr,
221    p2p: &P2pPtr,
222    proposals_sub: &JsonSubscriber,
223    blocks_sub: &JsonSubscriber,
224    channel: ChannelPtr,
225    proposal: &Proposal,
226) -> bool {
227    info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
228
229    // Check if genesis proposal was provided
230    if proposal.block.header.height == 0 {
231        debug!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
232        return true
233    }
234
235    // Communication setup
236    let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
237        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
238        return true
239    };
240
241    // Keep track of received header hashes sequence
242    let mut peer_header_hashes = vec![];
243
244    // Find last common header, going backwards from the proposal
245    let mut previous_height = proposal.block.header.height;
246    let mut previous_hash = proposal.hash;
247    for height in (0..proposal.block.header.height).rev() {
248        // Request peer header hash for this height
249        let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
250        if let Err(e) = channel.send(&request).await {
251            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
252            return true
253        };
254
255        // Node waits for response
256        let response = match response_sub
257            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
258            .await
259        {
260            Ok(r) => r,
261            Err(e) => {
262                debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
263                return true
264            }
265        };
266        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
267
268        // Check if peer returned a header
269        let Some(peer_header) = response.fork_header else {
270            debug!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
271            return true
272        };
273
274        // Check if we know this header
275        let headers = match validator.blockchain.blocks.get_order(&[height], false) {
276            Ok(r) => r,
277            Err(e) => {
278                error!(target: "darkfid::task::handle_reorg", "Retrieving headers failed: {e}");
279                return false
280            }
281        };
282        match headers[0] {
283            Some(known_header) => {
284                if known_header == peer_header {
285                    previous_height = height;
286                    previous_hash = known_header;
287                    break
288                }
289                // Since we retrieve in right -> left order we push them in reverse order
290                peer_header_hashes.insert(0, peer_header);
291            }
292            None => peer_header_hashes.insert(0, peer_header),
293        }
294    }
295
296    // Check if we have a sequence to process
297    if peer_header_hashes.is_empty() {
298        debug!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
299        return true
300    }
301
302    // Communication setup
303    let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
304        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
305        return true
306    };
307
308    // Grab last common height ranks
309    let last_common_height = previous_height;
310    let last_difficulty = match previous_height {
311        0 => {
312            let genesis_timestamp = match validator.blockchain.genesis_block() {
313                Ok(b) => b.header.timestamp,
314                Err(e) => {
315                    error!(target: "darkfid::task::handle_reorg", "Retrieving genesis block failed: {e}");
316                    return false
317                }
318            };
319            BlockDifficulty::genesis(genesis_timestamp)
320        }
321        _ => match validator.blockchain.blocks.get_difficulty(&[last_common_height], true) {
322            Ok(d) => d[0].clone().unwrap(),
323            Err(e) => {
324                error!(target: "darkfid::task::handle_reorg", "Retrieving block difficulty failed: {e}");
325                return false
326            }
327        },
328    };
329
330    // Create a new PoW from last common height
331    let module = match PoWModule::new(
332        validator.consensus.blockchain.clone(),
333        validator.consensus.module.read().await.target,
334        validator.consensus.module.read().await.fixed_difficulty.clone(),
335        Some(last_common_height + 1),
336    ) {
337        Ok(m) => m,
338        Err(e) => {
339            error!(target: "darkfid::task::handle_reorg", "PoWModule generation failed: {e}");
340            return false
341        }
342    };
343
344    // Retrieve the headers of the hashes sequence, in batches, keeping track of the sequence ranking
345    info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
346    let mut batch = Vec::with_capacity(BATCH);
347    let mut total_processed = 0;
348    let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
349    let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
350    let mut headers_module = module.clone();
351    for (index, hash) in peer_header_hashes.iter().enumerate() {
352        // Add hash in batch sequence
353        batch.push(*hash);
354
355        // Check if batch is full so we can send it
356        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
357            continue
358        }
359
360        // Request peer headers
361        let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
362        if let Err(e) = channel.send(&request).await {
363            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
364            return true
365        };
366
367        // Node waits for response
368        let response = match response_sub
369            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
370            .await
371        {
372            Ok(r) => r,
373            Err(e) => {
374                debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
375                return true
376            }
377        };
378        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
379
380        // Response sequence must be the same length as the one requested
381        if response.headers.len() != batch.len() {
382            debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
383            return true
384        }
385
386        // Process retrieved headers
387        for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
388            let peer_header_hash = peer_header.hash();
389            debug!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
390
391            // Validate its the header we requested
392            if peer_header_hash != batch[peer_header_index] {
393                debug!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
394                return true
395            }
396
397            // Validate sequence is correct
398            if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
399                debug!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
400                return true
401            }
402
403            // Grab next mine target and difficulty
404            let (next_target, next_difficulty) = match headers_module
405                .next_mine_target_and_difficulty()
406            {
407                Ok(p) => p,
408                Err(e) => {
409                    debug!(target: "darkfid::task::handle_reorg", "Retrieving next mine target and difficulty failed: {e}");
410                    return false
411                }
412            };
413
414            // Verify header hash and calculate its rank
415            let (target_distance_sq, hash_distance_sq) = match header_rank(
416                peer_header,
417                &next_target,
418            ) {
419                Ok(distances) => distances,
420                Err(e) => {
421                    debug!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
422                    return true
423                }
424            };
425
426            // Update sequence ranking
427            targets_rank += target_distance_sq.clone();
428            hashes_rank += hash_distance_sq.clone();
429
430            // Update PoW headers module
431            headers_module.append(peer_header.timestamp, &next_difficulty);
432
433            // Set previous header
434            previous_height = peer_header.height;
435            previous_hash = peer_header_hash;
436        }
437
438        total_processed += response.headers.len();
439        info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
440
441        // Reset batch
442        batch = Vec::with_capacity(BATCH);
443    }
444
445    // Check if the sequence ranks higher than our current best fork
446    let forks = validator.consensus.forks.read().await;
447    let index = match best_fork_index(&forks) {
448        Ok(i) => i,
449        Err(e) => {
450            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
451            return false
452        }
453    };
454    let best_fork = &forks[index];
455    if targets_rank < best_fork.targets_rank ||
456        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
457    {
458        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
459        drop(forks);
460        return true
461    }
462    drop(forks);
463
464    // Communication setup
465    let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
466        debug!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
467        return true
468    };
469
470    // Create a fork from last common height
471    let mut peer_fork =
472        match Fork::new(validator.consensus.blockchain.clone(), module.clone()).await {
473            Ok(f) => f,
474            Err(e) => {
475                error!(target: "darkfid::task::handle_reorg", "Generating peer fork failed: {e}");
476                return false
477            }
478        };
479    peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
480    peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
481
482    // Grab all state inverse diffs after last common height, and add them to the fork
483    let inverse_diffs = match validator
484        .blockchain
485        .blocks
486        .get_state_inverse_diffs_after(last_common_height)
487    {
488        Ok(i) => i,
489        Err(e) => {
490            error!(target: "darkfid::task::handle_reorg", "Retrieving state inverse diffs failed: {e}");
491            return false
492        }
493    };
494    for inverse_diff in inverse_diffs.iter().rev() {
495        if let Err(e) =
496            peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)
497        {
498            error!(target: "darkfid::task::handle_reorg", "Applying inverse diff failed: {e}");
499            return false
500        }
501    }
502
503    // Rebuild fork contracts states monotree
504    if let Err(e) = peer_fork.compute_monotree() {
505        error!(target: "darkfid::task::handle_reorg", "Rebuilding peer fork monotree failed: {e}");
506        return false
507    }
508
509    // Retrieve the proposals of the hashes sequence, in batches
510    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
511    let mut batch = Vec::with_capacity(BATCH);
512    let mut total_processed = 0;
513    for (index, hash) in peer_header_hashes.iter().enumerate() {
514        // Add hash in batch sequence
515        batch.push(*hash);
516
517        // Check if batch is full so we can send it
518        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
519            continue
520        }
521
522        // Request peer proposals
523        let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
524        if let Err(e) = channel.send(&request).await {
525            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
526            return true
527        };
528
529        // Node waits for response
530        let response = match response_sub
531            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
532            .await
533        {
534            Ok(r) => r,
535            Err(e) => {
536                debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
537                return true
538            }
539        };
540        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
541
542        // Response sequence must be the same length as the one requested
543        if response.proposals.len() != batch.len() {
544            debug!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
545            return true
546        }
547
548        // Process retrieved proposal
549        for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
550            info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
551
552            // Validate its the proposal we requested
553            if peer_proposal.hash != batch[peer_proposal_index] {
554                error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
555                return true
556            }
557
558            // Verify proposal
559            if let Err(e) =
560                verify_fork_proposal(&mut peer_fork, peer_proposal, validator.verify_fees).await
561            {
562                error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
563                return true
564            }
565
566            // Append proposal
567            if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
568                error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
569                return true
570            }
571        }
572
573        total_processed += response.proposals.len();
574        info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
575
576        // Reset batch
577        batch = Vec::with_capacity(BATCH);
578    }
579
580    // Verify trigger proposal
581    if let Err(e) = verify_fork_proposal(&mut peer_fork, proposal, validator.verify_fees).await {
582        error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
583        return true
584    }
585
586    // Append trigger proposal
587    if let Err(e) = peer_fork.append_proposal(proposal).await {
588        error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
589        return true
590    }
591
592    // Check if the peer fork ranks higher than our current best fork
593    let mut forks = validator.consensus.forks.write().await;
594    let index = match best_fork_index(&forks) {
595        Ok(i) => i,
596        Err(e) => {
597            debug!(target: "darkfid::task::handle_reorg", "Retrieving best fork index failed: {e}");
598            return false
599        }
600    };
601    let best_fork = &forks[index];
602    if peer_fork.targets_rank < best_fork.targets_rank ||
603        (peer_fork.targets_rank == best_fork.targets_rank &&
604            peer_fork.hashes_rank <= best_fork.hashes_rank)
605    {
606        info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
607        drop(forks);
608        return true
609    }
610
611    // Execute the reorg
612    info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
613    *validator.consensus.module.write().await = module;
614    *forks = vec![peer_fork];
615    drop(forks);
616
617    // Check if we can confirm anything and broadcast them
618    let confirmed = match validator.confirmation().await {
619        Ok(f) => f,
620        Err(e) => {
621            error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
622            return false
623        }
624    };
625
626    if !confirmed.is_empty() {
627        let mut notif_blocks = Vec::with_capacity(confirmed.len());
628        for block in confirmed {
629            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
630        }
631        blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
632    }
633
634    // Broadcast proposal to the network
635    let message = ProposalMessage(proposal.clone());
636    p2p.broadcast(&message).await;
637
638    // Notify proposals subscriber
639    let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
640    proposals_sub.notify(vec![enc_prop].into()).await;
641
642    false
643}