fud/
rpc.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use async_trait::async_trait;
20use log::error;
21use smol::lock::{Mutex, MutexGuard};
22use std::{
23    collections::{HashMap, HashSet},
24    path::PathBuf,
25    sync::Arc,
26};
27use tinyjson::JsonValue;
28
29use darkfi::{
30    dht::DhtNode,
31    geode::hash_to_string,
32    net::P2pPtr,
33    rpc::{
34        jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber},
35        p2p_method::HandlerP2p,
36        server::RequestHandler,
37    },
38    system::StoppableTaskPtr,
39    util::path::expand_path,
40    Result,
41};
42
43use crate::{util::FileSelection, Fud};
44
45pub struct JsonRpcInterface {
46    fud: Arc<Fud>,
47    rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
48    dnet_sub: JsonSubscriber,
49    event_sub: JsonSubscriber,
50}
51
52#[async_trait]
53impl RequestHandler<()> for JsonRpcInterface {
54    async fn handle_request(&self, req: JsonRequest) -> JsonResult {
55        return match req.method.as_str() {
56            "ping" => self.pong(req.id, req.params).await,
57
58            "put" => self.put(req.id, req.params).await,
59            "get" => self.get(req.id, req.params).await,
60            "subscribe" => self.subscribe(req.id, req.params).await,
61            "remove" => self.remove(req.id, req.params).await,
62            "list_resources" => self.list_resources(req.id, req.params).await,
63            "list_buckets" => self.list_buckets(req.id, req.params).await,
64            "list_seeders" => self.list_seeders(req.id, req.params).await,
65            "verify" => self.verify(req.id, req.params).await,
66
67            "dnet.switch" => self.dnet_switch(req.id, req.params).await,
68            "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
69            "p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
70            _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
71        }
72    }
73
74    async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
75        self.rpc_connections.lock().await
76    }
77}
78
79impl HandlerP2p for JsonRpcInterface {
80    fn p2p(&self) -> P2pPtr {
81        self.fud.p2p.clone()
82    }
83}
84
85/// Fud RPC methods
86impl JsonRpcInterface {
87    pub fn new(fud: Arc<Fud>, dnet_sub: JsonSubscriber, event_sub: JsonSubscriber) -> Self {
88        Self { fud, rpc_connections: Mutex::new(HashSet::new()), dnet_sub, event_sub }
89    }
90
91    // RPCAPI:
92    // Put a file onto the network. Takes a local filesystem path as a parameter.
93    // Returns the file hash that serves as a pointer to the uploaded file.
94    //
95    // --> {"jsonrpc": "2.0", "method": "put", "params": ["/foo.txt"], "id": 42}
96    // <-- {"jsonrpc": "2.0", "result: "df4...3db7", "id": 42}
97    async fn put(&self, id: u16, params: JsonValue) -> JsonResult {
98        let params = params.get::<Vec<JsonValue>>().unwrap();
99        if params.len() != 1 || !params[0].is_string() {
100            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
101        }
102
103        let path = params[0].get::<String>().unwrap();
104        let path = match expand_path(path.as_str()) {
105            Ok(v) => v,
106            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
107        };
108
109        // A valid path was passed. Let's see if we can read it, and if so,
110        // add it to Geode.
111        let res = self.fud.put(&path).await;
112        if let Err(e) = res {
113            return JsonError::new(ErrorCode::InternalError, Some(format!("{e}")), id).into()
114        }
115
116        JsonResponse::new(JsonValue::String(path.to_string_lossy().to_string()), id).into()
117    }
118
119    // RPCAPI:
120    // Fetch a resource from the network. Takes a hash, path (absolute or relative), and an
121    // optional list of file paths (only used for directories) as parameters.
122    // Returns the path where the resource will be located once downloaded.
123    //
124    // --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd", "~/myfile.jpg", null], "id": 42}
125    // <-- {"jsonrpc": "2.0", "result": "/home/user/myfile.jpg", "id": 42}
126    async fn get(&self, id: u16, params: JsonValue) -> JsonResult {
127        let params = params.get::<Vec<JsonValue>>().unwrap();
128        if params.len() != 3 || !params[0].is_string() || !params[1].is_string() {
129            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
130        }
131
132        let mut hash_buf = vec![];
133        match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
134            Ok(_) => {}
135            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
136        }
137
138        if hash_buf.len() != 32 {
139            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
140        }
141
142        let mut hash_buf_arr = [0u8; 32];
143        hash_buf_arr.copy_from_slice(&hash_buf);
144
145        let hash = blake3::Hash::from_bytes(hash_buf_arr);
146        let hash_str = hash_to_string(&hash);
147
148        let path = match params[1].get::<String>() {
149            Some(path) => match path.is_empty() {
150                true => match self.fud.hash_to_path(&hash).ok().flatten() {
151                    Some(path) => path,
152                    None => self.fud.downloads_path.join(&hash_str),
153                },
154                false => match PathBuf::from(path).is_absolute() {
155                    true => PathBuf::from(path),
156                    false => self.fud.downloads_path.join(path),
157                },
158            },
159            None => self.fud.downloads_path.join(&hash_str),
160        };
161
162        let files: FileSelection = match &params[2] {
163            JsonValue::Array(files) => files
164                .iter()
165                .filter_map(|v| {
166                    if let JsonValue::String(file) = v {
167                        Some(PathBuf::from(file.clone()))
168                    } else {
169                        None
170                    }
171                })
172                .collect(),
173            JsonValue::Null => FileSelection::All,
174            _ => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
175        };
176
177        // Start downloading the resource
178        if let Err(e) = self.fud.get(&hash, &path, files).await {
179            return JsonError::new(ErrorCode::InternalError, Some(e.to_string()), id).into()
180        }
181
182        JsonResponse::new(JsonValue::String(path.to_string_lossy().to_string()), id).into()
183    }
184
185    // RPCAPI:
186    // Subscribe to fud events.
187    //
188    // --> {"jsonrpc": "2.0", "method": "get", "params": [], "id": 42}
189    // <-- {"jsonrpc": "2.0", "result": `event`, "id": 42}
190    async fn subscribe(&self, _id: u16, _params: JsonValue) -> JsonResult {
191        self.event_sub.clone().into()
192    }
193
194    // RPCAPI:
195    // Activate or deactivate dnet in the P2P stack.
196    // By sending `true`, dnet will be activated, and by sending `false` dnet
197    // will be deactivated. Returns `true` on success.
198    //
199    // --> {"jsonrpc": "2.0", "method": "dnet_switch", "params": [true], "id": 42}
200    // <-- {"jsonrpc": "2.0", "result": true, "id": 42}
201    async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
202        let params = params.get::<Vec<JsonValue>>().unwrap();
203        if params.len() != 1 || !params[0].is_bool() {
204            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
205        }
206
207        let switch = params[0].get::<bool>().unwrap();
208
209        if *switch {
210            self.fud.p2p.dnet_enable();
211        } else {
212            self.fud.p2p.dnet_disable();
213        }
214
215        JsonResponse::new(JsonValue::Boolean(true), id).into()
216    }
217
218    // RPCAPI:
219    // Initializes a subscription to p2p dnet events.
220    // Once a subscription is established, `fud` will send JSON-RPC notifications of
221    // new network events to the subscriber.
222    //
223    // --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1}
224    // <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]}
225    pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
226        let params = params.get::<Vec<JsonValue>>().unwrap();
227        if !params.is_empty() {
228            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
229        }
230
231        self.dnet_sub.clone().into()
232    }
233
234    // RPCAPI:
235    // Returns resources.
236    //
237    // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
238    // <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1}
239    pub async fn list_resources(&self, id: u16, params: JsonValue) -> JsonResult {
240        let params = params.get::<Vec<JsonValue>>().unwrap();
241        if !params.is_empty() {
242            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
243        }
244
245        let resources_read = self.fud.resources.read().await;
246        let mut resources: Vec<JsonValue> = vec![];
247        for (_, resource) in resources_read.iter() {
248            resources.push(resource.clone().into());
249        }
250
251        JsonResponse::new(JsonValue::Array(resources), id).into()
252    }
253
254    // RPCAPI:
255    // Returns the current buckets.
256    //
257    // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
258    // <-- {"jsonrpc": "2.0", "result": [["abcdef", ["tcp://127.0.0.1:13337"]]], "id": 1}
259    pub async fn list_buckets(&self, id: u16, params: JsonValue) -> JsonResult {
260        let params = params.get::<Vec<JsonValue>>().unwrap();
261        if !params.is_empty() {
262            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
263        }
264        let mut buckets = vec![];
265        for bucket in self.fud.dht.buckets.read().await.iter() {
266            let mut nodes = vec![];
267            for node in bucket.nodes.clone() {
268                let mut addresses = vec![];
269                for addr in &node.addresses {
270                    addresses.push(JsonValue::String(addr.to_string()));
271                }
272                nodes.push(JsonValue::Array(vec![
273                    JsonValue::String(hash_to_string(&node.id())),
274                    JsonValue::Array(addresses),
275                ]));
276            }
277            buckets.push(JsonValue::Array(nodes));
278        }
279
280        JsonResponse::new(JsonValue::Array(buckets), id).into()
281    }
282
283    // RPCAPI:
284    // Returns the content of the seeders router.
285    //
286    // --> {"jsonrpc": "2.0", "method": "list_seeders", "params": [], "id": 1}
287    // <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdefileid": [["abcdef", ["tcp://127.0.0.1:13337"]]]}}, "id": 1}
288    pub async fn list_seeders(&self, id: u16, params: JsonValue) -> JsonResult {
289        let params = params.get::<Vec<JsonValue>>().unwrap();
290        if !params.is_empty() {
291            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
292        }
293        let mut seeders_router: HashMap<String, JsonValue> = HashMap::new();
294        for (hash, items) in self.fud.seeders_router.read().await.iter() {
295            let mut nodes = vec![];
296            for item in items {
297                let mut addresses = vec![];
298                for addr in &item.node.addresses {
299                    addresses.push(JsonValue::String(addr.to_string()));
300                }
301                nodes.push(JsonValue::Array(vec![
302                    JsonValue::String(hash_to_string(&item.node.id())),
303                    JsonValue::Array(addresses),
304                ]));
305            }
306            seeders_router.insert(hash_to_string(hash), JsonValue::Array(nodes));
307        }
308        let mut res: HashMap<String, JsonValue> = HashMap::new();
309        res.insert("seeders".to_string(), JsonValue::Object(seeders_router));
310
311        JsonResponse::new(JsonValue::Object(res), id).into()
312    }
313
314    // RPCAPI:
315    // Removes a resource.
316    //
317    // --> {"jsonrpc": "2.0", "method": "remove", "params": ["1211...abfd"], "id": 1}
318    // <-- {"jsonrpc": "2.0", "result": [], "id": 1}
319    pub async fn remove(&self, id: u16, params: JsonValue) -> JsonResult {
320        let params = params.get::<Vec<JsonValue>>().unwrap();
321        if params.len() != 1 || !params[0].is_string() {
322            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
323        }
324        let mut hash_buf = [0u8; 32];
325        match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
326            Ok(_) => {}
327            Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
328        }
329
330        self.fud.remove(&blake3::Hash::from_bytes(hash_buf)).await;
331
332        JsonResponse::new(JsonValue::Array(vec![]), id).into()
333    }
334
335    // RPCAPI:
336    // Verifies local files. Takes a list of file hashes as parameters.
337    // An empty list means all known files.
338    // Returns the path where the file will be located once downloaded.
339    //
340    // --> {"jsonrpc": "2.0", "method": "verify", "params": ["1211...abfd"], "id": 42}
341    // <-- {"jsonrpc": "2.0", "result": [], "id": 1}
342    async fn verify(&self, id: u16, params: JsonValue) -> JsonResult {
343        let params = params.get::<Vec<JsonValue>>().unwrap();
344        if !params.iter().all(|param| param.is_string()) {
345            return JsonError::new(ErrorCode::InvalidParams, None, id).into()
346        }
347        let hashes = if params.is_empty() {
348            None
349        } else {
350            let hashes_str: Vec<String> =
351                params.iter().map(|param| param.get::<String>().unwrap().clone()).collect();
352            let hashes: Result<Vec<blake3::Hash>> = hashes_str
353                .into_iter()
354                .map(|hash_str| {
355                    let mut buf = [0u8; 32];
356                    bs58::decode(hash_str).onto(&mut buf)?;
357                    Ok(blake3::Hash::from_bytes(buf))
358                })
359                .collect();
360            if hashes.is_err() {
361                return JsonError::new(ErrorCode::InvalidParams, None, id).into();
362            }
363            Some(hashes.unwrap())
364        };
365
366        if let Err(e) = self.fud.verify_resources(hashes).await {
367            error!(target: "fud::verify()", "Could not verify resources: {e}");
368            return JsonError::new(ErrorCode::InternalError, None, id).into();
369        }
370
371        JsonResponse::new(JsonValue::Array(vec![]), id).into()
372    }
373}