Start-Run-Stop
Now that we have initialized the network settings we can create an instance of the p2p network.
Add the following to main()
:
let p2p = net::P2p::new(settings?).await;
We will next create a Dchat
struct that will store all the data required
by dchat. For now, it will just hold a pointer to the p2p network.
struct Dchat {
p2p: net::P2pPtr,
}
impl Dchat {
fn new(p2p: net::P2pPtr) -> Self {
Self { p2p }
}
}
Now let's add a start()
function to the Dchat
implementation. start()
takes an executor and runs three p2p methods, p2p::start()
, p2p::run()
,
and p2p::stop()
.
async fn start(&mut self, ex: Arc<Executor<'_>>) -> Result<()> {
let ex2 = ex.clone();
self.p2p.clone().start(ex.clone()).await?;
ex2.spawn(self.p2p.clone().run(ex.clone())).detach();
self.p2p.stop().await;
Ok(())
}
Let's take a quick look at the underlying p2p methods we're using here.
Start
This is start():
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::p2p::start()", "P2p::start() [BEGIN]");
*self.state.lock().await = P2pState::Start;
// Start seed session
let seed = SeedSyncSession::new(Arc::downgrade(&self));
// This will block until all seed queries have finished
seed.start(executor.clone()).await?;
*self.state.lock().await = P2pState::Started;
debug!(target: "net::p2p::start()", "P2p::start() [END]");
Ok(())
}
start()
changes the P2pState
to P2pState::Start
and runs a seed
session.
This loops through the seed addresses specified in our Settings
and
tries to connect to them. The seed session either connects successfully,
fails with an error or times out.
If a seed node connects successfully, it runs a version exchange protocol, stores the channel in the p2p list of channels, and disconnects, removing the channel from the channel list.
Run
This is run():
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::p2p::run()", "P2p::run() [BEGIN]");
*self.state.lock().await = P2pState::Run;
let manual = self.session_manual().await;
for peer in &self.settings.peers {
manual.clone().connect(peer, executor.clone()).await;
}
let inbound = self.session_inbound().await;
inbound.clone().start(executor.clone()).await?;
let outbound = self.session_outbound().await;
outbound.clone().start(executor.clone()).await?;
let stop_sub = self.subscribe_stop().await;
// Wait for stop signal
stop_sub.receive().await;
// Stop the sessions
manual.stop().await;
inbound.stop().await;
outbound.stop().await;
debug!(target: "net::p2p::run()", "P2p::run() [END]");
Ok(())
}
run()
changes the P2pState to P2pState::Run
. It then calls start()
on manual, inbound and outbound sessions that are contained with the
P2p
struct. The outcome of start()
will depend on how your node is
configured. start()
will try to run each kind of session, but if the
configuration doesn't match attemping to start a session will simply
return without doing anything. For example, if you are an outbound node,
inbound.start()
will return with the following message:
info!(target: "net", "Not configured for accepting incoming connections.");
run()
then waits for a stop signal and shuts down the sessions when it
is received.
Stop
This is stop().
pub async fn stop(&self) {
self.stop_subscriber.notify(()).await
}
stop()
transmits a shutdown signal to all channels subscribed to the
stop signal and safely shuts down the network.