darkfi/event_graph/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{
20    collections::{BTreeMap, HashMap, HashSet, VecDeque},
21    path::PathBuf,
22    sync::Arc,
23};
24
25use darkfi_serial::{deserialize_async, serialize_async};
26use log::{debug, error, info, warn};
27use num_bigint::BigUint;
28use sled_overlay::{sled, SledTreeOverlay};
29use smol::{
30    lock::{OnceCell, RwLock},
31    Executor,
32};
33use tinyjson::JsonValue::{self};
34
35use crate::{
36    event_graph::util::replayer_log,
37    net::P2pPtr,
38    rpc::{
39        jsonrpc::{JsonResponse, JsonResult},
40        util::json_map,
41    },
42    system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
43    Error, Result,
44};
45
46/// An event graph event
47pub mod event;
48pub use event::Event;
49
50/// P2P protocol implementation for the Event Graph
51pub mod proto;
52use proto::{EventRep, EventReq, TipRep, TipReq};
53
54/// Utility functions
55pub mod util;
56use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp};
57
58// Debugging event graph
59pub mod deg;
60use deg::DegEvent;
61
62#[cfg(test)]
63mod tests;
64
65/// Initial genesis timestamp in millis (07 Sep 2023, 00:00:00 UTC)
66/// Must always be UTC midnight.
67pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
68/// Genesis event contents
69pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
70
71/// The number of parents an event is supposed to have.
72pub const N_EVENT_PARENTS: usize = 5;
73/// Allowed timestamp drift in milliseconds
74const EVENT_TIME_DRIFT: u64 = 60_000;
75/// Null event ID
76pub const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
77
78/// Atomic pointer to an [`EventGraph`] instance.
79pub type EventGraphPtr = Arc<EventGraph>;
80
81/// An Event Graph instance
82pub struct EventGraph {
83    /// Pointer to the P2P network instance
84    p2p: P2pPtr,
85    /// Sled tree containing the DAG
86    dag: sled::Tree,
87    /// Replay logs path.
88    datastore: PathBuf,
89    /// Run in replay_mode where if set we log Sled DB instructions
90    /// into `datastore`, useful to reacreate a faulty DAG to debug.
91    replay_mode: bool,
92    /// The set of unreferenced DAG tips
93    unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
94    /// A `HashSet` containg event IDs and their 1-level parents.
95    /// These come from the events we've sent out using `EventPut`.
96    /// They are used with `EventReq` to decide if we should reply
97    /// or not. Additionally it is also used when we broadcast the
98    /// `TipRep` message telling peers about our unreferenced tips.
99    broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
100    /// DAG Pruning Task
101    pub prune_task: OnceCell<StoppableTaskPtr>,
102    /// Event publisher, this notifies whenever an event is
103    /// inserted into the DAG
104    pub event_pub: PublisherPtr<Event>,
105    /// Current genesis event
106    current_genesis: RwLock<Event>,
107    /// Currently configured DAG rotation, in days
108    days_rotation: u64,
109    /// Flag signalling DAG has finished initial sync
110    pub synced: RwLock<bool>,
111    /// Enable graph debugging
112    pub deg_enabled: RwLock<bool>,
113    /// The publisher for which we can give deg info over
114    deg_publisher: PublisherPtr<DegEvent>,
115}
116
117impl EventGraph {
118    /// Create a new [`EventGraph`] instance, creates a new Genesis
119    /// event and checks if it
120    /// is containd in DAG, if not prunes DAG, may also start a pruning
121    /// task based on `days_rotation`, and return an atomic instance of
122    /// `Self`
123    /// * `p2p` atomic pointer to p2p.
124    /// * `sled_db` sled DB instance.
125    /// * `datastore` path where we should log db instrucion if run in
126    ///   replay mode.
127    /// * `replay_mode` set the flag to keep a log of db instructions.
128    /// * `dag_tree_name` the name of disk-backed tree (or DAG name).
129    /// * `days_rotation` marks the lifetime of the DAG before it's
130    ///   pruned.
131    pub async fn new(
132        p2p: P2pPtr,
133        sled_db: sled::Db,
134        datastore: PathBuf,
135        replay_mode: bool,
136        dag_tree_name: &str,
137        days_rotation: u64,
138        ex: Arc<Executor<'_>>,
139    ) -> Result<EventGraphPtr> {
140        let dag = sled_db.open_tree(dag_tree_name)?;
141        let unreferenced_tips = RwLock::new(BTreeMap::new());
142        let broadcasted_ids = RwLock::new(HashSet::new());
143        let event_pub = Publisher::new();
144
145        // Create the current genesis event based on the `days_rotation`
146        let current_genesis = generate_genesis(days_rotation);
147        let self_ = Arc::new(Self {
148            p2p,
149            dag: dag.clone(),
150            datastore,
151            replay_mode,
152            unreferenced_tips,
153            broadcasted_ids,
154            prune_task: OnceCell::new(),
155            event_pub,
156            current_genesis: RwLock::new(current_genesis.clone()),
157            days_rotation,
158            synced: RwLock::new(false),
159            deg_enabled: RwLock::new(false),
160            deg_publisher: Publisher::new(),
161        });
162
163        // Check if we have it in our DAG.
164        // If not, we can prune the DAG and insert this new genesis event.
165        if !dag.contains_key(current_genesis.id().as_bytes())? {
166            info!(
167                target: "event_graph::new()",
168                "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
169            );
170            self_.dag_prune(current_genesis).await?;
171        }
172
173        // Find the unreferenced tips in the current DAG state.
174        *self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await;
175
176        // Spawn the DAG pruning task
177        if days_rotation > 0 {
178            let prune_task = StoppableTask::new();
179            let _ = self_.prune_task.set(prune_task.clone()).await;
180
181            prune_task.clone().start(
182                self_.clone().dag_prune_task(days_rotation),
183                |res| async move {
184                    match res {
185                        Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
186                        Err(e) => error!(target: "event_graph::_handle_stop()", "[EVENTGRAPH] Failed stopping prune task: {e}")
187                    }
188                },
189                Error::DetachedTaskStopped,
190                ex.clone(),
191            );
192        }
193
194        Ok(self_)
195    }
196
197    pub fn days_rotation(&self) -> u64 {
198        self.days_rotation
199    }
200
201    /// Sync the DAG from connected peers
202    pub async fn dag_sync(&self) -> Result<()> {
203        // We do an optimistic sync where we ask all our connected peers for
204        // the latest layer DAG tips (unreferenced events) and then we accept
205        // the ones we see the most times.
206        // * Compare received tips with local ones, identify which we are missing.
207        // * Request these from peers
208        // * Recursively request these backward
209        //
210        // Verification:
211        // * Timestamps should go backwards
212        // * Cross-check with multiple peers, this means we should request the
213        //   same event from multiple peers and make sure it is the same.
214        // * Since we should be pruning, if we're not synced after some reasonable
215        //   amount of iterations, these could be faulty peers and we can try again
216        //   from the beginning
217
218        // Get references to all our peers.
219        let channels = self.p2p.hosts().peers();
220        let mut communicated_peers = channels.len();
221        info!(
222            target: "event_graph::dag_sync()",
223            "[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers,
224        );
225
226        // Here we keep track of the tips, their layers and how many time we've seen them.
227        let mut tips: HashMap<blake3::Hash, (u64, usize)> = HashMap::new();
228
229        // Let's first ask all of our peers for their tips and collect them
230        // in our hashmap above.
231        for channel in channels.iter() {
232            let url = channel.address();
233
234            let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
235                Ok(v) => v,
236                Err(e) => {
237                    error!(
238                        target: "event_graph::dag_sync()",
239                        "[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {}, skipping ({})",
240                        url, e,
241                    );
242                    communicated_peers -= 1;
243                    continue
244                }
245            };
246
247            if let Err(e) = channel.send(&TipReq {}).await {
248                error!(
249                    target: "event_graph::dag_sync()",
250                    "[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
251                );
252                communicated_peers -= 1;
253                continue
254            };
255
256            // Node waits for response
257            let Ok(peer_tips) = tip_rep_sub
258                .receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout)
259                .await
260            else {
261                error!(
262                    target: "event_graph::dag_sync()",
263                    "[EVENTGRAPH] Sync: Peer {} didn't reply with tips in time, skipping", url,
264                );
265                communicated_peers -= 1;
266                continue
267            };
268
269            let peer_tips = &peer_tips.0;
270
271            // Note down the seen tips
272            for (layer, layer_tips) in peer_tips {
273                for tip in layer_tips {
274                    if let Some(seen_tip) = tips.get_mut(tip) {
275                        seen_tip.1 += 1;
276                    } else {
277                        tips.insert(*tip, (*layer, 1));
278                    }
279                }
280            }
281        }
282
283        // After we've communicated all the peers, let's see what happened.
284        if tips.is_empty() {
285            error!(
286                target: "event_graph::dag_sync()",
287                "[EVENTGRAPH] Sync: Could not find any DAG tips",
288            );
289            return Err(Error::DagSyncFailed)
290        }
291
292        // We know the number of peers we've communicated with,
293        // so we will consider events we saw at more than 2/3 of
294        // those peers.
295        let consideration_threshold = communicated_peers * 2 / 3;
296        let mut considered_tips = HashSet::new();
297        for (tip, (_, amount)) in tips.iter() {
298            if amount > &consideration_threshold {
299                considered_tips.insert(*tip);
300            }
301        }
302        drop(tips);
303
304        // Now begin fetching the events backwards.
305        let mut missing_parents = HashSet::new();
306        for tip in considered_tips.iter() {
307            assert!(tip != &NULL_ID);
308
309            if !self.dag.contains_key(tip.as_bytes()).unwrap() {
310                missing_parents.insert(*tip);
311            }
312        }
313
314        if missing_parents.is_empty() {
315            *self.synced.write().await = true;
316            info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
317            return Ok(())
318        }
319
320        info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
321        let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
322        let mut received_events_hashes = HashSet::new();
323
324        while !missing_parents.is_empty() {
325            let mut found_event = false;
326
327            for channel in channels.iter() {
328                let url = channel.address();
329
330                debug!(
331                    target: "event_graph::dag_sync()",
332                    "Requesting {:?} from {}...", missing_parents, url,
333                );
334
335                let ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
336                    Ok(v) => v,
337                    Err(e) => {
338                        error!(
339                            target: "event_graph::dag_sync()",
340                            "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
341                            url, e,
342                        );
343                        continue
344                    }
345                };
346
347                let request_missing_events = missing_parents.clone().into_iter().collect();
348                if let Err(e) = channel.send(&EventReq(request_missing_events)).await {
349                    error!(
350                        target: "event_graph::dag_sync()",
351                        "[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",
352                        missing_parents, url, e,
353                    );
354                    continue
355                }
356
357                // Node waits for response
358                let Ok(parent) = ev_rep_sub
359                    .receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout)
360                    .await
361                else {
362                    error!(
363                        target: "event_graph::dag_sync()",
364                        "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
365                        missing_parents, url,
366                    );
367                    continue
368                };
369
370                let parents = parent.0.clone();
371
372                for parent in parents {
373                    let parent_id = parent.id();
374                    if !missing_parents.contains(&parent_id) {
375                        error!(
376                            target: "event_graph::dag_sync()",
377                            "[EVENTGRAPH] Sync: Peer {} replied with a wrong event: {}",
378                            url, parent.id(),
379                        );
380                        continue
381                    }
382
383                    debug!(
384                        target: "event_graph::dag_sync()",
385                        "Got correct parent event {}", parent_id,
386                    );
387
388                    if let Some(layer_events) = received_events.get_mut(&parent.layer) {
389                        layer_events.push(parent.clone());
390                    } else {
391                        let layer_events = vec![parent.clone()];
392                        received_events.insert(parent.layer, layer_events);
393                    }
394                    received_events_hashes.insert(parent_id);
395
396                    missing_parents.remove(&parent_id);
397                    found_event = true;
398
399                    // See if we have the upper parents
400                    for upper_parent in parent.parents.iter() {
401                        if upper_parent == &NULL_ID {
402                            continue
403                        }
404
405                        if !missing_parents.contains(upper_parent) &&
406                            !received_events_hashes.contains(upper_parent) &&
407                            !self.dag.contains_key(upper_parent.as_bytes()).unwrap()
408                        {
409                            debug!(
410                                target: "event_graph::dag_sync()",
411                                "Found upper missing parent event {}", upper_parent,
412                            );
413                            missing_parents.insert(*upper_parent);
414                        }
415                    }
416                }
417
418                break
419            }
420
421            if !found_event {
422                error!(
423                    target: "event_graph::dag_sync()",
424                    "[EVENTGRAPH] Sync: Failed to get all events",
425                );
426                return Err(Error::DagSyncFailed)
427            }
428        } // <-- while !missing_parents.is_empty
429
430        // At this point we should've got all the events.
431        // We should add them to the DAG.
432        let mut events = vec![];
433        for (_, tips) in received_events {
434            for tip in tips {
435                events.push(tip);
436            }
437        }
438        self.dag_insert(&events).await?;
439
440        *self.synced.write().await = true;
441
442        info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
443        Ok(())
444    }
445
446    /// Atomically prune the DAG and insert the given event as genesis.
447    async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
448        debug!(target: "event_graph::dag_prune()", "Pruning DAG...");
449
450        // Acquire exclusive locks to unreferenced_tips, broadcasted_ids and
451        // current_genesis while this operation is happening. We do this to
452        // ensure that during the pruning operation, no other operations are
453        // able to access the intermediate state which could lead to producing
454        // the wrong state after pruning.
455        let mut unreferenced_tips = self.unreferenced_tips.write().await;
456        let mut broadcasted_ids = self.broadcasted_ids.write().await;
457        let mut current_genesis = self.current_genesis.write().await;
458
459        // Atomically clear the DAG and write the new genesis event.
460        let mut batch = sled::Batch::default();
461        for key in self.dag.iter().keys() {
462            batch.remove(key.unwrap());
463        }
464        batch.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event).await);
465
466        debug!(target: "event_graph::dag_prune()", "Applying batch...");
467        if let Err(e) = self.dag.apply_batch(batch) {
468            panic!("Failed pruning DAG, sled apply_batch error: {}", e);
469        }
470
471        // Clear unreferenced tips and bcast ids
472        *unreferenced_tips = BTreeMap::new();
473        unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
474        *current_genesis = genesis_event;
475        *broadcasted_ids = HashSet::new();
476        drop(unreferenced_tips);
477        drop(broadcasted_ids);
478        drop(current_genesis);
479
480        debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
481        Ok(())
482    }
483
484    /// Background task periodically pruning the DAG.
485    async fn dag_prune_task(self: Arc<Self>, days_rotation: u64) -> Result<()> {
486        // The DAG should periodically be pruned. This can be a configurable
487        // parameter. By pruning, we should deterministically replace the
488        // genesis event (can use a deterministic timestamp) and drop everything
489        // in the DAG, leaving just the new genesis event.
490        debug!(target: "event_graph::dag_prune_task()", "Spawned background DAG pruning task");
491
492        loop {
493            // Find the next rotation timestamp:
494            let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
495
496            // Prepare the new genesis event
497            let current_genesis = Event {
498                timestamp: next_rotation,
499                content: GENESIS_CONTENTS.to_vec(),
500                parents: [NULL_ID; N_EVENT_PARENTS],
501                layer: 0,
502            };
503
504            // Sleep until it's time to rotate.
505            let s = millis_until_next_rotation(next_rotation);
506
507            debug!(target: "event_graph::dag_prune_task()", "Sleeping {}ms until next DAG prune", s);
508            msleep(s).await;
509            debug!(target: "event_graph::dag_prune_task()", "Rotation period reached");
510
511            // Trigger DAG prune
512            self.dag_prune(current_genesis).await?;
513        }
514    }
515
516    /// Atomically insert given events into the DAG and return the event IDs.
517    /// All provided events must be valid. An overlay is used over the DAG tree,
518    /// temporary writting each event in order. After all events have been
519    /// validated and inserted successfully, we write the overlay to sled.
520    /// This will append the new events into the unreferenced tips set, and
521    /// remove the events' parents from it. It will also append the events'
522    /// level-1 parents to the `broadcasted_ids` set, so the P2P protocol
523    /// knows that any requests for them are actually legitimate.
524    /// TODO: The `broadcasted_ids` set should periodically be pruned, when
525    /// some sensible time has passed after broadcasting the event.
526    pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
527        // Sanity check
528        if events.is_empty() {
529            return Ok(vec![])
530        }
531
532        // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
533        let mut unreferenced_tips = self.unreferenced_tips.write().await;
534        let mut broadcasted_ids = self.broadcasted_ids.write().await;
535
536        // Here we keep the IDs to return
537        let mut ids = Vec::with_capacity(events.len());
538
539        // Create an overlay over the DAG tree
540        let mut overlay = SledTreeOverlay::new(&self.dag);
541
542        // Grab genesis timestamp
543        let genesis_timestamp = self.current_genesis.read().await.timestamp;
544
545        // Iterate over given events to validate them and
546        // write them to the overlay
547        for event in events {
548            let event_id = event.id();
549            debug!(
550                target: "event_graph::dag_insert()",
551                "Inserting event {} into the DAG", event_id,
552            );
553
554            if !event
555                .validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
556                .await?
557            {
558                error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id);
559                return Err(Error::EventIsInvalid)
560            }
561
562            let event_se = serialize_async(event).await;
563
564            // Add the event to the overlay
565            overlay.insert(event_id.as_bytes(), &event_se)?;
566
567            if self.replay_mode {
568                replayer_log(&self.datastore, "insert".to_owned(), event_se)?;
569            }
570            // Note down the event ID to return
571            ids.push(event_id);
572        }
573
574        // Aggregate changes into a single batch
575        let batch = overlay.aggregate().unwrap();
576
577        // Atomically apply the batch.
578        // Panic if something is corrupted.
579        if let Err(e) = self.dag.apply_batch(batch) {
580            panic!("Failed applying dag_insert batch to sled: {}", e);
581        }
582
583        // Iterate over given events to update references and
584        // send out notifications about them
585        for event in events {
586            let event_id = event.id();
587
588            // Update the unreferenced DAG tips set
589            debug!(
590                target: "event_graph::dag_insert()",
591                "Event {} parents {:#?}", event_id, event.parents,
592            );
593            for parent_id in event.parents.iter() {
594                if parent_id != &NULL_ID {
595                    debug!(
596                        target: "event_graph::dag_insert()",
597                        "Removing {} from unreferenced_tips", parent_id,
598                    );
599
600                    // Iterate over unreferenced tips in previous layers
601                    // and remove the parent
602                    // NOTE: this might be too exhaustive, but the
603                    // assumption is that previous layers unreferenced
604                    // tips will be few.
605                    for (layer, tips) in unreferenced_tips.iter_mut() {
606                        if layer >= &event.layer {
607                            continue
608                        }
609                        tips.remove(parent_id);
610                    }
611                    broadcasted_ids.insert(*parent_id);
612                }
613            }
614            unreferenced_tips.retain(|_, tips| !tips.is_empty());
615            debug!(
616                target: "event_graph::dag_insert()",
617                "Adding {} to unreferenced tips", event_id,
618            );
619
620            if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
621                layer_tips.insert(event_id);
622            } else {
623                let mut layer_tips = HashSet::new();
624                layer_tips.insert(event_id);
625                unreferenced_tips.insert(event.layer, layer_tips);
626            }
627
628            // Send out notifications about the new event
629            self.event_pub.notify(event.clone()).await;
630        }
631
632        // Drop the exclusive locks
633        drop(unreferenced_tips);
634        drop(broadcasted_ids);
635
636        Ok(ids)
637    }
638
639    /// Fetch an event from the DAG
640    pub async fn dag_get(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
641        let Some(bytes) = self.dag.get(event_id.as_bytes())? else { return Ok(None) };
642        let event: Event = deserialize_async(&bytes).await?;
643
644        Ok(Some(event))
645    }
646
647    /// Get next layer along with its N_EVENT_PARENTS from the unreferenced
648    /// tips of the DAG. Since tips are mapped by their layer, we go backwards
649    /// until we fill the vector, ensuring we always use latest layers tips as
650    /// parents.
651    async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
652        let unreferenced_tips = self.unreferenced_tips.read().await;
653
654        let mut parents = [NULL_ID; N_EVENT_PARENTS];
655        let mut index = 0;
656        'outer: for (_, tips) in unreferenced_tips.iter().rev() {
657            for tip in tips.iter() {
658                parents[index] = *tip;
659                index += 1;
660                if index >= N_EVENT_PARENTS {
661                    break 'outer
662                }
663            }
664        }
665
666        let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
667
668        assert!(parents.iter().any(|x| x != &NULL_ID));
669        (next_layer, parents)
670    }
671
672    /// Find the unreferenced tips in the current DAG state, mapped by their layers.
673    async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
674        // First get all the event IDs
675        let mut tips = HashSet::new();
676        for iter_elem in self.dag.iter() {
677            let (id, _) = iter_elem.unwrap();
678            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
679            tips.insert(id);
680        }
681
682        // Iterate again to find unreferenced IDs
683        for iter_elem in self.dag.iter() {
684            let (_, event) = iter_elem.unwrap();
685            let event: Event = deserialize_async(&event).await.unwrap();
686            for parent in event.parents.iter() {
687                tips.remove(parent);
688            }
689        }
690
691        // Build the layers map
692        let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
693        for tip in tips {
694            let event = self.dag_get(&tip).await.unwrap().unwrap();
695            if let Some(layer_tips) = map.get_mut(&event.layer) {
696                layer_tips.insert(tip);
697            } else {
698                let mut layer_tips = HashSet::new();
699                layer_tips.insert(tip);
700                map.insert(event.layer, layer_tips);
701            }
702        }
703
704        map
705    }
706
707    /// Internal function used for DAG sorting.
708    async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
709        let (_, tips) = self.get_next_layer_with_parents().await;
710
711        // Convert the hash to BigUint for sorting
712        let mut sorted: Vec<_> =
713            tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
714        sorted.sort_unstable();
715
716        // Convert back to blake3
717        let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
718        for (i, id) in sorted.iter().enumerate() {
719            let mut bytes = id.to_bytes_be();
720
721            // Ensure we have 32 bytes
722            while bytes.len() < blake3::OUT_LEN {
723                bytes.insert(0, 0);
724            }
725
726            tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
727        }
728
729        tips_sorted
730    }
731
732    /// Perform a topological sort of the DAG.
733    pub async fn order_events(&self) -> Vec<Event> {
734        let mut ordered_events = VecDeque::new();
735        let mut visited = HashSet::new();
736
737        for tip in self.get_unreferenced_tips_sorted().await {
738            if !visited.contains(&tip) && tip != NULL_ID {
739                let tip = self.dag_get(&tip).await.unwrap().unwrap();
740                ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
741            }
742        }
743
744        let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
745        // Order events based on thier layer numbers, or based on timestamp if they are equal
746        ord_events_vec
747            .sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.timestamp.cmp(&a.1.timestamp)));
748
749        ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
750    }
751
752    /// We do a non-recursive DFS (<https://en.wikipedia.org/wiki/Depth-first_search>),
753    /// and additionally we consider the timestamps.
754    async fn dfs_topological_sort(
755        &self,
756        event: Event,
757        visited: &mut HashSet<blake3::Hash>,
758    ) -> VecDeque<(u64, Event)> {
759        let mut ordered_events = VecDeque::new();
760        let mut stack = VecDeque::new();
761        let event_id = event.id();
762        stack.push_back(event_id);
763
764        while let Some(event_id) = stack.pop_front() {
765            if !visited.contains(&event_id) && event_id != NULL_ID {
766                visited.insert(event_id);
767                if let Some(event) = self.dag_get(&event_id).await.unwrap() {
768                    for parent in event.parents.iter() {
769                        stack.push_back(*parent);
770                    }
771
772                    ordered_events.push_back((event.layer, event))
773                }
774            }
775        }
776
777        ordered_events
778    }
779
780    /// Enable graph debugging
781    pub async fn deg_enable(&self) {
782        *self.deg_enabled.write().await = true;
783        warn!("[EVENTGRAPH] Graph debugging enabled!");
784    }
785
786    /// Disable graph debugging
787    pub async fn deg_disable(&self) {
788        *self.deg_enabled.write().await = false;
789        warn!("[EVENTGRAPH] Graph debugging disabled!");
790    }
791
792    /// Subscribe to deg events
793    pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
794        self.deg_publisher.clone().subscribe().await
795    }
796
797    /// Send a deg notification over the publisher
798    pub async fn deg_notify(&self, event: DegEvent) {
799        self.deg_publisher.notify(event).await;
800    }
801
802    pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
803        let mut graph = HashMap::new();
804        for iter_elem in self.dag.iter() {
805            let (id, val) = iter_elem.unwrap();
806            let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
807            let val: Event = deserialize_async(&val).await.unwrap();
808            graph.insert(id, val);
809        }
810
811        let json_graph = graph
812            .into_iter()
813            .map(|(k, v)| {
814                let key = k.to_string();
815                let value = JsonValue::from(v);
816                (key, value)
817            })
818            .collect();
819        let values = json_map([("dag", JsonValue::Object(json_graph))]);
820
821        let result = JsonValue::Object(HashMap::from([("eventgraph_info".to_string(), values)]));
822
823        JsonResponse::new(result, id).into()
824    }
825
826    /// Fetch all the events that are on a higher layers than the
827    /// provided ones.
828    pub async fn fetch_successors_of(
829        &self,
830        tips: BTreeMap<u64, HashSet<blake3::Hash>>,
831    ) -> Result<Vec<Event>> {
832        debug!(
833             target: "event_graph::fetch_successors_of()",
834             "fetching successors of {tips:?}"
835        );
836
837        let mut graph = HashMap::new();
838        for iter_elem in self.dag.iter() {
839            let (id, val) = iter_elem.unwrap();
840            let hash = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
841            let event: Event = deserialize_async(&val).await.unwrap();
842            graph.insert(hash, event);
843        }
844
845        let mut result = vec![];
846
847        'outer: for tip in tips.iter() {
848            for i in tip.1.iter() {
849                if !graph.contains_key(i) {
850                    continue 'outer;
851                }
852            }
853
854            for (_, ev) in graph.iter() {
855                if ev.layer > *tip.0 && !result.contains(ev) {
856                    result.push(ev.clone())
857                }
858            }
859        }
860
861        result.sort_by(|a, b| a.layer.cmp(&b.layer));
862
863        Ok(result)
864    }
865}