use std::str::FromStr;
use darkfi::{
blockchain::HeaderHash,
rpc::{jsonrpc::JsonNotification, util::JsonValue},
system::{sleep, ExecutorPtr, StoppableTask, Subscription},
util::{encoding::base64, time::Timestamp},
Error, Result,
};
use darkfi_sdk::{
crypto::{FuncId, PublicKey},
pasta::{group::ff::PrimeField, pallas},
};
use darkfi_serial::serialize_async;
use log::{error, info};
use crate::{
task::{garbage_collect_task, miner::MinerRewardsRecipientConfig, miner_task, sync_task},
DarkfiNodePtr,
};
#[derive(Clone)]
pub struct ConsensusInitTaskConfig {
pub skip_sync: bool,
pub checkpoint_height: Option<u32>,
pub checkpoint: Option<String>,
pub miner: bool,
pub recipient: Option<String>,
pub spend_hook: Option<String>,
pub user_data: Option<String>,
pub bootstrap: u64,
}
pub async fn consensus_init_task(
node: DarkfiNodePtr,
config: ConsensusInitTaskConfig,
ex: ExecutorPtr,
) -> Result<()> {
let current = Timestamp::current_time().inner();
if current < config.bootstrap {
let diff = config.bootstrap - current;
info!(target: "darkfid::task::consensus_init_task", "Waiting for network bootstrap: {diff} seconds");
sleep(diff).await;
}
info!(target: "darkfid::task::consensus_init_task", "Generating new empty fork...");
node.validator.consensus.generate_empty_fork().await?;
let checkpoint = if !config.skip_sync {
if config.checkpoint_height.is_some() && config.checkpoint.is_none() {
return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
}
let checkpoint = if let Some(height) = config.checkpoint_height {
Some((height, HeaderHash::from_str(config.checkpoint.as_ref().unwrap())?))
} else {
None
};
sync_task(&node, checkpoint).await?;
checkpoint
} else {
*node.validator.synced.write().await = true;
None
};
let recipient_config = if config.miner {
if config.recipient.is_none() {
return Err(Error::ParseFailed("Recipient address missing"))
}
let recipient = match PublicKey::from_str(config.recipient.as_ref().unwrap()) {
Ok(address) => address,
Err(_) => return Err(Error::InvalidAddress),
};
let spend_hook = match &config.spend_hook {
Some(s) => match FuncId::from_str(s) {
Ok(s) => Some(s),
Err(_) => return Err(Error::ParseFailed("Invalid spend hook")),
},
None => None,
};
let user_data = match &config.user_data {
Some(u) => {
let bytes: [u8; 32] = match bs58::decode(&u).into_vec()?.try_into() {
Ok(b) => b,
Err(_) => return Err(Error::ParseFailed("Invalid user data")),
};
match pallas::Base::from_repr(bytes).into() {
Some(v) => Some(v),
None => return Err(Error::ParseFailed("Invalid user data")),
}
}
None => None,
};
Some(MinerRewardsRecipientConfig { recipient, spend_hook, user_data })
} else {
None
};
loop {
let result = if config.miner {
miner_task(&node, recipient_config.as_ref().unwrap(), config.skip_sync, &ex).await
} else {
replicator_task(&node, &ex).await
};
match result {
Ok(_) => return Ok(()),
Err(Error::NetworkNotConnected) => {
*node.validator.synced.write().await = false;
node.validator.consensus.purge_forks().await?;
if !config.skip_sync {
sync_task(&node, checkpoint).await?;
} else {
*node.validator.synced.write().await = true;
}
}
Err(e) => return Err(e),
}
}
}
async fn replicator_task(node: &DarkfiNodePtr, ex: &ExecutorPtr) -> Result<()> {
let proposals_sub = node.subscribers.get("proposals").unwrap();
let prop_subscription = proposals_sub.publisher.clone().subscribe().await;
let net_subscription = node.p2p_handler.p2p.hosts().subscribe_disconnect().await;
let result = smol::future::or(
monitor_network(&net_subscription),
consensus_task(node, &prop_subscription, ex),
)
.await;
prop_subscription.unsubscribe().await;
net_subscription.unsubscribe().await;
result
}
async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> {
Err(subscription.receive().await)
}
async fn consensus_task(
node: &DarkfiNodePtr,
subscription: &Subscription<JsonNotification>,
ex: &ExecutorPtr,
) -> Result<()> {
info!(target: "darkfid::task::consensus_task", "Starting consensus task...");
let block_sub = node.subscribers.get("blocks").unwrap();
let gc_task = StoppableTask::new();
gc_task.clone().start(
async { Ok(()) },
|_| async { },
Error::GarbageCollectionTaskStopped,
ex.clone(),
);
loop {
subscription.receive().await;
let finalized = match node.validator.finalization().await {
Ok(f) => f,
Err(e) => {
error!(
target: "darkfid::task::consensus_task",
"Finalization failed: {e}"
);
continue
}
};
if finalized.is_empty() {
continue
}
let mut notif_blocks = Vec::with_capacity(finalized.len());
for block in finalized {
notif_blocks.push(JsonValue::String(base64::encode(&serialize_async(&block).await)));
}
block_sub.notify(JsonValue::Array(notif_blocks)).await;
gc_task.clone().stop().await;
gc_task.clone().start(
garbage_collect_task(node.clone()),
|res| async {
match res {
Ok(()) | Err(Error::GarbageCollectionTaskStopped) => { }
Err(e) => {
error!(target: "darkfid", "Failed starting garbage collection task: {}", e)
}
}
},
Error::GarbageCollectionTaskStopped,
ex.clone(),
);
}
}