darkfi/geode/
mod.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19//! Chunk-based file storage implementation.
20//! This is a building block for a DHT or something similar.
21//!
22//! The API supports file insertion and retrieval. There is intentionally no
23//! `remove` support. File removal should be handled externally, and then it
24//! is only required to run `garbage_collect()` to clean things up.
25//!
26//! The filesystem hierarchy stores a `files` directory storing metadata
27//! about a full file. The filename of a file in `files` is the BLAKE3
28//! hash of hashed chunks in the correct order. Inside the file is the list
29//! of the chunks making up the full file.
30//!
31//! To get the chunks you split the full file into `MAX_CHUNK_SIZE` sized
32//! slices, where the last chunk is the only one that can be smaller than
33//! that.
34//!
35//! It might look like the following:
36//! ```
37//! /files/B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX
38//! /files/8nA3ndjFFee3n5wMPLZampLpGaMJi3od4MSyaXPDoF91
39//! /files/...
40//! ```
41//!
42//! In the above example, contents of `B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX`
43//! may be:
44//! ```
45//! 2bQPxSR8Frz7S7JW3DRAzEtkrHfLXB1CN65V7az77pUp
46//! CvjvN6MfWQYK54DgKNR7MPgFSZqsCgpWKF2p8ot66CCP
47//! ```
48//!
49//! This means, the file `B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX`
50//! is the concatenation of the chunks with the above hashes.
51//!
52//! The full file is not copied, and individual chunks are not stored by
53//! geode. Additionally it does not keep track of the full files path.
54
55use std::{collections::HashSet, path::PathBuf};
56
57use futures::{AsyncRead, AsyncSeek};
58use log::{debug, info, warn};
59use smol::{
60    fs::{self, File, OpenOptions},
61    io::{
62        self, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor,
63        SeekFrom,
64    },
65    stream::StreamExt,
66};
67
68use crate::{Error, Result};
69
70/// Defined maximum size of a stored chunk (256 KiB)
71pub const MAX_CHUNK_SIZE: usize = 262_144;
72
73/// Path prefix where file metadata is stored
74const FILES_PATH: &str = "files";
75
76pub fn hash_to_string(hash: &blake3::Hash) -> String {
77    bs58::encode(hash.as_bytes()).into_string()
78}
79
80/// `ChunkedFile` is a representation of a file we're trying to
81/// retrieve from `Geode`.
82///
83/// The tuple contains `blake3::Hash` of
84/// the file's chunks and an optional `PathBuf` which points to
85/// the filesystem where the chunk can be found. If `None`, it
86/// is to be assumed that the chunk is not available locally.
87#[derive(Clone)]
88pub struct ChunkedFile(Vec<(blake3::Hash, Option<bool>)>);
89
90impl ChunkedFile {
91    fn new(hashes: &[blake3::Hash]) -> Self {
92        Self(hashes.iter().map(|x| (*x, None)).collect())
93    }
94
95    /// Check whether we have all the chunks available locally.
96    pub fn is_complete(&self) -> bool {
97        !self.0.iter().any(|(_, p)| p.is_none())
98    }
99
100    /// Return an iterator over the chunks and their paths.
101    pub fn iter(&self) -> core::slice::Iter<'_, (blake3::Hash, Option<bool>)> {
102        self.0.iter()
103    }
104
105    /// Return the number of chunks.
106    pub fn len(&self) -> usize {
107        self.0.len()
108    }
109
110    /// Return `true` if the chunked file contains no chunk.
111    pub fn is_empty(&self) -> bool {
112        self.0.is_empty()
113    }
114
115    /// Return the number of chunks available locally.
116    pub fn local_chunks(&self) -> usize {
117        self.0.iter().filter(|(_, p)| p.is_some()).count()
118    }
119}
120
121/// Chunk-based file storage interface.
122pub struct Geode {
123    /// Path to the filesystem directory where file metadata is stored
124    files_path: PathBuf,
125}
126
127/// smol::fs::File::read does not guarantee that the buffer will be filled, even if the buffer is
128/// smaller than the file. This is a workaround.
129/// This reads the stream until the buffer is full or until we reached the end of the stream.
130pub async fn read_until_filled(
131    mut stream: impl AsyncRead + Unpin,
132    buffer: &mut [u8],
133) -> io::Result<usize> {
134    let mut total_bytes_read = 0;
135
136    while total_bytes_read < buffer.len() {
137        let bytes_read = stream.read(&mut buffer[total_bytes_read..]).await?;
138        if bytes_read == 0 {
139            break; // EOF reached
140        }
141        total_bytes_read += bytes_read;
142    }
143
144    Ok(total_bytes_read)
145}
146
147impl Geode {
148    /// Instantiate a new [`Geode`] object.
149    /// `base_path` defines the root directory where Geode will store its
150    /// file metadata and chunks.
151    pub async fn new(base_path: &PathBuf) -> Result<Self> {
152        let mut files_path: PathBuf = base_path.into();
153        files_path.push(FILES_PATH);
154
155        // Create necessary directory structure if needed
156        fs::create_dir_all(&files_path).await?;
157
158        Ok(Self { files_path })
159    }
160
161    /// Attempt to read chunk hashes from a given file path and return
162    /// a `Vec` containing the hashes in order.
163    async fn read_metadata(path: &PathBuf) -> Result<Vec<blake3::Hash>> {
164        debug!(target: "geode::read_metadata()", "Reading chunks from {:?}", path);
165        let fd = File::open(path).await?;
166        let mut read_chunks = vec![];
167        let mut lines = BufReader::new(fd).lines();
168        while let Some(line) = lines.next().await {
169            let line = line?;
170            let mut hash_buf = [0u8; 32];
171            bs58::decode(line).onto(&mut hash_buf)?;
172            let chunk_hash = blake3::Hash::from_bytes(hash_buf);
173            read_chunks.push(chunk_hash);
174        }
175
176        Ok(read_chunks)
177    }
178
179    /// Perform garbage collection over the filesystem hierarchy.
180    /// Returns a set representing deleted files.
181    pub async fn garbage_collect(&self) -> Result<HashSet<blake3::Hash>> {
182        info!(target: "geode::garbage_collect()", "[Geode] Performing garbage collection");
183        // We track corrupt files here.
184        let mut deleted_files = HashSet::new();
185
186        // Perform health check over file metadata. For now we just ensure they
187        // have the correct format.
188        let mut file_paths = fs::read_dir(&self.files_path).await?;
189        while let Some(file) = file_paths.next().await {
190            let Ok(entry) = file else { continue };
191            let path = entry.path();
192
193            // Skip if we're not a plain file
194            if !path.is_file() {
195                continue
196            }
197
198            // Make sure that the filename is a BLAKE3 hash
199            let file_name = match path.file_name().and_then(|n| n.to_str()) {
200                Some(v) => v,
201                None => continue,
202            };
203            let mut hash_buf = [0u8; 32];
204            let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
205                Ok(_) => blake3::Hash::from_bytes(hash_buf),
206                Err(_) => continue,
207            };
208
209            // The filename is a BLAKE3 hash. It should contain a newline-separated
210            // list of chunks which represent the full file. If that is not the case
211            // we will consider it a corrupted file and delete it.
212            if Self::read_metadata(&path).await.is_err() {
213                if let Err(e) = fs::remove_file(path).await {
214                    warn!(
215                       target: "geode::garbage_collect()",
216                       "[Geode] Garbage collect failed to remove corrupted file: {}", e,
217                    );
218                }
219
220                deleted_files.insert(file_hash);
221                continue
222            }
223        }
224
225        info!(target: "geode::garbage_collect()", "[Geode] Garbage collection finished");
226        Ok(deleted_files)
227    }
228
229    /// Insert a file into Geode. The function expects any kind of byte stream, which
230    /// can either be another file on the filesystem, a buffer, etc.
231    /// Returns a tuple of `(blake3::Hash, Vec<blake3::Hash>)` which represents the
232    /// file hash, and the file's chunks, respectively.
233    pub async fn insert(
234        &self,
235        mut stream: impl AsyncRead + Unpin,
236    ) -> Result<(blake3::Hash, Vec<blake3::Hash>)> {
237        info!(target: "geode::insert()", "[Geode] Inserting file...");
238        let mut file_hasher = blake3::Hasher::new();
239        let mut chunk_hashes = vec![];
240
241        loop {
242            let mut buf = [0u8; MAX_CHUNK_SIZE];
243            let bytes_read = read_until_filled(&mut stream, &mut buf).await?;
244            if bytes_read == 0 {
245                break
246            }
247
248            let chunk_slice = &buf[..bytes_read];
249            let chunk_hash = blake3::hash(chunk_slice);
250            file_hasher.update(chunk_hash.as_bytes());
251            chunk_hashes.push(chunk_hash);
252        }
253
254        // This hash is the file's chunks hashes hashed in order.
255        let file_hash = file_hasher.finalize();
256        let mut file_path = self.files_path.clone();
257        file_path.push(hash_to_string(&file_hash).as_str());
258
259        // We always overwrite the metadata.
260        let mut file_fd = File::create(&file_path).await?;
261        for ch in &chunk_hashes {
262            file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
263        }
264
265        file_fd.flush().await?;
266
267        Ok((file_hash, chunk_hashes))
268    }
269
270    /// Create and insert file metadata into Geode given a list of hashes.
271    /// Always overwrites any existing file.
272    /// Verifies that the file hash matches the chunk hashes
273    pub async fn insert_file(
274        &self,
275        file_hash: &blake3::Hash,
276        chunk_hashes: &[blake3::Hash],
277    ) -> Result<()> {
278        info!(target: "geode::insert_file()", "[Geode] Inserting file metadata");
279
280        if !self.verify_file(file_hash, chunk_hashes) {
281            // The chunk list or file hash is wrong
282            return Err(Error::GeodeNeedsGc)
283        }
284
285        let mut file_path = self.files_path.clone();
286        file_path.push(hash_to_string(file_hash).as_str());
287        let mut file_fd = File::create(&file_path).await?;
288
289        for ch in chunk_hashes {
290            file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
291        }
292        file_fd.flush().await?;
293
294        Ok(())
295    }
296
297    /// Write a single chunk into `file_path` given a stream.
298    /// The file must be inserted into Geode before calling this method.
299    /// Always overwrites any existing chunk. Returns the chunk hash once inserted.
300    pub async fn write_chunk(
301        &self,
302        file_hash: &blake3::Hash,
303        file_path: &PathBuf,
304        stream: impl AsRef<[u8]>,
305    ) -> Result<blake3::Hash> {
306        info!(target: "geode::write_chunk()", "[Geode] Writing single chunk");
307
308        let mut cursor = Cursor::new(&stream);
309        let mut chunk = [0u8; MAX_CHUNK_SIZE];
310
311        let bytes_read = read_until_filled(&mut cursor, &mut chunk).await?;
312        let chunk_slice = &chunk[..bytes_read];
313        let chunk_hash = blake3::hash(chunk_slice);
314
315        let chunked_file = self.get(file_hash, file_path).await?;
316
317        // Get the chunk index in the file from the chunk hash
318        let chunk_index = match chunked_file.iter().position(|c| c.0 == chunk_hash) {
319            Some(index) => index,
320            None => {
321                return Err(Error::GeodeNeedsGc);
322            }
323        };
324
325        let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
326
327        // Create the file if it does not exist
328        if !file_path.exists() {
329            File::create(&file_path).await?;
330        }
331
332        let mut file_fd = OpenOptions::new().write(true).open(&file_path).await?;
333        file_fd.seek(SeekFrom::Start(position)).await?;
334        file_fd.write_all(chunk_slice).await?;
335        file_fd.flush().await?;
336
337        Ok(chunk_hash)
338    }
339
340    /// Fetch file metadata from Geode. Returns [`ChunkedFile`] which gives a list
341    /// of chunks and booleans to know if the chunks we have are valid. Returns an error if
342    /// the read failed in any way (could also be the file does not exist).
343    pub async fn get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<ChunkedFile> {
344        let file_hash_str = hash_to_string(file_hash);
345        info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash_str);
346        let mut file_metadata_path = self.files_path.clone();
347        file_metadata_path.push(file_hash_str);
348
349        // Try to read the file metadata. If it's corrupt, return an error signalling
350        // that garbage collection needs to run.
351        let chunk_hashes = match Self::read_metadata(&file_metadata_path).await {
352            Ok(v) => v,
353            Err(e) => {
354                return match e {
355                    // If the file is not found, return according error.
356                    Error::Io(std::io::ErrorKind::NotFound) => Err(Error::GeodeFileNotFound),
357                    // Anything else should tell the client to do garbage collection
358                    _ => Err(Error::GeodeNeedsGc),
359                }
360            }
361        };
362
363        // Make sure the chunk hashes match with the file hash
364        if !self.verify_file(file_hash, &chunk_hashes) {
365            return Err(Error::GeodeNeedsGc);
366        }
367
368        let mut chunked_file = ChunkedFile::new(&chunk_hashes);
369
370        // Open the file, if we can't we return the chunked file with no locally available chunk.
371        let mut file = match File::open(&file_path).await {
372            Ok(v) => v,
373            Err(_) => {
374                return Ok(chunked_file);
375            }
376        };
377
378        // Iterate over chunks and find which chunks we have available locally.
379        for (chunk_index, (chunk_hash, chunk_valid)) in chunked_file.0.iter_mut().enumerate() {
380            let chunk = self.read_chunk(&mut file, &chunk_index).await?;
381
382            // Perform chunk consistency check
383            if !self.verify_chunk(chunk_hash, &chunk) {
384                continue
385            }
386
387            *chunk_valid = Some(true);
388        }
389
390        Ok(chunked_file)
391    }
392
393    /// Fetch a single chunk from Geode. Returns a Vec containing the chunk content
394    /// if it is found.
395    pub async fn get_chunk(
396        &self,
397        chunk_hash: &blake3::Hash,
398        file_hash: &blake3::Hash,
399        file_path: &PathBuf,
400    ) -> Result<Vec<u8>> {
401        info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
402
403        if !file_path.exists() || !file_path.is_file() {
404            return Err(Error::GeodeChunkNotFound)
405        }
406
407        let mut file_metadata_path = self.files_path.clone();
408        file_metadata_path.push(hash_to_string(file_hash));
409
410        // Try to read the file metadata. If it's corrupt, return an error signalling
411        // that garbage collection needs to run.
412        let chunk_hashes = match Self::read_metadata(&file_metadata_path).await {
413            Ok(v) => v,
414            Err(e) => {
415                return match e {
416                    // If the file is not found, return according error.
417                    Error::Io(std::io::ErrorKind::NotFound) => Err(Error::GeodeFileNotFound),
418                    // Anything else should tell the client to do garbage collection
419                    _ => Err(Error::GeodeNeedsGc),
420                }
421            }
422        };
423
424        // Get the chunk index in the file from the chunk hash
425        let chunk_index = match chunk_hashes.iter().position(|&h| h == *chunk_hash) {
426            Some(index) => index,
427            None => return Err(Error::GeodeChunkNotFound),
428        };
429
430        // Read the file to get the chunk content
431        let mut file = File::open(&file_path).await?;
432        let chunk = self.read_chunk(&mut file, &chunk_index).await?;
433
434        // Perform chunk consistency check
435        if !self.verify_chunk(chunk_hash, &chunk) {
436            return Err(Error::GeodeNeedsGc)
437        }
438
439        Ok(chunk)
440    }
441
442    /// Read the file at `file_path` to get its chunk with index `chunk_index`.
443    /// Returns the chunk content in a Vec.
444    pub async fn read_chunk(
445        &self,
446        mut stream: impl AsyncRead + Unpin + AsyncSeek,
447        chunk_index: &usize,
448    ) -> Result<Vec<u8>> {
449        let position = (*chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
450        let mut buf = [0u8; MAX_CHUNK_SIZE];
451        stream.seek(SeekFrom::Start(position)).await?;
452        let bytes_read = read_until_filled(stream, &mut buf).await?;
453        Ok(buf[..bytes_read].to_vec())
454    }
455
456    /// Verifies that the file hash matches the chunk hashes.
457    pub fn verify_file(&self, file_hash: &blake3::Hash, chunk_hashes: &[blake3::Hash]) -> bool {
458        info!(target: "geode::verify_file()", "[Geode] Verifying file metadata for {}", hash_to_string(file_hash));
459
460        let mut file_hasher = blake3::Hasher::new();
461        for chunk_hash in chunk_hashes {
462            file_hasher.update(chunk_hash.as_bytes());
463        }
464
465        *file_hash == file_hasher.finalize()
466    }
467
468    /// Verifies that the chunk hash matches the content.
469    pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool {
470        blake3::hash(chunk_slice) == *chunk_hash
471    }
472}