Registering a protocol

We've now successfully created a custom protocol. The next step is the register the protocol with the ProtocolRegistry.

We'll define a new function inside the Dchat implementation called register_protocol(). It will invoke the ProtocolRegistry using the handle to the p2p network contained in the Dchat struct. It will then call register() on the registry and pass the ProtocolDchat constructor.

    async fn register_protocol(&self, msgs: DchatMsgsBuffer) -> Result<()> {
        debug!(target: "dchat", "Dchat::register_protocol() [START]");
        let registry = self.p2p.protocol_registry();
        registry
            .register(!net::SESSION_SEED, move |channel, _p2p| {
                let msgs2 = msgs.clone();
                async move { ProtocolDchat::init(channel, msgs2).await }
            })
            .await;
        debug!(target: "dchat", "Dchat::register_protocol() [STOP]");
        Ok(())
    }

There's a lot going on here. register() takes a closure with two arguments, channel and p2p. We use move to capture these values. We then create an async closure that captures these values and the value msgs and use them to call ProtocolDchat::init() in the async block.

The code would be expressed more simply as:

registry.register(!net::SESSION_SEED, async move |channel, _p2p| {
        ProtocolDchat::init(channel, msgs).await
    })
    .await;

However we cannot do this due to limitation with async closures. So instead we wrap the async move in a move in order to capture the variables needed by ProtocolDchat::init().

Notice the use of a bitflag. We use !SESSION_SEED to specify that this protocol should be performed by all sessions aside from the seed session.

Also notice that register_protocol() requires a DchatMsgsBuffer that we send to the ProtocolDchat constructor. We'll create the DchatMsgsBuffer in main() and pass it to Dchat::new(). Let's add DchatMsgsBuffer to the Dchat struct definition first.

struct Dchat {
    p2p: net::P2pPtr,
    recv_msgs: DchatMsgsBuffer,
}

And initialize it:

#[async_std::main]
async fn main() -> Result<()> {
    //...

    let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));

    let mut dchat = Dchat::new(p2p.clone(), msgs);

    //...
    let (_, result) = Parallel::new()
        .each(0..nthreads, |_| smol::future::block_on(ex2.run(shutdown.recv())))
        .finish(|| {
            smol::future::block_on(async move {
                dchat.start(ex3).await?;
                drop(signal);
                Ok(())
            })
        });

    result
}

Finally, call register_protocol() in dchat::start():

    async fn start(&mut self, ex: Arc<Executor<'_>>) -> Result<()> {
        let ex2 = ex.clone();

        self.register_protocol(self.recv_msgs.clone()).await?;
        self.p2p.clone().start(ex.clone()).await?;
        ex2.spawn(self.p2p.clone().run(ex.clone())).detach();

        self.p2p.stop().await;

        Ok(())
    }

Now try running Alice and Bob and seeing what debug output you get. Keep an eye out for the following:

[DEBUG] (1) net: Channel::subscribe_msg() [START, command="DchatMsg", address=tcp://127.0.0.1:55555]
[DEBUG] (1) net: Channel::subscribe_msg() [END, command="DchatMsg", address=tcp://127.0.0.1:55555]
[DEBUG] (1) net: Attached ProtocolDchat

If you see that, we have successfully:

  • Implemented a custom Message and created a MessageSubscription.
  • Implemented a custom Protocol and registered it with the ProtocolRegistry.