1use std::{collections::HashSet, sync::Arc};
20
21use log::{error, info};
22use smol::{
23 channel::{Receiver, Sender},
24 lock::Mutex,
25};
26
27use darkfi::{
28 rpc::{
29 server::{listen_and_serve, RequestHandler},
30 settings::RpcSettings,
31 },
32 system::{sleep, ExecutorPtr, StoppableTask, StoppableTaskPtr},
33 Error, Result,
34};
35
36mod error;
38
39mod rpc;
41
42pub type MinerNodePtr = Arc<MinerNode>;
44
45pub struct MinerNode {
47 threads: usize,
49 stop_at_height: u32,
51 sender: Sender<()>,
53 stop_signal: Receiver<()>,
55 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
57}
58
59impl MinerNode {
60 pub fn new(
61 threads: usize,
62 stop_at_height: u32,
63 sender: Sender<()>,
64 stop_signal: Receiver<()>,
65 ) -> MinerNodePtr {
66 Arc::new(Self {
67 threads,
68 stop_at_height,
69 sender,
70 stop_signal,
71 rpc_connections: Mutex::new(HashSet::new()),
72 })
73 }
74}
75
76pub type MinerdPtr = Arc<Minerd>;
78
79pub struct Minerd {
81 node: MinerNodePtr,
83 rpc_task: StoppableTaskPtr,
85}
86
87impl Minerd {
88 pub fn init(threads: usize, stop_at_height: u32) -> MinerdPtr {
93 info!(target: "minerd::Minerd::init", "Initializing a new mining daemon...");
94
95 let (sender, stop_signal) = smol::channel::bounded(1);
97
98 let node = MinerNode::new(threads, stop_at_height, sender, stop_signal);
100
101 let rpc_task = StoppableTask::new();
103
104 info!(target: "minerd::Minerd::init", "Mining daemon initialized successfully!");
105
106 Arc::new(Self { node, rpc_task })
107 }
108
109 pub fn start(&self, executor: &ExecutorPtr, rpc_settings: &RpcSettings) {
111 info!(target: "minerd::Minerd::start", "Starting mining daemon...");
112
113 let node_ = self.node.clone();
115 self.rpc_task.clone().start(
116 listen_and_serve(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
117 |res| async move {
118 match res {
119 Ok(()) | Err(Error::RpcServerStopped) => node_.stop_connections().await,
120 Err(e) => error!(target: "minerd::Minerd::start", "Failed starting JSON-RPC server: {e}"),
121 }
122 },
123 Error::RpcServerStopped,
124 executor.clone(),
125 );
126
127 info!(target: "minerd::Minerd::start", "Mining daemon started successfully!");
128 }
129
130 pub async fn stop(&self) -> Result<()> {
132 info!(target: "minerd::Minerd::stop", "Terminating mining daemon...");
133
134 info!(target: "minerd::Minerd::stop", "Stopping miner threads...");
136 if self.node.stop_signal.is_empty() {
137 self.node.sender.send(()).await?;
138 }
139 while self.node.stop_signal.receiver_count() > 1 {
140 sleep(1).await;
141 }
142
143 info!(target: "minerd::Minerd::stop", "Stopping JSON-RPC server...");
145 self.rpc_task.stop().await;
146
147 if self.node.stop_signal.is_full() {
149 self.node.stop_signal.recv().await?;
150 }
151
152 info!(target: "minerd::Minerd::stop", "Mining daemon terminated successfully!");
153 Ok(())
154 }
155}
156
157#[cfg(test)]
158use url::Url;
159
160#[test]
161fn minerd_programmatic_control() -> Result<()> {
166 let mut cfg = simplelog::ConfigBuilder::new();
168
169 if simplelog::TermLogger::init(
172 simplelog::LevelFilter::Info,
173 cfg.build(),
176 simplelog::TerminalMode::Mixed,
177 simplelog::ColorChoice::Auto,
178 )
179 .is_err()
180 {
181 log::debug!(target: "minerd_programmatic_control", "Logger initialized");
182 }
183
184 let threads = 4;
186 let rpc_settings =
187 RpcSettings { listen: Url::parse("tcp://127.0.0.1:28467")?, ..RpcSettings::default() };
188
189 let ex = Arc::new(smol::Executor::new());
191 let (signal, shutdown) = smol::channel::unbounded::<()>();
192
193 let target = darkfi::rpc::util::JsonValue::String(
195 num_bigint::BigUint::from_bytes_be(&[0xFF; 32]).to_string(),
196 );
197 let block = darkfi::rpc::util::JsonValue::String(darkfi::util::encoding::base64::encode(
198 &darkfi_serial::serialize(&darkfi::blockchain::BlockInfo::default()),
199 ));
200 let mining_job = darkfi::rpc::jsonrpc::JsonRequest::new(
201 "mine",
202 darkfi::rpc::util::JsonValue::Array(vec![target, block]),
203 );
204
205 easy_parallel::Parallel::new()
206 .each(0..threads, |_| smol::block_on(ex.run(shutdown.recv())))
207 .finish(|| {
208 smol::block_on(async {
209 let daemon = Minerd::init(threads, 0);
211
212 daemon.start(&ex, &rpc_settings);
214
215 let mut rpc_client =
217 darkfi::rpc::client::RpcClient::new(rpc_settings.listen.clone(), ex.clone())
218 .await;
219 while rpc_client.is_err() {
220 rpc_client = darkfi::rpc::client::RpcClient::new(
221 rpc_settings.listen.clone(),
222 ex.clone(),
223 )
224 .await;
225 }
226 let rpc_client = rpc_client.unwrap();
227
228 smol::future::or(
230 async {
231 let _ = rpc_client.request(mining_job).await;
232 },
233 async {
234 darkfi::system::sleep(2).await;
236 daemon.stop().await.unwrap();
237 },
238 )
239 .await;
240 rpc_client.stop().await;
241
242 daemon.start(&ex, &rpc_settings);
244
245 daemon.stop().await.unwrap();
247
248 drop(signal);
250 })
251 });
252
253 Ok(())
254}