1use std::collections::HashMap;
7use std::io::Cursor;
8use std::io::Read;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use std::sync::mpsc;
13use std::sync::mpsc::Receiver;
14use std::sync::mpsc::Sender;
15use std::thread::JoinHandle;
16use std::time::Duration;
17use std::time::Instant;
18
19use curl::easy::Easy2;
20use curl::easy::Handler;
21use curl::easy::InfoType;
22use curl::easy::WriteError;
23use curl::multi::Easy2Handle;
24use curl::multi::Multi;
25use futures::channel::oneshot;
26use portable_atomic::AtomicI64;
27use portable_atomic::AtomicU64;
28use tracing::{debug, error, trace, warn};
29
30use crate::util::network::http::HandleConfiguration;
31use crate::util::network::http::HttpTimeout;
32
33type Response = http::Response<Vec<u8>>;
34type Request = http::Request<Vec<u8>>;
35type HttpResult<T> = std::result::Result<T, Error>;
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[non_exhaustive]
39pub enum Error {
40 #[error(transparent)]
41 Multi(#[from] curl::MultiError),
42
43 #[error(transparent)]
44 Easy(#[from] curl::Error),
45
46 #[error(
47 "transfer too slow: failed to transfer more than {low_speed_limit} bytes in {}s (transferred {transferred} bytes)",
48 timeout_dur.as_secs()
49 )]
50 TooSlow {
51 low_speed_limit: u32,
52 timeout_dur: Duration,
53 transferred: u64,
54 },
55
56 #[error("failed to convert header value of `{name}` to string: {bytes:?}")]
57 BadHeader { name: String, bytes: Vec<u8> },
58}
59
60struct Message {
61 easy: Easy2<Collector>,
62 sender: oneshot::Sender<HttpResult<Response>>,
63}
64
65#[derive(Default)]
66struct Stats {
67 dl_remaining: AtomicI64,
68 dl_transferred: AtomicU64,
69}
70
71pub struct Client {
74 channel: Option<Sender<Message>>,
75 thread_handle: Option<JoinHandle<()>>,
76 handle_config: HandleConfiguration,
77 stats: Arc<Stats>,
78}
79
80impl Client {
81 pub fn new(handle_config: HandleConfiguration) -> Client {
83 let (tx, rx) = mpsc::channel();
84 let stats = Arc::new(Stats::default());
85 let timeout = handle_config.timeout.clone();
86 let worker_stats = stats.clone();
87 let handle = std::thread::spawn(move || {
88 WorkerServer::run(rx, handle_config.multiplexing, timeout, worker_stats)
89 });
90 Client {
91 channel: Some(tx),
92 thread_handle: Some(handle),
93 handle_config,
94 stats,
95 }
96 }
97
98 pub fn request_blocking(&self, request: Request) -> HttpResult<Response> {
101 let handle = self.request_helper(request)?;
102 handle.perform()?;
103 Ok(WorkerServer::process_response(handle))
104 }
105
106 pub async fn request(&self, request: Request) -> HttpResult<Response> {
108 let handle = self.request_helper(request)?;
109 let (sender, receiver) = oneshot::channel();
110 let req = Message {
111 easy: handle,
112 sender,
113 };
114 self.channel.as_ref().unwrap().send(req).unwrap();
115 receiver.await.unwrap()
116 }
117
118 fn request_helper(&self, request: Request) -> HttpResult<Easy2<Collector>> {
119 let url = request.uri().to_string();
120 debug!(target: "network::fetch", url);
121 let mut collector = Collector::new(self.stats.clone());
122 let (parts, body) = request.into_parts();
123 let body_len = body.len();
124 collector.request_body = Cursor::new(body);
125 collector.debug = self.handle_config.verbose;
126 let mut handle = curl::easy::Easy2::new(collector);
127 self.handle_config.configure2(&mut handle)?;
128
129 handle.url(&url)?;
130 handle.follow_location(true)?;
131 handle.progress(true)?;
132
133 match parts.method {
134 http::Method::HEAD => handle.nobody(true)?,
135 http::Method::GET => handle.get(true)?,
136 http::Method::POST => {
137 handle.post_field_size(body_len as u64)?;
138 handle.post(true)?;
139 }
140 http::Method::PUT => {
141 handle.in_filesize(body_len as u64)?;
142 handle.put(true)?;
143 }
144 method => {
145 handle.upload(true)?;
146 handle.in_filesize(body_len as u64)?;
147 handle.custom_request(method.as_str())?;
148 }
149 }
150
151 let mut headers = curl::easy::List::new();
152 for (name, value) in parts.headers {
153 if let Some(name) = name {
154 let value: &str = value.to_str().map_err(|_| Error::BadHeader {
155 name: name.to_string(),
156 bytes: value.as_bytes().to_owned(),
157 })?;
158 headers.append(&format!("{}: {}", name, value))?;
159 }
160 }
161 handle.http_headers(headers)?;
162
163 Ok(handle)
164 }
165
166 pub fn bytes_pending(&self) -> u64 {
168 self.stats
169 .dl_remaining
170 .load(Ordering::Acquire)
171 .try_into()
172 .unwrap()
173 }
174}
175
176impl Drop for Client {
177 fn drop(&mut self) {
178 drop(self.channel.take().unwrap());
180 let _ = self.thread_handle.take().unwrap().join();
182 }
183}
184
185impl std::fmt::Debug for Client {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct("http_async::Client").finish()
188 }
189}
190
191struct WorkerServer {
194 incoming_work: Receiver<Message>,
196 multi: Multi,
198 handles: HashMap<
200 usize,
201 (
202 Easy2Handle<Collector>,
203 oneshot::Sender<HttpResult<Response>>,
204 ),
205 >,
206 token: usize,
208 timeout: HttpTimeout,
210 stats: Arc<Stats>,
212 low_speed_window_start: Instant,
214 low_speed_window_initial: u64,
216}
217
218impl WorkerServer {
219 fn run(
220 incoming_work: Receiver<Message>,
221 multiplex: bool,
222 timeout: HttpTimeout,
223 stats: Arc<Stats>,
224 ) {
225 let mut multi = Multi::new();
226 if let Err(e) = multi.set_max_host_connections(2) {
228 error!("failed to set max host connections in curl: {e}");
229 }
230 if let Err(e) = multi.pipelining(false, multiplex) {
231 error!("failed to enable multiplexing/pipelining in curl: {e}");
232 }
233
234 let mut worker = Self {
235 incoming_work,
236 multi,
237 handles: HashMap::new(),
238 token: 0,
239 timeout,
240 stats,
241 low_speed_window_start: Instant::now(),
242 low_speed_window_initial: 0,
243 };
244 worker.worker_loop();
245 }
246
247 fn fail_and_drain(&mut self, e: &Error) {
248 warn!(
249 target: "network",
250 "failing all outstanding HTTP requests: {e}"
251 );
252 for (_token, (_handle, sender)) in self.handles.drain() {
253 let _ = sender.send(Err(e.clone()));
254 }
255 }
256
257 fn process_response(mut easy: Easy2<Collector>) -> Response {
258 let mut response =
259 std::mem::replace(&mut easy.get_mut().response, Response::new(Vec::new()));
260 if let Ok(status) = easy.response_code()
261 && status != 0
262 && let Ok(status) = http::StatusCode::from_u16(status as u16)
263 {
264 *response.status_mut() = status;
265 }
266 let extensions = Extensions {
268 client_ip: easy.primary_ip().ok().flatten().map(str::to_string),
269 effective_url: easy.effective_url().ok().flatten().map(str::to_string),
270 };
271 response.extensions_mut().insert(extensions);
272 response
273 }
274
275 fn reset_low_speed_timeout(&mut self) {
277 self.low_speed_window_start = Instant::now();
278 self.low_speed_window_initial = self.stats.dl_transferred.load(Ordering::Acquire);
279 }
280
281 fn check_low_speed_timeout(&mut self) -> Option<Error> {
284 if Instant::now().duration_since(self.low_speed_window_start) < self.timeout.dur {
286 return None;
287 }
288
289 let current = self.stats.dl_transferred.load(Ordering::Acquire);
291 let transferred = current.saturating_sub(self.low_speed_window_initial);
292 self.reset_low_speed_timeout();
293 if transferred < self.timeout.low_speed_limit.into() {
294 Some(Error::TooSlow {
295 low_speed_limit: self.timeout.low_speed_limit,
296 timeout_dur: self.timeout.dur,
297 transferred,
298 })
299 } else {
300 None
301 }
302 }
303
304 fn worker_loop(&mut self) {
305 const INITIAL_DELAY: Duration = Duration::from_millis(1);
306 let mut wait_backoff = INITIAL_DELAY;
307 loop {
308 while let Ok(msg) = self.incoming_work.try_recv() {
310 self.enqueue_request(msg);
311 wait_backoff = INITIAL_DELAY;
312 }
313
314 match self.multi.perform() {
315 Err(e) if e.is_call_perform() => {
316 }
318 Err(e) => {
319 self.fail_and_drain(&Error::Multi(e));
320 }
321 Ok(running) => {
322 self.multi.messages(|msg| {
323 let t = msg.token().expect("all handles have tokens");
324 trace!(token = t, "finish");
325 let Some((handle, sender)) = self.handles.remove(&t) else {
326 error!("missing entry {t} in handle table");
327 return;
328 };
329 let result = msg.result_for2(&handle).expect("handle must have a result");
330 let easy = self.multi.remove2(handle).expect("handle must be in multi");
331 let response = Self::process_response(easy);
332 let _ = sender.send(result.map(|()| response).map_err(Into::into));
333 });
334
335 if running > 0 {
336 if let Some(timeout_error) = self.check_low_speed_timeout() {
338 self.fail_and_drain(&timeout_error);
339 continue;
340 }
341
342 let max_timeout = Duration::from_millis(1000);
343 let mut timeout = self
344 .multi
345 .get_timeout()
346 .ok()
347 .flatten()
348 .unwrap_or(max_timeout)
349 .min(max_timeout);
350 if timeout.is_zero() {
351 continue;
353 }
354 if wait_backoff < timeout {
366 wait_backoff *= 2;
367 timeout = wait_backoff
368 }
369 trace!(
370 pending = self.handles.len(),
371 timeout = timeout.as_millis(),
372 "curl wait"
373 );
374 if let Err(e) = self.multi.wait(&mut [], timeout) {
375 self.fail_and_drain(&Error::Multi(e));
376 }
377 } else {
378 trace!("all work completed");
380 match self.incoming_work.recv() {
381 Ok(msg) => {
382 trace!("resuming work");
383 self.reset_low_speed_timeout();
384 self.enqueue_request(msg);
385 wait_backoff = INITIAL_DELAY;
386 }
387 Err(_) => {
388 break;
390 }
391 }
392 }
393 }
394 }
395 }
396 }
397
398 fn enqueue_request(&mut self, message: Message) {
400 match self.multi.add2(message.easy) {
401 Ok(mut handle) => {
402 self.token = self.token.wrapping_add(1);
403 handle.set_token(self.token).ok();
404 self.handles.insert(self.token, (handle, message.sender));
405 }
406 Err(e) => {
407 let _ = message.sender.send(Err(e.into()));
408 }
409 }
410 }
411}
412
413struct Collector {
415 response: Response,
417 request_body: Cursor<Vec<u8>>,
419 debug: bool,
421 global_stats: Arc<Stats>,
423 dl_remaining_delta: i64,
425}
426
427impl Collector {
428 fn new(stats: Arc<Stats>) -> Self {
429 Collector {
430 response: Response::new(Vec::new()),
431 request_body: Cursor::new(Vec::new()),
432 debug: false,
433 global_stats: stats,
434 dl_remaining_delta: 0,
435 }
436 }
437}
438
439impl Handler for Collector {
440 fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
441 self.response.body_mut().extend_from_slice(data);
442 self.global_stats
443 .dl_transferred
444 .fetch_add(data.len() as u64, Ordering::Release);
445 Ok(data.len())
446 }
447
448 fn header(&mut self, data: &[u8]) -> bool {
449 if let Some((name, value)) = handle_http_header(data)
450 && let Ok(name) = http::HeaderName::from_str(name)
451 && let Ok(value) = http::HeaderValue::from_str(value)
452 {
453 self.response.headers_mut().append(name, value);
454 }
455 true
456 }
457
458 fn read(&mut self, data: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
459 Ok(self.request_body.read(data).unwrap())
460 }
461
462 fn debug(&mut self, kind: InfoType, data: &[u8]) {
463 if self.debug {
464 super::http::debug(kind, data);
465 }
466 }
467
468 fn progress(&mut self, dltotal: f64, dlnow: f64, _ultotal: f64, _ulnow: f64) -> bool {
469 if dlnow > dltotal {
470 return true;
471 }
472 let dl_total = dltotal as i64;
473 let dl_current = dlnow as i64;
474
475 let remaining = dl_total - dl_current;
476
477 self.global_stats
478 .dl_remaining
479 .fetch_add(remaining - self.dl_remaining_delta, Ordering::Release);
480 self.dl_remaining_delta = remaining;
481 true
482 }
483}
484
485impl Drop for Collector {
486 fn drop(&mut self) {
487 self.global_stats
489 .dl_remaining
490 .fetch_add(-self.dl_remaining_delta, Ordering::Release);
491 }
492}
493
494#[derive(Clone)]
496struct Extensions {
497 client_ip: Option<String>,
498 effective_url: Option<String>,
499}
500
501pub trait ResponsePartsExtensions {
502 fn client_ip(&self) -> Option<&str>;
503 fn effective_url(&self) -> Option<&str>;
504}
505
506impl ResponsePartsExtensions for http::response::Parts {
507 fn client_ip(&self) -> Option<&str> {
508 self.extensions
509 .get::<Extensions>()
510 .and_then(|extensions| extensions.client_ip.as_deref())
511 }
512
513 fn effective_url(&self) -> Option<&str> {
514 self.extensions
515 .get::<Extensions>()
516 .and_then(|extensions| extensions.effective_url.as_deref())
517 }
518}
519
520impl ResponsePartsExtensions for Response {
521 fn client_ip(&self) -> Option<&str> {
522 self.extensions()
523 .get::<Extensions>()
524 .and_then(|extensions| extensions.client_ip.as_deref())
525 }
526
527 fn effective_url(&self) -> Option<&str> {
528 self.extensions()
529 .get::<Extensions>()
530 .and_then(|extensions| extensions.effective_url.as_deref())
531 }
532}
533
534fn handle_http_header(buf: &[u8]) -> Option<(&str, &str)> {
536 if buf.is_empty() {
537 return None;
538 }
539 let buf = std::str::from_utf8(buf).ok()?.trim_end();
540 if buf.contains('\n') {
542 return None;
543 }
544 let (tag, value) = buf.split_once(':')?;
545 let value = value.trim();
546 Some((tag, value))
547}