1use std::{collections::HashSet, sync::Arc};
20
21use log::{error, info};
22use smol::{
23 channel::{Receiver, Sender},
24 lock::Mutex,
25};
26
27use darkfi::{
28 rpc::{
29 server::{listen_and_serve, RequestHandler},
30 settings::RpcSettings,
31 },
32 system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
33 Error, Result,
34};
35
36mod error;
38
39mod rpc;
41
42pub type MinerNodePtr = Arc<MinerNode>;
44
45pub struct MinerNode {
47 threads: usize,
49 stop_at_height: u32,
51 sender: Sender<()>,
53 stop_signal: Receiver<()>,
55 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
57}
58
59impl MinerNode {
60 pub fn new(
61 threads: usize,
62 stop_at_height: u32,
63 sender: Sender<()>,
64 stop_signal: Receiver<()>,
65 ) -> MinerNodePtr {
66 Arc::new(Self {
67 threads,
68 stop_at_height,
69 sender,
70 stop_signal,
71 rpc_connections: Mutex::new(HashSet::new()),
72 })
73 }
74}
75
76pub type MinerdPtr = Arc<Minerd>;
78
79pub struct Minerd {
81 node: MinerNodePtr,
83 rpc_task: StoppableTaskPtr,
85}
86
87impl Minerd {
88 pub fn init(threads: usize, stop_at_height: u32) -> MinerdPtr {
93 info!(target: "minerd::Minerd::init", "Initializing a new mining daemon...");
94
95 let (sender, stop_signal) = smol::channel::bounded(1);
97
98 let node = MinerNode::new(threads, stop_at_height, sender, stop_signal);
100
101 let rpc_task = StoppableTask::new();
103
104 info!(target: "minerd::Minerd::init", "Mining daemon initialized successfully!");
105
106 Arc::new(Self { node, rpc_task })
107 }
108
109 pub fn start(&self, executor: &ExecutorPtr, rpc_settings: &RpcSettings) {
111 info!(target: "minerd::Minerd::start", "Starting mining daemon...");
112
113 let node_ = self.node.clone();
115 self.rpc_task.clone().start(
116 listen_and_serve(rpc_settings.clone(), self.node.clone(), None, executor.clone()),
117 |res| async move {
118 match res {
119 Ok(()) | Err(Error::RpcServerStopped) => node_.stop_connections().await,
120 Err(e) => error!(target: "minerd::Minerd::start", "Failed starting JSON-RPC server: {}", e),
121 }
122 },
123 Error::RpcServerStopped,
124 executor.clone(),
125 );
126
127 info!(target: "minerd::Minerd::start", "Mining daemon started successfully!");
128 }
129
130 pub async fn stop(&self) -> Result<()> {
132 info!(target: "minerd::Minerd::stop", "Terminating mining daemon...");
133
134 info!(target: "minerd::Minerd::stop", "Stopping miner threads...");
136 self.node.sender.send(()).await?;
137
138 info!(target: "minerd::Minerd::stop", "Stopping JSON-RPC server...");
140 self.rpc_task.stop().await;
141
142 if self.node.stop_signal.is_full() {
144 self.node.stop_signal.recv().await?;
145 }
146
147 info!(target: "minerd::Minerd::stop", "Mining daemon terminated successfully!");
148 Ok(())
149 }
150}
151
152#[cfg(test)]
153use url::Url;
154
155#[test]
156fn minerd_programmatic_control() -> Result<()> {
161 let mut cfg = simplelog::ConfigBuilder::new();
163
164 if simplelog::TermLogger::init(
167 simplelog::LevelFilter::Info,
168 cfg.build(),
171 simplelog::TerminalMode::Mixed,
172 simplelog::ColorChoice::Auto,
173 )
174 .is_err()
175 {
176 log::debug!(target: "minerd_programmatic_control", "Logger initialized");
177 }
178
179 let threads = 4;
181 let rpc_settings =
182 RpcSettings { listen: Url::parse("tcp://127.0.0.1:28467")?, ..RpcSettings::default() };
183
184 let ex = Arc::new(smol::Executor::new());
186 let (signal, shutdown) = smol::channel::unbounded::<()>();
187
188 let target = darkfi::rpc::util::JsonValue::String(
190 num_bigint::BigUint::from_bytes_be(&[0xFF; 32]).to_string(),
191 );
192 let block = darkfi::rpc::util::JsonValue::String(darkfi::util::encoding::base64::encode(
193 &darkfi_serial::serialize(&darkfi::blockchain::BlockInfo::default()),
194 ));
195 let mining_job = darkfi::rpc::jsonrpc::JsonRequest::new(
196 "mine",
197 darkfi::rpc::util::JsonValue::Array(vec![target, block]),
198 );
199
200 easy_parallel::Parallel::new()
201 .each(0..threads, |_| smol::block_on(ex.run(shutdown.recv())))
202 .finish(|| {
203 smol::block_on(async {
204 let daemon = Minerd::init(threads, 0);
206
207 daemon.start(&ex, &rpc_settings);
209
210 let mut rpc_client =
212 darkfi::rpc::client::RpcClient::new(rpc_settings.listen.clone(), ex.clone())
213 .await;
214 while rpc_client.is_err() {
215 rpc_client = darkfi::rpc::client::RpcClient::new(
216 rpc_settings.listen.clone(),
217 ex.clone(),
218 )
219 .await;
220 }
221 let rpc_client = rpc_client.unwrap();
222
223 smol::future::or(
225 async {
226 rpc_client.request(mining_job).await.unwrap();
227 },
228 async {
229 darkfi::system::sleep(2).await;
231 daemon.stop().await.unwrap();
232 },
233 )
234 .await;
235 rpc_client.stop().await;
236
237 daemon.start(&ex, &rpc_settings);
239
240 daemon.stop().await.unwrap();
242
243 drop(signal);
245 })
246 });
247
248 Ok(())
249}