use std::{collections::HashSet, fs, path::PathBuf};
use async_std::sync::Arc;
use async_trait::async_trait;
use darkfi_serial::serialize;
use log::{debug, error, info, warn};
use serde_json::{json, Value};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
use darkfi::{
async_daemonize, cli_desc,
dht::{waiting_for_response, Dht, DhtPtr},
net,
rpc::{
jsonrpc::{
ErrorCode::{InvalidParams, MethodNotFound},
JsonError, JsonRequest, JsonResponse, JsonResult,
},
server::{listen_and_serve, RequestHandler},
},
util::path::expand_path,
Result,
};
mod error;
use error::{server_error, RpcError};
const CONFIG_FILE: &str = "fud_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "fud", about = cli_desc!())]
struct Args {
#[structopt(short, long)]
config: Option<String>,
#[structopt(long, default_value = "~/.config/darkfi/fud")]
folder: String,
#[structopt(long, default_value = "tcp://127.0.0.1:13336")]
rpc_listen: Url,
#[structopt(long)]
p2p_accept: Vec<Url>,
#[structopt(long)]
p2p_external: Vec<Url>,
#[structopt(long, default_value = "8")]
slots: u32,
#[structopt(long)]
seeds: Vec<Url>,
#[structopt(long)]
peers: Vec<Url>,
#[structopt(long)]
transports: Vec<String>,
#[structopt(long)]
localnet: bool,
#[structopt(long)]
channel_log: bool,
#[structopt(short, parse(from_occurrences))]
verbose: u8,
}
pub struct Fud {
dht: DhtPtr,
folder: PathBuf,
}
impl Fud {
pub async fn new(dht: DhtPtr, folder: PathBuf) -> Result<Self> {
Ok(Self { dht, folder })
}
async fn init(&self) -> Result<()> {
info!("Initializing fud dht state for folder: {:?}", self.folder);
if !self.folder.exists() {
fs::create_dir_all(&self.folder)?;
}
let entries = fs::read_dir(&self.folder).unwrap();
{
let mut lock = self.dht.write().await;
if let Err(e) = lock.sync_lookup_map().await {
error!("Failed to sync lookup map: {}", e);
}
for entry in entries {
let e = entry.unwrap();
let name = String::from(e.file_name().to_str().unwrap());
info!("Entry: {}", name);
let key_hash = blake3::hash(&serialize(&name));
let value: Vec<u8> = std::fs::read(e.path()).unwrap();
if let Err(e) = lock.insert(key_hash, value).await {
error!("Failed to insert key: {}", e);
}
}
}
Ok(())
}
async fn disconnect(&self) -> Result<()> {
debug!("Peer disconnecting, signaling network");
{
let mut lock = self.dht.write().await;
let records = lock.map.clone();
for key in records.keys() {
let result = lock.remove(*key).await;
match result {
Ok(option) => match option {
Some(k) => {
debug!("Hash key removed: {}", k);
}
None => {
warn!("Did not find key: {}", key);
}
},
Err(e) => {
error!("Failed to remove key: {}", e);
}
}
}
}
Ok(())
}
pub async fn list(&self, id: Value, _params: &[Value]) -> JsonResult {
let mut content = HashSet::new();
let mut new = HashSet::new();
let mut deleted = HashSet::new();
let entries = fs::read_dir(&self.folder).unwrap();
let records = self.dht.read().await.map.clone();
let mut entries_hashes = HashSet::new();
for entry in entries {
let e = entry.unwrap();
let name = String::from(e.file_name().to_str().unwrap());
let key_hash = blake3::hash(&serialize(&name));
entries_hashes.insert(key_hash);
if records.contains_key(&key_hash) {
content.insert(name.clone());
} else {
new.insert(name);
}
}
for key in records.keys() {
if entries_hashes.contains(key) {
continue
}
deleted.insert(key.to_string());
}
JsonResponse::new(json!((content, new, deleted)), id).into()
}
pub async fn sync(&self, id: Value, _params: &[Value]) -> JsonResult {
info!("Sync process started");
let entries = fs::read_dir(&self.folder).unwrap();
{
let mut lock = self.dht.write().await;
let records = lock.map.clone();
let mut entries_hashes = HashSet::new();
for entry in entries {
let e = entry.unwrap();
let name = String::from(e.file_name().to_str().unwrap());
info!("Entry: {}", name);
let key_hash = blake3::hash(&serialize(&name));
entries_hashes.insert(key_hash);
if records.contains_key(&key_hash) {
continue
}
let value: Vec<u8> = std::fs::read(e.path()).unwrap();
if let Err(e) = lock.insert(key_hash, value).await {
error!("Failed to insert key: {}", e);
return server_error(RpcError::KeyInsertFail, id)
}
}
let records = lock.map.clone();
for key in records.keys() {
if entries_hashes.contains(key) {
continue
}
let result = lock.remove(*key).await;
match result {
Ok(option) => match option {
Some(k) => {
debug!("Hash key removed: {}", k);
}
None => {
warn!("Did not find key: {}", key);
}
},
Err(e) => {
error!("Failed to remove key: {}", e);
return server_error(RpcError::KeyRemoveFail, id)
}
}
}
}
JsonResponse::new(json!(true), id).into()
}
async fn get(&self, id: Value, params: &[Value]) -> JsonResult {
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(InvalidParams, None, id).into()
}
let key = params[0].as_str().unwrap().to_string();
let key_hash = blake3::hash(&serialize(&key));
let exists = self.dht.read().await.contains_key(key_hash);
if exists.is_none() {
info!("Did not find key: {}", key);
return server_error(RpcError::UnknownKey, id)
}
let path = self.folder.join(key.clone());
let local = exists.unwrap();
if local {
match self.dht.read().await.get(key_hash) {
Some(_) => return JsonResponse::new(json!(path), id).into(),
None => {
info!("Did not find key: {}", key);
return server_error(RpcError::UnknownKey, id)
}
}
}
info!("Key doesn't exist locally, querring network...");
if let Err(e) = self.dht.read().await.request_key(key_hash).await {
error!("Failed to query key: {}", e);
return server_error(RpcError::QueryFailed, id)
}
info!("Waiting response...");
match waiting_for_response(self.dht.clone()).await {
Ok(response) => {
match response {
Some(resp) => {
info!("Key found!");
if let Err(e) =
self.dht.write().await.insert(resp.key, resp.value.clone()).await
{
error!("Failed to insert key: {}", e);
return server_error(RpcError::KeyInsertFail, id)
}
if let Err(e) = std::fs::write(path.clone(), resp.value) {
error!("Failed to generate file for key: {}", e);
return server_error(RpcError::FileGenerationFail, id)
}
JsonResponse::new(json!(path), id).into()
}
None => {
info!("Did not find key: {}", key);
server_error(RpcError::UnknownKey, id)
}
}
}
Err(e) => {
error!("Error while waiting network response: {}", e);
server_error(RpcError::WaitingNetworkError, id)
}
}
}
async fn pong(&self, id: Value, _params: &[Value]) -> JsonResult {
JsonResponse::new(json!("pong"), id).into()
}
async fn get_info(&self, id: Value, _params: &[Value]) -> JsonResult {
let resp = self.dht.read().await.p2p.get_info().await;
JsonResponse::new(resp, id).into()
}
}
#[async_trait]
impl RequestHandler for Fud {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if !req.params.is_array() {
return JsonError::new(InvalidParams, None, req.id).into()
}
let params = req.params.as_array().unwrap();
match req.method.as_str() {
Some("list") => return self.list(req.id, params).await,
Some("sync") => return self.sync(req.id, params).await,
Some("get") => return self.get(req.id, params).await,
Some("ping") => return self.pong(req.id, params).await,
Some("get_info") => return self.get_info(req.id, params).await,
Some(_) | None => return JsonError::new(MethodNotFound, None, req.id).into(),
}
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'_>>) -> Result<()> {
let (signal, shutdown) = smol::channel::bounded::<()>(1);
ctrlc::set_handler(move || {
async_std::task::block_on(signal.send(())).unwrap();
})
.unwrap();
let network_settings = net::Settings {
inbound: args.p2p_accept,
outbound_connections: args.slots,
external_addr: args.p2p_external,
peers: args.peers.clone(),
seeds: args.seeds.clone(),
outbound_transports: net::settings::get_outbound_transports(args.transports),
localnet: args.localnet,
channel_log: args.channel_log,
..Default::default()
};
let p2p = net::P2p::new(network_settings).await;
let dht = Dht::new(None, p2p.clone(), shutdown.clone(), ex.clone()).await?;
let folder = expand_path(&args.folder)?;
let fud = Fud::new(dht.clone(), folder).await?;
let fud = Arc::new(fud);
info!("Starting JSON-RPC server");
let _ex = ex.clone();
ex.spawn(listen_and_serve(args.rpc_listen, fud.clone(), _ex)).detach();
info!("Starting sync P2P network");
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
let _p2p = p2p.clone();
ex.spawn(async move {
if let Err(e) = _p2p.run(_ex).await {
error!("Failed starting P2P network: {}", e);
}
})
.detach();
info!("Waiting for P2P outbound connections");
p2p.wait_for_outbound(ex).await?;
fud.init().await?;
shutdown.recv().await?;
print!("\r");
info!("Caught termination signal, cleaning up and exiting...");
fud.disconnect().await?;
Ok(())
}