Skip to main content

cargo/util/network/
http_async.rs

1//! Async wrapper around cURL for making managing HTTP requests.
2//!
3//! Requests are executed in parallel using cURL [`Multi`] on
4//! a worker thread that is owned by the Client.
5
6use std::collections::HashMap;
7use std::io::Cursor;
8use std::io::Read;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use std::sync::mpsc;
13use std::sync::mpsc::Receiver;
14use std::sync::mpsc::Sender;
15use std::thread::JoinHandle;
16use std::time::Duration;
17use std::time::Instant;
18
19use curl::easy::Easy2;
20use curl::easy::Handler;
21use curl::easy::InfoType;
22use curl::easy::WriteError;
23use curl::multi::Easy2Handle;
24use curl::multi::Multi;
25use futures::channel::oneshot;
26use portable_atomic::AtomicI64;
27use portable_atomic::AtomicU64;
28use tracing::{debug, error, trace, warn};
29
30use crate::util::network::http::HandleConfiguration;
31use crate::util::network::http::HttpTimeout;
32
33type Response = http::Response<Vec<u8>>;
34type Request = http::Request<Vec<u8>>;
35type HttpResult<T> = std::result::Result<T, Error>;
36
37#[derive(Debug, Clone, thiserror::Error)]
38#[non_exhaustive]
39pub enum Error {
40    #[error(transparent)]
41    Multi(#[from] curl::MultiError),
42
43    #[error(transparent)]
44    Easy(#[from] curl::Error),
45
46    #[error(
47        "transfer too slow: failed to transfer more than {low_speed_limit} bytes in {}s (transferred {transferred} bytes)",
48        timeout_dur.as_secs()
49    )]
50    TooSlow {
51        low_speed_limit: u32,
52        timeout_dur: Duration,
53        transferred: u64,
54    },
55
56    #[error("failed to convert header value of `{name}` to string: {bytes:?}")]
57    BadHeader { name: String, bytes: Vec<u8> },
58}
59
60struct Message {
61    easy: Easy2<Collector>,
62    sender: oneshot::Sender<HttpResult<Response>>,
63}
64
65#[derive(Default)]
66struct Stats {
67    dl_remaining: AtomicI64,
68    dl_transferred: AtomicU64,
69}
70
71/// HTTP Client. Creating a new client spawns a cURL `Multi` and
72/// thread that is used for all HTTP requests by this client.
73pub struct Client {
74    channel: Option<Sender<Message>>,
75    thread_handle: Option<JoinHandle<()>>,
76    handle_config: HandleConfiguration,
77    stats: Arc<Stats>,
78}
79
80impl Client {
81    /// Spawns a new worker thread where HTTP request execute.
82    pub fn new(handle_config: HandleConfiguration) -> Client {
83        let (tx, rx) = mpsc::channel();
84        let stats = Arc::new(Stats::default());
85        let timeout = handle_config.timeout.clone();
86        let worker_stats = stats.clone();
87        let handle = std::thread::spawn(move || {
88            WorkerServer::run(rx, handle_config.multiplexing, timeout, worker_stats)
89        });
90        Client {
91            channel: Some(tx),
92            thread_handle: Some(handle),
93            handle_config,
94            stats,
95        }
96    }
97
98    /// Perform a blocking HTTP request using this client.
99    /// Does not start an async executor.
100    pub fn request_blocking(&self, request: Request) -> HttpResult<Response> {
101        let handle = self.request_helper(request)?;
102        handle.perform()?;
103        Ok(WorkerServer::process_response(handle))
104    }
105
106    /// Perform an HTTP request using this client.
107    pub async fn request(&self, request: Request) -> HttpResult<Response> {
108        let handle = self.request_helper(request)?;
109        let (sender, receiver) = oneshot::channel();
110        let req = Message {
111            easy: handle,
112            sender,
113        };
114        self.channel.as_ref().unwrap().send(req).unwrap();
115        receiver.await.unwrap()
116    }
117
118    fn request_helper(&self, request: Request) -> HttpResult<Easy2<Collector>> {
119        let url = request.uri().to_string();
120        debug!(target: "network::fetch", url);
121        let mut collector = Collector::new(self.stats.clone());
122        let (parts, body) = request.into_parts();
123        let body_len = body.len();
124        collector.request_body = Cursor::new(body);
125        collector.debug = self.handle_config.verbose;
126        let mut handle = curl::easy::Easy2::new(collector);
127        self.handle_config.configure2(&mut handle)?;
128
129        handle.url(&url)?;
130        handle.follow_location(true)?;
131        handle.progress(true)?;
132
133        match parts.method {
134            http::Method::HEAD => handle.nobody(true)?,
135            http::Method::GET => handle.get(true)?,
136            http::Method::POST => {
137                handle.post_field_size(body_len as u64)?;
138                handle.post(true)?;
139            }
140            http::Method::PUT => {
141                handle.in_filesize(body_len as u64)?;
142                handle.put(true)?;
143            }
144            method => {
145                handle.upload(true)?;
146                handle.in_filesize(body_len as u64)?;
147                handle.custom_request(method.as_str())?;
148            }
149        }
150
151        let mut headers = curl::easy::List::new();
152        for (name, value) in parts.headers {
153            if let Some(name) = name {
154                let value: &str = value.to_str().map_err(|_| Error::BadHeader {
155                    name: name.to_string(),
156                    bytes: value.as_bytes().to_owned(),
157                })?;
158                headers.append(&format!("{}: {}", name, value))?;
159            }
160        }
161        handle.http_headers(headers)?;
162
163        Ok(handle)
164    }
165
166    /// Returns the number pending bytes across all active transfers.
167    pub fn bytes_pending(&self) -> u64 {
168        self.stats
169            .dl_remaining
170            .load(Ordering::Acquire)
171            .try_into()
172            .unwrap()
173    }
174}
175
176impl Drop for Client {
177    fn drop(&mut self) {
178        // Close the channel
179        drop(self.channel.take().unwrap());
180        // Join the thread
181        let _ = self.thread_handle.take().unwrap().join();
182    }
183}
184
185impl std::fmt::Debug for Client {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.debug_struct("http_async::Client").finish()
188    }
189}
190
191/// Manages the cURL `Multi`. Processes incoming work sent over the
192/// channel, and returns responses.
193struct WorkerServer {
194    /// Channel to receive new work
195    incoming_work: Receiver<Message>,
196    /// curl multi interface
197    multi: Multi,
198    /// Map of token to curl handle and response channel
199    handles: HashMap<
200        usize,
201        (
202            Easy2Handle<Collector>,
203            oneshot::Sender<HttpResult<Response>>,
204        ),
205    >,
206    /// Next token to use
207    token: usize,
208    /// Global timeout configuration
209    timeout: HttpTimeout,
210    /// Global transfer statistics
211    stats: Arc<Stats>,
212    /// Instant when the current low speed window started
213    low_speed_window_start: Instant,
214    /// Amount of total bytes transferred when the current low speed window started
215    low_speed_window_initial: u64,
216}
217
218impl WorkerServer {
219    fn run(
220        incoming_work: Receiver<Message>,
221        multiplex: bool,
222        timeout: HttpTimeout,
223        stats: Arc<Stats>,
224    ) {
225        let mut multi = Multi::new();
226        // let's not flood the server with connections
227        if let Err(e) = multi.set_max_host_connections(2) {
228            error!("failed to set max host connections in curl: {e}");
229        }
230        if let Err(e) = multi.pipelining(false, multiplex) {
231            error!("failed to enable multiplexing/pipelining in curl: {e}");
232        }
233
234        let mut worker = Self {
235            incoming_work,
236            multi,
237            handles: HashMap::new(),
238            token: 0,
239            timeout,
240            stats,
241            low_speed_window_start: Instant::now(),
242            low_speed_window_initial: 0,
243        };
244        worker.worker_loop();
245    }
246
247    fn fail_and_drain(&mut self, e: &Error) {
248        warn!(
249            target: "network",
250            "failing all outstanding HTTP requests: {e}"
251        );
252        for (_token, (_handle, sender)) in self.handles.drain() {
253            let _ = sender.send(Err(e.clone()));
254        }
255    }
256
257    fn process_response(mut easy: Easy2<Collector>) -> Response {
258        let mut response =
259            std::mem::replace(&mut easy.get_mut().response, Response::new(Vec::new()));
260        if let Ok(status) = easy.response_code()
261            && status != 0
262            && let Ok(status) = http::StatusCode::from_u16(status as u16)
263        {
264            *response.status_mut() = status;
265        }
266        // Would be nice to set HTTP version via `response.version_mut()`, but `curl` doesn't have it exposed.
267        let extensions = Extensions {
268            client_ip: easy.primary_ip().ok().flatten().map(str::to_string),
269            effective_url: easy.effective_url().ok().flatten().map(str::to_string),
270        };
271        response.extensions_mut().insert(extensions);
272        response
273    }
274
275    /// Marks the start of a new timeout window.
276    fn reset_low_speed_timeout(&mut self) {
277        self.low_speed_window_start = Instant::now();
278        self.low_speed_window_initial = self.stats.dl_transferred.load(Ordering::Acquire);
279    }
280
281    /// Return an error if we're at the end of a timeout window, we haven't
282    /// made enough progress.
283    fn check_low_speed_timeout(&mut self) -> Option<Error> {
284        // Make sure we've waited for the timeout duration
285        if Instant::now().duration_since(self.low_speed_window_start) < self.timeout.dur {
286            return None;
287        }
288
289        // Calculate how much we've transferred since the last check.
290        let current = self.stats.dl_transferred.load(Ordering::Acquire);
291        let transferred = current.saturating_sub(self.low_speed_window_initial);
292        self.reset_low_speed_timeout();
293        if transferred < self.timeout.low_speed_limit.into() {
294            Some(Error::TooSlow {
295                low_speed_limit: self.timeout.low_speed_limit,
296                timeout_dur: self.timeout.dur,
297                transferred,
298            })
299        } else {
300            None
301        }
302    }
303
304    fn worker_loop(&mut self) {
305        const INITIAL_DELAY: Duration = Duration::from_millis(1);
306        let mut wait_backoff = INITIAL_DELAY;
307        loop {
308            // Start any pending work.
309            while let Ok(msg) = self.incoming_work.try_recv() {
310                self.enqueue_request(msg);
311                wait_backoff = INITIAL_DELAY;
312            }
313
314            match self.multi.perform() {
315                Err(e) if e.is_call_perform() => {
316                    // cURL states if you receive `is_call_perform`, this means that you should call `perform` again.
317                }
318                Err(e) => {
319                    self.fail_and_drain(&Error::Multi(e));
320                }
321                Ok(running) => {
322                    self.multi.messages(|msg| {
323                        let t = msg.token().expect("all handles have tokens");
324                        trace!(token = t, "finish");
325                        let Some((handle, sender)) = self.handles.remove(&t) else {
326                            error!("missing entry {t} in handle table");
327                            return;
328                        };
329                        let result = msg.result_for2(&handle).expect("handle must have a result");
330                        let easy = self.multi.remove2(handle).expect("handle must be in multi");
331                        let response = Self::process_response(easy);
332                        let _ = sender.send(result.map(|()| response).map_err(Into::into));
333                    });
334
335                    if running > 0 {
336                        // Check for low speed timeout.
337                        if let Some(timeout_error) = self.check_low_speed_timeout() {
338                            self.fail_and_drain(&timeout_error);
339                            continue;
340                        }
341
342                        let max_timeout = Duration::from_millis(1000);
343                        let mut timeout = self
344                            .multi
345                            .get_timeout()
346                            .ok()
347                            .flatten()
348                            .unwrap_or(max_timeout)
349                            .min(max_timeout);
350                        if timeout.is_zero() {
351                            // curl said not to wait.
352                            continue;
353                        }
354                        // Ideally we would use `Multi::poll` + a `MultiWaker` instead of `Multi::wait`
355                        // to wake the thread when new work is queued. But it requires curl 7.68+,
356                        // which is not available everywhere we support.
357                        //
358                        // Instead, we use an exponential backoff approach so that as long as requests
359                        // are being queued, we poll quickly to allow the requests to be added sooner.
360                        // Without this, we end up sitting in `Multi::wait` too long while new work is
361                        // added to the channel.
362                        //
363                        // `get_timeout` says we should wait *at most* the timeout amount, so reducing
364                        // the wait time is fine.
365                        if wait_backoff < timeout {
366                            wait_backoff *= 2;
367                            timeout = wait_backoff
368                        }
369                        trace!(
370                            pending = self.handles.len(),
371                            timeout = timeout.as_millis(),
372                            "curl wait"
373                        );
374                        if let Err(e) = self.multi.wait(&mut [], timeout) {
375                            self.fail_and_drain(&Error::Multi(e));
376                        }
377                    } else {
378                        // Block, waiting for more work
379                        trace!("all work completed");
380                        match self.incoming_work.recv() {
381                            Ok(msg) => {
382                                trace!("resuming work");
383                                self.reset_low_speed_timeout();
384                                self.enqueue_request(msg);
385                                wait_backoff = INITIAL_DELAY;
386                            }
387                            Err(_) => {
388                                // The sending channel is closed. Shut down the worker.
389                                break;
390                            }
391                        }
392                    }
393                }
394            }
395        }
396    }
397
398    /// Adds the request to the `Multi`, or send an error back through the channel.
399    fn enqueue_request(&mut self, message: Message) {
400        match self.multi.add2(message.easy) {
401            Ok(mut handle) => {
402                self.token = self.token.wrapping_add(1);
403                handle.set_token(self.token).ok();
404                self.handles.insert(self.token, (handle, message.sender));
405            }
406            Err(e) => {
407                let _ = message.sender.send(Err(e.into()));
408            }
409        }
410    }
411}
412
413/// Interface that cURL (`Easy2`) uses to make progress.
414struct Collector {
415    /// The response being built
416    response: Response,
417    /// The body to transmit
418    request_body: Cursor<Vec<u8>>,
419    /// Whether we're in debug mode
420    debug: bool,
421    /// Global transfer statistics.
422    global_stats: Arc<Stats>,
423    /// How much has this particular transfer added to global `dl_remaining` stats.
424    dl_remaining_delta: i64,
425}
426
427impl Collector {
428    fn new(stats: Arc<Stats>) -> Self {
429        Collector {
430            response: Response::new(Vec::new()),
431            request_body: Cursor::new(Vec::new()),
432            debug: false,
433            global_stats: stats,
434            dl_remaining_delta: 0,
435        }
436    }
437}
438
439impl Handler for Collector {
440    fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
441        self.response.body_mut().extend_from_slice(data);
442        self.global_stats
443            .dl_transferred
444            .fetch_add(data.len() as u64, Ordering::Release);
445        Ok(data.len())
446    }
447
448    fn header(&mut self, data: &[u8]) -> bool {
449        if let Some((name, value)) = handle_http_header(data)
450            && let Ok(name) = http::HeaderName::from_str(name)
451            && let Ok(value) = http::HeaderValue::from_str(value)
452        {
453            self.response.headers_mut().append(name, value);
454        }
455        true
456    }
457
458    fn read(&mut self, data: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
459        Ok(self.request_body.read(data).unwrap())
460    }
461
462    fn debug(&mut self, kind: InfoType, data: &[u8]) {
463        if self.debug {
464            super::http::debug(kind, data);
465        }
466    }
467
468    fn progress(&mut self, dltotal: f64, dlnow: f64, _ultotal: f64, _ulnow: f64) -> bool {
469        if dlnow > dltotal {
470            return true;
471        }
472        let dl_total = dltotal as i64;
473        let dl_current = dlnow as i64;
474
475        let remaining = dl_total - dl_current;
476
477        self.global_stats
478            .dl_remaining
479            .fetch_add(remaining - self.dl_remaining_delta, Ordering::Release);
480        self.dl_remaining_delta = remaining;
481        true
482    }
483}
484
485impl Drop for Collector {
486    fn drop(&mut self) {
487        // Zero out this transfer's contribution to the global dl_remaining.
488        self.global_stats
489            .dl_remaining
490            .fetch_add(-self.dl_remaining_delta, Ordering::Release);
491    }
492}
493
494/// Additional fields on an [`http::Response`].
495#[derive(Clone)]
496struct Extensions {
497    client_ip: Option<String>,
498    effective_url: Option<String>,
499}
500
501pub trait ResponsePartsExtensions {
502    fn client_ip(&self) -> Option<&str>;
503    fn effective_url(&self) -> Option<&str>;
504}
505
506impl ResponsePartsExtensions for http::response::Parts {
507    fn client_ip(&self) -> Option<&str> {
508        self.extensions
509            .get::<Extensions>()
510            .and_then(|extensions| extensions.client_ip.as_deref())
511    }
512
513    fn effective_url(&self) -> Option<&str> {
514        self.extensions
515            .get::<Extensions>()
516            .and_then(|extensions| extensions.effective_url.as_deref())
517    }
518}
519
520impl ResponsePartsExtensions for Response {
521    fn client_ip(&self) -> Option<&str> {
522        self.extensions()
523            .get::<Extensions>()
524            .and_then(|extensions| extensions.client_ip.as_deref())
525    }
526
527    fn effective_url(&self) -> Option<&str> {
528        self.extensions()
529            .get::<Extensions>()
530            .and_then(|extensions| extensions.effective_url.as_deref())
531    }
532}
533
534/// Splits HTTP `HEADER: VALUE` to a tuple.
535fn handle_http_header(buf: &[u8]) -> Option<(&str, &str)> {
536    if buf.is_empty() {
537        return None;
538    }
539    let buf = std::str::from_utf8(buf).ok()?.trim_end();
540    // Don't let server sneak extra lines anywhere.
541    if buf.contains('\n') {
542        return None;
543    }
544    let (tag, value) = buf.split_once(':')?;
545    let value = value.trim();
546    Some((tag, value))
547}