StoppableTask
Implementing a JSON-RPC
RequestHandler
also requires that we implement
a method called connections_mut
. This introduces us to an important
darkfi
type called StoppableTask
.
StoppableTask
is a async task that can be prematurely (and safely)
stopped at any time. We've already encountered this method when we
discussed p2p.stop
, which triggers StoppableTask
to cleanly shutdown
any inbound, outbound or manual sessions which are running.
This is the basic usage of StoppableTask
:
let task = StoppableTask::new();
task.clone().start(
my_method(),
|result| self_.handle_stop(result),
Error::MyStopError,
executor,
);
Then at any time we can call task.stop
to close the task.
To make use of this, we will need to import StoppableTask
to dchatd
and add it to the Dchat
struct definition. We'll wrap it in a Mutex
to ensure thread safety.
//...
use darkfi::system::{StoppableTask, StoppableTaskPrc};
//...
struct Dchat {
p2p: net::P2pPtr,
recv_msgs: DchatMsgsBuffer,
pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
}
impl Dchat {
fn new(
p2p: net::P2pPtr,
recv_msgs: DchatMsgsBuffer,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
) -> Self {
Self { p2p, recv_msgs, rpc_connections }
}
}
We'll then add the required trait method connections_mut
to the Dchat
RequestHandler
implementation that unlocks the Mutex
, returning a
HashSet
of StoppableTaskPtr
.
async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
self.rpc_connections.lock().await
}
Next, we invoke JSON-RPC
in the main function of dchatd
, wielding
StoppableTask
to start a JSON-RPC
server and wait for a stop signal as follows:
info!("Starting JSON-RPC server on port {}", args.rpc_listen);
let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
let rpc_connections = Mutex::new(HashSet::new());
let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections));
let _ex = ex.clone();
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
ex.clone(),
);
//...
info!("Stopping JSON-RPC server");
rpc_task.stop().await;
The method stop_connections
is implemented by RequestHandler
trait. Behind the scenes it calls the connections_mut
method we
implemented above, loops through the StoppableTaskPtr
's it returns
and calls stop
on them, safely closing each JSON-RPC
connection.
Notice that when we start the StoppableTask
using
rpc.task.clone().start
, we also pass a method called listen_and_serve
.
listen_and_serve
is a method defined in DarkFi's rpc
module.
It starts a JSON-RPC server that is bound to the provided accept address
and uses our previously implemented RequestHandler
to handle incoming
requests.
The async block uses the move
keyword to takes ownership of
the accept_addr
and RequestHandler
values and pass them into
listen_and_serve
.
We have enabled JSON-RPC.