darkfid/task/
consensus.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::str::FromStr;
20
21use darkfi::{
22    blockchain::HeaderHash,
23    rpc::{jsonrpc::JsonNotification, util::JsonValue},
24    system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25    util::{encoding::base64, time::Timestamp},
26    Error, Result,
27};
28use darkfi_sdk::{
29    crypto::{FuncId, PublicKey},
30    pasta::{group::ff::PrimeField, pallas},
31};
32use darkfi_serial::serialize_async;
33use log::{error, info};
34
35use crate::{
36    task::{garbage_collect_task, miner::MinerRewardsRecipientConfig, miner_task, sync_task},
37    DarkfiNodePtr,
38};
39
40/// Auxiliary structure representing node consensus init task configuration
41#[derive(Clone)]
42pub struct ConsensusInitTaskConfig {
43    pub skip_sync: bool,
44    pub checkpoint_height: Option<u32>,
45    pub checkpoint: Option<String>,
46    pub miner: bool,
47    pub recipient: Option<String>,
48    pub spend_hook: Option<String>,
49    pub user_data: Option<String>,
50    pub bootstrap: u64,
51}
52
53/// Sync the node consensus state and start the corresponding task, based on node type.
54pub async fn consensus_init_task(
55    node: DarkfiNodePtr,
56    config: ConsensusInitTaskConfig,
57    ex: ExecutorPtr,
58) -> Result<()> {
59    // Check if network is configured to start in the future.
60    // NOTE: Always configure the network to start in the future when bootstrapping
61    // or restarting it.
62    let current = Timestamp::current_time().inner();
63    if current < config.bootstrap {
64        let diff = config.bootstrap - current;
65        info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
66        sleep(diff).await;
67    }
68
69    // Generate a new fork to be able to extend
70    info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
71    node.validator.consensus.generate_empty_fork().await?;
72
73    // Sync blockchain
74    let checkpoint = if !config.skip_sync {
75        // Parse configured checkpoint
76        if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
77            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
78        }
79
80        let checkpoint = if let Some(height) = config.checkpoint_height {
81            Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
82        } else {
83            None
84        };
85
86        sync_task(&node, checkpoint).await?;
87        checkpoint
88    } else {
89        *node.validator.synced.write().await = true;
90        None
91    };
92
93    // Grab rewards recipient public key(address) if node is a miner,
94    // along with configured spend hook and user data.
95    let recipient_config = if config.miner {
96        if config.recipient.is_none() {
97            return Err(Error::ParseFailed("Recipient address missing"))
98        }
99        let recipient = match PublicKey::from_str(config.recipient.as_ref().unwrap()) {
100            Ok(address) => address,
101            Err(_) => return Err(Error::InvalidAddress),
102        };
103
104        let spend_hook = match &config.spend_hook {
105            Some(s) => match FuncId::from_str(s) {
106                Ok(s) => Some(s),
107                Err(_) => return Err(Error::ParseFailed("Invalid spend hook")),
108            },
109            None => None,
110        };
111
112        let user_data = match &config.user_data {
113            Some(u) => {
114                let bytes: [u8; 32] = match bs58::decode(&u).into_vec()?.try_into() {
115                    Ok(b) => b,
116                    Err(_) => return Err(Error::ParseFailed("Invalid user data")),
117                };
118
119                match pallas::Base::from_repr(bytes).into() {
120                    Some(v) => Some(v),
121                    None => return Err(Error::ParseFailed("Invalid user data")),
122                }
123            }
124            None => None,
125        };
126
127        Some(MinerRewardsRecipientConfig { recipient, spend_hook, user_data })
128    } else {
129        None
130    };
131
132    // Gracefully handle network disconnections
133    loop {
134        let result = if config.miner {
135            miner_task(&node, recipient_config.as_ref().unwrap(), config.skip_sync, &ex).await
136        } else {
137            replicator_task(&node, &ex).await
138        };
139
140        match result {
141            Ok(_) => return Ok(()),
142            Err(Error::NetworkNotConnected) => {
143                // Sync node again
144                *node.validator.synced.write().await = false;
145                node.validator.consensus.purge_forks().await?;
146                if !config.skip_sync {
147                    sync_task(&node, checkpoint).await?;
148                } else {
149                    *node.validator.synced.write().await = true;
150                }
151            }
152            Err(e) => return Err(e),
153        }
154    }
155}
156
157/// Async task to start the consensus task, while monitoring for a network disconnections.
158async fn replicator_task(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
159    // Grab proposals subscriber and subscribe to it
160    let proposals_sub = node.subscribers.get("proposals").unwrap();
161    let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
162
163    // Subscribe to the network disconnect subscriber
164    let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
165
166    let result = smol::future::or(
167        monitor_network(&net_subscription),
168        consensus_task(node, &prop_subscription, ex),
169    )
170    .await;
171
172    // Terminate the subscriptions
173    prop_subscription.unsubscribe().await;
174    net_subscription.unsubscribe().await;
175
176    result
177}
178
179/// Async task to monitor network disconnections.
180async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
181    Err(subscription.receive().await)
182}
183
184/// Async task used for listening for new blocks and perform consensus.
185async fn consensus_task(
186    node: &DarkfiNodePtr,
187    subscription: &Subscription<JsonNotification>,
188    ex: &ExecutorPtr,
189) -> Result<()> {
190    info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
191
192    // Grab blocks subscriber
193    let block_sub = node.subscribers.get("blocks").unwrap();
194
195    // Create the garbage collection task using a dummy task
196    let gc_task = StoppableTask::new();
197    gc_task.clone().start(
198        async { Ok(()) },
199        |_| async { /* Do nothing */ },
200        Error::GarbageCollectionTaskStopped,
201        ex.clone(),
202    );
203
204    loop {
205        subscription.receive().await;
206
207        // Check if we can confirm anything and broadcast them
208        let confirmed = match node.validator.confirmation().await {
209            Ok(f) => f,
210            Err(e) => {
211                error!(
212                    target: "darkfid::task::consensus_task",
213                    "Confirmation failed: {e}"
214                );
215                continue
216            }
217        };
218
219        if confirmed.is_empty() {
220            continue
221        }
222
223        let mut notif_blocks = Vec::with_capacity(confirmed.len());
224        for block in confirmed {
225            notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
226        }
227        block_sub.notify(JsonValue::Array(notif_blocks)).await;
228
229        // Invoke the detached garbage collection task
230        gc_task.clone().stop().await;
231        gc_task.clone().start(
232            garbage_collect_task(node.clone()),
233            |res| async {
234                match res {
235                    Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { /* Do nothing */ }
236                    Err(e) => {
237                        error!(target: "darkfid", "Failed starting garbage collection task: {}", e)
238                    }
239                }
240            },
241            Error::GarbageCollectionTaskStopped,
242            ex.clone(),
243        );
244    }
245}