darkfid/task/
consensus.rs1use std::str::FromStr;
20
21use darkfi::{
22 blockchain::HeaderHash,
23 rpc::{jsonrpc::JsonNotification, util::JsonValue},
24 system::{sleep, ExecutorPtr, StoppableTask, Subscription},
25 util::{encoding::base64, time::Timestamp},
26 Error, Result,
27};
28use darkfi_serial::serialize_async;
29use tracing::{error, info};
30
31use crate::{
32 task::{garbage_collect_task, sync_task},
33 DarkfiNodePtr,
34};
35
36#[derive(Clone)]
38pub struct ConsensusInitTaskConfig {
39 pub skip_sync: bool,
41 pub checkpoint_height: Option<u32>,
43 pub checkpoint: Option<String>,
45 pub bootstrap: u64,
47}
48
49pub async fn consensus_init_task(
51 node: DarkfiNodePtr,
52 config: ConsensusInitTaskConfig,
53 ex: ExecutorPtr,
54) -> Result<()> {
55 node.validator.consensus.healthcheck().await?;
59
60 let current = Timestamp::current_time().inner();
64 if current < config.bootstrap {
65 let diff = config.bootstrap - current;
66 info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
67 sleep(diff).await;
68 }
69
70 info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
72 node.validator.consensus.generate_empty_fork().await?;
73
74 let checkpoint = if !config.skip_sync {
76 if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
78 return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
79 }
80
81 let checkpoint = if let Some(height) = config.checkpoint_height {
82 Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
83 } else {
84 None
85 };
86
87 sync_task(&node, checkpoint).await?;
88 checkpoint
89 } else {
90 *node.validator.synced.write().await = true;
91 None
92 };
93
94 loop {
96 match listen_to_network(&node, &ex).await {
97 Ok(_) => return Ok(()),
98 Err(Error::NetworkNotConnected) => {
99 *node.validator.synced.write().await = false;
101 node.validator.consensus.purge_forks().await?;
102 if !config.skip_sync {
103 sync_task(&node, checkpoint).await?;
104 } else {
105 *node.validator.synced.write().await = true;
106 }
107 }
108 Err(e) => return Err(e),
109 }
110 }
111}
112
113async fn listen_to_network(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
115 let proposals_sub = node.subscribers.get("proposals").unwrap();
117 let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
118
119 let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
121
122 let result = smol::future::or(
123 monitor_network(&net_subscription),
124 consensus_task(node, &prop_subscription, ex),
125 )
126 .await;
127
128 prop_subscription.unsubscribe().await;
130 net_subscription.unsubscribe().await;
131
132 result
133}
134
135async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
137 Err(subscription.receive().await)
138}
139
140async fn consensus_task(
142 node: &DarkfiNodePtr,
143 subscription: &Subscription<JsonNotification>,
144 ex: &ExecutorPtr,
145) -> Result<()> {
146 info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
147
148 let block_sub = node.subscribers.get("blocks").unwrap();
150
151 let gc_task = StoppableTask::new();
153 gc_task.clone().start(
154 async { Ok(()) },
155 |_| async { },
156 Error::GarbageCollectionTaskStopped,
157 ex.clone(),
158 );
159
160 loop {
161 subscription.receive().await;
162
163 let confirmed = match node.validator.confirmation().await {
165 Ok(f) => f,
166 Err(e) => {
167 error!(
168 target: "darkfid::task::consensus_task",
169 "Confirmation failed: {e}"
170 );
171 continue
172 }
173 };
174
175 if confirmed.is_empty() {
176 continue
177 }
178
179 if let Err(e) = clean_blocktemplates(node).await {
180 error!(target: "darkfid", "Failed cleaning mining block templates: {e}")
181 }
182
183 let mut notif_blocks = Vec::with_capacity(confirmed.len());
184 for block in confirmed {
185 notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
186 }
187 block_sub.notify(JsonValue::Array(notif_blocks)).await;
188
189 gc_task.clone().stop().await;
191 gc_task.clone().start(
192 garbage_collect_task(node.clone()),
193 |res| async {
194 match res {
195 Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
196 Err(e) => {
197 error!(target: "darkfid", "Failed starting garbage collection task: {e}")
198 }
199 }
200 },
201 Error::GarbageCollectionTaskStopped,
202 ex.clone(),
203 );
204 }
205}
206
207async fn clean_blocktemplates(node: &DarkfiNodePtr) -> Result<()> {
210 let mut blocktemplates = node.blocktemplates.lock().await;
212 let mut mm_blocktemplates = node.mm_blocktemplates.lock().await;
213
214 if blocktemplates.is_empty() && mm_blocktemplates.is_empty() {
216 return Ok(())
217 }
218
219 let forks = node.validator.consensus.forks.read().await;
221
222 let (_, last_confirmed) = node.validator.blockchain.last()?;
224
225 let mut dropped_templates = vec![];
227 'outer: for (key, blocktemplate) in blocktemplates.iter() {
228 for fork in forks.iter() {
230 for p_hash in fork.proposals.iter().rev() {
232 if &blocktemplate.block.header.previous == p_hash {
234 continue 'outer
235 }
236 }
237 }
238
239 if blocktemplate.block.header.previous == last_confirmed {
241 continue
242 }
243
244 dropped_templates.push(key.clone());
246 }
247
248 for key in dropped_templates {
250 blocktemplates.remove(&key);
251 }
252
253 let mut dropped_templates = vec![];
255 'outer: for (key, (block, _, _)) in mm_blocktemplates.iter() {
256 for fork in forks.iter() {
258 for p_hash in fork.proposals.iter().rev() {
260 if &block.header.previous == p_hash {
262 continue 'outer
263 }
264 }
265 }
266
267 if block.header.previous == last_confirmed {
269 continue
270 }
271
272 dropped_templates.push(key.clone());
274 }
275
276 for key in dropped_templates {
278 mm_blocktemplates.remove(&key);
279 }
280
281 Ok(())
282}