darkfid/task/
consensus.rs
1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_sdk::{
29 crypto::{FuncId, PublicKey},
30 pasta::{group::ff::PrimeField, pallas},
31};
32use darkfi_serial::serialize_async;
33use log::{error, info};
34
35use crate::{
36 task::{garbage_collect_task, miner::MinerRewardsRecipientConfig, miner_task, sync_task},
37 DarkfiNodePtr,
38};
39
40#[derive(Clone)]
42pub struct ConsensusInitTaskConfig {
43 pub skip_sync: bool,
44 pub checkpoint_height: Option<u32>,
45 pub checkpoint: Option<String>,
46 pub miner: bool,
47 pub recipient: Option<String>,
48 pub spend_hook: Option<String>,
49 pub user_data: Option<String>,
50 pub bootstrap: u64,
51}
52
53pub async fn consensus_init_task(
55 node: DarkfiNodePtr,
56 config: ConsensusInitTaskConfig,
57 ex: ExecutorPtr,
58) -> Result<()> {
59 let current = Timestamp::current_time().inner();
63 if current < config.bootstrap {
64 let diff = config.bootstrap - current;
65 info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
66 sleep(diff).await;
67 }
68
69 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
71 node.validator.consensus.generate_empty_fork().await?;
72
73 let checkpoint = if !config.skip_sync {
75 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
77 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
78 }
79
80 let checkpoint = if let Some(height) = config.checkpoint_height {
81 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
82 } else {
83 None
84 };
85
86 sync_task(&node, checkpoint).await?;
87 checkpoint
88 } else {
89 *node.validator.synced.write().await = true;
90 None
91 };
92
93 let recipient_config = if config.miner {
96 if config.recipient.is_none() {
97 return Err(Error::ParseFailed("Recipient address missing"))
98 }
99 let recipient = match PublicKey::from_str(config.recipient.as_ref().unwrap()) {
100 Ok(address) => address,
101 Err(_) => return Err(Error::InvalidAddress),
102 };
103
104 let spend_hook = match &config.spend_hook {
105 Some(s) => match FuncId::from_str(s) {
106 Ok(s) => Some(s),
107 Err(_) => return Err(Error::ParseFailed("Invalid spend hook")),
108 },
109 None => None,
110 };
111
112 let user_data = match &config.user_data {
113 Some(u) => {
114 let bytes: [u8; 32] = match bs58::decode(&u).into_vec()?.try_into() {
115 Ok(b) => b,
116 Err(_) => return Err(Error::ParseFailed("Invalid user data")),
117 };
118
119 match pallas::Base::from_repr(bytes).into() {
120 Some(v) => Some(v),
121 None => return Err(Error::ParseFailed("Invalid user data")),
122 }
123 }
124 None => None,
125 };
126
127 Some(MinerRewardsRecipientConfig { recipient, spend_hook, user_data })
128 } else {
129 None
130 };
131
132 loop {
134 let result = if config.miner {
135 miner_task(&node, recipient_config.as_ref().unwrap(), config.skip_sync, &ex).await
136 } else {
137 replicator_task(&node, &ex).await
138 };
139
140 match result {
141 Ok(_) => return Ok(()),
142 Err(Error::NetworkNotConnected) => {
143 *node.validator.synced.write().await = false;
145 node.validator.consensus.purge_forks().await?;
146 if !config.skip_sync {
147 sync_task(&node, checkpoint).await?;
148 } else {
149 *node.validator.synced.write().await = true;
150 }
151 }
152 Err(e) => return Err(e),
153 }
154 }
155}
156
157async fn replicator_task(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
159 let proposals_sub = node.subscribers.get("proposals").unwrap();
161 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
162
163 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
165
166 let result = smol::future::or(
167 monitor_network(&net_subscription),
168 consensus_task(node, &prop_subscription, ex),
169 )
170 .await;
171
172 prop_subscription.unsubscribe().await;
174 net_subscription.unsubscribe().await;
175
176 result
177}
178
179async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
181 Err(subscription.receive().await)
182}
183
184async fn consensus_task(
186 node: &DarkfiNodePtr,
187 subscription: &Subscription<JsonNotification>,
188 ex: &ExecutorPtr,
189) -> Result<()> {
190 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
191
192 let block_sub = node.subscribers.get("blocks").unwrap();
194
195 let gc_task = StoppableTask::new();
197 gc_task.clone().start(
198 async { Ok(()) },
199 |_| async { },
200 Error::GarbageCollectionTaskStopped,
201 ex.clone(),
202 );
203
204 loop {
205 subscription.receive().await;
206
207 let confirmed = match node.validator.confirmation().await {
209 Ok(f) => f,
210 Err(e) => {
211 error!(
212 target: "darkfid::task::consensus_task",
213 "Confirmation failed: {e}"
214 );
215 continue
216 }
217 };
218
219 if confirmed.is_empty() {
220 continue
221 }
222
223 let mut notif_blocks = Vec::with_capacity(confirmed.len());
224 for block in confirmed {
225 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
226 }
227 block_sub.notify(JsonValue::Array(notif_blocks)).await;
228
229 gc_task.clone().stop().await;
231 gc_task.clone().start(
232 garbage_collect_task(node.clone()),
233 |res| async {
234 match res {
235 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
236 Err(e) => {
237 error!(target: "darkfid", "Failed starting garbage collection task: {}", e)
238 }
239 }
240 },
241 Error::GarbageCollectionTaskStopped,
242 ex.clone(),
243 );
244 }
245}