1use std::{collections::HashSet, time::Instant};
20
21use async_trait::async_trait;
22use log::{debug, error, info, warn};
23use smol::lock::MutexGuard;
24use tinyjson::JsonValue;
25use url::Url;
26
27use darkfi::{
28 net::P2pPtr,
29 rpc::{
30 client::RpcChadClient,
31 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
32 p2p_method::HandlerP2p,
33 server::RequestHandler,
34 },
35 system::{sleep, ExecutorPtr, StoppableTaskPtr},
36 util::time::Timestamp,
37 Error, Result,
38};
39
40use crate::{
41 error::{server_error, RpcError},
42 DarkfiNode,
43};
44
45pub struct DefaultRpcHandler;
47pub struct MmRpcHandler;
49
50pub struct MinerRpcClient {
53 endpoint: Url,
54 ex: ExecutorPtr,
55 client: Option<RpcChadClient>,
56}
57
58impl MinerRpcClient {
59 pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Self {
60 let client = match RpcChadClient::new(endpoint.clone(), ex.clone()).await {
61 Ok(c) => Some(c),
62 Err(_) => {
63 warn!(target: "darkfid::Darkfid::init", "Failed to initialize miner daemon rpc client, will try later");
64 None
65 }
66 };
67 Self { endpoint, ex, client }
68 }
69
70 pub async fn stop(&self) {
72 if let Some(ref client) = self.client {
73 client.stop().await
74 }
75 }
76}
77
78#[async_trait]
79#[rustfmt::skip]
80impl RequestHandler<DefaultRpcHandler> for DarkfiNode {
81 async fn handle_request(&self, req: JsonRequest) -> JsonResult {
82 debug!(target: "darkfid::rpc", "--> {}", req.stringify().unwrap());
83
84 match req.method.as_str() {
85 "ping" => <DarkfiNode as RequestHandler<DefaultRpcHandler>>::pong(self, req.id, req.params).await,
89 "clock" => self.clock(req.id, req.params).await,
90 "ping_miner" => self.ping_miner(req.id, req.params).await,
91 "dnet.switch" => self.dnet_switch(req.id, req.params).await,
92 "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
93 "p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
94
95 "blockchain.get_block" => self.blockchain_get_block(req.id, req.params).await,
99 "blockchain.get_tx" => self.blockchain_get_tx(req.id, req.params).await,
100 "blockchain.last_confirmed_block" => self.blockchain_last_confirmed_block(req.id, req.params).await,
101 "blockchain.best_fork_next_block_height" => self.blockchain_best_fork_next_block_height(req.id, req.params).await,
102 "blockchain.block_target" => self.blockchain_block_target(req.id, req.params).await,
103 "blockchain.lookup_zkas" => self.blockchain_lookup_zkas(req.id, req.params).await,
104 "blockchain.get_contract_state" => self.blockchain_get_contract_state(req.id, req.params).await,
105 "blockchain.get_contract_state_key" => self.blockchain_get_contract_state_key(req.id, req.params).await,
106 "blockchain.subscribe_blocks" => self.blockchain_subscribe_blocks(req.id, req.params).await,
107 "blockchain.subscribe_txs" => self.blockchain_subscribe_txs(req.id, req.params).await,
108 "blockchain.subscribe_proposals" => self.blockchain_subscribe_proposals(req.id, req.params).await,
109
110 "tx.simulate" => self.tx_simulate(req.id, req.params).await,
114 "tx.broadcast" => self.tx_broadcast(req.id, req.params).await,
115 "tx.pending" => self.tx_pending(req.id, req.params).await,
116 "tx.clean_pending" => self.tx_pending(req.id, req.params).await,
117 "tx.calculate_fee" => self.tx_calculate_fee(req.id, req.params).await,
118
119 _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
123 }
124 }
125
126 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
127 self.rpc_connections.lock().await
128 }
129}
130
131#[async_trait]
132#[rustfmt::skip]
133impl RequestHandler<MmRpcHandler> for DarkfiNode {
134 async fn handle_request(&self, req: JsonRequest) -> JsonResult {
135 debug!(target: "darkfid::mm_rpc", "--> {}", req.stringify().unwrap());
136
137 match req.method.as_str() {
138 "merge_mining_get_chain_id" => self.xmr_merge_mining_get_chain_id(req.id, req.params).await,
142
143 _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
147 }
148 }
149
150 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
151 self.mm_rpc_connections.lock().await
152 }
153}
154
155impl DarkfiNode {
156 async fn clock(&self, id: u16, _params: JsonValue) -> JsonResult {
162 JsonResponse::new(JsonValue::String(Timestamp::current_time().inner().to_string()), id)
163 .into()
164 }
165
166 async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
174 let params = params.get::<Vec<JsonValue>>().unwrap();
175 if params.len() != 1 || !params[0].is_bool() {
176 return JsonError::new(ErrorCode::InvalidParams, None, id).into()
177 }
178
179 let switch = params[0].get::<bool>().unwrap();
180
181 if *switch {
182 self.p2p_handler.p2p.dnet_enable();
183 } else {
184 self.p2p_handler.p2p.dnet_disable();
185 }
186
187 JsonResponse::new(JsonValue::Boolean(true), id).into()
188 }
189
190 pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
198 let params = params.get::<Vec<JsonValue>>().unwrap();
199 if !params.is_empty() {
200 return JsonError::new(ErrorCode::InvalidParams, None, id).into()
201 }
202
203 self.subscribers.get("dnet").unwrap().clone().into()
204 }
205
206 async fn ping_miner(&self, id: u16, _params: JsonValue) -> JsonResult {
213 if let Err(e) = self.ping_miner_daemon().await {
214 error!(target: "darkfid::rpc::ping_miner", "Failed to ping miner daemon: {}", e);
215 return server_error(RpcError::PingFailed, id, None)
216 }
217 JsonResponse::new(JsonValue::Boolean(true), id).into()
218 }
219
220 pub async fn ping_miner_daemon(&self) -> Result<()> {
222 debug!(target: "darkfid::ping_miner_daemon", "Pinging miner daemon...");
223 self.miner_daemon_request("ping", &JsonValue::Array(vec![])).await?;
224 Ok(())
225 }
226
227 pub async fn miner_daemon_request(
229 &self,
230 method: &str,
231 params: &JsonValue,
232 ) -> Result<JsonValue> {
233 let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
234 debug!(target: "darkfid::rpc::miner_daemon_request", "Executing request {} with params: {:?}", method, params);
235 let latency = Instant::now();
236 let req = JsonRequest::new(method, params.clone());
237 let lock = rpc_client.lock().await;
238 let Some(ref client) = lock.client else { return Err(Error::RpcClientStopped) };
239 let rep = client.request(req).await?;
240 drop(lock);
241 let latency = latency.elapsed();
242 debug!(target: "darkfid::rpc::miner_daemon_request", "Got reply: {:?}", rep);
243 debug!(target: "darkfid::rpc::miner_daemon_request", "Latency: {:?}", latency);
244 Ok(rep)
245 }
246
247 pub async fn miner_daemon_request_with_retry(
250 &self,
251 method: &str,
252 params: &JsonValue,
253 ) -> JsonValue {
254 loop {
255 match self.miner_daemon_request(method, params).await {
257 Ok(v) => return v,
258 Err(e) => {
259 error!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Failed to execute miner daemon request: {}", e);
260 }
261 }
262 loop {
263 info!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Sleeping so we can retry later");
265 sleep(10).await;
266 let mut rpc_client = self.rpc_client.as_ref().unwrap().lock().await;
268 let Ok(client) =
269 RpcChadClient::new(rpc_client.endpoint.clone(), rpc_client.ex.clone()).await
270 else {
271 error!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Failed to initialize miner daemon rpc client, check if minerd is running");
272 drop(rpc_client);
273 continue
274 };
275 info!(target: "darkfid::rpc::miner_daemon_request_with_retry", "Connection re-established!");
276 rpc_client.client = Some(client);
278 break;
279 }
280 }
281 }
282}
283
284impl HandlerP2p for DarkfiNode {
285 fn p2p(&self) -> P2pPtr {
286 self.p2p_handler.p2p.clone()
287 }
288}