darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_sdk::{
29    crypto::{FuncId, PublicKey},
30    pasta::{group::ff::PrimeField, pallas},
31};
32use darkfi_serial::serialize_async;
33use log::{error, info};
34
35use crate::{
36    task::{garbage_collect_task, miner::MinerRewardsRecipientConfig, miner_task, sync_task},
37    DarkfiNodePtr,
38};
39
40/// Auxiliary structure representing node consensus init task configuration
41#[derive(Clone)]
42pub struct ConsensusInitTaskConfig {
43    pub skip_sync: bool,
44    pub checkpoint_height: Option<u32>,
45    pub checkpoint: Option<String>,
46    pub miner: bool,
47    pub recipient: Option<String>,
48    pub spend_hook: Option<String>,
49    pub user_data: Option<String>,
50    pub bootstrap: u64,
51}
52
53/// Sync the node consensus state and start the corresponding task, based on node type.
54pub async fn consensus_init_task(
55    node: DarkfiNodePtr,
56    config: ConsensusInitTaskConfig,
57    ex: ExecutorPtr,
58) -> Result<()> {
59    // Check current canonical blockchain for curruption
60    // TODO: create a restore method reverting each block backwards
61    //       until its healthy again
62    node.validator.consensus.healthcheck().await?;
63
64    // Check if network is configured to start in the future.
65    // NOTE: Always configure the network to start in the future when bootstrapping
66    // or restarting it.
67    let current = Timestamp::current_time().inner();
68    if current < config.bootstrap {
69        let diff = config.bootstrap - current;
70        info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
71        sleep(diff).await;
72    }
73
74    // Generate a new fork to be able to extend
75    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
76    node.validator.consensus.generate_empty_fork().await?;
77
78    // Sync blockchain
79    let checkpoint = if !config.skip_sync {
80        // Parse configured checkpoint
81        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
82            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
83        }
84
85        let checkpoint = if let Some(height) = config.checkpoint_height {
86            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
87        } else {
88            None
89        };
90
91        sync_task(&node, checkpoint).await?;
92        checkpoint
93    } else {
94        *node.validator.synced.write().await = true;
95        None
96    };
97
98    // Grab rewards recipient public key(address) if node is a miner,
99    // along with configured spend hook and user data.
100    let recipient_config = if config.miner {
101        if config.recipient.is_none() {
102            return Err(Error::ParseFailed("Recipient address missing"))
103        }
104        let recipient = match PublicKey::from_str(config.recipient.as_ref().unwrap()) {
105            Ok(address) => address,
106            Err(_) => return Err(Error::InvalidAddress),
107        };
108
109        let spend_hook = match &config.spend_hook {
110            Some(s) => match FuncId::from_str(s) {
111                Ok(s) => Some(s),
112                Err(_) => return Err(Error::ParseFailed("Invalid spend hook")),
113            },
114            None => None,
115        };
116
117        let user_data = match &config.user_data {
118            Some(u) => {
119                let bytes: [u8; 32] = match bs58::decode(&u).into_vec()?.try_into() {
120                    Ok(b) => b,
121                    Err(_) => return Err(Error::ParseFailed("Invalid user data")),
122                };
123
124                match pallas::Base::from_repr(bytes).into() {
125                    Some(v) => Some(v),
126                    None => return Err(Error::ParseFailed("Invalid user data")),
127                }
128            }
129            None => None,
130        };
131
132        Some(MinerRewardsRecipientConfig { recipient, spend_hook, user_data })
133    } else {
134        None
135    };
136
137    // Gracefully handle network disconnections
138    loop {
139        let result = if config.miner {
140            miner_task(&node, recipient_config.as_ref().unwrap(), config.skip_sync, &ex).await
141        } else {
142            replicator_task(&node, &ex).await
143        };
144
145        match result {
146            Ok(_) => return Ok(()),
147            Err(Error::NetworkNotConnected) => {
148                // Sync node again
149                *node.validator.synced.write().await = false;
150                node.validator.consensus.purge_forks().await?;
151                if !config.skip_sync {
152                    sync_task(&node, checkpoint).await?;
153                } else {
154                    *node.validator.synced.write().await = true;
155                }
156            }
157            Err(e) => return Err(e),
158        }
159    }
160}
161
162/// Async task to start the consensus task, while monitoring for a network disconnections.
163async fn replicator_task(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
164    // Grab proposals subscriber and subscribe to it
165    let proposals_sub = node.subscribers.get("proposals").unwrap();
166    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
167
168    // Subscribe to the network disconnect subscriber
169    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
170
171    let result = smol::future::or(
172        monitor_network(&net_subscription),
173        consensus_task(node, &prop_subscription, ex),
174    )
175    .await;
176
177    // Terminate the subscriptions
178    prop_subscription.unsubscribe().await;
179    net_subscription.unsubscribe().await;
180
181    result
182}
183
184/// Async task to monitor network disconnections.
185async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
186    Err(subscription.receive().await)
187}
188
189/// Async task used for listening for new blocks and perform consensus.
190async fn consensus_task(
191    node: &DarkfiNodePtr,
192    subscription: &Subscription<JsonNotification>,
193    ex: &ExecutorPtr,
194) -> Result<()> {
195    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
196
197    // Grab blocks subscriber
198    let block_sub = node.subscribers.get("blocks").unwrap();
199
200    // Create the garbage collection task using a dummy task
201    let gc_task = StoppableTask::new();
202    gc_task.clone().start(
203        async { Ok(()) },
204        |_| async { /* Do nothing */ },
205        Error::GarbageCollectionTaskStopped,
206        ex.clone(),
207    );
208
209    loop {
210        subscription.receive().await;
211
212        // Check if we can confirm anything and broadcast them
213        let confirmed = match node.validator.confirmation().await {
214            Ok(f) => f,
215            Err(e) => {
216                error!(
217                    target: "darkfid::task::consensus_task",
218                    "Confirmation failed: {e}"
219                );
220                continue
221            }
222        };
223
224        if confirmed.is_empty() {
225            continue
226        }
227
228        let mut notif_blocks = Vec::with_capacity(confirmed.len());
229        for block in confirmed {
230            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
231        }
232        block_sub.notify(JsonValue::Array(notif_blocks)).await;
233
234        // Invoke the detached garbage collection task
235        gc_task.clone().stop().await;
236        gc_task.clone().start(
237            garbage_collect_task(node.clone()),
238            |res| async {
239                match res {
240                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
241                    Err(e) => {
242                        error!(target: "darkfid", "Failed starting garbage collection task: {e}")
243                    }
244                }
245            },
246            Error::GarbageCollectionTaskStopped,
247            ex.clone(),
248        );
249    }
250}