darkfid/task/
consensus.rs1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_sdk::{
29 crypto::{FuncId, PublicKey},
30 pasta::{group::ff::PrimeField, pallas},
31};
32use darkfi_serial::serialize_async;
33use log::{error, info};
34
35use crate::{
36 task::{garbage_collect_task, miner::MinerRewardsRecipientConfig, miner_task, sync_task},
37 DarkfiNodePtr,
38};
39
40#[derive(Clone)]
42pub struct ConsensusInitTaskConfig {
43 pub skip_sync: bool,
44 pub checkpoint_height: Option<u32>,
45 pub checkpoint: Option<String>,
46 pub miner: bool,
47 pub recipient: Option<String>,
48 pub spend_hook: Option<String>,
49 pub user_data: Option<String>,
50 pub bootstrap: u64,
51}
52
53pub async fn consensus_init_task(
55 node: DarkfiNodePtr,
56 config: ConsensusInitTaskConfig,
57 ex: ExecutorPtr,
58) -> Result<()> {
59 node.validator.consensus.healthcheck().await?;
63
64 let current = Timestamp::current_time().inner();
68 if current < config.bootstrap {
69 let diff = config.bootstrap - current;
70 info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
71 sleep(diff).await;
72 }
73
74 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
76 node.validator.consensus.generate_empty_fork().await?;
77
78 let checkpoint = if !config.skip_sync {
80 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
82 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
83 }
84
85 let checkpoint = if let Some(height) = config.checkpoint_height {
86 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
87 } else {
88 None
89 };
90
91 sync_task(&node, checkpoint).await?;
92 checkpoint
93 } else {
94 *node.validator.synced.write().await = true;
95 None
96 };
97
98 let recipient_config = if config.miner {
101 if config.recipient.is_none() {
102 return Err(Error::ParseFailed("Recipient address missing"))
103 }
104 let recipient = match PublicKey::from_str(config.recipient.as_ref().unwrap()) {
105 Ok(address) => address,
106 Err(_) => return Err(Error::InvalidAddress),
107 };
108
109 let spend_hook = match &config.spend_hook {
110 Some(s) => match FuncId::from_str(s) {
111 Ok(s) => Some(s),
112 Err(_) => return Err(Error::ParseFailed("Invalid spend hook")),
113 },
114 None => None,
115 };
116
117 let user_data = match &config.user_data {
118 Some(u) => {
119 let bytes: [u8; 32] = match bs58::decode(&u).into_vec()?.try_into() {
120 Ok(b) => b,
121 Err(_) => return Err(Error::ParseFailed("Invalid user data")),
122 };
123
124 match pallas::Base::from_repr(bytes).into() {
125 Some(v) => Some(v),
126 None => return Err(Error::ParseFailed("Invalid user data")),
127 }
128 }
129 None => None,
130 };
131
132 Some(MinerRewardsRecipientConfig { recipient, spend_hook, user_data })
133 } else {
134 None
135 };
136
137 loop {
139 let result = if config.miner {
140 miner_task(&node, recipient_config.as_ref().unwrap(), config.skip_sync, &ex).await
141 } else {
142 replicator_task(&node, &ex).await
143 };
144
145 match result {
146 Ok(_) => return Ok(()),
147 Err(Error::NetworkNotConnected) => {
148 *node.validator.synced.write().await = false;
150 node.validator.consensus.purge_forks().await?;
151 if !config.skip_sync {
152 sync_task(&node, checkpoint).await?;
153 } else {
154 *node.validator.synced.write().await = true;
155 }
156 }
157 Err(e) => return Err(e),
158 }
159 }
160}
161
162async fn replicator_task(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
164 let proposals_sub = node.subscribers.get("proposals").unwrap();
166 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
167
168 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
170
171 let result = smol::future::or(
172 monitor_network(&net_subscription),
173 consensus_task(node, &prop_subscription, ex),
174 )
175 .await;
176
177 prop_subscription.unsubscribe().await;
179 net_subscription.unsubscribe().await;
180
181 result
182}
183
184async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
186 Err(subscription.receive().await)
187}
188
189async fn consensus_task(
191 node: &DarkfiNodePtr,
192 subscription: &Subscription<JsonNotification>,
193 ex: &ExecutorPtr,
194) -> Result<()> {
195 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
196
197 let block_sub = node.subscribers.get("blocks").unwrap();
199
200 let gc_task = StoppableTask::new();
202 gc_task.clone().start(
203 async { Ok(()) },
204 |_| async { },
205 Error::GarbageCollectionTaskStopped,
206 ex.clone(),
207 );
208
209 loop {
210 subscription.receive().await;
211
212 let confirmed = match node.validator.confirmation().await {
214 Ok(f) => f,
215 Err(e) => {
216 error!(
217 target: "darkfid::task::consensus_task",
218 "Confirmation failed: {e}"
219 );
220 continue
221 }
222 };
223
224 if confirmed.is_empty() {
225 continue
226 }
227
228 let mut notif_blocks = Vec::with_capacity(confirmed.len());
229 for block in confirmed {
230 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
231 }
232 block_sub.notify(JsonValue::Array(notif_blocks)).await;
233
234 gc_task.clone().stop().await;
236 gc_task.clone().start(
237 garbage_collect_task(node.clone()),
238 |res| async {
239 match res {
240 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
241 Err(e) => {
242 error!(target: "darkfid", "Failed starting garbage collection task: {e}")
243 }
244 }
245 },
246 Error::GarbageCollectionTaskStopped,
247 ex.clone(),
248 );
249 }
250}