Services

Nodes and applications are composed out of services. These are long running components that may communicate with each other.

The standard signature for a service is of the form:

use darkfi::ExecutorPtr;

pub struct Service {
    // ...
}

impl Service {
    pub fn new(/* ... */, executor: ExecutorPtr) -> Arc<Self> {
        // ...
    }

    pub async fn start(self: Arc<Self>) {
        // ...
    }

    pub async fn stop(&self) {
    }
}

Both start() and stop() should return immediately without blocking the caller. Any long running tasks they need to perform should be done using StoppableTask (see below).

StoppableTask

Services will likely want to start any number of processes. For that you can use StoppableTask.

For example ManualSession looks like this:

pub struct ManualSession {
    p2p: Weak<P2p>,
    connect_slots: Mutex<Vec<StoppableTaskPtr>>,
    // ...
}

impl ManualSession {
    pub fn new(p2p: Weak<P2p>) -> ManualSessionPtr {
        Arc::new(Self {
            p2p,
            connect_slots: Mutex::new(Vec::new()),
            // ...
        })
    }

    pub async fn connect(self: Arc<Self>, addr: Url) {
        let ex = self.p2p().executor();
        let task = StoppableTask::new();

        task.clone().start(
            self.clone().channel_connect_loop(addr),
            // Ignore stop handler
            |_| async {},
            Error::NetworkServiceStopped,
            ex,
        );

        self.connect_slots.lock().await.push(task);
    }

    pub async fn stop(&self) {
        let connect_slots = &*self.connect_slots.lock().await;

        for slot in connect_slots {
            slot.stop().await;
        }
    }
    
    // ...
}

The method in start() is a future that returns Result<()>. If you do not want to return a result (for example with long running processes), then simply use the future:

    async {
        foo().await;
        unreachable!()
    }

Communicating Between Services

Another tool in our toolbox is the subscribe()/notify() paradigm.

We can use system::Subscriber. Then inside our method we can define a method like so:

    pub async fn subscribe_stop(&self) -> Result<Subscription<Error>> {
        let sub = self.stop_subscriber.clone().subscribe().await;
        Ok(sub)
    }

    // ...

    // Invoke it like this
    self.stop_subscriber.notify(Error::ChannelStopped).await;

Then the API user can simply do:

let stop_sub = channel.subscribe_stop().await?;
let err = stop_sub.receive().await;
stop_sub.unsubscribe().await;

Parent-Child Relationships

In the async context we are forced to use Arc<Self>, but often times we want a parent-child relationship where if both parties contain an Arc reference to the other it creates a circular loop. For this case, we can use std::sync::Weak and std::sync::Arc::new_cyclic().

pub struct Parent {
    child: Arc<Child>,
    // ...
}

impl Parent {
    pub async fn new(/* ... */) -> Arc<Self> {
        Arc::new_cyclic(|parent| Self {
            child: Child::new(parent.clone())
            // ...
        });
    }

    // ...
}


pub struct Child {
    pub parent: Weak<Parent>,
    // ...
}

impl Child {
    pub fn new(parent: Weak<Parent>) -> Arc<Self> {
        Arc::new(Self {
            parent: Weak::new(),
            // ...
        })
    }

    // ...
}

Otherwise if the relationship is just one way, use Arc<Foo>. For example if doing dependency injection where component B dependent on component A, then we could do:

let comp_a = Foo::new();
let comp_b = Bar::new(comp_a);