1use std::{
20 collections::{BTreeMap, HashMap, HashSet, VecDeque},
21 path::PathBuf,
22 sync::Arc,
23};
24
25use darkfi_serial::{deserialize_async, serialize_async};
26use log::{debug, error, info, warn};
27use num_bigint::BigUint;
28use sled_overlay::{sled, SledTreeOverlay};
29use smol::{
30 lock::{OnceCell, RwLock},
31 Executor,
32};
33use tinyjson::JsonValue::{self};
34
35use crate::{
36 event_graph::util::replayer_log,
37 net::P2pPtr,
38 rpc::{
39 jsonrpc::{JsonResponse, JsonResult},
40 util::json_map,
41 },
42 system::{msleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr, Subscription},
43 Error, Result,
44};
45
46pub mod event;
48pub use event::Event;
49
50pub mod proto;
52use proto::{EventRep, EventReq, TipRep, TipReq};
53
54pub mod util;
56use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp};
57
58pub mod deg;
60use deg::DegEvent;
61
62#[cfg(test)]
63mod tests;
64
65pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
68pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
70
71pub const N_EVENT_PARENTS: usize = 5;
73const EVENT_TIME_DRIFT: u64 = 60_000;
75pub const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
77
78pub type EventGraphPtr = Arc<EventGraph>;
80
81pub struct EventGraph {
83 p2p: P2pPtr,
85 dag: sled::Tree,
87 datastore: PathBuf,
89 replay_mode: bool,
92 unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
94 broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
100 pub prune_task: OnceCell<StoppableTaskPtr>,
102 pub event_pub: PublisherPtr<Event>,
105 current_genesis: RwLock<Event>,
107 days_rotation: u64,
109 pub synced: RwLock<bool>,
111 pub deg_enabled: RwLock<bool>,
113 deg_publisher: PublisherPtr<DegEvent>,
115}
116
117impl EventGraph {
118 pub async fn new(
132 p2p: P2pPtr,
133 sled_db: sled::Db,
134 datastore: PathBuf,
135 replay_mode: bool,
136 dag_tree_name: &str,
137 days_rotation: u64,
138 ex: Arc<Executor<'_>>,
139 ) -> Result<EventGraphPtr> {
140 let dag = sled_db.open_tree(dag_tree_name)?;
141 let unreferenced_tips = RwLock::new(BTreeMap::new());
142 let broadcasted_ids = RwLock::new(HashSet::new());
143 let event_pub = Publisher::new();
144
145 let current_genesis = generate_genesis(days_rotation);
147 let self_ = Arc::new(Self {
148 p2p,
149 dag: dag.clone(),
150 datastore,
151 replay_mode,
152 unreferenced_tips,
153 broadcasted_ids,
154 prune_task: OnceCell::new(),
155 event_pub,
156 current_genesis: RwLock::new(current_genesis.clone()),
157 days_rotation,
158 synced: RwLock::new(false),
159 deg_enabled: RwLock::new(false),
160 deg_publisher: Publisher::new(),
161 });
162
163 if !dag.contains_key(current_genesis.id().as_bytes())? {
166 info!(
167 target: "event_graph::new()",
168 "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
169 );
170 self_.dag_prune(current_genesis).await?;
171 }
172
173 *self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await;
175
176 if days_rotation > 0 {
178 let prune_task = StoppableTask::new();
179 let _ = self_.prune_task.set(prune_task.clone()).await;
180
181 prune_task.clone().start(
182 self_.clone().dag_prune_task(days_rotation),
183 |res| async move {
184 match res {
185 Ok(()) | Err(Error::DetachedTaskStopped) => { }
186 Err(e) => error!(target: "event_graph::_handle_stop()", "[EVENTGRAPH] Failed stopping prune task: {e}")
187 }
188 },
189 Error::DetachedTaskStopped,
190 ex.clone(),
191 );
192 }
193
194 Ok(self_)
195 }
196
197 pub fn days_rotation(&self) -> u64 {
198 self.days_rotation
199 }
200
201 pub async fn dag_sync(&self) -> Result<()> {
203 let channels = self.p2p.hosts().peers();
220 let mut communicated_peers = channels.len();
221 info!(
222 target: "event_graph::dag_sync()",
223 "[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers,
224 );
225
226 let mut tips: HashMap<blake3::Hash, (u64, usize)> = HashMap::new();
228
229 for channel in channels.iter() {
232 let url = channel.address();
233
234 let tip_rep_sub = match channel.subscribe_msg::<TipRep>().await {
235 Ok(v) => v,
236 Err(e) => {
237 error!(
238 target: "event_graph::dag_sync()",
239 "[EVENTGRAPH] Sync: Couldn't subscribe TipReq for peer {}, skipping ({})",
240 url, e,
241 );
242 communicated_peers -= 1;
243 continue
244 }
245 };
246
247 if let Err(e) = channel.send(&TipReq {}).await {
248 error!(
249 target: "event_graph::dag_sync()",
250 "[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
251 );
252 communicated_peers -= 1;
253 continue
254 };
255
256 let Ok(peer_tips) = tip_rep_sub
258 .receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout)
259 .await
260 else {
261 error!(
262 target: "event_graph::dag_sync()",
263 "[EVENTGRAPH] Sync: Peer {} didn't reply with tips in time, skipping", url,
264 );
265 communicated_peers -= 1;
266 continue
267 };
268
269 let peer_tips = &peer_tips.0;
270
271 for (layer, layer_tips) in peer_tips {
273 for tip in layer_tips {
274 if let Some(seen_tip) = tips.get_mut(tip) {
275 seen_tip.1 += 1;
276 } else {
277 tips.insert(*tip, (*layer, 1));
278 }
279 }
280 }
281 }
282
283 if tips.is_empty() {
285 error!(
286 target: "event_graph::dag_sync()",
287 "[EVENTGRAPH] Sync: Could not find any DAG tips",
288 );
289 return Err(Error::DagSyncFailed)
290 }
291
292 let consideration_threshold = communicated_peers * 2 / 3;
296 let mut considered_tips = HashSet::new();
297 for (tip, (_, amount)) in tips.iter() {
298 if amount > &consideration_threshold {
299 considered_tips.insert(*tip);
300 }
301 }
302 drop(tips);
303
304 let mut missing_parents = HashSet::new();
306 for tip in considered_tips.iter() {
307 assert!(tip != &NULL_ID);
308
309 if !self.dag.contains_key(tip.as_bytes()).unwrap() {
310 missing_parents.insert(*tip);
311 }
312 }
313
314 if missing_parents.is_empty() {
315 *self.synced.write().await = true;
316 info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
317 return Ok(())
318 }
319
320 info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
321 let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
322 let mut received_events_hashes = HashSet::new();
323
324 while !missing_parents.is_empty() {
325 let mut found_event = false;
326
327 for channel in channels.iter() {
328 let url = channel.address();
329
330 debug!(
331 target: "event_graph::dag_sync()",
332 "Requesting {:?} from {}...", missing_parents, url,
333 );
334
335 let ev_rep_sub = match channel.subscribe_msg::<EventRep>().await {
336 Ok(v) => v,
337 Err(e) => {
338 error!(
339 target: "event_graph::dag_sync()",
340 "[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
341 url, e,
342 );
343 continue
344 }
345 };
346
347 let request_missing_events = missing_parents.clone().into_iter().collect();
348 if let Err(e) = channel.send(&EventReq(request_missing_events)).await {
349 error!(
350 target: "event_graph::dag_sync()",
351 "[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",
352 missing_parents, url, e,
353 );
354 continue
355 }
356
357 let Ok(parent) = ev_rep_sub
359 .receive_with_timeout(self.p2p.settings().read().await.outbound_connect_timeout)
360 .await
361 else {
362 error!(
363 target: "event_graph::dag_sync()",
364 "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
365 missing_parents, url,
366 );
367 continue
368 };
369
370 let parents = parent.0.clone();
371
372 for parent in parents {
373 let parent_id = parent.id();
374 if !missing_parents.contains(&parent_id) {
375 error!(
376 target: "event_graph::dag_sync()",
377 "[EVENTGRAPH] Sync: Peer {} replied with a wrong event: {}",
378 url, parent.id(),
379 );
380 continue
381 }
382
383 debug!(
384 target: "event_graph::dag_sync()",
385 "Got correct parent event {}", parent_id,
386 );
387
388 if let Some(layer_events) = received_events.get_mut(&parent.layer) {
389 layer_events.push(parent.clone());
390 } else {
391 let layer_events = vec![parent.clone()];
392 received_events.insert(parent.layer, layer_events);
393 }
394 received_events_hashes.insert(parent_id);
395
396 missing_parents.remove(&parent_id);
397 found_event = true;
398
399 for upper_parent in parent.parents.iter() {
401 if upper_parent == &NULL_ID {
402 continue
403 }
404
405 if !missing_parents.contains(upper_parent) &&
406 !received_events_hashes.contains(upper_parent) &&
407 !self.dag.contains_key(upper_parent.as_bytes()).unwrap()
408 {
409 debug!(
410 target: "event_graph::dag_sync()",
411 "Found upper missing parent event {}", upper_parent,
412 );
413 missing_parents.insert(*upper_parent);
414 }
415 }
416 }
417
418 break
419 }
420
421 if !found_event {
422 error!(
423 target: "event_graph::dag_sync()",
424 "[EVENTGRAPH] Sync: Failed to get all events",
425 );
426 return Err(Error::DagSyncFailed)
427 }
428 } let mut events = vec![];
433 for (_, tips) in received_events {
434 for tip in tips {
435 events.push(tip);
436 }
437 }
438 self.dag_insert(&events).await?;
439
440 *self.synced.write().await = true;
441
442 info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
443 Ok(())
444 }
445
446 async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
448 debug!(target: "event_graph::dag_prune()", "Pruning DAG...");
449
450 let mut unreferenced_tips = self.unreferenced_tips.write().await;
456 let mut broadcasted_ids = self.broadcasted_ids.write().await;
457 let mut current_genesis = self.current_genesis.write().await;
458
459 let mut batch = sled::Batch::default();
461 for key in self.dag.iter().keys() {
462 batch.remove(key.unwrap());
463 }
464 batch.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event).await);
465
466 debug!(target: "event_graph::dag_prune()", "Applying batch...");
467 if let Err(e) = self.dag.apply_batch(batch) {
468 panic!("Failed pruning DAG, sled apply_batch error: {}", e);
469 }
470
471 *unreferenced_tips = BTreeMap::new();
473 unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
474 *current_genesis = genesis_event;
475 *broadcasted_ids = HashSet::new();
476 drop(unreferenced_tips);
477 drop(broadcasted_ids);
478 drop(current_genesis);
479
480 debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
481 Ok(())
482 }
483
484 async fn dag_prune_task(self: Arc<Self>, days_rotation: u64) -> Result<()> {
486 debug!(target: "event_graph::dag_prune_task()", "Spawned background DAG pruning task");
491
492 loop {
493 let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
495
496 let current_genesis = Event {
498 timestamp: next_rotation,
499 content: GENESIS_CONTENTS.to_vec(),
500 parents: [NULL_ID; N_EVENT_PARENTS],
501 layer: 0,
502 };
503
504 let s = millis_until_next_rotation(next_rotation);
506
507 debug!(target: "event_graph::dag_prune_task()", "Sleeping {}ms until next DAG prune", s);
508 msleep(s).await;
509 debug!(target: "event_graph::dag_prune_task()", "Rotation period reached");
510
511 self.dag_prune(current_genesis).await?;
513 }
514 }
515
516 pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
527 if events.is_empty() {
529 return Ok(vec![])
530 }
531
532 let mut unreferenced_tips = self.unreferenced_tips.write().await;
534 let mut broadcasted_ids = self.broadcasted_ids.write().await;
535
536 let mut ids = Vec::with_capacity(events.len());
538
539 let mut overlay = SledTreeOverlay::new(&self.dag);
541
542 let genesis_timestamp = self.current_genesis.read().await.timestamp;
544
545 for event in events {
548 let event_id = event.id();
549 debug!(
550 target: "event_graph::dag_insert()",
551 "Inserting event {} into the DAG", event_id,
552 );
553
554 if !event
555 .validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
556 .await?
557 {
558 error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id);
559 return Err(Error::EventIsInvalid)
560 }
561
562 let event_se = serialize_async(event).await;
563
564 overlay.insert(event_id.as_bytes(), &event_se)?;
566
567 if self.replay_mode {
568 replayer_log(&self.datastore, "insert".to_owned(), event_se)?;
569 }
570 ids.push(event_id);
572 }
573
574 let batch = overlay.aggregate().unwrap();
576
577 if let Err(e) = self.dag.apply_batch(batch) {
580 panic!("Failed applying dag_insert batch to sled: {}", e);
581 }
582
583 for event in events {
586 let event_id = event.id();
587
588 debug!(
590 target: "event_graph::dag_insert()",
591 "Event {} parents {:#?}", event_id, event.parents,
592 );
593 for parent_id in event.parents.iter() {
594 if parent_id != &NULL_ID {
595 debug!(
596 target: "event_graph::dag_insert()",
597 "Removing {} from unreferenced_tips", parent_id,
598 );
599
600 for (layer, tips) in unreferenced_tips.iter_mut() {
606 if layer >= &event.layer {
607 continue
608 }
609 tips.remove(parent_id);
610 }
611 broadcasted_ids.insert(*parent_id);
612 }
613 }
614 unreferenced_tips.retain(|_, tips| !tips.is_empty());
615 debug!(
616 target: "event_graph::dag_insert()",
617 "Adding {} to unreferenced tips", event_id,
618 );
619
620 if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
621 layer_tips.insert(event_id);
622 } else {
623 let mut layer_tips = HashSet::new();
624 layer_tips.insert(event_id);
625 unreferenced_tips.insert(event.layer, layer_tips);
626 }
627
628 self.event_pub.notify(event.clone()).await;
630 }
631
632 drop(unreferenced_tips);
634 drop(broadcasted_ids);
635
636 Ok(ids)
637 }
638
639 pub async fn dag_get(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
641 let Some(bytes) = self.dag.get(event_id.as_bytes())? else { return Ok(None) };
642 let event: Event = deserialize_async(&bytes).await?;
643
644 Ok(Some(event))
645 }
646
647 async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
652 let unreferenced_tips = self.unreferenced_tips.read().await;
653
654 let mut parents = [NULL_ID; N_EVENT_PARENTS];
655 let mut index = 0;
656 'outer: for (_, tips) in unreferenced_tips.iter().rev() {
657 for tip in tips.iter() {
658 parents[index] = *tip;
659 index += 1;
660 if index >= N_EVENT_PARENTS {
661 break 'outer
662 }
663 }
664 }
665
666 let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
667
668 assert!(parents.iter().any(|x| x != &NULL_ID));
669 (next_layer, parents)
670 }
671
672 async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
674 let mut tips = HashSet::new();
676 for iter_elem in self.dag.iter() {
677 let (id, _) = iter_elem.unwrap();
678 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
679 tips.insert(id);
680 }
681
682 for iter_elem in self.dag.iter() {
684 let (_, event) = iter_elem.unwrap();
685 let event: Event = deserialize_async(&event).await.unwrap();
686 for parent in event.parents.iter() {
687 tips.remove(parent);
688 }
689 }
690
691 let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
693 for tip in tips {
694 let event = self.dag_get(&tip).await.unwrap().unwrap();
695 if let Some(layer_tips) = map.get_mut(&event.layer) {
696 layer_tips.insert(tip);
697 } else {
698 let mut layer_tips = HashSet::new();
699 layer_tips.insert(tip);
700 map.insert(event.layer, layer_tips);
701 }
702 }
703
704 map
705 }
706
707 async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
709 let (_, tips) = self.get_next_layer_with_parents().await;
710
711 let mut sorted: Vec<_> =
713 tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
714 sorted.sort_unstable();
715
716 let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
718 for (i, id) in sorted.iter().enumerate() {
719 let mut bytes = id.to_bytes_be();
720
721 while bytes.len() < blake3::OUT_LEN {
723 bytes.insert(0, 0);
724 }
725
726 tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
727 }
728
729 tips_sorted
730 }
731
732 pub async fn order_events(&self) -> Vec<Event> {
734 let mut ordered_events = VecDeque::new();
735 let mut visited = HashSet::new();
736
737 for tip in self.get_unreferenced_tips_sorted().await {
738 if !visited.contains(&tip) && tip != NULL_ID {
739 let tip = self.dag_get(&tip).await.unwrap().unwrap();
740 ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
741 }
742 }
743
744 let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
745 ord_events_vec
747 .sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.timestamp.cmp(&a.1.timestamp)));
748
749 ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
750 }
751
752 async fn dfs_topological_sort(
755 &self,
756 event: Event,
757 visited: &mut HashSet<blake3::Hash>,
758 ) -> VecDeque<(u64, Event)> {
759 let mut ordered_events = VecDeque::new();
760 let mut stack = VecDeque::new();
761 let event_id = event.id();
762 stack.push_back(event_id);
763
764 while let Some(event_id) = stack.pop_front() {
765 if !visited.contains(&event_id) && event_id != NULL_ID {
766 visited.insert(event_id);
767 if let Some(event) = self.dag_get(&event_id).await.unwrap() {
768 for parent in event.parents.iter() {
769 stack.push_back(*parent);
770 }
771
772 ordered_events.push_back((event.layer, event))
773 }
774 }
775 }
776
777 ordered_events
778 }
779
780 pub async fn deg_enable(&self) {
782 *self.deg_enabled.write().await = true;
783 warn!("[EVENTGRAPH] Graph debugging enabled!");
784 }
785
786 pub async fn deg_disable(&self) {
788 *self.deg_enabled.write().await = false;
789 warn!("[EVENTGRAPH] Graph debugging disabled!");
790 }
791
792 pub async fn deg_subscribe(&self) -> Subscription<DegEvent> {
794 self.deg_publisher.clone().subscribe().await
795 }
796
797 pub async fn deg_notify(&self, event: DegEvent) {
799 self.deg_publisher.notify(event).await;
800 }
801
802 pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
803 let mut graph = HashMap::new();
804 for iter_elem in self.dag.iter() {
805 let (id, val) = iter_elem.unwrap();
806 let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
807 let val: Event = deserialize_async(&val).await.unwrap();
808 graph.insert(id, val);
809 }
810
811 let json_graph = graph
812 .into_iter()
813 .map(|(k, v)| {
814 let key = k.to_string();
815 let value = JsonValue::from(v);
816 (key, value)
817 })
818 .collect();
819 let values = json_map([("dag", JsonValue::Object(json_graph))]);
820
821 let result = JsonValue::Object(HashMap::from([("eventgraph_info".to_string(), values)]));
822
823 JsonResponse::new(result, id).into()
824 }
825
826 pub async fn fetch_successors_of(
829 &self,
830 tips: BTreeMap<u64, HashSet<blake3::Hash>>,
831 ) -> Result<Vec<Event>> {
832 debug!(
833 target: "event_graph::fetch_successors_of()",
834 "fetching successors of {tips:?}"
835 );
836
837 let mut graph = HashMap::new();
838 for iter_elem in self.dag.iter() {
839 let (id, val) = iter_elem.unwrap();
840 let hash = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
841 let event: Event = deserialize_async(&val).await.unwrap();
842 graph.insert(hash, event);
843 }
844
845 let mut result = vec![];
846
847 'outer: for tip in tips.iter() {
848 for i in tip.1.iter() {
849 if !graph.contains_key(i) {
850 continue 'outer;
851 }
852 }
853
854 for (_, ev) in graph.iter() {
855 if ev.layer > *tip.0 && !result.contains(ev) {
856 result.push(ev.clone())
857 }
858 }
859 }
860
861 result.sort_by(|a, b| a.layer.cmp(&b.layer));
862
863 Ok(result)
864 }
865}