StoppableTask

Implementing a JSON-RPC RequestHandler also requires that we implement a method called connections_mut. This introduces us to an important darkfi type called StoppableTask.

StoppableTask is a async task that can be prematurely (and safely) stopped at any time. We've already encountered this method when we discussed p2p.stop, which triggers StoppableTask to cleanly shutdown any inbound, outbound or manual sessions which are running.

This is the basic usage of StoppableTask:

    let task = StoppableTask::new();
    task.clone().start(
        my_method(),
        |result| self_.handle_stop(result),
        Error::MyStopError,
        executor,
    );

Then at any time we can call task.stop to close the task.

To make use of this, we will need to import StoppableTask to dchatd and add it to the Dchat struct definition. We'll wrap it in a Mutex to ensure thread safety.

//...

use darkfi::system::{StoppableTask, StoppableTaskPrc};

//...

struct Dchat {
    p2p: net::P2pPtr,
    recv_msgs: DchatMsgsBuffer,
    pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
}

impl Dchat {
    fn new(
        p2p: net::P2pPtr,
        recv_msgs: DchatMsgsBuffer,
        rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
    ) -> Self {
        Self { p2p, recv_msgs, rpc_connections }
    }
}

We'll then add the required trait method connections_mut to the Dchat RequestHandler implementation that unlocks the Mutex, returning a HashSet of StoppableTaskPtr.

    async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
        self.rpc_connections.lock().await
    }

Next, we invoke JSON-RPC in the main function of dchatd, wielding StoppableTask to start a JSON-RPC server and wait for a stop signal as follows:

    info!("Starting JSON-RPC server on port {}", args.rpc_listen);
    let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
    let rpc_connections = Mutex::new(HashSet::new());
    let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections));
    let _ex = ex.clone();

    let rpc_task = StoppableTask::new();
    rpc_task.clone().start(
        listen_and_serve(args.rpc_listen, dchat.clone(), None, ex.clone()),
        |res| async move {
            match res {
                Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
                Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
            }
        },
        Error::RpcServerStopped,
        ex.clone(),
    );

    //...

    info!("Stopping JSON-RPC server");
    rpc_task.stop().await;

The method stop_connections is implemented by RequestHandler trait. Behind the scenes it calls the connections_mut method we implemented above, loops through the StoppableTaskPtr's it returns and calls stop on them, safely closing each JSON-RPC connection.

Notice that when we start the StoppableTask using rpc.task.clone().start, we also pass a method called listen_and_serve. listen_and_serve is a method defined in DarkFi's rpc module. It starts a JSON-RPC server that is bound to the provided accept address and uses our previously implemented RequestHandler to handle incoming requests.

The async block uses the move keyword to takes ownership of the accept_addr and RequestHandler values and pass them into listen_and_serve.

We have enabled JSON-RPC.