1use clap::{Parser, Subcommand};
20use log::error;
21use simplelog::{ColorChoice, TermLogger, TerminalMode};
22use smol::lock::RwLock;
23use std::{
24 collections::HashMap,
25 io::{stdout, Write},
26 sync::Arc,
27};
28use termcolor::{StandardStream, WriteColor};
29use url::Url;
30
31use darkfi::{
32 cli_desc,
33 rpc::{
34 client::RpcClient,
35 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
36 util::JsonValue,
37 },
38 system::{ExecutorPtr, Publisher, StoppableTask},
39 util::cli::{get_log_config, get_log_level},
40 Error, Result,
41};
42
43use fud::{
44 resource::{Resource, ResourceStatus, ResourceType},
45 util::hash_to_string,
46};
47
48mod util;
49use crate::util::{
50 format_bytes, format_duration, format_progress_bytes, optional_value, print_tree,
51 status_to_colorspec, type_to_colorspec, TreeNode,
52};
53
54#[derive(Parser)]
55#[clap(name = "fu", about = cli_desc!(), version)]
56#[clap(arg_required_else_help(true))]
57struct Args {
58 #[clap(short, action = clap::ArgAction::Count)]
59 verbose: u8,
61
62 #[clap(short, long, default_value = "tcp://127.0.0.1:13336")]
63 endpoint: Url,
65
66 #[clap(subcommand)]
67 command: Subcmd,
68}
69
70#[derive(Subcommand)]
71enum Subcmd {
72 Get {
74 hash: String,
76 path: Option<String>,
78 #[arg(short, long, num_args = 1..)]
80 files: Option<Vec<String>>,
81 },
82
83 Put {
85 path: String,
87 },
88
89 Ls {},
91
92 Watch {},
94
95 Rm {
97 hash: String,
99 },
100
101 ListBuckets {},
103
104 ListSeeders {},
106
107 Verify {
109 files: Option<Vec<String>>,
111 },
112}
113
114struct Fu {
115 pub rpc_client: Arc<RpcClient>,
116 pub endpoint: Url,
117}
118
119impl Fu {
120 async fn get(
121 &self,
122 hash: String,
123 path: Option<String>,
124 files: Option<Vec<String>>,
125 ex: ExecutorPtr,
126 ) -> Result<()> {
127 let publisher = Publisher::new();
128 let subscription = Arc::new(publisher.clone().subscribe().await);
129 let subscriber_task = StoppableTask::new();
130 let hash_ = hash.clone();
131 let publisher_ = publisher.clone();
132 let rpc_client_ = self.rpc_client.clone();
133 subscriber_task.clone().start(
134 async move {
135 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
136 rpc_client_.subscribe(req, publisher).await
137 },
138 move |res| async move {
139 match res {
140 Ok(()) | Err(Error::DetachedTaskStopped) => { }
141 Err(e) => {
142 error!("{e}");
143 publisher_
144 .notify(JsonResult::Error(JsonError::new(
145 ErrorCode::InternalError,
146 None,
147 0,
148 )))
149 .await;
150 }
151 }
152 },
153 Error::DetachedTaskStopped,
154 ex.clone(),
155 );
156
157 let progress_bar_width = 20;
158 let mut started = false;
159 let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
160
161 let mut print_progress = |info: &HashMap<String, JsonValue>| {
162 started = true;
163 let rs: Resource = info.get("resource").unwrap().clone().into();
164
165 print!("\x1B[2K\r"); let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
169 1.0
170 } else if rs.target_bytes_size > 0 {
171 rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
172 } else {
173 0.0
174 };
175 let completed = (percent * progress_bar_width as f64) as usize;
176 let remaining = match progress_bar_width > completed {
177 true => progress_bar_width - completed,
178 false => 0,
179 };
180 let bar = "=".repeat(completed) + &" ".repeat(remaining);
181 print!("[{bar}] {:.1}% | ", percent * 100.0);
182
183 if rs.target_bytes_size > 0 {
185 if rs.target_bytes_downloaded == rs.target_bytes_size {
186 print!("{} | ", format_bytes(rs.target_bytes_size));
187 } else {
188 print!(
189 "{} | ",
190 format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
191 );
192 }
193 }
194
195 if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
197 print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
198 }
199
200 if rs.target_chunks_count > 0 {
202 let s = if rs.target_chunks_count > 1 { "s" } else { "" };
203 if rs.target_chunks_downloaded == rs.target_chunks_count {
204 print!("{} chunk{s} | ", rs.target_chunks_count);
205 } else {
206 print!(
207 "{}/{} chunk{s} | ",
208 rs.target_chunks_downloaded, rs.target_chunks_count
209 );
210 }
211 }
212
213 if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
215 print!("ETA: {} | ", format_duration(rs.get_eta()));
216 }
217
218 let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
220 rs.status.as_str() == "incomplete";
221 let status = if is_done { ResourceStatus::Seeding } else { rs.status };
222 tstdout.set_color(&status_to_colorspec(&status)).unwrap();
223 print!(
224 "{}",
225 if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
226 );
227 tstdout.reset().unwrap();
228 stdout().flush().unwrap();
229 };
230
231 let req = JsonRequest::new(
232 "get",
233 JsonValue::Array(vec![
234 JsonValue::String(hash_.clone()),
235 JsonValue::String(path.unwrap_or_default()),
236 match files {
237 Some(files) => {
238 JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
239 }
240 None => JsonValue::Null,
241 },
242 ]),
243 );
244 let rpc_client_getter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
246 let _ = rpc_client_getter.request(req).await?;
247
248 loop {
249 match subscription.receive().await {
250 JsonResult::Notification(n) => {
251 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
252 let info =
253 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
254 let hash = match info.get("hash") {
255 Some(hash_value) => hash_value.get::<String>().unwrap(),
256 None => continue,
257 };
258 if *hash != hash_ {
259 continue;
260 }
261 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
262 "download_started" |
263 "metadata_download_completed" |
264 "chunk_download_completed" |
265 "resource_updated" => {
266 print_progress(info);
267 }
268 "download_completed" => {
269 let resource_json = info
270 .get("resource")
271 .unwrap()
272 .get::<HashMap<String, JsonValue>>()
273 .unwrap();
274 let path = resource_json.get("path").unwrap().get::<String>().unwrap();
275 print_progress(info);
276 println!("\nDownload completed:\n{path}");
277 return Ok(());
278 }
279 "metadata_not_found" => {
280 println!();
281 return Err(Error::Custom(format!("Could not find {hash}")));
282 }
283 "chunk_not_found" => {
284 }
287 "missing_chunks" => {
288 println!();
290 return Err(Error::Custom("Missing chunks".to_string()));
291 }
292 "download_error" => {
293 if started {
295 println!();
296 }
297 return Err(Error::Custom(
298 info.get("error").unwrap().get::<String>().unwrap().to_string(),
299 ));
300 }
301 _ => {}
302 }
303 }
304
305 JsonResult::Error(e) => {
306 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
307 }
308
309 x => {
310 return Err(Error::UnexpectedJsonRpc(format!(
311 "Got unexpected data from JSON-RPC: {x:?}"
312 )))
313 }
314 }
315 }
316 }
317
318 async fn put(&self, path: String, ex: ExecutorPtr) -> Result<()> {
319 let publisher = Publisher::new();
320 let subscription = Arc::new(publisher.clone().subscribe().await);
321 let subscriber_task = StoppableTask::new();
322 let publisher_ = publisher.clone();
323 let rpc_client_ = self.rpc_client.clone();
324 subscriber_task.clone().start(
325 async move {
326 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
327 rpc_client_.subscribe(req, publisher).await
328 },
329 move |res| async move {
330 match res {
331 Ok(()) | Err(Error::DetachedTaskStopped) => { }
332 Err(e) => {
333 error!("{e}");
334 publisher_
335 .notify(JsonResult::Error(JsonError::new(
336 ErrorCode::InternalError,
337 None,
338 0,
339 )))
340 .await;
341 }
342 }
343 },
344 Error::DetachedTaskStopped,
345 ex.clone(),
346 );
347
348 let rpc_client_putter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
349 let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(path)]));
350 let rep = rpc_client_putter.request(req).await?;
351 let path_str = rep.get::<String>().unwrap().clone();
352
353 loop {
354 match subscription.receive().await {
355 JsonResult::Notification(n) => {
356 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
357 let info =
358 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
359 let path = match info.get("path") {
360 Some(path) => path.get::<String>().unwrap(),
361 None => continue,
362 };
363 if *path != path_str {
364 continue;
365 }
366
367 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
368 "insert_completed" => {
369 let id = info.get("hash").unwrap().get::<String>().unwrap().to_string();
370 println!("{id}");
371 break Ok(())
372 }
373 "insert_error" => {
374 return Err(Error::Custom(
375 info.get("error").unwrap().get::<String>().unwrap().to_string(),
376 ));
377 }
378 _ => {}
379 }
380 }
381
382 JsonResult::Error(e) => {
383 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
384 }
385
386 x => {
387 return Err(Error::UnexpectedJsonRpc(format!(
388 "Got unexpected data from JSON-RPC: {x:?}"
389 )))
390 }
391 }
392 }
393 }
394
395 async fn list_resources(&self) -> Result<()> {
396 let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
397 let rep = self.rpc_client.request(req).await?;
398
399 let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
400 let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
401
402 for resource in resources.iter() {
403 let tree: Vec<TreeNode<&str>> = vec![
404 TreeNode::kv("ID", hash_to_string(&resource.hash)),
405 TreeNode::kvc(
406 "Type",
407 resource.rtype.as_str().to_string(),
408 type_to_colorspec(&resource.rtype),
409 ),
410 TreeNode::kvc(
411 "Status",
412 resource.status.as_str().to_string(),
413 status_to_colorspec(&resource.status),
414 ),
415 TreeNode::kv("Chunks", {
416 if let ResourceType::Directory = resource.rtype {
417 format!(
418 "{}/{} ({}/{})",
419 resource.total_chunks_downloaded,
420 optional_value!(resource.total_chunks_count),
421 resource.target_chunks_downloaded,
422 optional_value!(resource.target_chunks_count)
423 )
424 } else {
425 format!(
426 "{}/{}",
427 resource.total_chunks_downloaded,
428 optional_value!(resource.total_chunks_count)
429 )
430 }
431 }),
432 TreeNode::kv("Bytes", {
433 if let ResourceType::Directory = resource.rtype {
434 format!(
435 "{} ({})",
436 optional_value!(resource.total_bytes_size, |x: u64| {
437 format_progress_bytes(resource.total_bytes_downloaded, x)
438 }),
439 optional_value!(resource.target_bytes_size, |x: u64| {
440 format_progress_bytes(resource.target_bytes_downloaded, x)
441 })
442 )
443 } else {
444 optional_value!(resource.total_bytes_size, |x: u64| format_progress_bytes(
445 resource.total_bytes_downloaded,
446 x
447 ))
448 }
449 }),
450 ];
451 print_tree(&resource.path.to_string_lossy(), &tree);
452 }
453
454 Ok(())
455 }
456
457 async fn list_buckets(&self) -> Result<()> {
458 let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
459 let rep = self.rpc_client.request(req).await?;
460 let buckets: Vec<JsonValue> = rep.try_into().unwrap();
461 let mut empty = true;
462 for (bucket_i, bucket) in buckets.into_iter().enumerate() {
463 let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
464 if nodes.is_empty() {
465 continue
466 }
467 empty = false;
468
469 let tree: Vec<TreeNode<String>> = nodes
470 .into_iter()
471 .map(|n| {
472 let node: Vec<JsonValue> = n.try_into().unwrap();
473 let node_id: JsonValue = node[0].clone();
474 let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
475
476 let addresses_vec: Vec<String> = addresses
477 .into_iter()
478 .map(|addr| TryInto::<String>::try_into(addr).unwrap())
479 .collect();
480
481 let node_id_string: String = node_id.try_into().unwrap();
482
483 TreeNode {
484 key: node_id_string,
485 value: None,
486 color: None,
487 children: addresses_vec
488 .into_iter()
489 .map(|addr| TreeNode::key(addr.clone()))
490 .collect(),
491 }
492 })
493 .collect();
494
495 print_tree(format!("Bucket {bucket_i}").as_str(), &tree);
496 }
497
498 if empty {
499 println!("All buckets are empty");
500 }
501
502 Ok(())
503 }
504
505 async fn list_seeders(&self) -> Result<()> {
506 let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
507 let rep = self.rpc_client.request(req).await?;
508
509 let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
510
511 if resources.is_empty() {
512 println!("No known seeders");
513 return Ok(())
514 }
515
516 for (hash, nodes) in resources {
517 let nodes: Vec<JsonValue> = nodes.try_into().unwrap();
518 let tree: Vec<TreeNode<String>> = nodes
519 .into_iter()
520 .map(|n| {
521 let node: Vec<JsonValue> = n.try_into().unwrap();
522 let node_id: JsonValue = node[0].clone();
523 let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
524
525 let addresses_vec: Vec<String> = addresses
526 .into_iter()
527 .map(|addr| TryInto::<String>::try_into(addr).unwrap())
528 .collect();
529
530 let node_id_string: String = node_id.try_into().unwrap();
531
532 TreeNode {
533 key: node_id_string,
534 value: None,
535 color: None,
536 children: addresses_vec
537 .into_iter()
538 .map(|addr| TreeNode::key(addr.clone()))
539 .collect(),
540 }
541 })
542 .collect();
543
544 print_tree(&hash, &tree);
545 }
546
547 Ok(())
548 }
549
550 async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
551 let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
552 let rep = self.rpc_client.request(req).await?;
553
554 let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
555 let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
556
557 let publisher = Publisher::new();
558 let subscription = Arc::new(publisher.clone().subscribe().await);
559 let subscriber_task = StoppableTask::new();
560 let publisher_ = publisher.clone();
561 let rpc_client_ = self.rpc_client.clone();
562 subscriber_task.clone().start(
563 async move {
564 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
565 rpc_client_.subscribe(req, publisher).await
566 },
567 move |res| async move {
568 match res {
569 Ok(()) | Err(Error::DetachedTaskStopped) => { }
570 Err(e) => {
571 error!("{e}");
572 publisher_
573 .notify(JsonResult::Error(JsonError::new(
574 ErrorCode::InternalError,
575 None,
576 0,
577 )))
578 .await;
579 }
580 }
581 },
582 Error::DetachedTaskStopped,
583 ex,
584 );
585
586 let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
587
588 let mut update_resource = async |resource: &Resource| {
589 let mut resources_write = resources.write().await;
590 let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
591 Some(i) => {
592 resources_write.remove(i);
593 resources_write.insert(i, resource.clone());
594 i
595 }
596 None => {
597 resources_write.push(resource.clone());
598 resources_write.len() - 1
599 }
600 };
601
602 print!("\x1b[{};1H\x1B[2K", i + 2);
604
605 print!("\r{:>44} ", hash_to_string(&resource.hash));
607
608 tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
610 print!(
611 "{:>4} ",
612 match resource.rtype.as_str() {
613 "unknown" => "?",
614 "directory" => "dir",
615 _ => resource.rtype.as_str(),
616 }
617 );
618 tstdout.reset().unwrap();
619
620 tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
622 print!("{:>11} ", resource.status.as_str());
623 tstdout.reset().unwrap();
624
625 match resource.total_bytes_size {
627 0 => {
628 print!("{:>5.1} {:>16} ", 0.0, "?");
629 }
630 _ => {
631 let percent = resource.total_bytes_downloaded as f64 /
632 resource.total_bytes_size as f64 *
633 100.0;
634 if resource.total_bytes_downloaded == resource.total_bytes_size {
635 print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
636 } else {
637 print!(
638 "{:>5.1} {:>16} ",
639 percent,
640 format_progress_bytes(
641 resource.total_bytes_downloaded,
642 resource.total_bytes_size
643 )
644 );
645 }
646 }
647 };
648
649 match resource.total_chunks_count {
651 0 => {
652 print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
653 }
654 _ => {
655 if resource.total_chunks_downloaded == resource.total_chunks_count {
656 print!("{:>9} ", resource.total_chunks_count.to_string());
657 } else {
658 print!(
659 "{:>9} ",
660 format!(
661 "{}/{}",
662 resource.total_chunks_downloaded, resource.total_chunks_count
663 )
664 );
665 }
666 }
667 };
668
669 let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
671 resource.status.as_str() == "downloading" &&
672 !resource.speeds.is_empty();
673 print!(
674 "{:>12} ",
675 match speed_available {
676 false => "-".to_string(),
677 true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
678 }
679 );
680
681 let eta = resource.get_eta();
683 print!(
684 "{:>6}",
685 match eta {
686 0 => "-".to_string(),
687 _ => format_duration(eta),
688 }
689 );
690
691 println!();
692
693 print!("\x1b[{};1H", resources_write.len() + 2);
695 stdout().flush().unwrap();
696 };
697
698 let print_begin = async || {
699 print!("\x1B[2J\x1B[1;1H");
701
702 println!(
704 "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
705 "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
706 );
707 };
708
709 print_begin().await;
710 if resources_json.is_empty() {
711 println!("No known resources");
712 } else {
713 for resource in resources_json.iter() {
714 let rs: Resource = resource.clone().into();
715 update_resource(&rs).await;
716 }
717 }
718
719 loop {
720 match subscription.receive().await {
721 JsonResult::Notification(n) => {
722 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
723 let info =
724 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
725 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
726 "download_started" |
727 "metadata_download_completed" |
728 "chunk_download_completed" |
729 "download_completed" |
730 "missing_chunks" |
731 "metadata_not_found" |
732 "resource_updated" => {
733 let resource: Resource = info.get("resource").unwrap().clone().into();
734 update_resource(&resource).await;
735 }
736 "resource_removed" => {
737 {
738 let hash = info.get("hash").unwrap().get::<String>().unwrap();
739 let mut resources_write = resources.write().await;
740 let i = resources_write
741 .iter()
742 .position(|r| hash_to_string(&r.hash) == *hash);
743 if let Some(i) = i {
744 resources_write.remove(i);
745 }
746 }
747
748 let r = resources.read().await.clone();
749 print_begin().await;
750 for resource in r.iter() {
751 update_resource(resource).await;
752 }
753 }
754 "download_error" => {
755 }
757 _ => {}
758 }
759 }
760
761 JsonResult::Error(e) => {
762 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
763 }
764
765 x => {
766 return Err(Error::UnexpectedJsonRpc(format!(
767 "Got unexpected data from JSON-RPC: {x:?}"
768 )))
769 }
770 }
771 }
772 }
773
774 async fn remove(&self, hash: String) -> Result<()> {
775 let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
776 self.rpc_client.request(req).await?;
777 Ok(())
778 }
779
780 async fn verify(&self, files: Option<Vec<String>>) -> Result<()> {
781 let files = files.unwrap_or_default().into_iter().map(JsonValue::String).collect();
782 let req = JsonRequest::new("verify", JsonValue::Array(files));
783 self.rpc_client.request(req).await?;
784 Ok(())
785 }
786}
787
788fn main() -> Result<()> {
789 let args = Args::parse();
790
791 let log_level = get_log_level(args.verbose);
792 let log_config = get_log_config(args.verbose);
793 TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
794
795 let ex = Arc::new(smol::Executor::new());
796 smol::block_on(async {
797 ex.run(async {
798 let rpc_client = Arc::new(RpcClient::new(args.endpoint.clone(), ex.clone()).await?);
799 let fu = Fu { rpc_client, endpoint: args.endpoint.clone() };
800
801 match args.command {
802 Subcmd::Get { hash, path, files } => fu.get(hash, path, files, ex.clone()).await,
803 Subcmd::Put { path } => fu.put(path, ex.clone()).await,
804 Subcmd::Ls {} => fu.list_resources().await,
805 Subcmd::Watch {} => fu.watch(ex.clone()).await,
806 Subcmd::Rm { hash } => fu.remove(hash).await,
807 Subcmd::ListBuckets {} => fu.list_buckets().await,
808 Subcmd::ListSeeders {} => fu.list_seeders().await,
809 Subcmd::Verify { files } => fu.verify(files).await,
810 }?;
811
812 Ok(())
813 })
814 .await
815 })
816}