darkfid/task/
unknown_proposal.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use log::{debug, error, info, warn};
20use tinyjson::JsonValue;
21
22use darkfi::{
23    blockchain::BlockDifficulty,
24    net::{ChannelPtr, P2pPtr},
25    rpc::jsonrpc::JsonSubscriber,
26    util::encoding::base64,
27    validator::{
28        consensus::{Fork, Proposal},
29        pow::PoWModule,
30        utils::{best_fork_index, header_rank},
31        verification::verify_fork_proposal,
32        ValidatorPtr,
33    },
34    Error, Result,
35};
36use darkfi_serial::serialize_async;
37
38use crate::proto::{
39    ForkHeaderHashRequest, ForkHeaderHashResponse, ForkHeadersRequest, ForkHeadersResponse,
40    ForkProposalsRequest, ForkProposalsResponse, ForkSyncRequest, ForkSyncResponse,
41    ProposalMessage, BATCH,
42};
43
44/// Background task to handle unknown proposals.
45pub async fn handle_unknown_proposal(
46    validator: ValidatorPtr,
47    p2p: P2pPtr,
48    proposals_sub: JsonSubscriber,
49    blocks_sub: JsonSubscriber,
50    channel: u32,
51    proposal: Proposal,
52) -> Result<()> {
53    // If proposal fork chain was not found, we ask our peer for its sequence
54    debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence");
55    let Some(channel) = p2p.get_channel(channel) else {
56        error!(target: "darkfid::task::handle_unknown_proposal", "Channel {channel} wasn't found.");
57        return Ok(())
58    };
59
60    // Communication setup
61    let Ok(response_sub) = channel.subscribe_msg::<ForkSyncResponse>().await else {
62        error!(target: "darkfid::task::handle_unknown_proposal", "Failure during `ForkSyncResponse` communication setup with peer: {channel:?}");
63        return Ok(())
64    };
65
66    // Grab last known block to create the request and execute it
67    let last = match validator.blockchain.last() {
68        Ok(l) => l,
69        Err(e) => {
70            debug!(target: "darkfid::task::handle_unknown_proposal", "Blockchain last retriaval failed: {e}");
71            return Ok(())
72        }
73    };
74    let request = ForkSyncRequest { tip: last.1, fork_tip: Some(proposal.hash) };
75    if let Err(e) = channel.send(&request).await {
76        debug!(target: "darkfid::task::handle_unknown_proposal", "Channel send failed: {e}");
77        return Ok(())
78    };
79
80    // Node waits for response
81    let response = match response_sub
82        .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
83        .await
84    {
85        Ok(r) => r,
86        Err(e) => {
87            debug!(target: "darkfid::task::handle_unknown_proposal", "Asking peer for fork sequence failed: {e}");
88            return Ok(())
89        }
90    };
91    debug!(target: "darkfid::task::handle_unknown_proposal", "Peer response: {response:?}");
92
93    // Verify and store retrieved proposals
94    debug!(target: "darkfid::task::handle_unknown_proposal", "Processing received proposals");
95
96    // Response should not be empty
97    if response.proposals.is_empty() {
98        warn!(target: "darkfid::task::handle_unknown_proposal", "Peer responded with empty sequence, node might be out of sync!");
99        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
100    }
101
102    // Sequence length must correspond to requested height
103    if response.proposals.len() as u32 != proposal.block.header.height - last.0 {
104        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence length is erroneous");
105        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
106    }
107
108    // First proposal must extend canonical
109    if response.proposals[0].block.header.previous != last.1 {
110        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't extend canonical");
111        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
112    }
113
114    // Last proposal must be the same as the one requested
115    if response.proposals.last().unwrap().hash != proposal.hash {
116        debug!(target: "darkfid::task::handle_unknown_proposal", "Response sequence doesn't correspond to requested tip");
117        return handle_reorg(validator, p2p, proposals_sub, blocks_sub, channel, proposal).await
118    }
119
120    // Process response proposals
121    for proposal in &response.proposals {
122        // Append proposal
123        match validator.append_proposal(proposal).await {
124            Ok(()) => { /* Do nothing */ }
125            // Skip already existing proposals
126            Err(Error::ProposalAlreadyExists) => continue,
127            Err(e) => {
128                error!(
129                    target: "darkfid::task::handle_unknown_proposal",
130                    "Error while appending response proposal: {e}"
131                );
132                break;
133            }
134        };
135
136        // Broadcast proposal to rest nodes
137        let message = ProposalMessage(proposal.clone());
138        p2p.broadcast_with_exclude(&message, &[channel.address().clone()]).await;
139
140        // Notify proposals subscriber
141        let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
142        proposals_sub.notify(vec![enc_prop].into()).await;
143    }
144
145    Ok(())
146}
147
148// TODO; If a reorg trigger is erroneous, disconnect from peer.
149/// Auxiliary function to handle a potential reorg.
150/// We first find our last common block with the peer,
151/// then grab the header sequence from that block until
152/// the proposal and check if it ranks higher than our
153/// current best ranking fork, to perform a reorg.
154async fn handle_reorg(
155    validator: ValidatorPtr,
156    p2p: P2pPtr,
157    proposals_sub: JsonSubscriber,
158    blocks_sub: JsonSubscriber,
159    channel: ChannelPtr,
160    proposal: Proposal,
161) -> Result<()> {
162    info!(target: "darkfid::task::handle_reorg", "Checking for potential reorg from proposal {} - {} by peer: {channel:?}", proposal.hash, proposal.block.header.height);
163
164    // Check if genesis proposal was provided
165    if proposal.block.header.height == 0 {
166        info!(target: "darkfid::task::handle_reorg", "Peer send a genesis proposal, skipping...");
167        return Ok(())
168    }
169
170    // Communication setup
171    let Ok(response_sub) = channel.subscribe_msg::<ForkHeaderHashResponse>().await else {
172        error!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeaderHashResponse` communication setup with peer: {channel:?}");
173        return Ok(())
174    };
175
176    // Keep track of received header hashes sequence
177    let mut peer_header_hashes = vec![];
178
179    // Find last common header, going backwards from the proposal
180    let mut previous_height = proposal.block.header.height;
181    let mut previous_hash = proposal.hash;
182    for height in (0..proposal.block.header.height).rev() {
183        // Request peer header hash for this height
184        let request = ForkHeaderHashRequest { height, fork_header: proposal.hash };
185        if let Err(e) = channel.send(&request).await {
186            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
187            return Ok(())
188        };
189
190        // Node waits for response
191        let response = match response_sub
192            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
193            .await
194        {
195            Ok(r) => r,
196            Err(e) => {
197                debug!(target: "darkfid::task::handle_reorg", "Asking peer for header hash failed: {e}");
198                return Ok(())
199            }
200        };
201        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
202
203        // Check if peer returned a header
204        let Some(peer_header) = response.fork_header else {
205            info!(target: "darkfid::task::handle_reorg", "Peer responded with an empty header");
206            return Ok(())
207        };
208
209        // Check if we know this header
210        match validator.blockchain.blocks.get_order(&[height], false)?[0] {
211            Some(known_header) => {
212                if known_header == peer_header {
213                    previous_height = height;
214                    previous_hash = known_header;
215                    break
216                }
217                // Since we retrieve in right -> left order we push them in reverse order
218                peer_header_hashes.insert(0, peer_header);
219            }
220            None => peer_header_hashes.insert(0, peer_header),
221        }
222    }
223
224    // Check if we have a sequence to process
225    if peer_header_hashes.is_empty() {
226        info!(target: "darkfid::task::handle_reorg", "No headers to process, skipping...");
227        return Ok(())
228    }
229
230    // Communication setup
231    let Ok(response_sub) = channel.subscribe_msg::<ForkHeadersResponse>().await else {
232        error!(target: "darkfid::task::handle_reorg", "Failure during `ForkHeadersResponse` communication setup with peer: {channel:?}");
233        return Ok(())
234    };
235
236    // Grab last common height ranks
237    let last_common_height = previous_height;
238    let last_difficulty = match previous_height {
239        0 => BlockDifficulty::genesis(validator.blockchain.genesis_block()?.header.timestamp),
240        _ => validator.blockchain.blocks.get_difficulty(&[last_common_height], true)?[0]
241            .clone()
242            .unwrap(),
243    };
244
245    // Create a new PoW from last common height
246    let module = PoWModule::new(
247        validator.consensus.blockchain.clone(),
248        validator.consensus.module.read().await.target,
249        validator.consensus.module.read().await.fixed_difficulty.clone(),
250        Some(last_common_height + 1),
251    )?;
252
253    // Retrieve the headers of the hashes sequence, in batches, keeping track of the sequence ranking
254    info!(target: "darkfid::task::handle_reorg", "Retrieving {} headers from peer...", peer_header_hashes.len());
255    let mut batch = Vec::with_capacity(BATCH);
256    let mut total_processed = 0;
257    let mut targets_rank = last_difficulty.ranks.targets_rank.clone();
258    let mut hashes_rank = last_difficulty.ranks.hashes_rank.clone();
259    let mut headers_module = module.clone();
260    for (index, hash) in peer_header_hashes.iter().enumerate() {
261        // Add hash in batch sequence
262        batch.push(*hash);
263
264        // Check if batch is full so we can send it
265        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
266            continue
267        }
268
269        // Request peer headers
270        let request = ForkHeadersRequest { headers: batch.clone(), fork_header: proposal.hash };
271        if let Err(e) = channel.send(&request).await {
272            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
273            return Ok(())
274        };
275
276        // Node waits for response
277        let response = match response_sub
278            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
279            .await
280        {
281            Ok(r) => r,
282            Err(e) => {
283                debug!(target: "darkfid::task::handle_reorg", "Asking peer for headers sequence failed: {e}");
284                return Ok(())
285            }
286        };
287        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
288
289        // Response sequence must be the same length as the one requested
290        if response.headers.len() != batch.len() {
291            error!(target: "darkfid::task::handle_reorg", "Peer responded with a different headers sequence length");
292            return Ok(())
293        }
294
295        // Process retrieved headers
296        for (peer_header_index, peer_header) in response.headers.iter().enumerate() {
297            let peer_header_hash = peer_header.hash();
298            info!(target: "darkfid::task::handle_reorg", "Processing header: {peer_header_hash} - {}", peer_header.height);
299
300            // Validate its the header we requested
301            if peer_header_hash != batch[peer_header_index] {
302                error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend header: {} - {peer_header_hash}", batch[peer_header_index]);
303                return Ok(())
304            }
305
306            // Validate sequence is correct
307            if peer_header.previous != previous_hash || peer_header.height != previous_height + 1 {
308                error!(target: "darkfid::task::handle_reorg", "Invalid header sequence detected");
309                return Ok(())
310            }
311
312            // Grab next mine target and difficulty
313            let (next_target, next_difficulty) =
314                headers_module.next_mine_target_and_difficulty()?;
315
316            // Verify header hash and calculate its rank
317            let (target_distance_sq, hash_distance_sq) = match header_rank(
318                peer_header,
319                &next_target,
320            ) {
321                Ok(distances) => distances,
322                Err(e) => {
323                    error!(target: "darkfid::task::handle_reorg", "Invalid header hash detected: {e}");
324                    return Ok(())
325                }
326            };
327
328            // Update sequence ranking
329            targets_rank += target_distance_sq.clone();
330            hashes_rank += hash_distance_sq.clone();
331
332            // Update PoW headers module
333            headers_module.append(peer_header.timestamp, &next_difficulty);
334
335            // Set previous header
336            previous_height = peer_header.height;
337            previous_hash = peer_header_hash;
338        }
339
340        total_processed += response.headers.len();
341        info!(target: "darkfid::task::handle_reorg", "Headers received and verified: {total_processed}/{}", peer_header_hashes.len());
342
343        // Reset batch
344        batch = Vec::with_capacity(BATCH);
345    }
346
347    // Check if the sequence ranks higher than our current best fork
348    let forks = validator.consensus.forks.read().await;
349    let best_fork = &forks[best_fork_index(&forks)?];
350    if targets_rank < best_fork.targets_rank ||
351        (targets_rank == best_fork.targets_rank && hashes_rank <= best_fork.hashes_rank)
352    {
353        info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks lower than our current best fork, skipping...");
354        drop(forks);
355        return Ok(())
356    }
357    drop(forks);
358
359    // Communication setup
360    let Ok(response_sub) = channel.subscribe_msg::<ForkProposalsResponse>().await else {
361        error!(target: "darkfid::task::handle_reorg", "Failure during `ForkProposalsResponse` communication setup with peer: {channel:?}");
362        return Ok(())
363    };
364
365    // Create a fork from last common height
366    let mut peer_fork = Fork::new(validator.consensus.blockchain.clone(), module).await?;
367    peer_fork.targets_rank = last_difficulty.ranks.targets_rank.clone();
368    peer_fork.hashes_rank = last_difficulty.ranks.hashes_rank.clone();
369
370    // Grab all state inverse diffs after last common height, and add them to the fork
371    let inverse_diffs =
372        validator.blockchain.blocks.get_state_inverse_diffs_after(last_common_height)?;
373    for inverse_diff in inverse_diffs.iter().rev() {
374        peer_fork.overlay.lock().unwrap().overlay.lock().unwrap().add_diff(inverse_diff)?;
375    }
376
377    // Retrieve the proposals of the hashes sequence, in batches
378    info!(target: "darkfid::task::handle_reorg", "Peer sequence ranks higher than our current best fork, retrieving {} proposals from peer...", peer_header_hashes.len());
379    let mut batch = Vec::with_capacity(BATCH);
380    let mut total_processed = 0;
381    for (index, hash) in peer_header_hashes.iter().enumerate() {
382        // Add hash in batch sequence
383        batch.push(*hash);
384
385        // Check if batch is full so we can send it
386        if batch.len() < BATCH && index != peer_header_hashes.len() - 1 {
387            continue
388        }
389
390        // Request peer proposals
391        let request = ForkProposalsRequest { headers: batch.clone(), fork_header: proposal.hash };
392        if let Err(e) = channel.send(&request).await {
393            debug!(target: "darkfid::task::handle_reorg", "Channel send failed: {e}");
394            return Ok(())
395        };
396
397        // Node waits for response
398        let response = match response_sub
399            .receive_with_timeout(p2p.settings().read().await.outbound_connect_timeout)
400            .await
401        {
402            Ok(r) => r,
403            Err(e) => {
404                debug!(target: "darkfid::task::handle_reorg", "Asking peer for proposals sequence failed: {e}");
405                return Ok(())
406            }
407        };
408        debug!(target: "darkfid::task::handle_reorg", "Peer response: {response:?}");
409
410        // Response sequence must be the same length as the one requested
411        if response.proposals.len() != batch.len() {
412            error!(target: "darkfid::task::handle_reorg", "Peer responded with a different proposals sequence length");
413            return Ok(())
414        }
415
416        // Process retrieved proposal
417        for (peer_proposal_index, peer_proposal) in response.proposals.iter().enumerate() {
418            info!(target: "darkfid::task::handle_reorg", "Processing proposal: {} - {}", peer_proposal.hash, peer_proposal.block.header.height);
419
420            // Validate its the proposal we requested
421            if peer_proposal.hash != batch[peer_proposal_index] {
422                error!(target: "darkfid::task::handle_reorg", "Peer responded with a differend proposal: {} - {}", batch[peer_proposal_index], peer_proposal.hash);
423                return Ok(())
424            }
425
426            // Verify proposal
427            if let Err(e) =
428                verify_fork_proposal(&peer_fork, peer_proposal, validator.verify_fees).await
429            {
430                error!(target: "darkfid::task::handle_reorg", "Verify fork proposal failed: {e}");
431                return Ok(())
432            }
433
434            // Append proposal
435            if let Err(e) = peer_fork.append_proposal(peer_proposal).await {
436                error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
437                return Ok(())
438            }
439        }
440
441        total_processed += response.proposals.len();
442        info!(target: "darkfid::task::handle_reorg", "Proposals received and verified: {total_processed}/{}", peer_header_hashes.len());
443
444        // Reset batch
445        batch = Vec::with_capacity(BATCH);
446    }
447
448    // Verify trigger proposal
449    if let Err(e) = verify_fork_proposal(&peer_fork, &proposal, validator.verify_fees).await {
450        error!(target: "darkfid::task::handle_reorg", "Verify proposal failed: {e}");
451        return Ok(())
452    }
453
454    // Append trigger proposal
455    if let Err(e) = peer_fork.append_proposal(&proposal).await {
456        error!(target: "darkfid::task::handle_reorg", "Appending proposal failed: {e}");
457        return Ok(())
458    }
459
460    // Check if the peer fork ranks higher than our current best fork
461    let mut forks = validator.consensus.forks.write().await;
462    let best_fork = &forks[best_fork_index(&forks)?];
463    if peer_fork.targets_rank < best_fork.targets_rank ||
464        (peer_fork.targets_rank == best_fork.targets_rank &&
465            peer_fork.hashes_rank <= best_fork.hashes_rank)
466    {
467        info!(target: "darkfid::task::handle_reorg", "Peer fork ranks lower than our current best fork, skipping...");
468        drop(forks);
469        return Ok(())
470    }
471
472    // Execute the reorg
473    info!(target: "darkfid::task::handle_reorg", "Peer fork ranks higher than our current best fork, executing reorg...");
474    *forks = vec![peer_fork];
475    drop(forks);
476
477    // Check if we can confirm anything and broadcast them
478    let confirmed = match validator.confirmation().await {
479        Ok(f) => f,
480        Err(e) => {
481            error!(target: "darkfid::task::handle_reorg", "Confirmation failed: {e}");
482            return Ok(())
483        }
484    };
485
486    if !confirmed.is_empty() {
487        let mut notif_blocks = Vec::with_capacity(confirmed.len());
488        for block in confirmed {
489            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
490        }
491        blocks_sub.notify(JsonValue::Array(notif_blocks)).await;
492    }
493
494    // Broadcast proposal to the network
495    let message = ProposalMessage(proposal.clone());
496    p2p.broadcast(&message).await;
497
498    // Notify proposals subscriber
499    let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
500    proposals_sub.notify(vec![enc_prop].into()).await;
501
502    Ok(())
503}