Skip to main content

cargo/sources/registry/
http_remote.rs

1//! Access to a HTTP-based crate registry. See [`HttpRegistry`] for details.
2
3use crate::core::PackageId;
4use crate::core::SourceId;
5use crate::core::global_cache_tracker;
6use crate::sources::registry::LoadResponse;
7use crate::sources::registry::MaybeLock;
8use crate::sources::registry::RegistryConfig;
9use crate::sources::registry::RegistryData;
10use crate::sources::registry::download;
11use crate::util::Filesystem;
12use crate::util::GlobalContext;
13use crate::util::IntoUrl;
14use crate::util::Progress;
15use crate::util::ProgressStyle;
16use crate::util::auth;
17use crate::util::cache_lock::CacheLockMode;
18use crate::util::errors::CargoResult;
19use crate::util::errors::HttpNotSuccessful;
20use crate::util::interning::InternedString;
21use crate::util::network::retry::Retry;
22use crate::util::network::retry::RetryResult;
23use anyhow::Context as _;
24use cargo_credential::Operation;
25use cargo_util::paths;
26use futures::lock::Mutex;
27use http::HeaderName;
28use http::HeaderValue;
29use http::Response;
30use std::cell::Cell;
31use std::cell::RefCell;
32use std::collections::HashSet;
33use std::fs;
34use std::fs::File;
35use std::io::ErrorKind;
36use std::path::Path;
37use std::str;
38use std::time::Duration;
39use tracing::debug;
40use tracing::trace;
41use tracing::warn;
42use url::Url;
43
44// HTTP headers
45const ETAG: &'static str = "etag";
46const LAST_MODIFIED: &'static str = "last-modified";
47
48const UNKNOWN: &'static str = "Unknown";
49
50/// A registry served by the HTTP-based registry API.
51///
52/// This type is primarily accessed through the [`RegistryData`] trait.
53///
54/// `HttpRegistry` implements the HTTP-based registry API outlined in [RFC 2789]. Read the RFC for
55/// the complete protocol, but _roughly_ the implementation loads each index file (e.g.,
56/// config.json or re/ge/regex) from an HTTP service rather than from a locally cloned git
57/// repository. The remote service can more or less be a static file server that simply serves the
58/// contents of the origin git repository.
59///
60/// Implemented naively, this leads to a significant amount of network traffic, as a lookup of any
61/// index file would need to check with the remote backend if the index file has changed. This
62/// cost is somewhat mitigated by the use of HTTP conditional fetches (`If-Modified-Since` and
63/// `If-None-Match` for `ETag`s) which can be efficiently handled by HTTP/2.
64///
65/// [RFC 2789]: https://github.com/rust-lang/rfcs/pull/2789
66pub struct HttpRegistry<'gctx> {
67    /// The name of this source, a unique string (across all sources) used as
68    /// the directory name where its cached content is stored.
69    name: InternedString,
70
71    /// Cached registry configuration.
72    registry_config: Mutex<Option<RegistryConfig>>,
73
74    /// Backend used for making network requests.
75    inner: HttpBackend<'gctx>,
76}
77
78impl<'gctx> HttpRegistry<'gctx> {
79    /// Creates a HTTP-rebased remote registry for `source_id`.
80    ///
81    /// * `name` --- Name of a path segment where `.crate` tarballs and the
82    ///   registry index are stored. Expect to be unique.
83    pub fn new(
84        source_id: SourceId,
85        gctx: &'gctx GlobalContext,
86        name: &str,
87    ) -> CargoResult<HttpRegistry<'gctx>> {
88        Ok(HttpRegistry {
89            name: name.into(),
90            registry_config: Mutex::new(None),
91            inner: HttpBackend::new(source_id, gctx, name)?,
92        })
93    }
94
95    fn inner(&self) -> &HttpBackend<'gctx> {
96        &self.inner
97    }
98
99    /// Get the registry configuration from either cache or remote.
100    async fn config(&self) -> CargoResult<RegistryConfig> {
101        let Some(config) = self.config_opt().await? else {
102            return Err(anyhow::anyhow!("config.json not found"));
103        };
104        Ok(config)
105    }
106
107    /// Get the registry configuration from either cache or remote.
108    /// Returns None if the config is not available.
109    async fn config_opt(&self) -> CargoResult<Option<RegistryConfig>> {
110        let mut config = self.registry_config.lock().await;
111        if let Some(config) = &*config
112            && self.inner().is_fresh(RegistryConfig::NAME)
113        {
114            Ok(Some(config.clone()))
115        } else {
116            let result = self.config_opt_inner().await?;
117            *config = result.clone();
118            Ok(result)
119        }
120    }
121
122    async fn config_opt_inner(&self) -> CargoResult<Option<RegistryConfig>> {
123        debug!("loading config");
124        let index_path = self.assert_index_locked(&self.inner().index_cache_path);
125        let config_json_path = index_path.join(RegistryConfig::NAME);
126        if self.inner().is_fresh(RegistryConfig::NAME)
127            && let Some(config) = self.config_from_filesystem()
128        {
129            return Ok(Some(config.clone()));
130        }
131
132        // Check if there's a cached config that says auth is required.
133        // This allows avoiding the initial unauthenticated request to probe.
134        if let Some(c) = self.config_from_filesystem() {
135            self.inner().auth_required.update(|v| v || c.auth_required);
136        }
137
138        let response = self
139            .inner()
140            .fetch_uncached(RegistryConfig::NAME, None)
141            .await;
142        let response = match response {
143            Err(e)
144                if !self.inner().auth_required.get()
145                    && e.downcast_ref::<HttpNotSuccessful>()
146                        .map(|e| e.code == 401)
147                        .unwrap_or_default() =>
148            {
149                self.inner().auth_required.set(true);
150                debug!(target: "network", "re-attempting request for config.json with authorization included.");
151                self.inner()
152                    .fetch_uncached(RegistryConfig::NAME, None)
153                    .await
154            }
155            resp => resp,
156        }?;
157
158        match response {
159            LoadResponse::Data {
160                raw_data,
161                index_version: _,
162            } => {
163                trace!("config loaded");
164                let config = Some(serde_json::from_slice(&raw_data)?);
165                if paths::create_dir_all(&config_json_path.parent().unwrap()).is_ok() {
166                    if let Err(e) = fs::write(&config_json_path, &raw_data) {
167                        tracing::debug!("failed to write config.json cache: {}", e);
168                    }
169                }
170                Ok(config)
171            }
172            LoadResponse::NotFound => Ok(None),
173            LoadResponse::CacheValid => Err(crate::util::internal(
174                "config.json is never stored in the index cache",
175            )),
176        }
177    }
178
179    /// Get the cached registry configuration from the filesystem, if it exists.
180    fn config_from_filesystem(&self) -> Option<RegistryConfig> {
181        let config_json_path = self
182            .assert_index_locked(&self.inner().index_cache_path)
183            .join(RegistryConfig::NAME);
184        match fs::read(&config_json_path) {
185            Ok(raw_data) => match serde_json::from_slice(&raw_data) {
186                Ok(json) => return Some(json),
187                Err(e) => tracing::debug!("failed to decode cached config.json: {}", e),
188            },
189            Err(e) => {
190                if e.kind() != ErrorKind::NotFound {
191                    tracing::debug!("failed to read config.json cache: {}", e)
192                }
193            }
194        }
195        None
196    }
197
198    async fn sparse_fetch(
199        &self,
200        path: &str,
201        index_version: Option<&str>,
202    ) -> CargoResult<LoadResponse> {
203        if let Some(index_version) = index_version {
204            trace!("local cache of {path} is available at version `{index_version}`",);
205            if self.inner().is_fresh(&path) {
206                return Ok(LoadResponse::CacheValid);
207            }
208        } else if self.inner().fresh.borrow().contains(path) {
209            // We have no cached copy of this file, and we already downloaded it.
210            debug!("cache did not contain previously downloaded file {path}",);
211            return Ok(LoadResponse::NotFound);
212        }
213
214        // If we have a cached copy of the file, include IF_NONE_MATCH or IF_MODIFIED_SINCE header.
215        let index_version =
216            index_version
217                .and_then(|v| v.split_once(':'))
218                .and_then(|(key, value)| match key {
219                    ETAG => Some((
220                        http::header::IF_NONE_MATCH,
221                        HeaderValue::from_str(value.trim()).ok()?,
222                    )),
223                    LAST_MODIFIED => Some((
224                        http::header::IF_MODIFIED_SINCE,
225                        HeaderValue::from_str(value.trim()).ok()?,
226                    )),
227                    _ => {
228                        debug!("unexpected index version: {}", index_version.unwrap());
229                        None
230                    }
231                });
232        let index_version = index_version.as_ref().map(|(k, v)| (k, v));
233        self.inner().fetch_uncached(&path, index_version).await
234    }
235}
236
237#[async_trait::async_trait(?Send)]
238impl<'gctx> RegistryData for HttpRegistry<'gctx> {
239    fn prepare(&self) -> CargoResult<()> {
240        self.inner()
241            .gctx
242            .deferred_global_last_use()?
243            .mark_registry_index_used(global_cache_tracker::RegistryIndex {
244                encoded_registry_name: self.name,
245            });
246        Ok(())
247    }
248
249    fn index_path(&self) -> &Filesystem {
250        &self.inner().index_cache_path
251    }
252
253    fn cache_path(&self) -> &Filesystem {
254        &self.inner().crate_cache_path
255    }
256
257    fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path {
258        self.inner()
259            .gctx
260            .assert_package_cache_locked(CacheLockMode::DownloadExclusive, path)
261    }
262
263    fn is_updated(&self) -> bool {
264        self.inner().requested_update.get()
265    }
266
267    async fn load(
268        &self,
269        _root: &Path,
270        path: &Path,
271        index_version: Option<&str>,
272    ) -> CargoResult<LoadResponse> {
273        // Ensure the config is loaded.
274        let Some(config) = self.config_opt().await? else {
275            return Ok(LoadResponse::NotFound);
276        };
277        self.inner()
278            .auth_required
279            .update(|v| v || config.auth_required);
280
281        let path = path
282            .to_str()
283            .ok_or_else(|| anyhow::anyhow!("non UTF8 path: {}", path.display()))?;
284        self.sparse_fetch(path, index_version).await
285    }
286
287    async fn config(&self) -> CargoResult<Option<RegistryConfig>> {
288        Ok(Some(self.config().await?))
289    }
290
291    fn invalidate_cache(&self) {
292        // Actually updating the index is more or less a no-op for this implementation.
293        // All it does is ensure that a subsequent load will double-check files with the
294        // server rather than rely on a locally cached copy of the index files.
295        debug!("invalidated index cache");
296        self.inner().fresh.borrow_mut().clear();
297        self.inner().requested_update.set(true);
298    }
299
300    fn set_quiet(&mut self, quiet: bool) {
301        self.inner().quiet.set(quiet);
302        self.inner().progress.replace(None);
303    }
304
305    async fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult<MaybeLock> {
306        let registry_config = self.config().await?;
307        download::download(
308            &self.inner().crate_cache_path,
309            &self.inner().gctx,
310            self.name.clone(),
311            pkg,
312            checksum,
313            registry_config,
314        )
315    }
316
317    async fn finish_download(
318        &self,
319        pkg: PackageId,
320        checksum: &str,
321        data: &[u8],
322    ) -> CargoResult<File> {
323        download::finish_download(
324            &self.inner().crate_cache_path,
325            &self.inner().gctx,
326            self.name.clone(),
327            pkg,
328            checksum,
329            data,
330        )
331    }
332
333    fn is_crate_downloaded(&self, pkg: PackageId) -> bool {
334        download::is_crate_downloaded(&self.inner().crate_cache_path, &self.inner().gctx, pkg)
335    }
336}
337
338struct HttpBackend<'gctx> {
339    /// Path to the registry index (`$CARGO_HOME/registry/index/$REG-HASH`).
340    index_cache_path: Filesystem,
341
342    /// Path to the cache of `.crate` files (`$CARGO_HOME/registry/cache/$REG-HASH`).
343    crate_cache_path: Filesystem,
344
345    /// The unique identifier of this registry source.
346    source_id: SourceId,
347    gctx: &'gctx GlobalContext,
348
349    /// Store the server URL without the protocol prefix (sparse+)
350    url: Url,
351
352    /// Has the client requested a cache update?
353    ///
354    /// Only if they have do we double-check the freshness of each locally-stored index file.
355    requested_update: Cell<bool>,
356
357    /// Progress bar for transfers.
358    progress: RefCell<Option<Progress<'gctx>>>,
359
360    /// Number of in-flight requests.
361    pending: Cell<usize>,
362
363    /// What paths have we already fetched since the last index update?
364    ///
365    /// We do not need to double-check any of these index files since we have already done so.
366    fresh: RefCell<HashSet<String>>,
367
368    /// Have we started to download any index files?
369    fetch_started: Cell<bool>,
370
371    /// Should we include the authorization header?
372    auth_required: Cell<bool>,
373
374    /// Url to get a token for the registry.
375    login_url: RefCell<Option<Url>>,
376
377    /// Headers received with an HTTP 401.
378    auth_error_headers: RefCell<Vec<String>>,
379
380    /// Disables status messages.
381    quiet: Cell<bool>,
382}
383
384impl<'gctx> HttpBackend<'gctx> {
385    pub fn new(
386        source_id: SourceId,
387        gctx: &'gctx GlobalContext,
388        name: &str,
389    ) -> CargoResult<HttpBackend<'gctx>> {
390        let url = source_id.url().as_str();
391        // Ensure the url ends with a slash so we can concatenate paths.
392        if !url.ends_with('/') {
393            anyhow::bail!("sparse registry url must end in a slash `/`: {url}")
394        }
395        assert!(source_id.is_sparse());
396        let url = url
397            .strip_prefix("sparse+")
398            .expect("sparse registry needs sparse+ prefix")
399            .into_url()
400            .expect("a url with the sparse+ stripped should still be valid");
401
402        let index_cache_path = gctx.registry_index_path().join(name);
403        Ok(HttpBackend {
404            index_cache_path: index_cache_path.clone(),
405            crate_cache_path: gctx.registry_cache_path().join(name),
406            source_id,
407            gctx,
408            url,
409            progress: RefCell::new(Some(Progress::with_style(
410                "Fetch",
411                ProgressStyle::Indeterminate,
412                gctx,
413            ))),
414            fresh: RefCell::new(HashSet::new()),
415            requested_update: Cell::new(false),
416            fetch_started: Cell::new(false),
417            auth_required: Cell::new(false),
418            login_url: RefCell::new(None),
419            auth_error_headers: RefCell::new(vec![]),
420            quiet: Cell::new(false),
421            pending: Cell::new(0),
422        })
423    }
424
425    /// Constructs the full URL to download a index file.
426    fn full_url(&self, path: &str) -> String {
427        // self.url always ends with a slash.
428        format!("{}{}", self.url, path)
429    }
430
431    /// Setup the necessary works before the first fetch gets started.
432    ///
433    /// This is a no-op if called more than one time.
434    fn start_fetch(&self) -> CargoResult<()> {
435        if self.fetch_started.get() {
436            // We only need to run the setup code once.
437            return Ok(());
438        }
439        self.fetch_started.set(true);
440
441        if !self.quiet.get() {
442            self.gctx
443                .shell()
444                .status("Updating", self.source_id.display_index())?;
445        }
446
447        Ok(())
448    }
449
450    /// Are we in offline mode?
451    ///
452    /// Return NotFound in offline mode when the file doesn't exist in the cache.
453    /// If this results in resolution failure, the resolver will suggest
454    /// removing the --offline flag.
455    fn offline(&self) -> bool {
456        !self.gctx.network_allowed() || self.gctx.cli_unstable().no_index_update
457    }
458
459    /// Check if an index file of `path` is up-to-date.
460    fn is_fresh(&self, path: &str) -> bool {
461        if !self.requested_update.get() {
462            trace!("using local {path} as user did not request update",);
463            true
464        } else if self.offline() {
465            trace!("using local {path} in offline mode");
466            true
467        } else if self.fresh.borrow().contains(path) {
468            trace!("using local {path} as it was already fetched");
469            true
470        } else {
471            debug!("checking freshness of {path}");
472            false
473        }
474    }
475
476    async fn fetch_uncached(
477        &self,
478        path: &str,
479        extra_header: Option<(&HeaderName, &HeaderValue)>,
480    ) -> CargoResult<LoadResponse> {
481        if self.offline() {
482            return Ok(LoadResponse::NotFound);
483        }
484        let mut r = Retry::new(self.gctx)?;
485        self.pending.update(|v| v + 1);
486        let response = loop {
487            let response = self.fetch_uncached_no_retry(path, extra_header).await;
488            match r.r#try(|| response) {
489                RetryResult::Success(result) => break Ok(result),
490                RetryResult::Err(error) => break Err(error),
491                RetryResult::Retry(delay_ms) => {
492                    futures_timer::Delay::new(Duration::from_millis(delay_ms)).await;
493                }
494            }
495        };
496        self.pending.update(|v| v - 1);
497        if !self.fresh.borrow_mut().insert(path.to_string()) {
498            warn!("downloaded the index file `{path}` twice");
499        }
500        response
501    }
502
503    async fn fetch_uncached_no_retry(
504        &self,
505        path: &str,
506        extra_header: Option<(&HeaderName, &HeaderValue)>,
507    ) -> CargoResult<LoadResponse> {
508        trace!("load: {path}");
509        self.start_fetch()?;
510        let full_url = self.full_url(path);
511        let mut request = http::Request::get(&full_url);
512
513        // Include a header to identify the protocol. This allows the server to
514        // know that Cargo is attempting to use the sparse protocol.
515        request = request.header("cargo-protocol", "version=1");
516        request = request.header(http::header::ACCEPT, "text/plain");
517
518        if let Some((k, v)) = extra_header {
519            request = request.header(k, v);
520        }
521
522        if self.auth_required.get() {
523            let authorization = auth::auth_token(
524                self.gctx,
525                &self.source_id,
526                self.login_url.borrow().clone().as_ref(),
527                Operation::Read,
528                self.auth_error_headers.borrow().clone(),
529                true,
530            )?;
531            request = request.header(http::header::AUTHORIZATION, authorization);
532            trace!(target: "network", "including authorization for {}", full_url);
533        }
534
535        let response = self
536            .gctx
537            .http_async()?
538            .request(request.body(Vec::new())?)
539            .await
540            .with_context(|| format!("download of {path} failed"))?;
541
542        self.tick()?;
543
544        let (response, body) = response.into_parts();
545
546        match response.status {
547            http::StatusCode::OK => {
548                let response_index_version =
549                    if let Some(etag) = response.headers.get(http::header::ETAG) {
550                        format!("{}: {}", ETAG, etag.to_str().unwrap())
551                    } else if let Some(lm) = response.headers.get(http::header::LAST_MODIFIED) {
552                        format!("{}: {}", LAST_MODIFIED, lm.to_str().unwrap())
553                    } else {
554                        UNKNOWN.to_string()
555                    };
556                trace!("index file version: {}", response_index_version);
557                Ok(LoadResponse::Data {
558                    raw_data: body,
559                    index_version: Some(response_index_version),
560                })
561            }
562            http::StatusCode::NOT_MODIFIED => {
563                // Not Modified: the data in the cache is still the latest.
564                Ok(LoadResponse::CacheValid)
565            }
566            http::StatusCode::NOT_FOUND => {
567                // The crate was not found or deleted from the registry.
568                return Ok(LoadResponse::NotFound);
569            }
570            http::StatusCode::UNAUTHORIZED => {
571                // Store the headers for later error reporting if needed.
572                self.auth_error_headers.replace(
573                    response
574                        .headers
575                        .iter()
576                        .map(|(name, value)| {
577                            format!("{}: {}", name.as_str(), value.to_str().unwrap_or_default())
578                        })
579                        .collect(),
580                );
581
582                // Look for a `www-authenticate` header with the `Cargo` scheme.
583                for value in &response.headers.get_all(http::header::WWW_AUTHENTICATE) {
584                    for challenge in
585                        http_auth::ChallengeParser::new(value.to_str().unwrap_or_default())
586                    {
587                        match challenge {
588                            Ok(challenge) if challenge.scheme.eq_ignore_ascii_case("Cargo") => {
589                                // Look for the `login_url` parameter.
590                                for (param, value) in challenge.params {
591                                    if param.eq_ignore_ascii_case("login_url") {
592                                        self.login_url
593                                            .replace(Some(value.to_unescaped().into_url()?));
594                                    }
595                                }
596                            }
597                            Ok(challenge) => {
598                                debug!(target: "network", "ignoring non-Cargo challenge: {}", challenge.scheme)
599                            }
600                            Err(e) => {
601                                debug!(target: "network", "failed to parse challenge: {}", e)
602                            }
603                        }
604                    }
605                }
606
607                let mut err = Err(HttpNotSuccessful::new_from_response(
608                    Response::from_parts(response, body),
609                    &full_url,
610                )
611                .into());
612                if self.auth_required.get() {
613                    let auth_error = auth::AuthorizationError::new(
614                        self.gctx,
615                        self.source_id,
616                        self.login_url.borrow().clone(),
617                        auth::AuthorizationErrorReason::TokenRejected,
618                    )?;
619                    err = err.context(auth_error)
620                }
621                err
622            }
623            _ => Err(HttpNotSuccessful::new_from_response(
624                Response::from_parts(response, body),
625                &full_url,
626            )
627            .into()),
628        }
629    }
630
631    /// Updates the state of the progress bar for downloads.
632    fn tick(&self) -> CargoResult<()> {
633        let mut progress = self.progress.borrow_mut();
634        let Some(progress) = progress.as_mut() else {
635            return Ok(());
636        };
637
638        if progress.update_allowed() {
639            let complete = self.fresh.borrow().len();
640            let pending = self.pending.get();
641            progress.print_now(&format!("{complete} complete; {pending} pending"))?;
642        }
643        Ok(())
644    }
645}