Start-Run-Stop

Now that we have initialized the network settings we can create an instance of the p2p network.

Add the following to main():

    let p2p = net::P2p::new(settings?).await;

We will next create a Dchat struct that will store all the data required by dchat. For now, it will just hold a pointer to the p2p network.

struct Dchat {
    p2p: net::P2pPtr,
}

impl Dchat {
    fn new(p2p: net::P2pPtr) -> Self {
        Self { p2p }
    }
}

Now let's add a start() function to the Dchat implementation. start() takes an executor and runs three p2p methods, p2p::start(), p2p::run(), and p2p::stop().

    async fn start(&mut self, ex: Arc<Executor<'_>>) -> Result<()> {
        let ex2 = ex.clone();

        self.p2p.clone().start(ex.clone()).await?;
        ex2.spawn(self.p2p.clone().run(ex.clone())).detach();

        self.p2p.stop().await;

        Ok(())
    }

Let's take a quick look at the underlying p2p methods we're using here.

Start

This is start():

    pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
        debug!(target: "net::p2p::start()", "P2p::start() [BEGIN]");

        *self.state.lock().await = P2pState::Start;

        // Start seed session
        let seed = SeedSyncSession::new(Arc::downgrade(&self));
        // This will block until all seed queries have finished
        seed.start(executor.clone()).await?;

        *self.state.lock().await = P2pState::Started;

        debug!(target: "net::p2p::start()", "P2p::start() [END]");
        Ok(())
    }

start() changes the P2pState to P2pState::Start and runs a seed session.

This loops through the seed addresses specified in our Settings and tries to connect to them. The seed session either connects successfully, fails with an error or times out.

If a seed node connects successfully, it runs a version exchange protocol, stores the channel in the p2p list of channels, and disconnects, removing the channel from the channel list.

Run

This is run():

    pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
        debug!(target: "net::p2p::run()", "P2p::run() [BEGIN]");

        *self.state.lock().await = P2pState::Run;

        let manual = self.session_manual().await;
        for peer in &self.settings.peers {
            manual.clone().connect(peer, executor.clone()).await;
        }

        let inbound = self.session_inbound().await;
        inbound.clone().start(executor.clone()).await?;

        let outbound = self.session_outbound().await;
        outbound.clone().start(executor.clone()).await?;

        let stop_sub = self.subscribe_stop().await;
        // Wait for stop signal
        stop_sub.receive().await;

        // Stop the sessions
        manual.stop().await;
        inbound.stop().await;
        outbound.stop().await;

        debug!(target: "net::p2p::run()", "P2p::run() [END]");
        Ok(())
    }

run() changes the P2pState to P2pState::Run. It then calls start() on manual, inbound and outbound sessions that are contained with the P2p struct. The outcome of start() will depend on how your node is configured. start() will try to run each kind of session, but if the configuration doesn't match attemping to start a session will simply return without doing anything. For example, if you are an outbound node, inbound.start() will return with the following message:

info!(target: "net", "Not configured for accepting incoming connections.");

run() then waits for a stop signal and shuts down the sessions when it is received.

Stop

This is stop().

        pub async fn stop(&self) {
        self.stop_subscriber.notify(()).await
    }

stop() transmits a shutdown signal to all channels subscribed to the stop signal and safely shuts down the network.