use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
};
use log::{error, info};
use smol::{lock::Mutex, stream::StreamExt};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
use darkfi::{
async_daemonize,
blockchain::{BlockInfo, HeaderHash},
cli_desc,
net::{settings::SettingsOpt, P2pPtr},
rpc::{
client::RpcChadClient,
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{StoppableTask, StoppableTaskPtr},
util::{encoding::base64, path::expand_path},
validator::{Validator, ValidatorConfig, ValidatorPtr},
Error, Result,
};
use darkfi_sdk::crypto::PublicKey;
use darkfi_serial::deserialize_async;
#[cfg(test)]
mod tests;
mod error;
use error::{server_error, RpcError};
mod rpc;
mod rpc_blockchain;
mod rpc_tx;
mod task;
use task::{consensus_task, miner_task, sync_task};
mod proto;
mod utils;
use utils::{parse_blockchain_config, spawn_p2p};
const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
const GENESIS_BLOCK_LOCALNET: &str = include_str!("../genesis_block_localnet");
const GENESIS_BLOCK_TESTNET: &str = include_str!("../genesis_block_testnet");
const GENESIS_BLOCK_MAINNET: &str = include_str!("../genesis_block_mainnet");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "darkfid", about = cli_desc!())]
struct Args {
#[structopt(short, long)]
config: Option<String>,
#[structopt(short, long, default_value = "tcp://127.0.0.1:8340")]
rpc_listen: Url,
#[structopt(short, long, default_value = "testnet")]
network: String,
#[structopt(short, long)]
log: Option<String>,
#[structopt(short, parse(from_occurrences))]
verbose: u8,
}
#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
#[structopt()]
pub struct BlockchainNetwork {
#[structopt(long, default_value = "~/.local/darkfi/darkfid_blockchain_localnet")]
pub database: String,
#[structopt(long, default_value = "3")]
pub threshold: usize,
#[structopt(long, default_value = "tcp://127.0.0.1:28467")]
pub minerd_endpoint: Url,
#[structopt(long, default_value = "10")]
pub pow_target: usize,
#[structopt(long)]
pub pow_fixed_difficulty: Option<usize>,
#[structopt(long)]
pub miner: bool,
#[structopt(long)]
pub recipient: Option<String>,
#[structopt(long)]
pub skip_sync: bool,
#[structopt(long)]
pub skip_fees: bool,
#[structopt(long)]
pub checkpoint_height: Option<u32>,
#[structopt(long)]
pub checkpoint: Option<String>,
#[structopt(flatten)]
pub net: SettingsOpt,
}
pub struct MinerRpcCLient {
endpoint: Url,
ex: Arc<smol::Executor<'static>>,
client: RpcChadClient,
}
impl MinerRpcCLient {
pub async fn new(endpoint: Url, ex: Arc<smol::Executor<'static>>) -> Result<Self> {
let client = RpcChadClient::new(endpoint.clone(), ex.clone()).await?;
Ok(Self { endpoint, ex, client })
}
}
pub struct Darkfid {
p2p: P2pPtr,
validator: ValidatorPtr,
miner: bool,
subscribers: HashMap<&'static str, JsonSubscriber>,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
rpc_client: Option<Mutex<MinerRpcCLient>>,
}
impl Darkfid {
pub async fn new(
p2p: P2pPtr,
validator: ValidatorPtr,
miner: bool,
subscribers: HashMap<&'static str, JsonSubscriber>,
rpc_client: Option<Mutex<MinerRpcCLient>>,
) -> Self {
Self {
p2p,
validator,
miner,
subscribers,
rpc_connections: Mutex::new(HashSet::new()),
rpc_client,
}
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "darkfid", "Initializing DarkFi node...");
let (blockchain_config, genesis_block) = match args.network.as_str() {
"localnet" => {
(parse_blockchain_config(args.config, "localnet").await?, GENESIS_BLOCK_LOCALNET)
}
"testnet" => {
(parse_blockchain_config(args.config, "testnet").await?, GENESIS_BLOCK_TESTNET)
}
"mainnet" => {
(parse_blockchain_config(args.config, "mainnet").await?, GENESIS_BLOCK_MAINNET)
}
_ => {
error!("Unsupported chain `{}`", args.network);
return Err(Error::UnsupportedChain)
}
};
let bytes = base64::decode(genesis_block.trim()).unwrap();
let genesis_block: BlockInfo = deserialize_async(&bytes).await?;
let db_path = expand_path(&blockchain_config.database)?;
let sled_db = sled::open(&db_path)?;
let pow_fixed_difficulty = if let Some(diff) = blockchain_config.pow_fixed_difficulty {
info!(target: "darkfid", "Node is configured to run with fixed PoW difficulty: {}", diff);
Some(diff.into())
} else {
None
};
let config = ValidatorConfig {
finalization_threshold: blockchain_config.threshold,
pow_target: blockchain_config.pow_target,
pow_fixed_difficulty,
genesis_block,
verify_fees: !blockchain_config.skip_fees,
};
let validator = Validator::new(&sled_db, config).await?;
let mut subscribers = HashMap::new();
subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
let p2p = spawn_p2p(&blockchain_config.net.into(), &validator, &subscribers, ex.clone()).await;
let rpc_client = if blockchain_config.miner {
let Ok(rpc_client) =
MinerRpcCLient::new(blockchain_config.minerd_endpoint, ex.clone()).await
else {
error!(target: "darkfid", "Failed to initialize miner daemon rpc client, check if minerd is running");
return Err(Error::RpcClientStopped)
};
Some(Mutex::new(rpc_client))
} else {
None
};
let darkfid =
Darkfid::new(p2p.clone(), validator, blockchain_config.miner, subscribers, rpc_client)
.await;
let darkfid = Arc::new(darkfid);
info!(target: "darkfid", "Node initialized successfully!");
if blockchain_config.miner {
if let Err(e) = darkfid.ping_miner_daemon().await {
error!(target: "darkfid", "Failed to ping miner daemon: {}", e);
return Err(Error::RpcClientStopped)
}
}
info!(target: "darkfid", "Starting JSON-RPC server");
let rpc_task = StoppableTask::new();
let darkfid_ = darkfid.clone();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, darkfid.clone(), None, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => darkfid_.stop_connections().await,
Err(e) => error!(target: "darkfid", "Failed starting sync JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
ex.clone(),
);
info!(target: "darkfid", "Starting P2P network");
p2p.clone().start().await?;
if !blockchain_config.skip_sync {
if blockchain_config.checkpoint_height.is_some() && blockchain_config.checkpoint.is_none() {
return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
}
let checkpoint = if let Some(height) = blockchain_config.checkpoint_height {
Some((height, HeaderHash::from_str(&blockchain_config.checkpoint.unwrap())?))
} else {
None
};
sync_task(&darkfid, checkpoint).await?;
} else {
*darkfid.validator.synced.write().await = true;
}
info!(target: "darkfid", "Starting consensus protocol task");
let consensus_task = if blockchain_config.miner {
if blockchain_config.recipient.is_none() {
return Err(Error::ParseFailed("Recipient address missing"))
}
let recipient = match PublicKey::from_str(&blockchain_config.recipient.unwrap()) {
Ok(address) => address,
Err(_) => return Err(Error::InvalidAddress),
};
let task = StoppableTask::new();
task.clone().start(
miner_task(darkfid, recipient, blockchain_config.skip_sync, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::MinerTaskStopped) => { }
Err(e) => error!(target: "darkfid", "Failed starting miner task: {}", e),
}
},
Error::MinerTaskStopped,
ex.clone(),
);
task
} else {
let task = StoppableTask::new();
task.clone().start(
consensus_task(darkfid, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::ConsensusTaskStopped) => { }
Err(e) => error!(target: "darkfid", "Failed starting consensus task: {}", e),
}
},
Error::ConsensusTaskStopped,
ex.clone(),
);
task
};
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!(target: "darkfid", "Caught termination signal, cleaning up and exiting...");
info!(target: "darkfid", "Stopping JSON-RPC server...");
rpc_task.stop().await;
info!(target: "darkfid", "Stopping P2P network...");
p2p.stop().await;
info!(target: "darkfid", "Stopping consensus task...");
consensus_task.stop().await;
info!(target: "darkfid", "Flushing sled database...");
let flushed_bytes = sled_db.flush_async().await?;
info!(target: "darkfid", "Flushed {} bytes", flushed_bytes);
Ok(())
}