darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32    task::{garbage_collect_task, sync_task},
33    DarkfiNodePtr,
34};
35
36/// Auxiliary structure representing node consensus init task configuration.
37#[derive(Clone)]
38pub struct ConsensusInitTaskConfig {
39    /// Skip syncing process and start node right away
40    pub skip_sync: bool,
41    /// Optional sync checkpoint height
42    pub checkpoint_height: Option<u32>,
43    /// Optional sync checkpoint hash
44    pub checkpoint: Option<String>,
45    /// Optional bootstrap timestamp
46    pub bootstrap: u64,
47}
48
49/// Sync the node consensus state and start the corresponding task, based on node type.
50pub async fn consensus_init_task(
51    node: DarkfiNodePtr,
52    config: ConsensusInitTaskConfig,
53    ex: ExecutorPtr,
54) -> Result<()> {
55    // Check current canonical blockchain for curruption
56    // TODO: create a restore method reverting each block backwards
57    //       until its healthy again
58    node.validator.consensus.healthcheck().await?;
59
60    // Check if network is configured to start in the future.
61    // NOTE: Always configure the network to start in the future when bootstrapping
62    // or restarting it.
63    let current = Timestamp::current_time().inner();
64    if current < config.bootstrap {
65        let diff = config.bootstrap - current;
66        info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
67        sleep(diff).await;
68    }
69
70    // Generate a new fork to be able to extend
71    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
72    node.validator.consensus.generate_empty_fork().await?;
73
74    // Sync blockchain
75    let checkpoint = if !config.skip_sync {
76        // Parse configured checkpoint
77        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
78            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
79        }
80
81        let checkpoint = if let Some(height) = config.checkpoint_height {
82            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
83        } else {
84            None
85        };
86
87        sync_task(&node, checkpoint).await?;
88        checkpoint
89    } else {
90        *node.validator.synced.write().await = true;
91        None
92    };
93
94    // Gracefully handle network disconnections
95    loop {
96        match listen_to_network(&node, &ex).await {
97            Ok(_) => return Ok(()),
98            Err(Error::NetworkNotConnected) => {
99                // Sync node again
100                *node.validator.synced.write().await = false;
101                node.validator.consensus.purge_forks().await?;
102                if !config.skip_sync {
103                    sync_task(&node, checkpoint).await?;
104                } else {
105                    *node.validator.synced.write().await = true;
106                }
107            }
108            Err(e) => return Err(e),
109        }
110    }
111}
112
113/// Async task to start the consensus task, while monitoring for a network disconnections.
114async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
115    // Grab proposals subscriber and subscribe to it
116    let proposals_sub = node.subscribers.get("proposals").unwrap();
117    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
118
119    // Subscribe to the network disconnect subscriber
120    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
121
122    let result = smol::future::or(
123        monitor_network(&net_subscription),
124        consensus_task(node, &prop_subscription, ex),
125    )
126    .await;
127
128    // Terminate the subscriptions
129    prop_subscription.unsubscribe().await;
130    net_subscription.unsubscribe().await;
131
132    result
133}
134
135/// Async task to monitor network disconnections.
136async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
137    Err(subscription.receive().await)
138}
139
140/// Async task used for listening for new blocks and perform consensus.
141async fn consensus_task(
142    node: &DarkfiNodePtr,
143    subscription: &Subscription<JsonNotification>,
144    ex: &ExecutorPtr,
145) -> Result<()> {
146    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
147
148    // Grab blocks subscriber
149    let block_sub = node.subscribers.get("blocks").unwrap();
150
151    // Create the garbage collection task using a dummy task
152    let gc_task = StoppableTask::new();
153    gc_task.clone().start(
154        async { Ok(()) },
155        |_| async { /* Do nothing */ },
156        Error::GarbageCollectionTaskStopped,
157        ex.clone(),
158    );
159
160    loop {
161        subscription.receive().await;
162
163        // Check if we can confirm anything and broadcast them
164        let confirmed = match node.validator.confirmation().await {
165            Ok(f) => f,
166            Err(e) => {
167                error!(
168                    target: "darkfid::task::consensus_task",
169                    "Confirmation failed: {e}"
170                );
171                continue
172            }
173        };
174
175        if confirmed.is_empty() {
176            continue
177        }
178
179        if let Err(e) = clean_blocktemplates(node).await {
180            error!(target: "darkfid", "Failed cleaning mining block templates: {e}")
181        }
182
183        let mut notif_blocks = Vec::with_capacity(confirmed.len());
184        for block in confirmed {
185            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
186        }
187        block_sub.notify(JsonValue::Array(notif_blocks)).await;
188
189        // Invoke the detached garbage collection task
190        gc_task.clone().stop().await;
191        gc_task.clone().start(
192            garbage_collect_task(node.clone()),
193            |res| async {
194                match res {
195                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
196                    Err(e) => {
197                        error!(target: "darkfid", "Failed starting garbage collection task: {e}")
198                    }
199                }
200            },
201            Error::GarbageCollectionTaskStopped,
202            ex.clone(),
203        );
204    }
205}
206
207/// Auxiliary function to drop mining block templates not referencing
208/// active forks or last confirmed block.
209async fn clean_blocktemplates(node: &DarkfiNodePtr) -> Result<()> {
210    // Grab a lock over node mining templates
211    let mut blocktemplates = node.blocktemplates.lock().await;
212    let mut mm_blocktemplates = node.mm_blocktemplates.lock().await;
213
214    // Early return if no mining block templates exist
215    if blocktemplates.is_empty() && mm_blocktemplates.is_empty() {
216        return Ok(())
217    }
218
219    // Grab a lock over node forks
220    let forks = node.validator.consensus.forks.read().await;
221
222    // Grab last confirmed block for checks
223    let (_, last_confirmed) = node.validator.blockchain.last()?;
224
225    // Loop through templates to find which can be dropped
226    let mut dropped_templates = vec![];
227    'outer: for (key, blocktemplate) in blocktemplates.iter() {
228        // Loop through all the forks
229        for fork in forks.iter() {
230            // Traverse fork proposals sequence in reverse
231            for p_hash in fork.proposals.iter().rev() {
232                // Check if job extends this fork
233                if &blocktemplate.block.header.previous == p_hash {
234                    continue 'outer
235                }
236            }
237        }
238
239        // Check if it extends last confirmed block
240        if blocktemplate.block.header.previous == last_confirmed {
241            continue
242        }
243
244        // This job doesn't reference something so we drop it
245        dropped_templates.push(key.clone());
246    }
247
248    // Drop jobs not referencing active forks or last confirmed block
249    for key in dropped_templates {
250        blocktemplates.remove(&key);
251    }
252
253    // Loop through merge mining templates to find which can be dropped
254    let mut dropped_templates = vec![];
255    'outer: for (key, (block, _, _)) in mm_blocktemplates.iter() {
256        // Loop through all the forks
257        for fork in forks.iter() {
258            // Traverse fork proposals sequence in reverse
259            for p_hash in fork.proposals.iter().rev() {
260                // Check if job extends this fork
261                if &block.header.previous == p_hash {
262                    continue 'outer
263                }
264            }
265        }
266
267        // Check if it extends last confirmed block
268        if block.header.previous == last_confirmed {
269            continue
270        }
271
272        // This job doesn't reference something so we drop it
273        dropped_templates.push(key.clone());
274    }
275
276    // Drop jobs not referencing active forks or last confirmed block
277    for key in dropped_templates {
278        mm_blocktemplates.remove(&key);
279    }
280
281    Ok(())
282}