1use std::{collections::HashSet, path::PathBuf};
56
57use futures::{AsyncRead, AsyncSeek};
58use log::{debug, info, warn};
59use smol::{
60 fs::{self, File, OpenOptions},
61 io::{
62 self, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor,
63 SeekFrom,
64 },
65 stream::StreamExt,
66};
67
68use crate::{Error, Result};
69
70pub const MAX_CHUNK_SIZE: usize = 262_144;
72
73const FILES_PATH: &str = "files";
75
76pub fn hash_to_string(hash: &blake3::Hash) -> String {
77 bs58::encode(hash.as_bytes()).into_string()
78}
79
80#[derive(Clone)]
88pub struct ChunkedFile(Vec<(blake3::Hash, Option<bool>)>);
89
90impl ChunkedFile {
91 fn new(hashes: &[blake3::Hash]) -> Self {
92 Self(hashes.iter().map(|x| (*x, None)).collect())
93 }
94
95 pub fn is_complete(&self) -> bool {
97 !self.0.iter().any(|(_, p)| p.is_none())
98 }
99
100 pub fn iter(&self) -> core::slice::Iter<'_, (blake3::Hash, Option<bool>)> {
102 self.0.iter()
103 }
104
105 pub fn len(&self) -> usize {
107 self.0.len()
108 }
109
110 pub fn is_empty(&self) -> bool {
112 self.0.is_empty()
113 }
114
115 pub fn local_chunks(&self) -> usize {
117 self.0.iter().filter(|(_, p)| p.is_some()).count()
118 }
119}
120
121pub struct Geode {
123 files_path: PathBuf,
125}
126
127pub async fn read_until_filled(
131 mut stream: impl AsyncRead + Unpin,
132 buffer: &mut [u8],
133) -> io::Result<usize> {
134 let mut total_bytes_read = 0;
135
136 while total_bytes_read < buffer.len() {
137 let bytes_read = stream.read(&mut buffer[total_bytes_read..]).await?;
138 if bytes_read == 0 {
139 break; }
141 total_bytes_read += bytes_read;
142 }
143
144 Ok(total_bytes_read)
145}
146
147impl Geode {
148 pub async fn new(base_path: &PathBuf) -> Result<Self> {
152 let mut files_path: PathBuf = base_path.into();
153 files_path.push(FILES_PATH);
154
155 fs::create_dir_all(&files_path).await?;
157
158 Ok(Self { files_path })
159 }
160
161 async fn read_metadata(path: &PathBuf) -> Result<Vec<blake3::Hash>> {
164 debug!(target: "geode::read_metadata()", "Reading chunks from {:?}", path);
165 let fd = File::open(path).await?;
166 let mut read_chunks = vec![];
167 let mut lines = BufReader::new(fd).lines();
168 while let Some(line) = lines.next().await {
169 let line = line?;
170 let mut hash_buf = [0u8; 32];
171 bs58::decode(line).onto(&mut hash_buf)?;
172 let chunk_hash = blake3::Hash::from_bytes(hash_buf);
173 read_chunks.push(chunk_hash);
174 }
175
176 Ok(read_chunks)
177 }
178
179 pub async fn garbage_collect(&self) -> Result<HashSet<blake3::Hash>> {
182 info!(target: "geode::garbage_collect()", "[Geode] Performing garbage collection");
183 let mut deleted_files = HashSet::new();
185
186 let mut file_paths = fs::read_dir(&self.files_path).await?;
189 while let Some(file) = file_paths.next().await {
190 let Ok(entry) = file else { continue };
191 let path = entry.path();
192
193 if !path.is_file() {
195 continue
196 }
197
198 let file_name = match path.file_name().and_then(|n| n.to_str()) {
200 Some(v) => v,
201 None => continue,
202 };
203 let mut hash_buf = [0u8; 32];
204 let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
205 Ok(_) => blake3::Hash::from_bytes(hash_buf),
206 Err(_) => continue,
207 };
208
209 if Self::read_metadata(&path).await.is_err() {
213 if let Err(e) = fs::remove_file(path).await {
214 warn!(
215 target: "geode::garbage_collect()",
216 "[Geode] Garbage collect failed to remove corrupted file: {}", e,
217 );
218 }
219
220 deleted_files.insert(file_hash);
221 continue
222 }
223 }
224
225 info!(target: "geode::garbage_collect()", "[Geode] Garbage collection finished");
226 Ok(deleted_files)
227 }
228
229 pub async fn insert(
234 &self,
235 mut stream: impl AsyncRead + Unpin,
236 ) -> Result<(blake3::Hash, Vec<blake3::Hash>)> {
237 info!(target: "geode::insert()", "[Geode] Inserting file...");
238 let mut file_hasher = blake3::Hasher::new();
239 let mut chunk_hashes = vec![];
240
241 loop {
242 let mut buf = [0u8; MAX_CHUNK_SIZE];
243 let bytes_read = read_until_filled(&mut stream, &mut buf).await?;
244 if bytes_read == 0 {
245 break
246 }
247
248 let chunk_slice = &buf[..bytes_read];
249 let chunk_hash = blake3::hash(chunk_slice);
250 file_hasher.update(chunk_hash.as_bytes());
251 chunk_hashes.push(chunk_hash);
252 }
253
254 let file_hash = file_hasher.finalize();
256 let mut file_path = self.files_path.clone();
257 file_path.push(hash_to_string(&file_hash).as_str());
258
259 let mut file_fd = File::create(&file_path).await?;
261 for ch in &chunk_hashes {
262 file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
263 }
264
265 file_fd.flush().await?;
266
267 Ok((file_hash, chunk_hashes))
268 }
269
270 pub async fn insert_file(
274 &self,
275 file_hash: &blake3::Hash,
276 chunk_hashes: &[blake3::Hash],
277 ) -> Result<()> {
278 info!(target: "geode::insert_file()", "[Geode] Inserting file metadata");
279
280 if !self.verify_file(file_hash, chunk_hashes) {
281 return Err(Error::GeodeNeedsGc)
283 }
284
285 let mut file_path = self.files_path.clone();
286 file_path.push(hash_to_string(file_hash).as_str());
287 let mut file_fd = File::create(&file_path).await?;
288
289 for ch in chunk_hashes {
290 file_fd.write(format!("{}\n", hash_to_string(ch).as_str()).as_bytes()).await?;
291 }
292 file_fd.flush().await?;
293
294 Ok(())
295 }
296
297 pub async fn write_chunk(
301 &self,
302 file_hash: &blake3::Hash,
303 file_path: &PathBuf,
304 stream: impl AsRef<[u8]>,
305 ) -> Result<blake3::Hash> {
306 info!(target: "geode::write_chunk()", "[Geode] Writing single chunk");
307
308 let mut cursor = Cursor::new(&stream);
309 let mut chunk = [0u8; MAX_CHUNK_SIZE];
310
311 let bytes_read = read_until_filled(&mut cursor, &mut chunk).await?;
312 let chunk_slice = &chunk[..bytes_read];
313 let chunk_hash = blake3::hash(chunk_slice);
314
315 let chunked_file = self.get(file_hash, file_path).await?;
316
317 let chunk_index = match chunked_file.iter().position(|c| c.0 == chunk_hash) {
319 Some(index) => index,
320 None => {
321 return Err(Error::GeodeNeedsGc);
322 }
323 };
324
325 let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
326
327 if !file_path.exists() {
329 File::create(&file_path).await?;
330 }
331
332 let mut file_fd = OpenOptions::new().write(true).open(&file_path).await?;
333 file_fd.seek(SeekFrom::Start(position)).await?;
334 file_fd.write_all(chunk_slice).await?;
335 file_fd.flush().await?;
336
337 Ok(chunk_hash)
338 }
339
340 pub async fn get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<ChunkedFile> {
344 let file_hash_str = hash_to_string(file_hash);
345 info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash_str);
346 let mut file_metadata_path = self.files_path.clone();
347 file_metadata_path.push(file_hash_str);
348
349 let chunk_hashes = match Self::read_metadata(&file_metadata_path).await {
352 Ok(v) => v,
353 Err(e) => {
354 return match e {
355 Error::Io(std::io::ErrorKind::NotFound) => Err(Error::GeodeFileNotFound),
357 _ => Err(Error::GeodeNeedsGc),
359 }
360 }
361 };
362
363 if !self.verify_file(file_hash, &chunk_hashes) {
365 return Err(Error::GeodeNeedsGc);
366 }
367
368 let mut chunked_file = ChunkedFile::new(&chunk_hashes);
369
370 let mut file = match File::open(&file_path).await {
372 Ok(v) => v,
373 Err(_) => {
374 return Ok(chunked_file);
375 }
376 };
377
378 for (chunk_index, (chunk_hash, chunk_valid)) in chunked_file.0.iter_mut().enumerate() {
380 let chunk = self.read_chunk(&mut file, &chunk_index).await?;
381
382 if !self.verify_chunk(chunk_hash, &chunk) {
384 continue
385 }
386
387 *chunk_valid = Some(true);
388 }
389
390 Ok(chunked_file)
391 }
392
393 pub async fn get_chunk(
396 &self,
397 chunk_hash: &blake3::Hash,
398 file_hash: &blake3::Hash,
399 file_path: &PathBuf,
400 ) -> Result<Vec<u8>> {
401 info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
402
403 if !file_path.exists() || !file_path.is_file() {
404 return Err(Error::GeodeChunkNotFound)
405 }
406
407 let mut file_metadata_path = self.files_path.clone();
408 file_metadata_path.push(hash_to_string(file_hash));
409
410 let chunk_hashes = match Self::read_metadata(&file_metadata_path).await {
413 Ok(v) => v,
414 Err(e) => {
415 return match e {
416 Error::Io(std::io::ErrorKind::NotFound) => Err(Error::GeodeFileNotFound),
418 _ => Err(Error::GeodeNeedsGc),
420 }
421 }
422 };
423
424 let chunk_index = match chunk_hashes.iter().position(|&h| h == *chunk_hash) {
426 Some(index) => index,
427 None => return Err(Error::GeodeChunkNotFound),
428 };
429
430 let mut file = File::open(&file_path).await?;
432 let chunk = self.read_chunk(&mut file, &chunk_index).await?;
433
434 if !self.verify_chunk(chunk_hash, &chunk) {
436 return Err(Error::GeodeNeedsGc)
437 }
438
439 Ok(chunk)
440 }
441
442 pub async fn read_chunk(
445 &self,
446 mut stream: impl AsyncRead + Unpin + AsyncSeek,
447 chunk_index: &usize,
448 ) -> Result<Vec<u8>> {
449 let position = (*chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
450 let mut buf = [0u8; MAX_CHUNK_SIZE];
451 stream.seek(SeekFrom::Start(position)).await?;
452 let bytes_read = read_until_filled(stream, &mut buf).await?;
453 Ok(buf[..bytes_read].to_vec())
454 }
455
456 pub fn verify_file(&self, file_hash: &blake3::Hash, chunk_hashes: &[blake3::Hash]) -> bool {
458 info!(target: "geode::verify_file()", "[Geode] Verifying file metadata for {}", hash_to_string(file_hash));
459
460 let mut file_hasher = blake3::Hasher::new();
461 for chunk_hash in chunk_hashes {
462 file_hasher.update(chunk_hash.as_bytes());
463 }
464
465 *file_hash == file_hasher.finalize()
466 }
467
468 pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool {
470 blake3::hash(chunk_slice) == *chunk_hash
471 }
472}