use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use log::{debug, error, info};
use smol::lock::Mutex;
use url::Url;
use darkfi::{
net::settings::Settings,
rpc::{
client::RpcChadClient,
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
validator::{Validator, ValidatorConfig, ValidatorPtr},
Error, Result,
};
#[cfg(test)]
mod tests;
mod error;
use error::{server_error, RpcError};
mod rpc;
mod rpc_blockchain;
mod rpc_tx;
pub mod task;
use task::{consensus::ConsensusInitTaskConfig, consensus_init_task};
mod proto;
use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
pub struct MinerRpcClient {
endpoint: Url,
ex: ExecutorPtr,
client: RpcChadClient,
}
impl MinerRpcClient {
pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Result<Self> {
let client = RpcChadClient::new(endpoint.clone(), ex.clone()).await?;
Ok(Self { endpoint, ex, client })
}
}
pub type DarkfiNodePtr = Arc<DarkfiNode>;
pub struct DarkfiNode {
p2p_handler: DarkfidP2pHandlerPtr,
validator: ValidatorPtr,
txs_batch_size: usize,
subscribers: HashMap<&'static str, JsonSubscriber>,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
rpc_client: Option<Mutex<MinerRpcClient>>,
}
impl DarkfiNode {
pub async fn new(
p2p_handler: DarkfidP2pHandlerPtr,
validator: ValidatorPtr,
txs_batch_size: usize,
subscribers: HashMap<&'static str, JsonSubscriber>,
rpc_client: Option<Mutex<MinerRpcClient>>,
) -> DarkfiNodePtr {
Arc::new(Self {
p2p_handler,
validator,
txs_batch_size,
subscribers,
rpc_connections: Mutex::new(HashSet::new()),
rpc_client,
})
}
}
pub type DarkfidPtr = Arc<Darkfid>;
pub struct Darkfid {
node: DarkfiNodePtr,
dnet_task: StoppableTaskPtr,
rpc_task: StoppableTaskPtr,
consensus_task: StoppableTaskPtr,
}
impl Darkfid {
pub async fn init(
sled_db: &sled_overlay::sled::Db,
config: &ValidatorConfig,
net_settings: &Settings,
minerd_endpoint: &Option<Url>,
txs_batch_size: &Option<usize>,
ex: &ExecutorPtr,
) -> Result<DarkfidPtr> {
info!(target: "darkfid::Darkfid::init", "Initializing a Darkfi daemon...");
let validator = Validator::new(sled_db, config).await?;
let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
let txs_batch_size = match txs_batch_size {
Some(b) => {
if *b > 0 {
*b
} else {
50
}
}
None => 50,
};
let mut subscribers = HashMap::new();
subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));
subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
let rpc_client = match minerd_endpoint {
Some(endpoint) => {
let Ok(rpc_client) = MinerRpcClient::new(endpoint.clone(), ex.clone()).await else {
error!(target: "darkfid::Darkfid::init", "Failed to initialize miner daemon rpc client, check if minerd is running");
return Err(Error::RpcClientStopped)
};
Some(Mutex::new(rpc_client))
}
None => None,
};
let node =
DarkfiNode::new(p2p_handler, validator, txs_batch_size, subscribers, rpc_client).await;
let dnet_task = StoppableTask::new();
let rpc_task = StoppableTask::new();
let consensus_task = StoppableTask::new();
info!(target: "darkfid::Darkfid::init", "Darkfi daemon initialized successfully!");
Ok(Arc::new(Self { node, dnet_task, rpc_task, consensus_task }))
}
pub async fn start(
&self,
executor: &ExecutorPtr,
rpc_listen: &Url,
config: &ConsensusInitTaskConfig,
) -> Result<()> {
info!(target: "darkfid::Darkfid::start", "Starting Darkfi daemon...");
if self.node.rpc_client.is_some() {
if let Err(e) = self.node.ping_miner_daemon().await {
error!(target: "darkfid::Darkfid::start", "Failed to ping miner daemon: {}", e);
return Err(Error::RpcClientStopped)
}
}
info!(target: "darkfid::Darkfid::start", "Starting dnet subs task");
let dnet_sub_ = self.node.subscribers.get("dnet").unwrap().clone();
let p2p_ = self.node.p2p_handler.p2p.clone();
self.dnet_task.clone().start(
async move {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
debug!(target: "darkfid::Darkfid::dnet_task", "Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { }
Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting dnet subs task: {}", e),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
info!(target: "darkfid::Darkfid::start", "Starting JSON-RPC server");
let node_ = self.node.clone();
self.rpc_task.clone().start(
listen_and_serve(rpc_listen.clone(), self.node.clone(), None, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => node_.stop_connections().await,
Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
executor.clone(),
);
info!(target: "darkfid::Darkfid::start", "Starting P2P network");
self.node
.p2p_handler
.clone()
.start(executor, &self.node.validator, &self.node.subscribers)
.await?;
info!(target: "darkfid::Darkfid::start", "Starting consensus protocol task");
self.consensus_task.clone().start(
consensus_init_task(
self.node.clone(),
config.clone(),
executor.clone(),
),
|res| async move {
match res {
Ok(()) | Err(Error::ConsensusTaskStopped) | Err(Error::MinerTaskStopped) => { }
Err(e) => error!(target: "darkfid::Darkfid::start", "Failed starting consensus initialization task: {}", e),
}
},
Error::ConsensusTaskStopped,
executor.clone(),
);
info!(target: "darkfid::Darkfid::start", "Darkfi daemon started successfully!");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!(target: "darkfid::Darkfid::stop", "Terminating Darkfi daemon...");
info!(target: "darkfid::Darkfid::stop", "Stopping dnet subs task...");
self.dnet_task.stop().await;
info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC server...");
self.rpc_task.stop().await;
info!(target: "darkfid::Darkfid::stop", "Stopping P2P network protocols handler...");
self.node.p2p_handler.stop().await;
info!(target: "darkfid::Darkfid::stop", "Stopping consensus task...");
self.consensus_task.stop().await;
info!(target: "darkfid::Darkfid::stop", "Flushing sled database...");
let flushed_bytes = self.node.validator.blockchain.sled_db.flush_async().await?;
info!(target: "darkfid::Darkfid::stop", "Flushed {} bytes", flushed_bytes);
if let Some(ref rpc_client) = self.node.rpc_client {
info!(target: "darkfid::Darkfid::stop", "Stopping JSON-RPC client...");
rpc_client.lock().await.client.stop().await;
};
info!(target: "darkfid::Darkfid::stop", "Darkfi daemon terminated successfully!");
Ok(())
}
}