fu/
main.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use clap::{Parser, Subcommand};
20use smol::lock::RwLock;
21use std::{
22    collections::HashMap,
23    io::{stdout, Write},
24    sync::Arc,
25};
26use termcolor::{ColorChoice, StandardStream, WriteColor};
27use tracing::error;
28use url::Url;
29
30use darkfi::{
31    cli_desc,
32    rpc::{
33        client::RpcClient,
34        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
35        util::JsonValue,
36    },
37    system::{ExecutorPtr, Publisher, StoppableTask},
38    util::logger::setup_logging,
39    Error, Result,
40};
41
42use fud::{
43    resource::{Resource, ResourceStatus, ResourceType},
44    util::hash_to_string,
45};
46
47mod util;
48use crate::util::{
49    format_bytes, format_duration, format_progress_bytes, optional_value, print_tree,
50    status_to_colorspec, type_to_colorspec, TreeNode,
51};
52
53#[derive(Parser)]
54#[clap(name = "fu", about = cli_desc!(), version)]
55#[clap(arg_required_else_help(true))]
56struct Args {
57    #[clap(short, action = clap::ArgAction::Count)]
58    /// Increase verbosity (-vvv supported)
59    verbose: u8,
60
61    #[clap(short, long, default_value = "tcp://127.0.0.1:13336")]
62    /// fud JSON-RPC endpoint
63    endpoint: Url,
64
65    #[clap(subcommand)]
66    command: Subcmd,
67}
68
69#[derive(Subcommand)]
70enum Subcmd {
71    /// Retrieve provided resource from the fud network
72    Get {
73        /// Resource hash
74        hash: String,
75        /// Download path (relative or absolute)
76        path: Option<String>,
77        /// Optional list of files you want to download (only used for directories)
78        #[arg(short, long, num_args = 1..)]
79        files: Option<Vec<String>>,
80    },
81
82    /// Put a file or directory onto the fud network
83    Put {
84        /// File path or directory path
85        path: String,
86    },
87
88    /// List resources
89    Ls {},
90
91    /// Watch
92    Watch {},
93
94    /// Remove a resource from fud
95    Rm {
96        /// Resource hash
97        hash: String,
98    },
99
100    /// Get the current node buckets
101    Buckets {},
102
103    /// Get the router state
104    Seeders {},
105
106    /// Verify local files
107    Verify {
108        /// File hashes
109        files: Option<Vec<String>>,
110    },
111
112    /// Lookup seeders of a resource from the network
113    Lookup {
114        /// Resource hash
115        hash: String,
116    },
117}
118
119struct Fu {
120    pub rpc_client: Arc<RpcClient>,
121    pub endpoint: Url,
122}
123
124impl Fu {
125    async fn get(
126        &self,
127        hash: String,
128        path: Option<String>,
129        files: Option<Vec<String>>,
130        ex: ExecutorPtr,
131    ) -> Result<()> {
132        let publisher = Publisher::new();
133        let subscription = Arc::new(publisher.clone().subscribe().await);
134        let subscriber_task = StoppableTask::new();
135        let hash_ = hash.clone();
136        let publisher_ = publisher.clone();
137        let rpc_client_ = self.rpc_client.clone();
138        subscriber_task.clone().start(
139            async move {
140                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
141                rpc_client_.subscribe(req, publisher).await
142            },
143            move |res| async move {
144                match res {
145                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
146                    Err(e) => {
147                        error!("{e}");
148                        publisher_
149                            .notify(JsonResult::Error(JsonError::new(
150                                ErrorCode::InternalError,
151                                None,
152                                0,
153                            )))
154                            .await;
155                    }
156                }
157            },
158            Error::DetachedTaskStopped,
159            ex.clone(),
160        );
161
162        let progress_bar_width = 20;
163        let mut started = false;
164        let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
165
166        let mut print_progress = |info: &HashMap<String, JsonValue>| {
167            started = true;
168            let rs: Resource = info.get("resource").unwrap().clone().into();
169
170            print!("\x1B[2K\r"); // Clear current line
171
172            // Progress bar
173            let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
174                1.0
175            } else if rs.target_bytes_size > 0 {
176                rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
177            } else {
178                0.0
179            };
180            let completed = (percent * progress_bar_width as f64) as usize;
181            let remaining = match progress_bar_width > completed {
182                true => progress_bar_width - completed,
183                false => 0,
184            };
185            let bar = "=".repeat(completed) + &" ".repeat(remaining);
186            print!("[{bar}] {:.1}% | ", percent * 100.0);
187
188            // Downloaded / Total (in bytes)
189            if rs.target_bytes_size > 0 {
190                if rs.target_bytes_downloaded == rs.target_bytes_size {
191                    print!("{} | ", format_bytes(rs.target_bytes_size));
192                } else {
193                    print!(
194                        "{} | ",
195                        format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
196                    );
197                }
198            }
199
200            // Download speed (in bytes/sec)
201            if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
202                print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
203            }
204
205            // Downloaded / Total (in chunks)
206            if rs.target_chunks_count > 0 {
207                let s = if rs.target_chunks_count > 1 { "s" } else { "" };
208                if rs.target_chunks_downloaded == rs.target_chunks_count {
209                    print!("{} chunk{s} | ", rs.target_chunks_count);
210                } else {
211                    print!(
212                        "{}/{} chunk{s} | ",
213                        rs.target_chunks_downloaded, rs.target_chunks_count
214                    );
215                }
216            }
217
218            // ETA
219            if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
220                print!("ETA: {} | ", format_duration(rs.get_eta()));
221            }
222
223            // Status
224            let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
225                rs.status.as_str() == "incomplete";
226            let status = if is_done { ResourceStatus::Seeding } else { rs.status };
227            tstdout.set_color(&status_to_colorspec(&status)).unwrap();
228            print!(
229                "{}",
230                if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
231            );
232            tstdout.reset().unwrap();
233            stdout().flush().unwrap();
234        };
235
236        let req = JsonRequest::new(
237            "get",
238            JsonValue::Array(vec![
239                JsonValue::String(hash_.clone()),
240                JsonValue::String(path.unwrap_or_default()),
241                match files {
242                    Some(files) => {
243                        JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
244                    }
245                    None => JsonValue::Null,
246                },
247            ]),
248        );
249        // Create a RPC client to send the `get` request
250        let rpc_client_getter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
251        let _ = rpc_client_getter.request(req).await?;
252
253        loop {
254            match subscription.receive().await {
255                JsonResult::Notification(n) => {
256                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
257                    let info = params.get("info");
258                    if info.is_none() {
259                        continue
260                    }
261                    let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
262
263                    let hash = match info.get("hash") {
264                        Some(hash_value) => hash_value.get::<String>().unwrap(),
265                        None => continue,
266                    };
267                    if *hash != hash_ {
268                        continue;
269                    }
270                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
271                        "download_started" |
272                        "metadata_download_completed" |
273                        "chunk_download_completed" |
274                        "resource_updated" => {
275                            print_progress(info);
276                        }
277                        "download_completed" => {
278                            let resource_json = info
279                                .get("resource")
280                                .unwrap()
281                                .get::<HashMap<String, JsonValue>>()
282                                .unwrap();
283                            let path = resource_json.get("path").unwrap().get::<String>().unwrap();
284                            print_progress(info);
285                            println!("\nDownload completed:\n{path}");
286                            return Ok(());
287                        }
288                        "metadata_not_found" => {
289                            println!();
290                            return Err(Error::Custom(format!("Could not find {hash}")));
291                        }
292                        "chunk_not_found" => {
293                            // A seeder does not have a chunk we are looking for,
294                            // we will try another seeder so there is nothing to do
295                        }
296                        "missing_chunks" => {
297                            // We tried all seeders and some chunks are still missing
298                            println!();
299                            return Err(Error::Custom("Missing chunks".to_string()));
300                        }
301                        "download_error" => {
302                            // An error that caused the download to be unsuccessful
303                            if started {
304                                println!();
305                            }
306                            return Err(Error::Custom(
307                                info.get("error").unwrap().get::<String>().unwrap().to_string(),
308                            ));
309                        }
310                        _ => {}
311                    }
312                }
313
314                JsonResult::Error(e) => {
315                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
316                }
317
318                x => {
319                    return Err(Error::UnexpectedJsonRpc(format!(
320                        "Got unexpected data from JSON-RPC: {x:?}"
321                    )))
322                }
323            }
324        }
325    }
326
327    async fn put(&self, path: String, ex: ExecutorPtr) -> Result<()> {
328        let publisher = Publisher::new();
329        let subscription = Arc::new(publisher.clone().subscribe().await);
330        let subscriber_task = StoppableTask::new();
331        let publisher_ = publisher.clone();
332        let rpc_client_ = self.rpc_client.clone();
333        subscriber_task.clone().start(
334            async move {
335                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
336                rpc_client_.subscribe(req, publisher).await
337            },
338            move |res| async move {
339                match res {
340                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
341                    Err(e) => {
342                        error!("{e}");
343                        publisher_
344                            .notify(JsonResult::Error(JsonError::new(
345                                ErrorCode::InternalError,
346                                None,
347                                0,
348                            )))
349                            .await;
350                    }
351                }
352            },
353            Error::DetachedTaskStopped,
354            ex.clone(),
355        );
356
357        let rpc_client_putter = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
358        let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(path)]));
359        let rep = rpc_client_putter.request(req).await?;
360        let path_str = rep.get::<String>().unwrap().clone();
361
362        loop {
363            match subscription.receive().await {
364                JsonResult::Notification(n) => {
365                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
366                    let info =
367                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
368                    let path = match info.get("path") {
369                        Some(path) => path.get::<String>().unwrap(),
370                        None => continue,
371                    };
372                    if *path != path_str {
373                        continue;
374                    }
375
376                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
377                        "insert_completed" => {
378                            let id = info.get("hash").unwrap().get::<String>().unwrap().to_string();
379                            println!("{id}");
380                            break Ok(())
381                        }
382                        "insert_error" => {
383                            return Err(Error::Custom(
384                                info.get("error").unwrap().get::<String>().unwrap().to_string(),
385                            ));
386                        }
387                        _ => {}
388                    }
389                }
390
391                JsonResult::Error(e) => {
392                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
393                }
394
395                x => {
396                    return Err(Error::UnexpectedJsonRpc(format!(
397                        "Got unexpected data from JSON-RPC: {x:?}"
398                    )))
399                }
400            }
401        }
402    }
403
404    async fn list_resources(&self) -> Result<()> {
405        let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
406        let rep = self.rpc_client.request(req).await?;
407
408        let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
409        let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
410
411        for resource in resources.iter() {
412            let tree: Vec<TreeNode<&str>> = vec![
413                TreeNode::kv("ID", hash_to_string(&resource.hash)),
414                TreeNode::kvc(
415                    "Type",
416                    resource.rtype.as_str().to_string(),
417                    type_to_colorspec(&resource.rtype),
418                ),
419                TreeNode::kvc(
420                    "Status",
421                    resource.status.as_str().to_string(),
422                    status_to_colorspec(&resource.status),
423                ),
424                TreeNode::kv("Chunks", {
425                    if let ResourceType::Directory = resource.rtype {
426                        format!(
427                            "{}/{} ({}/{})",
428                            resource.total_chunks_downloaded,
429                            optional_value!(resource.total_chunks_count),
430                            resource.target_chunks_downloaded,
431                            optional_value!(resource.target_chunks_count)
432                        )
433                    } else {
434                        format!(
435                            "{}/{}",
436                            resource.total_chunks_downloaded,
437                            optional_value!(resource.total_chunks_count)
438                        )
439                    }
440                }),
441                TreeNode::kv("Bytes", {
442                    if let ResourceType::Directory = resource.rtype {
443                        format!(
444                            "{} ({})",
445                            optional_value!(resource.total_bytes_size, |x: u64| {
446                                format_progress_bytes(resource.total_bytes_downloaded, x)
447                            }),
448                            optional_value!(resource.target_bytes_size, |x: u64| {
449                                format_progress_bytes(resource.target_bytes_downloaded, x)
450                            })
451                        )
452                    } else {
453                        optional_value!(resource.total_bytes_size, |x: u64| format_progress_bytes(
454                            resource.total_bytes_downloaded,
455                            x
456                        ))
457                    }
458                }),
459            ];
460            print_tree(&resource.path.to_string_lossy(), &tree);
461        }
462
463        Ok(())
464    }
465
466    async fn buckets(&self) -> Result<()> {
467        let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
468        let rep = self.rpc_client.request(req).await?;
469        let buckets: Vec<JsonValue> = rep.try_into().unwrap();
470        let mut empty = true;
471        for (bucket_i, bucket) in buckets.into_iter().enumerate() {
472            let nodes: Vec<JsonValue> = bucket.try_into().unwrap();
473            if nodes.is_empty() {
474                continue
475            }
476            empty = false;
477
478            let tree: Vec<TreeNode<String>> = nodes
479                .into_iter()
480                .map(|n| {
481                    let node: Vec<JsonValue> = n.try_into().unwrap();
482                    let node_id: JsonValue = node[0].clone();
483                    let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
484
485                    let addresses_vec: Vec<String> = addresses
486                        .into_iter()
487                        .map(|addr| TryInto::<String>::try_into(addr).unwrap())
488                        .collect();
489
490                    let node_id_string: String = node_id.try_into().unwrap();
491
492                    TreeNode {
493                        key: node_id_string,
494                        value: None,
495                        color: None,
496                        children: addresses_vec
497                            .into_iter()
498                            .map(|addr| TreeNode::key(addr.clone()))
499                            .collect(),
500                    }
501                })
502                .collect();
503
504            print_tree(format!("Bucket {bucket_i}").as_str(), &tree);
505        }
506
507        if empty {
508            println!("All buckets are empty");
509        }
510
511        Ok(())
512    }
513
514    async fn seeders(&self) -> Result<()> {
515        let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
516        let rep = self.rpc_client.request(req).await?;
517
518        let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
519
520        if resources.is_empty() {
521            println!("No known seeders");
522            return Ok(())
523        }
524
525        for (hash, nodes) in resources {
526            let nodes: Vec<JsonValue> = nodes.try_into().unwrap();
527            let tree: Vec<TreeNode<String>> = nodes
528                .into_iter()
529                .map(|n| {
530                    let node: Vec<JsonValue> = n.try_into().unwrap();
531                    let node_id: JsonValue = node[0].clone();
532                    let addresses: Vec<JsonValue> = node[1].clone().try_into().unwrap();
533
534                    let addresses_vec: Vec<String> = addresses
535                        .into_iter()
536                        .map(|addr| TryInto::<String>::try_into(addr).unwrap())
537                        .collect();
538
539                    let node_id_string: String = node_id.try_into().unwrap();
540
541                    TreeNode {
542                        key: node_id_string,
543                        value: None,
544                        color: None,
545                        children: addresses_vec
546                            .into_iter()
547                            .map(|addr| TreeNode::key(addr.clone()))
548                            .collect(),
549                    }
550                })
551                .collect();
552
553            print_tree(&hash, &tree);
554        }
555
556        Ok(())
557    }
558
559    async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
560        let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
561        let rep = self.rpc_client.request(req).await?;
562
563        let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
564        let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
565
566        let publisher = Publisher::new();
567        let subscription = Arc::new(publisher.clone().subscribe().await);
568        let subscriber_task = StoppableTask::new();
569        let publisher_ = publisher.clone();
570        let rpc_client_ = self.rpc_client.clone();
571        subscriber_task.clone().start(
572            async move {
573                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
574                rpc_client_.subscribe(req, publisher).await
575            },
576            move |res| async move {
577                match res {
578                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
579                    Err(e) => {
580                        error!("{e}");
581                        publisher_
582                            .notify(JsonResult::Error(JsonError::new(
583                                ErrorCode::InternalError,
584                                None,
585                                0,
586                            )))
587                            .await;
588                    }
589                }
590            },
591            Error::DetachedTaskStopped,
592            ex,
593        );
594
595        let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
596
597        let mut update_resource = async |resource: &Resource| {
598            let mut resources_write = resources.write().await;
599            let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
600                Some(i) => {
601                    resources_write.remove(i);
602                    resources_write.insert(i, resource.clone());
603                    i
604                }
605                None => {
606                    resources_write.push(resource.clone());
607                    resources_write.len() - 1
608                }
609            };
610
611            // Move the cursor to the i-th line and clear it
612            print!("\x1b[{};1H\x1B[2K", i + 2);
613
614            // Hash
615            print!("\r{:>44} ", hash_to_string(&resource.hash));
616
617            // Type
618            tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
619            print!(
620                "{:>4} ",
621                match resource.rtype.as_str() {
622                    "unknown" => "?",
623                    "directory" => "dir",
624                    _ => resource.rtype.as_str(),
625                }
626            );
627            tstdout.reset().unwrap();
628
629            // Status
630            tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
631            print!("{:>11} ", resource.status.as_str());
632            tstdout.reset().unwrap();
633
634            // Downloaded / Total (in bytes)
635            match resource.total_bytes_size {
636                0 => {
637                    print!("{:>5.1} {:>16} ", 0.0, "?");
638                }
639                _ => {
640                    let percent = resource.total_bytes_downloaded as f64 /
641                        resource.total_bytes_size as f64 *
642                        100.0;
643                    if resource.total_bytes_downloaded == resource.total_bytes_size {
644                        print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
645                    } else {
646                        print!(
647                            "{:>5.1} {:>16} ",
648                            percent,
649                            format_progress_bytes(
650                                resource.total_bytes_downloaded,
651                                resource.total_bytes_size
652                            )
653                        );
654                    }
655                }
656            };
657
658            // Downloaded / Total (in chunks)
659            match resource.total_chunks_count {
660                0 => {
661                    print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
662                }
663                _ => {
664                    if resource.total_chunks_downloaded == resource.total_chunks_count {
665                        print!("{:>9} ", resource.total_chunks_count.to_string());
666                    } else {
667                        print!(
668                            "{:>9} ",
669                            format!(
670                                "{}/{}",
671                                resource.total_chunks_downloaded, resource.total_chunks_count
672                            )
673                        );
674                    }
675                }
676            };
677
678            // Download speed (in bytes/sec)
679            let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
680                resource.status.as_str() == "downloading" &&
681                !resource.speeds.is_empty();
682            print!(
683                "{:>12} ",
684                match speed_available {
685                    false => "-".to_string(),
686                    true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
687                }
688            );
689
690            // ETA
691            let eta = resource.get_eta();
692            print!(
693                "{:>6}",
694                match eta {
695                    0 => "-".to_string(),
696                    _ => format_duration(eta),
697                }
698            );
699
700            println!();
701
702            // Move the cursor to end
703            print!("\x1b[{};1H", resources_write.len() + 2);
704            stdout().flush().unwrap();
705        };
706
707        let print_begin = async || {
708            // Clear
709            print!("\x1B[2J\x1B[1;1H");
710
711            // Print column headers
712            println!(
713                "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
714                "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
715            );
716        };
717
718        print_begin().await;
719        if resources_json.is_empty() {
720            println!("No known resources");
721        } else {
722            for resource in resources_json.iter() {
723                let rs: Resource = resource.clone().into();
724                update_resource(&rs).await;
725            }
726        }
727
728        loop {
729            match subscription.receive().await {
730                JsonResult::Notification(n) => {
731                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
732                    let info = params.get("info");
733                    if info.is_none() {
734                        continue
735                    }
736                    let info = info.unwrap().get::<HashMap<String, JsonValue>>().unwrap();
737                    match params.get("event").unwrap().get::<String>().unwrap().as_str() {
738                        "download_started" |
739                        "metadata_download_completed" |
740                        "chunk_download_completed" |
741                        "download_completed" |
742                        "missing_chunks" |
743                        "metadata_not_found" |
744                        "resource_updated" => {
745                            let resource: Resource = info.get("resource").unwrap().clone().into();
746                            update_resource(&resource).await;
747                        }
748                        "resource_removed" => {
749                            {
750                                let hash = info.get("hash").unwrap().get::<String>().unwrap();
751                                let mut resources_write = resources.write().await;
752                                let i = resources_write
753                                    .iter()
754                                    .position(|r| hash_to_string(&r.hash) == *hash);
755                                if let Some(i) = i {
756                                    resources_write.remove(i);
757                                }
758                            }
759
760                            let r = resources.read().await.clone();
761                            print_begin().await;
762                            for resource in r.iter() {
763                                update_resource(resource).await;
764                            }
765                        }
766                        "download_error" => {
767                            // An error that caused the download to be unsuccessful
768                        }
769                        _ => {}
770                    }
771                }
772
773                JsonResult::Error(e) => {
774                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
775                }
776
777                x => {
778                    return Err(Error::UnexpectedJsonRpc(format!(
779                        "Got unexpected data from JSON-RPC: {x:?}"
780                    )))
781                }
782            }
783        }
784    }
785
786    async fn remove(&self, hash: String) -> Result<()> {
787        let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
788        self.rpc_client.request(req).await?;
789        Ok(())
790    }
791
792    async fn verify(&self, files: Option<Vec<String>>) -> Result<()> {
793        let files = files.unwrap_or_default().into_iter().map(JsonValue::String).collect();
794        let req = JsonRequest::new("verify", JsonValue::Array(files));
795        self.rpc_client.request(req).await?;
796        Ok(())
797    }
798
799    async fn lookup(&self, hash: String, ex: ExecutorPtr) -> Result<()> {
800        let publisher = Publisher::new();
801        let subscription = Arc::new(publisher.clone().subscribe().await);
802        let subscriber_task = StoppableTask::new();
803        let publisher_ = publisher.clone();
804        let rpc_client_ = self.rpc_client.clone();
805        subscriber_task.clone().start(
806            async move {
807                let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
808                rpc_client_.subscribe(req, publisher).await
809            },
810            move |res| async move {
811                match res {
812                    Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
813                    Err(e) => {
814                        error!("{e}");
815                        publisher_
816                            .notify(JsonResult::Error(JsonError::new(
817                                ErrorCode::InternalError,
818                                None,
819                                0,
820                            )))
821                            .await;
822                    }
823                }
824            },
825            Error::DetachedTaskStopped,
826            ex.clone(),
827        );
828        let req =
829            JsonRequest::new("lookup", JsonValue::Array(vec![JsonValue::String(hash.clone())]));
830        let rpc_client_lookup = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
831        rpc_client_lookup.request(req).await?;
832
833        let print_seeders = |info: &HashMap<String, JsonValue>| {
834            let seeders = info.get("seeders").unwrap().get::<Vec<JsonValue>>().unwrap();
835            for seeder in seeders {
836                let seeder = seeder.get::<HashMap<String, JsonValue>>().unwrap();
837                let node: HashMap<String, JsonValue> =
838                    seeder.get("node").unwrap().clone().try_into().unwrap();
839                let node_id: String = node.get("id").unwrap().clone().try_into().unwrap();
840                let addresses: Vec<JsonValue> =
841                    node.get("addresses").unwrap().clone().try_into().unwrap();
842                let tree: Vec<_> = addresses
843                    .into_iter()
844                    .map(|addr| TreeNode::key(TryInto::<String>::try_into(addr).unwrap()))
845                    .collect();
846
847                print_tree(node_id.as_str(), &tree);
848            }
849        };
850
851        loop {
852            match subscription.receive().await {
853                JsonResult::Notification(n) => {
854                    let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
855                    let info =
856                        params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
857                    let hash_ = match info.get("hash") {
858                        Some(hash_value) => hash_value.get::<String>().unwrap(),
859                        None => continue,
860                    };
861                    if hash != *hash_ {
862                        continue;
863                    }
864
865                    if params.get("event").unwrap().get::<String>().unwrap().as_str() ==
866                        "seeders_found"
867                    {
868                        print_seeders(info);
869                        break
870                    }
871                }
872
873                JsonResult::Error(e) => {
874                    return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
875                }
876
877                x => {
878                    return Err(Error::UnexpectedJsonRpc(format!(
879                        "Got unexpected data from JSON-RPC: {x:?}"
880                    )))
881                }
882            }
883        }
884        Ok(())
885    }
886}
887
888fn main() -> Result<()> {
889    let args = Args::parse();
890
891    setup_logging(args.verbose, None)?;
892
893    let ex = Arc::new(smol::Executor::new());
894    smol::block_on(async {
895        ex.run(async {
896            let rpc_client = Arc::new(RpcClient::new(args.endpoint.clone(), ex.clone()).await?);
897            let fu = Fu { rpc_client, endpoint: args.endpoint.clone() };
898
899            match args.command {
900                Subcmd::Get { hash, path, files } => fu.get(hash, path, files, ex.clone()).await,
901                Subcmd::Put { path } => fu.put(path, ex.clone()).await,
902                Subcmd::Ls {} => fu.list_resources().await,
903                Subcmd::Watch {} => fu.watch(ex.clone()).await,
904                Subcmd::Rm { hash } => fu.remove(hash).await,
905                Subcmd::Buckets {} => fu.buckets().await,
906                Subcmd::Seeders {} => fu.seeders().await,
907                Subcmd::Verify { files } => fu.verify(files).await,
908                Subcmd::Lookup { hash } => fu.lookup(hash, ex.clone()).await,
909            }?;
910
911            Ok(())
912        })
913        .await
914    })
915}