darkfi/rpc/
common.rs

1/* This file is part of DarkFi (https://dark.fi)
2 *
3 * Copyright (C) 2020-2025 Dyne.org foundation
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU Affero General Public License as
7 * published by the Free Software Foundation, either version 3 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU Affero General Public License for more details.
14 *
15 * You should have received a copy of the GNU Affero General Public License
16 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
17 */
18
19use std::{io, time::Duration};
20
21use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
22use tracing::error;
23
24use super::jsonrpc::*;
25use crate::net::transport::PtStream;
26
27pub(super) const INIT_BUF_SIZE: usize = 4096; // 4K
28pub(super) const MAX_BUF_SIZE: usize = 1024 * 1024 * 16; // 16M
29pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30);
30
31/// Internal read function that reads from the active stream into a buffer.
32/// Performs HTTP POST request parsing. Returns the request body length.
33pub(super) async fn http_read_from_stream_request(
34    reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
35    buf: &mut Vec<u8>,
36) -> io::Result<usize> {
37    let mut total_read = 0;
38
39    // Intermediate buffer we use to read byte-by-byte.
40    let mut tmpbuf = [0_u8];
41
42    while total_read < MAX_BUF_SIZE {
43        buf.resize(total_read + INIT_BUF_SIZE, 0u8);
44
45        match reader.read(&mut tmpbuf).await {
46            Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
47            Ok(0) => break, // Finished reading
48            Ok(_) => {
49                // Copy the read byte to the destination buffer.
50                buf[total_read] = tmpbuf[0];
51                total_read += 1;
52
53                // In HTTP, when we reach '\r\n\r\n' we know we've read the headers.
54                // The rest is the body. Headers should contain Content-Length which
55                // tells us the remaining amount of bytes to read.
56                if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
57                {
58                    break
59                }
60            }
61
62            Err(e) => return Err(e),
63        }
64    }
65
66    // Here we parse the HTTP for correctness and find Content-Length
67    let mut headers = [httparse::EMPTY_HEADER; 8];
68    let mut req = httparse::Request::new(&mut headers);
69    let _body_offset = match req.parse(buf) {
70        Ok(v) => v.unwrap(), // TODO: This should check httparse::Status::is_partial()
71        Err(e) => {
72            error!("[RPC] Failed parsing HTTP request: {e}");
73            return Err(io::ErrorKind::InvalidData.into())
74        }
75    };
76
77    let mut content_length: usize = 0;
78    for header in headers {
79        if header.name.to_lowercase() == "content-length" {
80            let s = String::from_utf8_lossy(header.value);
81            content_length = match s.parse() {
82                Ok(v) => v,
83                Err(_) => return Err(io::ErrorKind::InvalidData.into()),
84            };
85        }
86    }
87
88    if content_length == 0 || content_length > MAX_BUF_SIZE {
89        return Err(io::ErrorKind::InvalidData.into())
90    }
91
92    // Now we know the request body size. Read it into the buffer.
93    buf.clear();
94    buf.resize(content_length, 0_u8);
95    reader.read(buf).await?;
96
97    assert!(buf.len() == content_length);
98    Ok(content_length)
99}
100
101/// Internal read function that reads from the active stream into a buffer.
102/// Performs HTTP POST response parsing. Returns the response body length.
103pub(super) async fn http_read_from_stream_response(
104    reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
105    buf: &mut Vec<u8>,
106) -> io::Result<usize> {
107    let mut total_read = 0;
108
109    // Intermediate buffer we use to read byte-by-byte.
110    let mut tmpbuf = [0_u8];
111
112    while total_read < MAX_BUF_SIZE {
113        buf.resize(total_read + INIT_BUF_SIZE, 0u8);
114
115        match reader.read(&mut tmpbuf).await {
116            Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
117            Ok(0) => break, // Finished reading
118            Ok(_) => {
119                // Copy the read byte to the destination buffer.
120                buf[total_read] = tmpbuf[0];
121                total_read += 1;
122
123                // In HTTP, when we reach '\r\n\r\n' we know we've read the headers.
124                // The rest is the body. Headers should contain Content-Length which
125                // tells us the remaining amount of bytes to read.
126                if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
127                {
128                    break
129                }
130            }
131
132            Err(e) => return Err(e),
133        }
134    }
135
136    // Here we parse the HTTP for correctness and find Content-Length
137    let mut headers = [httparse::EMPTY_HEADER; 8];
138    let mut resp = httparse::Response::new(&mut headers);
139    let _body_offset = match resp.parse(buf) {
140        Ok(v) => v.unwrap(), // TODO: This should check httparse::Status::is_partial()
141        Err(e) => {
142            error!("[RPC] Failed parsing HTTP response: {e}");
143            return Err(io::ErrorKind::InvalidData.into())
144        }
145    };
146
147    let mut content_length: usize = 0;
148    for header in headers {
149        if header.name.to_lowercase() == "content-length" {
150            let s = String::from_utf8_lossy(header.value);
151            content_length = match s.parse() {
152                Ok(v) => v,
153                Err(_) => return Err(io::ErrorKind::InvalidData.into()),
154            };
155        }
156    }
157
158    if content_length == 0 || content_length > MAX_BUF_SIZE {
159        return Err(io::ErrorKind::InvalidData.into())
160    }
161
162    // Now we know the response body size. Read it into the buffer.
163    buf.clear();
164    buf.resize(content_length, 0_u8);
165    reader.read(buf).await?;
166
167    assert!(buf.len() == content_length);
168    Ok(content_length)
169}
170
171/// Internal read function that reads from the active stream into a buffer.
172/// Reading stops upon reaching CRLF or LF, or when `MAX_BUF_SIZE` is reached.
173pub(super) async fn read_from_stream(
174    reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
175    buf: &mut Vec<u8>,
176) -> io::Result<usize> {
177    let mut total_read = 0;
178
179    // Intermediate buffer we use to read byte-by-byte.
180    let mut tmpbuf = [0_u8];
181
182    while total_read < MAX_BUF_SIZE {
183        buf.resize(total_read + INIT_BUF_SIZE, 0u8);
184
185        match reader.read(&mut tmpbuf).await {
186            Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
187            Ok(0) => break, // Finished reading
188            Ok(_) => {
189                // When we reach '\n', pop a possible '\r' from the buffer and bail.
190                if tmpbuf[0] == b'\n' {
191                    if buf[total_read - 1] == b'\r' {
192                        buf.pop();
193                        total_read -= 1;
194                    }
195                    break
196                }
197
198                // Copy the read byte to the destination buffer.
199                buf[total_read] = tmpbuf[0];
200                total_read += 1;
201            }
202
203            Err(e) => return Err(e),
204        }
205    }
206
207    // Truncate buffer to actual data size
208    buf.truncate(total_read);
209    Ok(total_read)
210}
211
212/// Internal write function that writes a JSON-RPC object to the active stream.
213/// Sent as an HTTP response.
214pub(super) async fn http_write_to_stream(
215    writer: &mut WriteHalf<Box<dyn PtStream>>,
216    object: &JsonResult,
217) -> io::Result<()> {
218    let (status_line, object_str) = match object {
219        JsonResult::Notification(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
220        JsonResult::Response(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
221        JsonResult::Error(v) => ("HTTP/1.1 400 Bad Request", v.stringify().unwrap()),
222        JsonResult::Request(v) => ("POST /json_rpc HTTP/1.1", v.stringify().unwrap()),
223        _ => unreachable!(),
224    };
225
226    let length = object_str.len();
227    let data = format!("{status_line}\r\nContent-Length: {length}\r\nContent-Type: application/json\r\n\r\n{object_str}");
228
229    writer.write_all(data.as_bytes()).await?;
230    writer.flush().await?;
231
232    Ok(())
233}
234
235/// Internal write function that writes a JSON-RPC object to the active stream.
236pub(super) async fn write_to_stream(
237    writer: &mut WriteHalf<Box<dyn PtStream>>,
238    object: &JsonResult,
239) -> io::Result<()> {
240    let object_str = match object {
241        JsonResult::Notification(v) => v.stringify().unwrap(),
242        JsonResult::Response(v) => v.stringify().unwrap(),
243        JsonResult::Error(v) => v.stringify().unwrap(),
244        JsonResult::Request(v) => v.stringify().unwrap(),
245        _ => unreachable!(),
246    };
247
248    // As we're a line-based protocol, we append CRLF to the end of the JSON string.
249    for i in [object_str.as_bytes(), b"\r\n"] {
250        writer.write_all(i).await?
251    }
252
253    writer.flush().await?;
254
255    Ok(())
256}