fu/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use clap::{Parser, Subcommand};
20use log::error;
21use simplelog::{ColorChoice, TermLogger, TerminalMode};
22use smol::lock::RwLock;
23use std::{
24    collections::HashMap,
25    io::{stdout, Write},
26    sync::Arc,
27};
28use termcolor::{StandardStream, WriteColor};
29use url::Url;
30
31use darkfi::{
32    cli_desc,
33    rpc::{
34        client::RpcClient,
35        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
36        util::JsonValue,
37    },
38    system::{ExecutorPtr, Publisher, StoppableTask},
39    util::cli::{get_log_config, get_log_level},
40    Error, Result,
41};
42
43use fud::{
44    resource::{Resource, ResourceStatus, ResourceType},
45    util::hash_to_string,
46};
47
48mod util;
49use crate::util::{
50    format_bytes, format_duration, format_progress_bytes, optional_value, print_tree,
51    status_to_colorspec, type_to_colorspec, TreeNode,
52};
53
54#[derive(Parser)]
55#[clap(name = "fu", about = cli_desc!(), version)]
56#[clap(arg_required_else_help(true))]
57struct Args {
58    #[clap(short, action = clap::ArgAction::Count)]
59    /// Increase verbosity (-vvv supported)
60    verbose: u8,
61
62    #[clap(short, long, default_value = "tcp://127.0.0.1:13336")]
63    /// fud JSON-RPC endpoint
64    endpoint: Url,
65
66    #[clap(subcommand)]
67    command: Subcmd,
68}
69
70#[derive(Subcommand)]
71enum Subcmd {
72    /// Retrieve provided resource from the fud network
73    Get {
74        /// Resource hash
75        hash: String,
76        /// Download path (relative or absolute)
77        path: Option<String>,
78        /// Optional list of files you want to download (only used for directories)
79        #[arg(short, long, num_args = 1..)]
80        files: Option<Vec<String>>,
81    },
82
83    /// Put a file or directory onto the fud network
84    Put {
85        /// File path or directory path
86        path: String,
87    },
88
89    /// List resources
90    Ls {},
91
92    /// Watch
93    Watch {},
94
95    /// Remove a resource from fud
96    Rm {
97        /// Resource hash
98        hash: String,
99    },
100
101    /// Get the current node buckets
102    ListBuckets {},
103
104    /// Get the router state
105    ListSeeders {},
106
107    /// Verify local files
108    Verify {
109        /// File hashes
110        files: Option<Vec<String>>,
111    },
112}
113
114struct Fu {
115    pub rpc_client: Arc<RpcClient>,
116    pub endpoint: Url,
117}
118
119impl Fu {
120    async fn get(
121        &self,
122        hash: String,
123        path: Option<String>,
124        files: Option<Vec<String>>,
125        ex: ExecutorPtr,
126    ) -> Result<()> {
127        let publisher = Publisher::new();
128        let subscription = Arc::new(publisher.clone().subscribe().await);
129        let subscriber_task = StoppableTask::new();
130        let hash_ = hash.clone();
131        let publisher_ = publisher.clone();
132        let rpc_client_ = self.rpc_client.clone();
133        subscriber_task.clone().start(
134            async move {
135                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
136                rpc_client_.subscribe(req, publisher).await
137            },
138            move |res| async move {
139                match res {
140                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
141                    Err(e) => {
142                        error!("{e}");
143                        publisher_
144                            .notify(JsonResult::Error(JsonError::new(
145                                ErrorCode::InternalError,
146                                None,
147                                0,
148                            )))
149                            .await;
150                    }
151                }
152            },
153            Error::DetachedTaskStopped,
154            ex.clone(),
155        );
156
157        let progress_bar_width = 20;
158        let mut started = false;
159        let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
160
161        let mut print_progress = |info: &HashMap<String, JsonValue>| {
162            started = true;
163            let rs: Resource = info.get("resource").unwrap().clone().into();
164
165            print!("\x1B[2K\r"); // Clear current line
166
167            // Progress bar
168            let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
169                1.0
170            } else if rs.target_bytes_size > 0 {
171                rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
172            } else {
173                0.0
174            };
175            let completed = (percent * progress_bar_width as f64) as usize;
176            let remaining = match progress_bar_width > completed {
177                true => progress_bar_width - completed,
178                false => 0,
179            };
180            let bar = "=".repeat(completed) + &" ".repeat(remaining);
181            print!("[{bar}] {:.1}% | ", percent * 100.0);
182
183            // Downloaded / Total (in bytes)
184            if rs.target_bytes_size > 0 {
185                if rs.target_bytes_downloaded == rs.target_bytes_size {
186                    print!("{} | ", format_bytes(rs.target_bytes_size));
187                } else {
188                    print!(
189                        "{} | ",
190                        format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
191                    );
192                }
193            }
194
195            // Download speed (in bytes/sec)
196            if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
197                print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
198            }
199
200            // Downloaded / Total (in chunks)
201            if rs.target_chunks_count > 0 {
202                let s = if rs.target_chunks_count > 1 { "s" } else { "" };
203                if rs.target_chunks_downloaded == rs.target_chunks_count {
204                    print!("{} chunk{s} | ", rs.target_chunks_count);
205                } else {
206                    print!(
207                        "{}/{} chunk{s} | ",
208                        rs.target_chunks_downloaded, rs.target_chunks_count
209                    );
210                }
211            }
212
213            // ETA
214            if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
215                print!("ETA: {} | ", format_duration(rs.get_eta()));
216            }
217
218            // Status
219            let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
220                rs.status.as_str() == "incomplete";
221            let status = if is_done { ResourceStatus::Seeding } else { rs.status };
222            tstdout.set_color(&status_to_colorspec(&status)).unwrap();
223            print!(
224                "{}",
225                if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
226            );
227            tstdout.reset().unwrap();
228            stdout().flush().unwrap();
229        };
230
231        let req = JsonRequest::new(
232            "get",
233            JsonValue::Array(vec![
234                JsonValue::String(hash_.clone()),
235                JsonValue::String(path.unwrap_or_default()),
236                match files {
237                    Some(files) => {
238                        JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
239                    }
240                    None => JsonValue::Null,
241                },
242            ]),
243        );
244        // Create a RPC client to send the `get` request
245        let rpc_client_getter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
246        let _ = rpc_client_getter.request(req).await?;
247
248        loop {
249            match subscription.receive().await {
250                JsonResult::Notification(n) => {
251                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
252                    let info =
253                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
254                    let hash = match info.get("hash") {
255                        Some(hash_value) => hash_value.get::<String>().unwrap(),
256                        None => continue,
257                    };
258                    if *hash != hash_ {
259                        continue;
260                    }
261                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
262                        "download_started" |
263                        "metadata_download_completed" |
264                        "chunk_download_completed" |
265                        "resource_updated" => {
266                            print_progress(info);
267                        }
268                        "download_completed" => {
269                            let resource_json = info
270                                .get("resource")
271                                .unwrap()
272                                .get::<HashMap<String, JsonValue>>()
273                                .unwrap();
274                            let path = resource_json.get("path").unwrap().get::<String>().unwrap();
275                            print_progress(info);
276                            println!("\nDownload completed:\n{path}");
277                            return Ok(());
278                        }
279                        "metadata_not_found" => {
280                            println!();
281                            return Err(Error::Custom(format!("Could not find {hash}")));
282                        }
283                        "chunk_not_found" => {
284                            // A seeder does not have a chunk we are looking for,
285                            // we will try another seeder so there is nothing to do
286                        }
287                        "missing_chunks" => {
288                            // We tried all seeders and some chunks are still missing
289                            println!();
290                            return Err(Error::Custom("Missing chunks".to_string()));
291                        }
292                        "download_error" => {
293                            // An error that caused the download to be unsuccessful
294                            if started {
295                                println!();
296                            }
297                            return Err(Error::Custom(
298                                info.get("error").unwrap().get::<String>().unwrap().to_string(),
299                            ));
300                        }
301                        _ => {}
302                    }
303                }
304
305                JsonResult::Error(e) => {
306                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
307                }
308
309                x => {
310                    return Err(Error::UnexpectedJsonRpc(format!(
311                        "Got unexpected data from JSON-RPC: {x:?}"
312                    )))
313                }
314            }
315        }
316    }
317
318    async fn put(&self, path: String, ex: ExecutorPtr) -> Result<()> {
319        let publisher = Publisher::new();
320        let subscription = Arc::new(publisher.clone().subscribe().await);
321        let subscriber_task = StoppableTask::new();
322        let publisher_ = publisher.clone();
323        let rpc_client_ = self.rpc_client.clone();
324        subscriber_task.clone().start(
325            async move {
326                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
327                rpc_client_.subscribe(req, publisher).await
328            },
329            move |res| async move {
330                match res {
331                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
332                    Err(e) => {
333                        error!("{e}");
334                        publisher_
335                            .notify(JsonResult::Error(JsonError::new(
336                                ErrorCode::InternalError,
337                                None,
338                                0,
339                            )))
340                            .await;
341                    }
342                }
343            },
344            Error::DetachedTaskStopped,
345            ex.clone(),
346        );
347
348        let rpc_client_putter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
349        let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(path)]));
350        let rep = rpc_client_putter.request(req).await?;
351        let path_str = rep.get::<String>().unwrap().clone();
352
353        loop {
354            match subscription.receive().await {
355                JsonResult::Notification(n) => {
356                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
357                    let info =
358                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
359                    let path = match info.get("path") {
360                        Some(path) => path.get::<String>().unwrap(),
361                        None => continue,
362                    };
363                    if *path != path_str {
364                        continue;
365                    }
366
367                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
368                        "insert_completed" => {
369                            let id = info.get("hash").unwrap().get::<String>().unwrap().to_string();
370                            println!("{id}");
371                            break Ok(())
372                        }
373                        "insert_error" => {
374                            return Err(Error::Custom(
375                                info.get("error").unwrap().get::<String>().unwrap().to_string(),
376                            ));
377                        }
378                        _ => {}
379                    }
380                }
381
382                JsonResult::Error(e) => {
383                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
384                }
385
386                x => {
387                    return Err(Error::UnexpectedJsonRpc(format!(
388                        "Got unexpected data from JSON-RPC: {x:?}"
389                    )))
390                }
391            }
392        }
393    }
394
395    async fn list_resources(&self) -> Result<()> {
396        let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
397        let rep = self.rpc_client.request(req).await?;
398
399        let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
400        let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
401
402        for resource in resources.iter() {
403            let tree: Vec<TreeNode<&str>> = vec![
404                TreeNode::kv("ID", hash_to_string(&resource.hash)),
405                TreeNode::kvc(
406                    "Type",
407                    resource.rtype.as_str().to_string(),
408                    type_to_colorspec(&resource.rtype),
409                ),
410                TreeNode::kvc(
411                    "Status",
412                    resource.status.as_str().to_string(),
413                    status_to_colorspec(&resource.status),
414                ),
415                TreeNode::kv("Chunks", {
416                    if let ResourceType::Directory = resource.rtype {
417                        format!(
418                            "{}/{} ({}/{})",
419                            resource.total_chunks_downloaded,
420                            optional_value!(resource.total_chunks_count),
421                            resource.target_chunks_downloaded,
422                            optional_value!(resource.target_chunks_count)
423                        )
424                    } else {
425                        format!(
426                            "{}/{}",
427                            resource.total_chunks_downloaded,
428                            optional_value!(resource.total_chunks_count)
429                        )
430                    }
431                }),
432                TreeNode::kv("Bytes", {
433                    if let ResourceType::Directory = resource.rtype {
434                        format!(
435                            "{} ({})",
436                            optional_value!(resource.total_bytes_size, |x: u64| {
437                                format_progress_bytes(resource.total_bytes_downloaded, x)
438                            }),
439                            optional_value!(resource.target_bytes_size, |x: u64| {
440                                format_progress_bytes(resource.target_bytes_downloaded, x)
441                            })
442                        )
443                    } else {
444                        optional_value!(resource.total_bytes_size, |x: u64| format_progress_bytes(
445                            resource.total_bytes_downloaded,
446                            x
447                        ))
448                    }
449                }),
450            ];
451            print_tree(&resource.path.to_string_lossy(), &tree);
452        }
453
454        Ok(())
455    }
456
457    async fn list_buckets(&self) -> Result<()> {
458        let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
459        let rep = self.rpc_client.request(req).await?;
460        let buckets: Vec<JsonValue> = rep.try_into().unwrap();
461        let mut empty = true;
462        for (bucket_i, bucket) in buckets.into_iter().enumerate() {
463            let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
464            if nodes.is_empty() {
465                continue
466            }
467            empty = false;
468
469            let tree: Vec<TreeNode<String>> = nodes
470                .into_iter()
471                .map(|n| {
472                    let node: Vec<JsonValue> = n.try_into().unwrap();
473                    let node_id: JsonValue = node[0].clone();
474                    let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
475
476                    let addresses_vec: Vec<String> = addresses
477                        .into_iter()
478                        .map(|addr| TryInto::<String>::try_into(addr).unwrap())
479                        .collect();
480
481                    let node_id_string: String = node_id.try_into().unwrap();
482
483                    TreeNode {
484                        key: node_id_string,
485                        value: None,
486                        color: None,
487                        children: addresses_vec
488                            .into_iter()
489                            .map(|addr| TreeNode::key(addr.clone()))
490                            .collect(),
491                    }
492                })
493                .collect();
494
495            print_tree(format!("Bucket {bucket_i}").as_str(), &tree);
496        }
497
498        if empty {
499            println!("All buckets are empty");
500        }
501
502        Ok(())
503    }
504
505    async fn list_seeders(&self) -> Result<()> {
506        let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
507        let rep = self.rpc_client.request(req).await?;
508
509        let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
510
511        if resources.is_empty() {
512            println!("No known seeders");
513            return Ok(())
514        }
515
516        for (hash, nodes) in resources {
517            let nodes: Vec<JsonValue> = nodes.try_into().unwrap();
518            let tree: Vec<TreeNode<String>> = nodes
519                .into_iter()
520                .map(|n| {
521                    let node: Vec<JsonValue> = n.try_into().unwrap();
522                    let node_id: JsonValue = node[0].clone();
523                    let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
524
525                    let addresses_vec: Vec<String> = addresses
526                        .into_iter()
527                        .map(|addr| TryInto::<String>::try_into(addr).unwrap())
528                        .collect();
529
530                    let node_id_string: String = node_id.try_into().unwrap();
531
532                    TreeNode {
533                        key: node_id_string,
534                        value: None,
535                        color: None,
536                        children: addresses_vec
537                            .into_iter()
538                            .map(|addr| TreeNode::key(addr.clone()))
539                            .collect(),
540                    }
541                })
542                .collect();
543
544            print_tree(&hash, &tree);
545        }
546
547        Ok(())
548    }
549
550    async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
551        let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
552        let rep = self.rpc_client.request(req).await?;
553
554        let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
555        let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
556
557        let publisher = Publisher::new();
558        let subscription = Arc::new(publisher.clone().subscribe().await);
559        let subscriber_task = StoppableTask::new();
560        let publisher_ = publisher.clone();
561        let rpc_client_ = self.rpc_client.clone();
562        subscriber_task.clone().start(
563            async move {
564                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
565                rpc_client_.subscribe(req, publisher).await
566            },
567            move |res| async move {
568                match res {
569                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
570                    Err(e) => {
571                        error!("{e}");
572                        publisher_
573                            .notify(JsonResult::Error(JsonError::new(
574                                ErrorCode::InternalError,
575                                None,
576                                0,
577                            )))
578                            .await;
579                    }
580                }
581            },
582            Error::DetachedTaskStopped,
583            ex,
584        );
585
586        let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
587
588        let mut update_resource = async |resource: &Resource| {
589            let mut resources_write = resources.write().await;
590            let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
591                Some(i) => {
592                    resources_write.remove(i);
593                    resources_write.insert(i, resource.clone());
594                    i
595                }
596                None => {
597                    resources_write.push(resource.clone());
598                    resources_write.len() - 1
599                }
600            };
601
602            // Move the cursor to the i-th line and clear it
603            print!("\x1b[{};1H\x1B[2K", i + 2);
604
605            // Hash
606            print!("\r{:>44} ", hash_to_string(&resource.hash));
607
608            // Type
609            tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
610            print!(
611                "{:>4} ",
612                match resource.rtype.as_str() {
613                    "unknown" => "?",
614                    "directory" => "dir",
615                    _ => resource.rtype.as_str(),
616                }
617            );
618            tstdout.reset().unwrap();
619
620            // Status
621            tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
622            print!("{:>11} ", resource.status.as_str());
623            tstdout.reset().unwrap();
624
625            // Downloaded / Total (in bytes)
626            match resource.total_bytes_size {
627                0 => {
628                    print!("{:>5.1} {:>16} ", 0.0, "?");
629                }
630                _ => {
631                    let percent = resource.total_bytes_downloaded as f64 /
632                        resource.total_bytes_size as f64 *
633                        100.0;
634                    if resource.total_bytes_downloaded == resource.total_bytes_size {
635                        print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
636                    } else {
637                        print!(
638                            "{:>5.1} {:>16} ",
639                            percent,
640                            format_progress_bytes(
641                                resource.total_bytes_downloaded,
642                                resource.total_bytes_size
643                            )
644                        );
645                    }
646                }
647            };
648
649            // Downloaded / Total (in chunks)
650            match resource.total_chunks_count {
651                0 => {
652                    print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
653                }
654                _ => {
655                    if resource.total_chunks_downloaded == resource.total_chunks_count {
656                        print!("{:>9} ", resource.total_chunks_count.to_string());
657                    } else {
658                        print!(
659                            "{:>9} ",
660                            format!(
661                                "{}/{}",
662                                resource.total_chunks_downloaded, resource.total_chunks_count
663                            )
664                        );
665                    }
666                }
667            };
668
669            // Download speed (in bytes/sec)
670            let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
671                resource.status.as_str() == "downloading" &&
672                !resource.speeds.is_empty();
673            print!(
674                "{:>12} ",
675                match speed_available {
676                    false => "-".to_string(),
677                    true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
678                }
679            );
680
681            // ETA
682            let eta = resource.get_eta();
683            print!(
684                "{:>6}",
685                match eta {
686                    0 => "-".to_string(),
687                    _ => format_duration(eta),
688                }
689            );
690
691            println!();
692
693            // Move the cursor to end
694            print!("\x1b[{};1H", resources_write.len() + 2);
695            stdout().flush().unwrap();
696        };
697
698        let print_begin = async || {
699            // Clear
700            print!("\x1B[2J\x1B[1;1H");
701
702            // Print column headers
703            println!(
704                "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
705                "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
706            );
707        };
708
709        print_begin().await;
710        if resources_json.is_empty() {
711            println!("No known resources");
712        } else {
713            for resource in resources_json.iter() {
714                let rs: Resource = resource.clone().into();
715                update_resource(&rs).await;
716            }
717        }
718
719        loop {
720            match subscription.receive().await {
721                JsonResult::Notification(n) => {
722                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
723                    let info =
724                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
725                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
726                        "download_started" |
727                        "metadata_download_completed" |
728                        "chunk_download_completed" |
729                        "download_completed" |
730                        "missing_chunks" |
731                        "metadata_not_found" |
732                        "resource_updated" => {
733                            let resource: Resource = info.get("resource").unwrap().clone().into();
734                            update_resource(&resource).await;
735                        }
736                        "resource_removed" => {
737                            {
738                                let hash = info.get("hash").unwrap().get::<String>().unwrap();
739                                let mut resources_write = resources.write().await;
740                                let i = resources_write
741                                    .iter()
742                                    .position(|r| hash_to_string(&r.hash) == *hash);
743                                if let Some(i) = i {
744                                    resources_write.remove(i);
745                                }
746                            }
747
748                            let r = resources.read().await.clone();
749                            print_begin().await;
750                            for resource in r.iter() {
751                                update_resource(resource).await;
752                            }
753                        }
754                        "download_error" => {
755                            // An error that caused the download to be unsuccessful
756                        }
757                        _ => {}
758                    }
759                }
760
761                JsonResult::Error(e) => {
762                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
763                }
764
765                x => {
766                    return Err(Error::UnexpectedJsonRpc(format!(
767                        "Got unexpected data from JSON-RPC: {x:?}"
768                    )))
769                }
770            }
771        }
772    }
773
774    async fn remove(&self, hash: String) -> Result<()> {
775        let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
776        self.rpc_client.request(req).await?;
777        Ok(())
778    }
779
780    async fn verify(&self, files: Option<Vec<String>>) -> Result<()> {
781        let files = files.unwrap_or_default().into_iter().map(JsonValue::String).collect();
782        let req = JsonRequest::new("verify", JsonValue::Array(files));
783        self.rpc_client.request(req).await?;
784        Ok(())
785    }
786}
787
788fn main() -> Result<()> {
789    let args = Args::parse();
790
791    let log_level = get_log_level(args.verbose);
792    let log_config = get_log_config(args.verbose);
793    TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
794
795    let ex = Arc::new(smol::Executor::new());
796    smol::block_on(async {
797        ex.run(async {
798            let rpc_client = Arc::new(RpcClient::new(args.endpoint.clone(), ex.clone()).await?);
799            let fu = Fu { rpc_client, endpoint: args.endpoint.clone() };
800
801            match args.command {
802                Subcmd::Get { hash, path, files } => fu.get(hash, path, files, ex.clone()).await,
803                Subcmd::Put { path } => fu.put(path, ex.clone()).await,
804                Subcmd::Ls {} => fu.list_resources().await,
805                Subcmd::Watch {} => fu.watch(ex.clone()).await,
806                Subcmd::Rm { hash } => fu.remove(hash).await,
807                Subcmd::ListBuckets {} => fu.list_buckets().await,
808                Subcmd::ListSeeders {} => fu.list_seeders().await,
809                Subcmd::Verify { files } => fu.verify(files).await,
810            }?;
811
812            Ok(())
813        })
814        .await
815    })
816}