1use clap::{Parser, Subcommand};
20use smol::lock::RwLock;
21use std::{
22 collections::HashMap,
23 io::{stdout, Write},
24 sync::Arc,
25};
26use termcolor::{ColorChoice, StandardStream, WriteColor};
27use tracing::error;
28use url::Url;
29
30use darkfi::{
31 cli_desc,
32 rpc::{
33 client::RpcClient,
34 jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
35 util::JsonValue,
36 },
37 system::{ExecutorPtr, Publisher, StoppableTask},
38 util::logger::setup_logging,
39 Error, Result,
40};
41
42use fud::{
43 resource::{Resource, ResourceStatus, ResourceType},
44 util::hash_to_string,
45};
46
47mod util;
48use crate::util::{
49 format_bytes, format_duration, format_progress_bytes, optional_value, print_tree,
50 status_to_colorspec, type_to_colorspec, TreeNode,
51};
52
53#[derive(Parser)]
54#[clap(name = "fu", about = cli_desc!(), version)]
55#[clap(arg_required_else_help(true))]
56struct Args {
57 #[clap(short, action = clap::ArgAction::Count)]
58 verbose: u8,
60
61 #[clap(short, long, default_value = "tcp://127.0.0.1:13336")]
62 endpoint: Url,
64
65 #[clap(subcommand)]
66 command: Subcmd,
67}
68
69#[derive(Subcommand)]
70enum Subcmd {
71 Get {
73 hash: String,
75 path: Option<String>,
77 #[arg(short, long, num_args = 1..)]
79 files: Option<Vec<String>>,
80 },
81
82 Put {
84 path: String,
86 },
87
88 Ls {},
90
91 Watch {},
93
94 Rm {
96 hash: String,
98 },
99
100 Buckets {},
102
103 Seeders {},
105
106 Verify {
108 files: Option<Vec<String>>,
110 },
111
112 Lookup {
114 hash: String,
116 },
117}
118
119struct Fu {
120 pub rpc_client: Arc<RpcClient>,
121 pub endpoint: Url,
122}
123
124impl Fu {
125 async fn get(
126 &self,
127 hash: String,
128 path: Option<String>,
129 files: Option<Vec<String>>,
130 ex: ExecutorPtr,
131 ) -> Result<()> {
132 let publisher = Publisher::new();
133 let subscription = Arc::new(publisher.clone().subscribe().await);
134 let subscriber_task = StoppableTask::new();
135 let hash_ = hash.clone();
136 let publisher_ = publisher.clone();
137 let rpc_client_ = self.rpc_client.clone();
138 subscriber_task.clone().start(
139 async move {
140 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
141 rpc_client_.subscribe(req, publisher).await
142 },
143 move |res| async move {
144 match res {
145 Ok(()) | Err(Error::DetachedTaskStopped) => { }
146 Err(e) => {
147 error!("{e}");
148 publisher_
149 .notify(JsonResult::Error(JsonError::new(
150 ErrorCode::InternalError,
151 None,
152 0,
153 )))
154 .await;
155 }
156 }
157 },
158 Error::DetachedTaskStopped,
159 ex.clone(),
160 );
161
162 let progress_bar_width = 20;
163 let mut started = false;
164 let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
165
166 let mut print_progress = |info: &HashMap<String, JsonValue>| {
167 started = true;
168 let rs: Resource = info.get("resource").unwrap().clone().into();
169
170 print!("\x1B[2K\r"); let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
174 1.0
175 } else if rs.target_bytes_size > 0 {
176 rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
177 } else {
178 0.0
179 };
180 let completed = (percent * progress_bar_width as f64) as usize;
181 let remaining = match progress_bar_width > completed {
182 true => progress_bar_width - completed,
183 false => 0,
184 };
185 let bar = "=".repeat(completed) + &" ".repeat(remaining);
186 print!("[{bar}] {:.1}% | ", percent * 100.0);
187
188 if rs.target_bytes_size > 0 {
190 if rs.target_bytes_downloaded == rs.target_bytes_size {
191 print!("{} | ", format_bytes(rs.target_bytes_size));
192 } else {
193 print!(
194 "{} | ",
195 format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
196 );
197 }
198 }
199
200 if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
202 print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
203 }
204
205 if rs.target_chunks_count > 0 {
207 let s = if rs.target_chunks_count > 1 { "s" } else { "" };
208 if rs.target_chunks_downloaded == rs.target_chunks_count {
209 print!("{} chunk{s} | ", rs.target_chunks_count);
210 } else {
211 print!(
212 "{}/{} chunk{s} | ",
213 rs.target_chunks_downloaded, rs.target_chunks_count
214 );
215 }
216 }
217
218 if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
220 print!("ETA: {} | ", format_duration(rs.get_eta()));
221 }
222
223 let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
225 rs.status.as_str() == "incomplete";
226 let status = if is_done { ResourceStatus::Seeding } else { rs.status };
227 tstdout.set_color(&status_to_colorspec(&status)).unwrap();
228 print!(
229 "{}",
230 if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
231 );
232 tstdout.reset().unwrap();
233 stdout().flush().unwrap();
234 };
235
236 let req = JsonRequest::new(
237 "get",
238 JsonValue::Array(vec![
239 JsonValue::String(hash_.clone()),
240 JsonValue::String(path.unwrap_or_default()),
241 match files {
242 Some(files) => {
243 JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
244 }
245 None => JsonValue::Null,
246 },
247 ]),
248 );
249 let rpc_client_getter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
251 let _ = rpc_client_getter.request(req).await?;
252
253 loop {
254 match subscription.receive().await {
255 JsonResult::Notification(n) => {
256 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
257 let info = params.get("info");
258 if info.is_none() {
259 continue
260 }
261 let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
262
263 let hash = match info.get("hash") {
264 Some(hash_value) => hash_value.get::<String>().unwrap(),
265 None => continue,
266 };
267 if *hash != hash_ {
268 continue;
269 }
270 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
271 "download_started" |
272 "metadata_download_completed" |
273 "chunk_download_completed" |
274 "resource_updated" => {
275 print_progress(info);
276 }
277 "download_completed" => {
278 let resource_json = info
279 .get("resource")
280 .unwrap()
281 .get::<HashMap<String, JsonValue>>()
282 .unwrap();
283 let path = resource_json.get("path").unwrap().get::<String>().unwrap();
284 print_progress(info);
285 println!("\nDownload completed:\n{path}");
286 return Ok(());
287 }
288 "metadata_not_found" => {
289 println!();
290 return Err(Error::Custom(format!("Could not find {hash}")));
291 }
292 "chunk_not_found" => {
293 }
296 "missing_chunks" => {
297 println!();
299 return Err(Error::Custom("Missing chunks".to_string()));
300 }
301 "download_error" => {
302 if started {
304 println!();
305 }
306 return Err(Error::Custom(
307 info.get("error").unwrap().get::<String>().unwrap().to_string(),
308 ));
309 }
310 _ => {}
311 }
312 }
313
314 JsonResult::Error(e) => {
315 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
316 }
317
318 x => {
319 return Err(Error::UnexpectedJsonRpc(format!(
320 "Got unexpected data from JSON-RPC: {x:?}"
321 )))
322 }
323 }
324 }
325 }
326
327 async fn put(&self, path: String, ex: ExecutorPtr) -> Result<()> {
328 let publisher = Publisher::new();
329 let subscription = Arc::new(publisher.clone().subscribe().await);
330 let subscriber_task = StoppableTask::new();
331 let publisher_ = publisher.clone();
332 let rpc_client_ = self.rpc_client.clone();
333 subscriber_task.clone().start(
334 async move {
335 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
336 rpc_client_.subscribe(req, publisher).await
337 },
338 move |res| async move {
339 match res {
340 Ok(()) | Err(Error::DetachedTaskStopped) => { }
341 Err(e) => {
342 error!("{e}");
343 publisher_
344 .notify(JsonResult::Error(JsonError::new(
345 ErrorCode::InternalError,
346 None,
347 0,
348 )))
349 .await;
350 }
351 }
352 },
353 Error::DetachedTaskStopped,
354 ex.clone(),
355 );
356
357 let rpc_client_putter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
358 let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(path)]));
359 let rep = rpc_client_putter.request(req).await?;
360 let path_str = rep.get::<String>().unwrap().clone();
361
362 loop {
363 match subscription.receive().await {
364 JsonResult::Notification(n) => {
365 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
366 let info =
367 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
368 let path = match info.get("path") {
369 Some(path) => path.get::<String>().unwrap(),
370 None => continue,
371 };
372 if *path != path_str {
373 continue;
374 }
375
376 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
377 "insert_completed" => {
378 let id = info.get("hash").unwrap().get::<String>().unwrap().to_string();
379 println!("{id}");
380 break Ok(())
381 }
382 "insert_error" => {
383 return Err(Error::Custom(
384 info.get("error").unwrap().get::<String>().unwrap().to_string(),
385 ));
386 }
387 _ => {}
388 }
389 }
390
391 JsonResult::Error(e) => {
392 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
393 }
394
395 x => {
396 return Err(Error::UnexpectedJsonRpc(format!(
397 "Got unexpected data from JSON-RPC: {x:?}"
398 )))
399 }
400 }
401 }
402 }
403
404 async fn list_resources(&self) -> Result<()> {
405 let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
406 let rep = self.rpc_client.request(req).await?;
407
408 let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
409 let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
410
411 for resource in resources.iter() {
412 let tree: Vec<TreeNode<&str>> = vec![
413 TreeNode::kv("ID", hash_to_string(&resource.hash)),
414 TreeNode::kvc(
415 "Type",
416 resource.rtype.as_str().to_string(),
417 type_to_colorspec(&resource.rtype),
418 ),
419 TreeNode::kvc(
420 "Status",
421 resource.status.as_str().to_string(),
422 status_to_colorspec(&resource.status),
423 ),
424 TreeNode::kv("Chunks", {
425 if let ResourceType::Directory = resource.rtype {
426 format!(
427 "{}/{} ({}/{})",
428 resource.total_chunks_downloaded,
429 optional_value!(resource.total_chunks_count),
430 resource.target_chunks_downloaded,
431 optional_value!(resource.target_chunks_count)
432 )
433 } else {
434 format!(
435 "{}/{}",
436 resource.total_chunks_downloaded,
437 optional_value!(resource.total_chunks_count)
438 )
439 }
440 }),
441 TreeNode::kv("Bytes", {
442 if let ResourceType::Directory = resource.rtype {
443 format!(
444 "{} ({})",
445 optional_value!(resource.total_bytes_size, |x: u64| {
446 format_progress_bytes(resource.total_bytes_downloaded, x)
447 }),
448 optional_value!(resource.target_bytes_size, |x: u64| {
449 format_progress_bytes(resource.target_bytes_downloaded, x)
450 })
451 )
452 } else {
453 optional_value!(resource.total_bytes_size, |x: u64| format_progress_bytes(
454 resource.total_bytes_downloaded,
455 x
456 ))
457 }
458 }),
459 ];
460 print_tree(&resource.path.to_string_lossy(), &tree);
461 }
462
463 Ok(())
464 }
465
466 async fn buckets(&self) -> Result<()> {
467 let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
468 let rep = self.rpc_client.request(req).await?;
469 let buckets: Vec<JsonValue> = rep.try_into().unwrap();
470 let mut empty = true;
471 for (bucket_i, bucket) in buckets.into_iter().enumerate() {
472 let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
473 if nodes.is_empty() {
474 continue
475 }
476 empty = false;
477
478 let tree: Vec<TreeNode<String>> = nodes
479 .into_iter()
480 .map(|n| {
481 let node: Vec<JsonValue> = n.try_into().unwrap();
482 let node_id: JsonValue = node[0].clone();
483 let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
484
485 let addresses_vec: Vec<String> = addresses
486 .into_iter()
487 .map(|addr| TryInto::<String>::try_into(addr).unwrap())
488 .collect();
489
490 let node_id_string: String = node_id.try_into().unwrap();
491
492 TreeNode {
493 key: node_id_string,
494 value: None,
495 color: None,
496 children: addresses_vec
497 .into_iter()
498 .map(|addr| TreeNode::key(addr.clone()))
499 .collect(),
500 }
501 })
502 .collect();
503
504 print_tree(format!("Bucket {bucket_i}").as_str(), &tree);
505 }
506
507 if empty {
508 println!("All buckets are empty");
509 }
510
511 Ok(())
512 }
513
514 async fn seeders(&self) -> Result<()> {
515 let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
516 let rep = self.rpc_client.request(req).await?;
517
518 let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
519
520 if resources.is_empty() {
521 println!("No known seeders");
522 return Ok(())
523 }
524
525 for (hash, nodes) in resources {
526 let nodes: Vec<JsonValue> = nodes.try_into().unwrap();
527 let tree: Vec<TreeNode<String>> = nodes
528 .into_iter()
529 .map(|n| {
530 let node: Vec<JsonValue> = n.try_into().unwrap();
531 let node_id: JsonValue = node[0].clone();
532 let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
533
534 let addresses_vec: Vec<String> = addresses
535 .into_iter()
536 .map(|addr| TryInto::<String>::try_into(addr).unwrap())
537 .collect();
538
539 let node_id_string: String = node_id.try_into().unwrap();
540
541 TreeNode {
542 key: node_id_string,
543 value: None,
544 color: None,
545 children: addresses_vec
546 .into_iter()
547 .map(|addr| TreeNode::key(addr.clone()))
548 .collect(),
549 }
550 })
551 .collect();
552
553 print_tree(&hash, &tree);
554 }
555
556 Ok(())
557 }
558
559 async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
560 let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
561 let rep = self.rpc_client.request(req).await?;
562
563 let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
564 let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
565
566 let publisher = Publisher::new();
567 let subscription = Arc::new(publisher.clone().subscribe().await);
568 let subscriber_task = StoppableTask::new();
569 let publisher_ = publisher.clone();
570 let rpc_client_ = self.rpc_client.clone();
571 subscriber_task.clone().start(
572 async move {
573 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
574 rpc_client_.subscribe(req, publisher).await
575 },
576 move |res| async move {
577 match res {
578 Ok(()) | Err(Error::DetachedTaskStopped) => { }
579 Err(e) => {
580 error!("{e}");
581 publisher_
582 .notify(JsonResult::Error(JsonError::new(
583 ErrorCode::InternalError,
584 None,
585 0,
586 )))
587 .await;
588 }
589 }
590 },
591 Error::DetachedTaskStopped,
592 ex,
593 );
594
595 let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
596
597 let mut update_resource = async |resource: &Resource| {
598 let mut resources_write = resources.write().await;
599 let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
600 Some(i) => {
601 resources_write.remove(i);
602 resources_write.insert(i, resource.clone());
603 i
604 }
605 None => {
606 resources_write.push(resource.clone());
607 resources_write.len() - 1
608 }
609 };
610
611 print!("\x1b[{};1H\x1B[2K", i + 2);
613
614 print!("\r{:>44} ", hash_to_string(&resource.hash));
616
617 tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
619 print!(
620 "{:>4} ",
621 match resource.rtype.as_str() {
622 "unknown" => "?",
623 "directory" => "dir",
624 _ => resource.rtype.as_str(),
625 }
626 );
627 tstdout.reset().unwrap();
628
629 tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
631 print!("{:>11} ", resource.status.as_str());
632 tstdout.reset().unwrap();
633
634 match resource.total_bytes_size {
636 0 => {
637 print!("{:>5.1} {:>16} ", 0.0, "?");
638 }
639 _ => {
640 let percent = resource.total_bytes_downloaded as f64 /
641 resource.total_bytes_size as f64 *
642 100.0;
643 if resource.total_bytes_downloaded == resource.total_bytes_size {
644 print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
645 } else {
646 print!(
647 "{:>5.1} {:>16} ",
648 percent,
649 format_progress_bytes(
650 resource.total_bytes_downloaded,
651 resource.total_bytes_size
652 )
653 );
654 }
655 }
656 };
657
658 match resource.total_chunks_count {
660 0 => {
661 print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
662 }
663 _ => {
664 if resource.total_chunks_downloaded == resource.total_chunks_count {
665 print!("{:>9} ", resource.total_chunks_count.to_string());
666 } else {
667 print!(
668 "{:>9} ",
669 format!(
670 "{}/{}",
671 resource.total_chunks_downloaded, resource.total_chunks_count
672 )
673 );
674 }
675 }
676 };
677
678 let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
680 resource.status.as_str() == "downloading" &&
681 !resource.speeds.is_empty();
682 print!(
683 "{:>12} ",
684 match speed_available {
685 false => "-".to_string(),
686 true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
687 }
688 );
689
690 let eta = resource.get_eta();
692 print!(
693 "{:>6}",
694 match eta {
695 0 => "-".to_string(),
696 _ => format_duration(eta),
697 }
698 );
699
700 println!();
701
702 print!("\x1b[{};1H", resources_write.len() + 2);
704 stdout().flush().unwrap();
705 };
706
707 let print_begin = async || {
708 print!("\x1B[2J\x1B[1;1H");
710
711 println!(
713 "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
714 "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
715 );
716 };
717
718 print_begin().await;
719 if resources_json.is_empty() {
720 println!("No known resources");
721 } else {
722 for resource in resources_json.iter() {
723 let rs: Resource = resource.clone().into();
724 update_resource(&rs).await;
725 }
726 }
727
728 loop {
729 match subscription.receive().await {
730 JsonResult::Notification(n) => {
731 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
732 let info = params.get("info");
733 if info.is_none() {
734 continue
735 }
736 let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
737 match params.get("event").unwrap().get::<String>().unwrap().as_str() {
738 "download_started" |
739 "metadata_download_completed" |
740 "chunk_download_completed" |
741 "download_completed" |
742 "missing_chunks" |
743 "metadata_not_found" |
744 "resource_updated" => {
745 let resource: Resource = info.get("resource").unwrap().clone().into();
746 update_resource(&resource).await;
747 }
748 "resource_removed" => {
749 {
750 let hash = info.get("hash").unwrap().get::<String>().unwrap();
751 let mut resources_write = resources.write().await;
752 let i = resources_write
753 .iter()
754 .position(|r| hash_to_string(&r.hash) == *hash);
755 if let Some(i) = i {
756 resources_write.remove(i);
757 }
758 }
759
760 let r = resources.read().await.clone();
761 print_begin().await;
762 for resource in r.iter() {
763 update_resource(resource).await;
764 }
765 }
766 "download_error" => {
767 }
769 _ => {}
770 }
771 }
772
773 JsonResult::Error(e) => {
774 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
775 }
776
777 x => {
778 return Err(Error::UnexpectedJsonRpc(format!(
779 "Got unexpected data from JSON-RPC: {x:?}"
780 )))
781 }
782 }
783 }
784 }
785
786 async fn remove(&self, hash: String) -> Result<()> {
787 let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
788 self.rpc_client.request(req).await?;
789 Ok(())
790 }
791
792 async fn verify(&self, files: Option<Vec<String>>) -> Result<()> {
793 let files = files.unwrap_or_default().into_iter().map(JsonValue::String).collect();
794 let req = JsonRequest::new("verify", JsonValue::Array(files));
795 self.rpc_client.request(req).await?;
796 Ok(())
797 }
798
799 async fn lookup(&self, hash: String, ex: ExecutorPtr) -> Result<()> {
800 let publisher = Publisher::new();
801 let subscription = Arc::new(publisher.clone().subscribe().await);
802 let subscriber_task = StoppableTask::new();
803 let publisher_ = publisher.clone();
804 let rpc_client_ = self.rpc_client.clone();
805 subscriber_task.clone().start(
806 async move {
807 let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
808 rpc_client_.subscribe(req, publisher).await
809 },
810 move |res| async move {
811 match res {
812 Ok(()) | Err(Error::DetachedTaskStopped) => { }
813 Err(e) => {
814 error!("{e}");
815 publisher_
816 .notify(JsonResult::Error(JsonError::new(
817 ErrorCode::InternalError,
818 None,
819 0,
820 )))
821 .await;
822 }
823 }
824 },
825 Error::DetachedTaskStopped,
826 ex.clone(),
827 );
828 let req =
829 JsonRequest::new("lookup", JsonValue::Array(vec![JsonValue::String(hash.clone())]));
830 let rpc_client_lookup = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
831 rpc_client_lookup.request(req).await?;
832
833 let print_seeders = |info: &HashMap<String, JsonValue>| {
834 let seeders = info.get("seeders").unwrap().get::<Vec<JsonValue>>().unwrap();
835 for seeder in seeders {
836 let seeder = seeder.get::<HashMap<String, JsonValue>>().unwrap();
837 let node: HashMap<String, JsonValue> =
838 seeder.get("node").unwrap().clone().try_into().unwrap();
839 let node_id: String = node.get("id").unwrap().clone().try_into().unwrap();
840 let addresses: Vec<JsonValue> =
841 node.get("addresses").unwrap().clone().try_into().unwrap();
842 let tree: Vec<_> = addresses
843 .into_iter()
844 .map(|addr| TreeNode::key(TryInto::<String>::try_into(addr).unwrap()))
845 .collect();
846
847 print_tree(node_id.as_str(), &tree);
848 }
849 };
850
851 loop {
852 match subscription.receive().await {
853 JsonResult::Notification(n) => {
854 let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
855 let info =
856 params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
857 let hash_ = match info.get("hash") {
858 Some(hash_value) => hash_value.get::<String>().unwrap(),
859 None => continue,
860 };
861 if hash != *hash_ {
862 continue;
863 }
864
865 if params.get("event").unwrap().get::<String>().unwrap().as_str() ==
866 "seeders_found"
867 {
868 print_seeders(info);
869 break
870 }
871 }
872
873 JsonResult::Error(e) => {
874 return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
875 }
876
877 x => {
878 return Err(Error::UnexpectedJsonRpc(format!(
879 "Got unexpected data from JSON-RPC: {x:?}"
880 )))
881 }
882 }
883 }
884 Ok(())
885 }
886}
887
888fn main() -> Result<()> {
889 let args = Args::parse();
890
891 setup_logging(args.verbose, None)?;
892
893 let ex = Arc::new(smol::Executor::new());
894 smol::block_on(async {
895 ex.run(async {
896 let rpc_client = Arc::new(RpcClient::new(args.endpoint.clone(), ex.clone()).await?);
897 let fu = Fu { rpc_client, endpoint: args.endpoint.clone() };
898
899 match args.command {
900 Subcmd::Get { hash, path, files } => fu.get(hash, path, files, ex.clone()).await,
901 Subcmd::Put { path } => fu.put(path, ex.clone()).await,
902 Subcmd::Ls {} => fu.list_resources().await,
903 Subcmd::Watch {} => fu.watch(ex.clone()).await,
904 Subcmd::Rm { hash } => fu.remove(hash).await,
905 Subcmd::Buckets {} => fu.buckets().await,
906 Subcmd::Seeders {} => fu.seeders().await,
907 Subcmd::Verify { files } => fu.verify(files).await,
908 Subcmd::Lookup { hash } => fu.lookup(hash, ex.clone()).await,
909 }?;
910
911 Ok(())
912 })
913 .await
914 })
915}