1use std::{io, time::Duration};
20
21use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
22use tracing::error;
23
24use super::jsonrpc::*;
25use crate::net::transport::PtStream;
26
27pub(super) const INIT_BUF_SIZE: usize = 4096; pub(super) const MAX_BUF_SIZE: usize = 1024 * 1024 * 16; pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub(super) async fn http_read_from_stream_request(
34 reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
35 buf: &mut Vec<u8>,
36) -> io::Result<usize> {
37 let mut total_read = 0;
38
39 let mut tmpbuf = [0_u8];
41
42 while total_read < MAX_BUF_SIZE {
43 buf.resize(total_read + INIT_BUF_SIZE, 0u8);
44
45 match reader.read(&mut tmpbuf).await {
46 Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
47 Ok(0) => break, Ok(_) => {
49 buf[total_read] = tmpbuf[0];
51 total_read += 1;
52
53 if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
57 {
58 break
59 }
60 }
61
62 Err(e) => return Err(e),
63 }
64 }
65
66 let mut headers = [httparse::EMPTY_HEADER; 8];
68 let mut req = httparse::Request::new(&mut headers);
69 let _body_offset = match req.parse(buf) {
70 Ok(v) => v.unwrap(), Err(e) => {
72 error!("[RPC] Failed parsing HTTP request: {e}");
73 return Err(io::ErrorKind::InvalidData.into())
74 }
75 };
76
77 let mut content_length: usize = 0;
78 for header in headers {
79 if header.name.to_lowercase() == "content-length" {
80 let s = String::from_utf8_lossy(header.value);
81 content_length = match s.parse() {
82 Ok(v) => v,
83 Err(_) => return Err(io::ErrorKind::InvalidData.into()),
84 };
85 }
86 }
87
88 if content_length == 0 || content_length > MAX_BUF_SIZE {
89 return Err(io::ErrorKind::InvalidData.into())
90 }
91
92 buf.clear();
94 buf.resize(content_length, 0_u8);
95 reader.read(buf).await?;
96
97 assert!(buf.len() == content_length);
98 Ok(content_length)
99}
100
101pub(super) async fn http_read_from_stream_response(
104 reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
105 buf: &mut Vec<u8>,
106) -> io::Result<usize> {
107 let mut total_read = 0;
108
109 let mut tmpbuf = [0_u8];
111
112 while total_read < MAX_BUF_SIZE {
113 buf.resize(total_read + INIT_BUF_SIZE, 0u8);
114
115 match reader.read(&mut tmpbuf).await {
116 Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
117 Ok(0) => break, Ok(_) => {
119 buf[total_read] = tmpbuf[0];
121 total_read += 1;
122
123 if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
127 {
128 break
129 }
130 }
131
132 Err(e) => return Err(e),
133 }
134 }
135
136 let mut headers = [httparse::EMPTY_HEADER; 8];
138 let mut resp = httparse::Response::new(&mut headers);
139 let _body_offset = match resp.parse(buf) {
140 Ok(v) => v.unwrap(), Err(e) => {
142 error!("[RPC] Failed parsing HTTP response: {e}");
143 return Err(io::ErrorKind::InvalidData.into())
144 }
145 };
146
147 let mut content_length: usize = 0;
148 for header in headers {
149 if header.name.to_lowercase() == "content-length" {
150 let s = String::from_utf8_lossy(header.value);
151 content_length = match s.parse() {
152 Ok(v) => v,
153 Err(_) => return Err(io::ErrorKind::InvalidData.into()),
154 };
155 }
156 }
157
158 if content_length == 0 || content_length > MAX_BUF_SIZE {
159 return Err(io::ErrorKind::InvalidData.into())
160 }
161
162 buf.clear();
164 buf.resize(content_length, 0_u8);
165 reader.read(buf).await?;
166
167 assert!(buf.len() == content_length);
168 Ok(content_length)
169}
170
171pub(super) async fn read_from_stream(
174 reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
175 buf: &mut Vec<u8>,
176) -> io::Result<usize> {
177 let mut total_read = 0;
178
179 let mut tmpbuf = [0_u8];
181
182 while total_read < MAX_BUF_SIZE {
183 buf.resize(total_read + INIT_BUF_SIZE, 0u8);
184
185 match reader.read(&mut tmpbuf).await {
186 Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
187 Ok(0) => break, Ok(_) => {
189 if tmpbuf[0] == b'\n' {
191 if buf[total_read - 1] == b'\r' {
192 buf.pop();
193 total_read -= 1;
194 }
195 break
196 }
197
198 buf[total_read] = tmpbuf[0];
200 total_read += 1;
201 }
202
203 Err(e) => return Err(e),
204 }
205 }
206
207 buf.truncate(total_read);
209 Ok(total_read)
210}
211
212pub(super) async fn http_write_to_stream(
215 writer: &mut WriteHalf<Box<dyn PtStream>>,
216 object: &JsonResult,
217) -> io::Result<()> {
218 let (status_line, object_str) = match object {
219 JsonResult::Notification(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
220 JsonResult::Response(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
221 JsonResult::Error(v) => ("HTTP/1.1 400 Bad Request", v.stringify().unwrap()),
222 JsonResult::Request(v) => ("POST /json_rpc HTTP/1.1", v.stringify().unwrap()),
223 _ => unreachable!(),
224 };
225
226 let length = object_str.len();
227 let data = format!("{status_line}\r\nContent-Length: {length}\r\nContent-Type: application/json\r\n\r\n{object_str}");
228
229 writer.write_all(data.as_bytes()).await?;
230 writer.flush().await?;
231
232 Ok(())
233}
234
235pub(super) async fn write_to_stream(
237 writer: &mut WriteHalf<Box<dyn PtStream>>,
238 object: &JsonResult,
239) -> io::Result<()> {
240 let object_str = match object {
241 JsonResult::Notification(v) => v.stringify().unwrap(),
242 JsonResult::Response(v) => v.stringify().unwrap(),
243 JsonResult::Error(v) => v.stringify().unwrap(),
244 JsonResult::Request(v) => v.stringify().unwrap(),
245 _ => unreachable!(),
246 };
247
248 for i in [object_str.as_bytes(), b"\r\n"] {
250 writer.write_all(i).await?
251 }
252
253 writer.flush().await?;
254
255 Ok(())
256}