use std::{collections::HashSet, time::Instant};
use async_trait::async_trait;
use log::{debug, error, info};
use smol::lock::MutexGuard;
use tinyjson::JsonValue;
use darkfi::{
net::P2pPtr,
rpc::{
client::RpcChadClient,
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
p2p_method::HandlerP2p,
server::RequestHandler,
},
system::{sleep, StoppableTaskPtr},
util::time::Timestamp,
Error, Result,
};
use crate::{
error::{server_error, RpcError},
DarkfiNode,
};
#[async_trait]
#[rustfmt::skip]
impl RequestHandler for DarkfiNode {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
debug!(target: "darkfid::rpc", "--> {}", req.stringify().unwrap());
match req.method.as_str() {
"ping" => self.pong(req.id, req.params).await,
"clock" => self.clock(req.id, req.params).await,
"ping_miner" => self.ping_miner(req.id, req.params).await,
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
"blockchain.get_block" => self.blockchain_get_block(req.id, req.params).await,
"blockchain.get_tx" => self.blockchain_get_tx(req.id, req.params).await,
"blockchain.last_finalized_block" => self.blockchain_last_finalized_block(req.id, req.params).await,
"blockchain.best_fork_next_block_height" => self.blockchain_best_fork_next_block_height(req.id, req.params).await,
"blockchain.block_target" => self.blockchain_block_target(req.id, req.params).await,
"blockchain.lookup_zkas" => self.blockchain_lookup_zkas(req.id, req.params).await,
"blockchain.subscribe_blocks" => self.blockchain_subscribe_blocks(req.id, req.params).await,
"blockchain.subscribe_txs" => self.blockchain_subscribe_txs(req.id, req.params).await,
"blockchain.subscribe_proposals" => self.blockchain_subscribe_proposals(req.id, req.params).await,
"merge_mining_get_chain_id" => self.merge_mining_get_chain_id(req.id, req.params).await,
"tx.simulate" => self.tx_simulate(req.id, req.params).await,
"tx.broadcast" => self.tx_broadcast(req.id, req.params).await,
"tx.pending" => self.tx_pending(req.id, req.params).await,
"tx.clean_pending" => self.tx_pending(req.id, req.params).await,
"tx.calculate_gas" => self.tx_calculate_gas(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
self.rpc_connections.lock().await
}
}
impl DarkfiNode {
async fn clock(&self, id: u16, _params: JsonValue) -> JsonResult {
JsonResponse::new(JsonValue::String(Timestamp::current_time().inner().to_string()), id)
.into()
}
async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_bool() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let switch = params[0].get::<bool>().unwrap();
if *switch {
self.p2p_handler.p2p.dnet_enable();
} else {
self.p2p_handler.p2p.dnet_disable();
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
self.subscribers.get("dnet").unwrap().clone().into()
}
async fn ping_miner(&self, id: u16, _params: JsonValue) -> JsonResult {
if let Err(e) = self.ping_miner_daemon().await {
error!(target: "darkfid::rpc::ping_miner", "Failed to ping miner daemon: {}", e);
return server_error(RpcError::PingFailed, id, None)
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
pub async fn ping_miner_daemon(&self) -> Result<()> {
debug!(target: "darkfid::ping_miner_daemon", "Pinging miner daemon...");
self.miner_daemon_request("ping", &JsonValue::Array(vec![])).await?;
Ok(())
}
pub async fn miner_daemon_request(
&self,
method: &str,
params: &JsonValue,
) -> Result<JsonValue> {
let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
debug!(target: "darkfid::rpc::miner_daemon_request", "Executing request {} with params: {:?}", method, params);
let latency = Instant::now();
let req = JsonRequest::new(method, params.clone());
let lock = rpc_client.lock().await;
let rep = lock.client.request(req).await?;
drop(lock);
let latency = latency.elapsed();
debug!(target: "darkfid::rpc::miner_daemon_request", "Got reply: {:?}", rep);
debug!(target: "darkfid::rpc::miner_daemon_request", "Latency: {:?}", latency);
Ok(rep)
}
pub async fn miner_daemon_request_with_retry(
&self,
method: &str,
params: &JsonValue,
) -> JsonValue {
loop {
match self.miner_daemon_request(method, params).await {
Ok(v) => return v,
Err(e) => {
error!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Failed to execute miner daemon request: {}", e);
}
}
loop {
info!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Sleeping so we can retry later");
sleep(10).await;
let mut rpc_client = self.rpc_client.as_ref().unwrap().lock().await;
let Ok(client) =
RpcChadClient::new(rpc_client.endpoint.clone(), rpc_client.ex.clone()).await
else {
error!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Failed to initialize miner daemon rpc client, check if minerd is running");
drop(rpc_client);
continue
};
info!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Connection re-established!");
rpc_client.client = client;
break;
}
}
}
}
impl HandlerP2p for DarkfiNode {
fn p2p(&self) -> P2pPtr {
self.p2p_handler.p2p.clone()
}
}