Skip to main content

cargo/core/compiler/job_queue/
mod.rs

1//! Management of the interaction between the main `cargo` and all spawned jobs.
2//!
3//! ## Overview
4//!
5//! This module implements a job queue. A job here represents a unit of work,
6//! which is roughly a rustc invocation, a build script run, or just a no-op.
7//! The job queue primarily handles the following things:
8//!
9//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be
10//!     either executed on a spawned thread or ran on the same thread to avoid
11//!     the threading overhead.
12//! * Controls the number of concurrency. It allocates and manages [`jobserver`]
13//!     tokens to each spawned off rustc and build scripts.
14//! * Manages the communication between the main `cargo` process and its
15//!     spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared
16//!     across threads.
17//! * Schedules the execution order of each [`Job`]. Priorities are determined
18//!     when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is
19//!     relatively rudimentary and could likely be improved.
20//!
21//! A rough outline of building a queue and executing jobs is:
22//!
23//! 1. [`JobQueue::new`] to simply create one queue.
24//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue.
25//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`].
26//!
27//! The primary loop happens insides [`JobQueue::execute`], which is effectively
28//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells,
29//! the running state of the job queue getting drained.
30//!
31//! ## Jobserver
32//!
33//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
34//! relationship with each other. They share a single jobserver amongst what
35//! is potentially hundreds of threads of work on many-cored systems.
36//! The jobserver could come from either the environment (e.g., from a `make`
37//! invocation), or from Cargo creating its own jobserver server if there is no
38//! jobserver to inherit from.
39//!
40//! Cargo wants to complete the build as quickly as possible, fully saturating
41//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
42//! more than N threads of work: the total amount of tokens we have floating
43//! around must always be limited to N.
44//!
45//! It is not really possible to optimally choose which crate should build
46//! first or last; nor is it possible to decide whether to give an additional
47//! token to rustc first or rather spawn a new crate of work. The algorithm in
48//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
49//! possible. In short, the jobserver relationship among Cargo and rustc
50//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
51//! processes in terms of parallelism[^parallel-rustc].
52//!
53//! We integrate with the [jobserver] crate, originating from GNU make
54//! [POSIX jobserver], to make sure that build scripts which use make to
55//! build C code can cooperate with us on the number of used tokens and
56//! avoid overfilling the system we're on.
57//!
58//! ## Scheduling
59//!
60//! The current scheduling algorithm is not really polished. It is simply based
61//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto
62//! the graph until we finalize it. When the graph gets finalized, it finds the
63//! sum of the cost of each dependencies of each node, including transitively.
64//! The sum of dependency cost turns out to be the cost of each given node.
65//!
66//! At the time being, the cost is just passed as a fixed placeholder in
67//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities
68//! around it. For instance, we start persisting timing information for each
69//! build somewhere. For a subsequent build, we can look into the historical
70//! data and perform a PGO-like optimization to prioritize jobs, making a build
71//! fully pipelined.
72//!
73//! ## Message queue
74//!
75//! Each spawned thread running a process uses the message queue [`Queue`] to
76//! send messages back to the main thread (the one running `cargo`).
77//! The main thread coordinates everything, and handles printing output.
78//!
79//! It is important to be careful which messages use [`push`] vs [`push_bounded`].
80//! `push` is for priority messages (like tokens, or "finished") where the
81//! sender shouldn't block. We want to handle those so real work can proceed
82//! ASAP.
83//!
84//! `push_bounded` is only for messages being printed to stdout/stderr. Being
85//! bounded prevents a flood of messages causing a large amount of memory
86//! being used.
87//!
88//! `push` also avoids blocking which helps avoid deadlocks. For example, when
89//! the diagnostic server thread is dropped, it waits for the thread to exit.
90//! But if the thread is blocked on a full queue, and there is a critical
91//! error, the drop will deadlock. This should be fixed at some point in the
92//! future. The jobserver thread has a similar problem, though it will time
93//! out after 1 second.
94//!
95//! To access the message queue, each running `Job` is given its own [`JobState`],
96//! containing everything it needs to communicate with the main thread.
97//!
98//! See [`Message`] for all available message kinds.
99//!
100//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
101//!     allocation of tokens to rustc beyond the implicit token each rustc owns
102//!     (i.e., the ones used for parallel LLVM work and parallel rustc threads).
103//!     See also ["Rust Compiler Development Guide: Parallel Compilation"]
104//!     and [this comment][rustc-codegen] in rust-lang/rust.
105//!
106//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
107//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
108//! [jobserver]: https://docs.rs/jobserver
109//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
110//! [`push`]: Queue::push
111//! [`push_bounded`]: Queue::push_bounded
112
113mod job;
114mod job_state;
115
116use std::cell::RefCell;
117use std::collections::{HashMap, HashSet};
118use std::fmt::Write as _;
119use std::path::{Path, PathBuf};
120use std::sync::Arc;
121use std::thread::{self, Scope};
122use std::time::Duration;
123use std::{env, io};
124
125use anyhow::{Context as _, format_err};
126use jobserver::{Acquired, HelperThread};
127use semver::Version;
128use tracing::{debug, trace};
129
130pub use self::job::Freshness::{self, Dirty, Fresh};
131pub use self::job::{Job, Work};
132pub use self::job_state::JobState;
133use super::BuildContext;
134use super::BuildRunner;
135use super::CompileMode;
136use super::Unit;
137use super::UnitIndex;
138use super::custom_build::Severity;
139use super::timings::SectionTiming;
140use super::timings::Timings;
141use super::unused_deps::UnusedDepState;
142use crate::core::compiler::descriptive_pkg_name;
143use crate::core::compiler::future_incompat::{
144    self, FutureBreakageItem, FutureIncompatReportPackage,
145};
146use crate::core::resolver::ResolveBehavior;
147use crate::core::{PackageId, TargetKind};
148use crate::util::CargoResult;
149use crate::util::context::WarningHandling;
150use crate::util::diagnostic_server::{self, DiagnosticPrinter};
151use crate::util::errors::AlreadyPrintedError;
152use crate::util::interning::InternedString;
153use crate::util::machine_message::{self, Message as _};
154use crate::util::{self, internal};
155use crate::util::{DependencyQueue, GlobalContext, Progress, ProgressStyle, Queue};
156use cargo_util_terminal::Shell;
157
158/// This structure is backed by the `DependencyQueue` type and manages the
159/// queueing of compilation steps for each package. Packages enqueue units of
160/// work and then later on the entire graph is converted to `DrainState` and
161/// executed.
162pub struct JobQueue<'gctx> {
163    queue: DependencyQueue<Unit, Artifact, Job>,
164    counts: HashMap<PackageId, usize>,
165    timings: Timings<'gctx>,
166}
167
168/// This structure is backed by the `DependencyQueue` type and manages the
169/// actual compilation step of each package. Packages enqueue units of work and
170/// then later on the entire graph is processed and compiled.
171///
172/// It is created from `JobQueue` when we have fully assembled the crate graph
173/// (i.e., all package dependencies are known).
174struct DrainState<'gctx> {
175    // This is the length of the DependencyQueue when starting out
176    total_units: usize,
177
178    queue: DependencyQueue<Unit, Artifact, Job>,
179    messages: Arc<Queue<Message>>,
180    /// Diagnostic deduplication support.
181    diag_dedupe: DiagDedupe<'gctx>,
182    /// Count of warnings, used to print a summary after the job succeeds
183    warning_count: HashMap<JobId, WarningCount>,
184    active: HashMap<JobId, Unit>,
185    compiled: HashSet<PackageId>,
186    documented: HashSet<PackageId>,
187    scraped: HashSet<PackageId>,
188    counts: HashMap<PackageId, usize>,
189    progress: Progress<'gctx>,
190    next_id: u32,
191    timings: Timings<'gctx>,
192    unused_dep_state: UnusedDepState,
193
194    /// Map from unit index to unit, for looking up dependency information.
195    index_to_unit: HashMap<UnitIndex, Unit>,
196
197    /// Tokens that are currently owned by this Cargo, and may be "associated"
198    /// with a rustc process. They may also be unused, though if so will be
199    /// dropped on the next loop iteration.
200    ///
201    /// Note that the length of this may be zero, but we will still spawn work,
202    /// as we share the implicit token given to this Cargo process with a
203    /// single rustc process.
204    tokens: Vec<Acquired>,
205
206    /// The list of jobs that we have not yet started executing, but have
207    /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
208    /// allow us to request jobserver tokens pretty early.
209    pending_queue: Vec<(Unit, Job, usize)>,
210    print: DiagnosticPrinter<'gctx>,
211
212    /// How many jobs we've finished
213    finished: usize,
214    per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
215}
216
217/// Count of warnings, used to print a summary after the job succeeds
218#[derive(Default, Clone)]
219pub struct WarningCount {
220    /// total number of warnings
221    pub total: usize,
222    /// number of lint warnings
223    pub lints: usize,
224    /// number of warnings that were suppressed because they
225    /// were duplicates of a previous warning
226    pub duplicates: usize,
227    /// number of fixable warnings set to `NotAllowed`
228    /// if any errors have been seen for the current
229    /// target
230    pub fixable: FixableWarnings,
231}
232
233impl WarningCount {
234    /// If an error is seen this should be called
235    /// to set `fixable` to `NotAllowed`
236    fn disallow_fixable(&mut self) {
237        self.fixable = FixableWarnings::NotAllowed;
238    }
239
240    /// Checks fixable if warnings are allowed
241    /// fixable warnings are allowed if no
242    /// errors have been seen for the current
243    /// target. If an error was seen `fixable`
244    /// will be `NotAllowed`.
245    fn fixable_allowed(&self) -> bool {
246        match &self.fixable {
247            FixableWarnings::NotAllowed => false,
248            _ => true,
249        }
250    }
251}
252
253/// Used to keep track of how many fixable warnings there are
254/// and if fixable warnings are allowed
255#[derive(Default, Copy, Clone)]
256pub enum FixableWarnings {
257    NotAllowed,
258    #[default]
259    Zero,
260    Positive(usize),
261}
262
263pub struct ErrorsDuringDrain {
264    pub count: usize,
265}
266
267struct ErrorToHandle {
268    error: anyhow::Error,
269
270    /// This field is true for "interesting" errors and false for "mundane"
271    /// errors. If false, we print the above error only if it's the first one
272    /// encountered so far while draining the job queue.
273    ///
274    /// At most places that an error is propagated, we set this to false to
275    /// avoid scenarios where Cargo might end up spewing tons of redundant error
276    /// messages. For example if an i/o stream got closed somewhere, we don't
277    /// care about individually reporting every thread that it broke; just the
278    /// first is enough.
279    ///
280    /// The exception where `print_always` is true is that we do report every
281    /// instance of a rustc invocation that failed with diagnostics. This
282    /// corresponds to errors from `Message::Finish`.
283    print_always: bool,
284}
285
286impl<E> From<E> for ErrorToHandle
287where
288    anyhow::Error: From<E>,
289{
290    fn from(error: E) -> Self {
291        ErrorToHandle {
292            error: anyhow::Error::from(error),
293            print_always: false,
294        }
295    }
296}
297
298#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
299pub struct JobId(pub u32);
300
301impl std::fmt::Display for JobId {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        write!(f, "{}", self.0)
304    }
305}
306
307/// Handler for deduplicating diagnostics.
308struct DiagDedupe<'gctx> {
309    seen: RefCell<HashSet<u64>>,
310    gctx: &'gctx GlobalContext,
311}
312
313impl<'gctx> DiagDedupe<'gctx> {
314    fn new(gctx: &'gctx GlobalContext) -> Self {
315        DiagDedupe {
316            seen: RefCell::new(HashSet::new()),
317            gctx,
318        }
319    }
320
321    /// Emits a diagnostic message.
322    ///
323    /// Returns `true` if the message was emitted, or `false` if it was
324    /// suppressed for being a duplicate.
325    fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
326        let h = util::hash_u64(diag);
327        if !self.seen.borrow_mut().insert(h) {
328            return Ok(false);
329        }
330        let mut shell = self.gctx.shell();
331        shell.print_ansi_stderr(diag.as_bytes())?;
332        shell.err().write_all(b"\n")?;
333        Ok(true)
334    }
335}
336
337/// Possible artifacts that can be produced by compilations, used as edge values
338/// in the dependency graph.
339///
340/// As edge values we can have multiple kinds of edges depending on one node,
341/// for example some units may only depend on the metadata for an rlib while
342/// others depend on the full rlib. This `Artifact` enum is used to distinguish
343/// this case and track the progress of compilations as they proceed.
344#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
345enum Artifact {
346    /// A generic placeholder for "depends on everything run by a step" and
347    /// means that we can't start the next compilation until the previous has
348    /// finished entirely.
349    All,
350
351    /// A node indicating that we only depend on the metadata of a compilation,
352    /// but the compilation is typically also producing an rlib. We can start
353    /// our step, however, before the full rlib is available.
354    Metadata,
355}
356
357enum Message {
358    Run(JobId, String),
359    Stdout(String),
360    Stderr(String),
361
362    // This is for general stderr output from subprocesses
363    Diagnostic {
364        id: JobId,
365        level: String,
366        diag: String,
367        lint: bool,
368        fixable: bool,
369    },
370    // This handles duplicate output that is suppressed, for showing
371    // only a count of duplicate messages instead
372    WarningCount {
373        id: JobId,
374        lint: bool,
375        emitted: bool,
376        fixable: bool,
377    },
378    // This is for warnings generated by Cargo's interpretation of the
379    // subprocess output, e.g. scrape-examples prints a warning if a
380    // unit fails to be scraped
381    Warning {
382        id: JobId,
383        warning: String,
384    },
385
386    FixDiagnostic(diagnostic_server::Message),
387    Token(io::Result<Acquired>),
388    Finish(JobId, Artifact, CargoResult<()>),
389    FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
390    SectionTiming(JobId, SectionTiming),
391    UnusedExterns(JobId, std::collections::BTreeSet<InternedString>),
392}
393
394impl<'gctx> JobQueue<'gctx> {
395    pub fn new(bcx: &BuildContext<'_, 'gctx>) -> JobQueue<'gctx> {
396        JobQueue {
397            queue: DependencyQueue::new(),
398            counts: HashMap::new(),
399            timings: Timings::new(bcx),
400        }
401    }
402
403    pub fn enqueue(
404        &mut self,
405        build_runner: &BuildRunner<'_, 'gctx>,
406        unit: &Unit,
407        job: Job,
408    ) -> CargoResult<()> {
409        let dependencies = build_runner.unit_deps(unit);
410        let mut queue_deps = dependencies
411            .iter()
412            .filter(|dep| {
413                // Binaries aren't actually needed to *compile* tests, just to run
414                // them, so we don't include this dependency edge in the job graph.
415                // But we shouldn't filter out dependencies being scraped for Rustdoc.
416                (!dep.unit.target.is_test() && !dep.unit.target.is_bin())
417                    || dep.unit.artifact.is_true()
418                    || dep.unit.mode.is_doc_scrape()
419            })
420            .map(|dep| {
421                // Handle the case here where our `unit -> dep` dependency may
422                // only require the metadata, not the full compilation to
423                // finish. Use the tables in `build_runner` to figure out what
424                // kind of artifact is associated with this dependency.
425                let artifact = if build_runner.only_requires_rmeta(unit, &dep.unit) {
426                    Artifact::Metadata
427                } else {
428                    Artifact::All
429                };
430                (dep.unit.clone(), artifact)
431            })
432            .collect::<HashMap<_, _>>();
433
434        // This is somewhat tricky, but we may need to synthesize some
435        // dependencies for this target if it requires full upstream
436        // compilations to have completed. Because of pipelining, some
437        // dependency edges may be `Metadata` due to the above clause (as
438        // opposed to everything being `All`). For example consider:
439        //
440        //    a (binary)
441        //    └ b (lib)
442        //        └ c (lib)
443        //
444        // Here the dependency edge from B to C will be `Metadata`, and the
445        // dependency edge from A to B will be `All`. For A to be compiled,
446        // however, it currently actually needs the full rlib of C. This means
447        // that we need to synthesize a dependency edge for the dependency graph
448        // from A to C. That's done here.
449        //
450        // This will walk all dependencies of the current target, and if any of
451        // *their* dependencies are `Metadata` then we depend on the `All` of
452        // the target as well. This should ensure that edges changed to
453        // `Metadata` propagate upwards `All` dependencies to anything that
454        // transitively contains the `Metadata` edge.
455        if unit.requires_upstream_objects() {
456            for dep in dependencies {
457                depend_on_deps_of_deps(build_runner, &mut queue_deps, dep.unit.clone());
458            }
459
460            fn depend_on_deps_of_deps(
461                build_runner: &BuildRunner<'_, '_>,
462                deps: &mut HashMap<Unit, Artifact>,
463                unit: Unit,
464            ) {
465                for dep in build_runner.unit_deps(&unit) {
466                    if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
467                        depend_on_deps_of_deps(build_runner, deps, dep.unit.clone());
468                    }
469                }
470            }
471        }
472
473        // For now we use a fixed placeholder value for the cost of each unit, but
474        // in the future this could be used to allow users to provide hints about
475        // relative expected costs of units, or this could be automatically set in
476        // a smarter way using timing data from a previous compilation.
477        self.queue.queue(unit.clone(), job, queue_deps, 100);
478        *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
479        Ok(())
480    }
481
482    /// Executes all jobs necessary to build the dependency graph.
483    ///
484    /// This function will spawn off `config.jobs()` workers to build all of the
485    /// necessary dependencies, in order. Freshness is propagated as far as
486    /// possible along each dependency chain.
487    #[tracing::instrument(skip_all)]
488    pub fn execute(mut self, build_runner: &mut BuildRunner<'_, '_>) -> CargoResult<()> {
489        self.queue.queue_finished();
490
491        let progress =
492            Progress::with_style("Building", ProgressStyle::Ratio, build_runner.bcx.gctx);
493        let state = DrainState {
494            total_units: self.queue.len(),
495            queue: self.queue,
496            // 100 here is somewhat arbitrary. It is a few screenfulls of
497            // output, and hopefully at most a few megabytes of memory for
498            // typical messages. If you change this, please update the test
499            // caching_large_output, too.
500            messages: Arc::new(Queue::new(100)),
501            diag_dedupe: DiagDedupe::new(build_runner.bcx.gctx),
502            warning_count: HashMap::new(),
503            active: HashMap::new(),
504            compiled: HashSet::new(),
505            documented: HashSet::new(),
506            scraped: HashSet::new(),
507            counts: self.counts,
508            progress,
509            next_id: 0,
510            timings: self.timings,
511            unused_dep_state: UnusedDepState::new(build_runner),
512            index_to_unit: build_runner
513                .bcx
514                .unit_to_index
515                .iter()
516                .map(|(unit, &index)| (index, unit.clone()))
517                .collect(),
518            tokens: Vec::new(),
519            pending_queue: Vec::new(),
520            print: DiagnosticPrinter::new(
521                build_runner.bcx.gctx,
522                &build_runner.bcx.rustc().workspace_wrapper,
523            ),
524            finished: 0,
525            per_package_future_incompat_reports: Vec::new(),
526        };
527
528        // Create a helper thread for acquiring jobserver tokens
529        let messages = state.messages.clone();
530        let helper = build_runner
531            .jobserver
532            .clone()
533            .into_helper_thread(move |token| {
534                messages.push(Message::Token(token));
535            })
536            .context("failed to create helper thread for jobserver management")?;
537
538        // Create a helper thread to manage the diagnostics for rustfix if
539        // necessary.
540        let messages = state.messages.clone();
541        // It is important that this uses `push` instead of `push_bounded` for
542        // now. If someone wants to fix this to be bounded, the `drop`
543        // implementation needs to be changed to avoid possible deadlocks.
544        let _diagnostic_server = build_runner
545            .bcx
546            .build_config
547            .rustfix_diagnostic_server
548            .borrow_mut()
549            .take()
550            .map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
551
552        thread::scope(move |scope| {
553            let (result,) = state.drain_the_queue(build_runner, scope, &helper);
554            result
555        })
556    }
557}
558
559impl<'gctx> DrainState<'gctx> {
560    fn spawn_work_if_possible<'s>(
561        &mut self,
562        build_runner: &mut BuildRunner<'_, '_>,
563        jobserver_helper: &HelperThread,
564        scope: &'s Scope<'s, '_>,
565    ) -> CargoResult<()> {
566        // Dequeue as much work as we can, learning about everything
567        // possible that can run. Note that this is also the point where we
568        // start requesting job tokens. Each job after the first needs to
569        // request a token.
570        while let Some((unit, job, priority)) = self.queue.dequeue() {
571            // We want to keep the pieces of work in the `pending_queue` sorted
572            // by their priorities, and insert the current job at its correctly
573            // sorted position: following the lower priority jobs, and the ones
574            // with the same priority (since they were dequeued before the
575            // current one, we also keep that relation).
576            let idx = self
577                .pending_queue
578                .partition_point(|&(_, _, p)| p <= priority);
579            self.pending_queue.insert(idx, (unit, job, priority));
580            if self.active.len() + self.pending_queue.len() > 1 {
581                jobserver_helper.request_token();
582            }
583        }
584
585        // Now that we've learned of all possible work that we can execute
586        // try to spawn it so long as we've got a jobserver token which says
587        // we're able to perform some parallel work.
588        // The `pending_queue` is sorted in ascending priority order, and we
589        // remove items from its end to schedule the highest priority items
590        // sooner.
591        while self.has_extra_tokens() && !self.pending_queue.is_empty() {
592            let (unit, job, _) = self.pending_queue.pop().unwrap();
593            *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
594            // Print out some nice progress information.
595            // NOTE: An error here will drop the job without starting it.
596            // That should be OK, since we want to exit as soon as
597            // possible during an error.
598            self.note_working_on(
599                build_runner.bcx.gctx,
600                build_runner.bcx.ws.root(),
601                &unit,
602                job.freshness(),
603            )?;
604            self.run(&unit, job, build_runner, scope);
605        }
606
607        Ok(())
608    }
609
610    fn has_extra_tokens(&self) -> bool {
611        self.active.len() < self.tokens.len() + 1
612    }
613
614    fn handle_event(
615        &mut self,
616        build_runner: &mut BuildRunner<'_, '_>,
617        event: Message,
618    ) -> Result<(), ErrorToHandle> {
619        let warning_handling = build_runner.bcx.gctx.warning_handling()?;
620        match event {
621            Message::Run(id, cmd) => {
622                build_runner
623                    .bcx
624                    .gctx
625                    .shell()
626                    .verbose(|c| c.status("Running", &cmd))?;
627                self.timings
628                    .unit_start(build_runner, id, self.active[&id].clone());
629            }
630            Message::Stdout(out) => {
631                writeln!(build_runner.bcx.gctx.shell().out(), "{}", out)?;
632            }
633            Message::Stderr(err) => {
634                let mut shell = build_runner.bcx.gctx.shell();
635                shell.print_ansi_stderr(err.as_bytes())?;
636                shell.err().write_all(b"\n")?;
637            }
638            Message::Diagnostic {
639                id,
640                level,
641                diag,
642                lint,
643                fixable,
644            } => {
645                let emitted = self.diag_dedupe.emit_diag(&diag)?;
646                if level == "warning" {
647                    self.bump_warning_count(id, lint, emitted, fixable);
648                }
649                if level == "error" {
650                    let count = self.warning_count.entry(id).or_default();
651                    // If there is an error, the `cargo fix` message should not show
652                    count.disallow_fixable();
653                }
654            }
655            Message::Warning { id, warning } => {
656                build_runner.bcx.gctx.shell().warn(warning)?;
657                let lint = false;
658                let emitted = true;
659                let fixable = false;
660                self.bump_warning_count(id, lint, emitted, fixable);
661            }
662            Message::WarningCount {
663                id,
664                lint,
665                emitted,
666                fixable,
667            } => {
668                self.bump_warning_count(id, lint, emitted, fixable);
669            }
670            Message::FixDiagnostic(msg) => {
671                self.print.print(&msg)?;
672            }
673            Message::Finish(id, artifact, mut result) => {
674                let unit = match artifact {
675                    // If `id` has completely finished we remove it
676                    // from the `active` map ...
677                    Artifact::All => {
678                        trace!("end: {:?}", id);
679                        self.finished += 1;
680                        let unit = self.active.remove(&id).unwrap();
681                        // An error could add an entry for a `Unit`
682                        // with 0 warnings but having fixable
683                        // warnings be disallowed
684                        let count = self
685                            .warning_count
686                            .get(&id)
687                            .filter(|count| 0 < count.total)
688                            .cloned();
689                        if let Some(count) = count {
690                            let denied_warnings =
691                                warning_handling == WarningHandling::Deny && 0 < count.lints;
692                            self.report_warning_count(
693                                build_runner,
694                                &unit,
695                                &count,
696                                &build_runner.bcx.rustc().workspace_wrapper,
697                                denied_warnings,
698                            );
699                            let stop_on_warnings =
700                                denied_warnings && !build_runner.bcx.build_config.keep_going;
701                            if stop_on_warnings {
702                                result = Err(anyhow::format_err!(
703                                    "warnings are denied by `build.warnings` configuration"
704                                ))
705                            }
706                        }
707                        unit
708                    }
709                    // ... otherwise if it hasn't finished we leave it
710                    // in there as we'll get another `Finish` later on.
711                    Artifact::Metadata => {
712                        trace!("end (meta): {:?}", id);
713                        self.active[&id].clone()
714                    }
715                };
716                debug!("end ({:?}): {:?}", unit, result);
717                match result {
718                    Ok(()) => self.finish(id, &unit, artifact, build_runner)?,
719                    Err(_) if build_runner.bcx.unit_can_fail_for_docscraping(&unit) => {
720                        build_runner
721                            .failed_scrape_units
722                            .lock()
723                            .unwrap()
724                            .insert(build_runner.files().metadata(&unit).unit_id());
725                        self.queue.finish(&unit, &artifact);
726                    }
727                    Err(error) => {
728                        let show_warnings = true;
729                        self.emit_log_messages(&unit, build_runner, show_warnings)?;
730                        self.back_compat_notice(build_runner, &unit)?;
731                        return Err(ErrorToHandle {
732                            error,
733                            print_always: true,
734                        });
735                    }
736                }
737            }
738            Message::FutureIncompatReport(id, items) => {
739                let unit = &self.active[&id];
740                let package_id = unit.pkg.package_id();
741                let is_local = unit.is_local();
742                self.per_package_future_incompat_reports
743                    .push(FutureIncompatReportPackage {
744                        package_id,
745                        is_local,
746                        items,
747                    });
748            }
749            Message::UnusedExterns(id, unused_externs) => {
750                let unit = &self.active[&id];
751                self.unused_dep_state
752                    .record_unused_externs_for_unit(unit, unused_externs);
753            }
754            Message::Token(acquired_token) => {
755                let token = acquired_token.context("failed to acquire jobserver token")?;
756                self.tokens.push(token);
757            }
758            Message::SectionTiming(id, section) => {
759                self.timings.unit_section_timing(build_runner, id, &section);
760            }
761        }
762
763        Ok(())
764    }
765
766    // This will also tick the progress bar as appropriate
767    fn wait_for_events(&mut self) -> Vec<Message> {
768        // Drain all events at once to avoid displaying the progress bar
769        // unnecessarily. If there's no events we actually block waiting for
770        // an event, but we keep a "heartbeat" going to allow `record_cpu`
771        // to run above to calculate CPU usage over time. To do this we
772        // listen for a message with a timeout, and on timeout we run the
773        // previous parts of the loop again.
774        let mut events = self.messages.try_pop_all();
775        if events.is_empty() {
776            loop {
777                self.tick_progress();
778                self.tokens.truncate(self.active.len() - 1);
779                match self.messages.pop(Duration::from_millis(500)) {
780                    Some(message) => {
781                        events.push(message);
782                        break;
783                    }
784                    None => continue,
785                }
786            }
787        }
788        events
789    }
790
791    /// This is the "main" loop, where Cargo does all work to run the
792    /// compiler.
793    ///
794    /// This returns a tuple of `Result` to prevent the use of `?` on
795    /// `Result` types because it is important for the loop to
796    /// carefully handle errors.
797    fn drain_the_queue<'s>(
798        mut self,
799        build_runner: &mut BuildRunner<'_, '_>,
800        scope: &'s Scope<'s, '_>,
801        jobserver_helper: &HelperThread,
802    ) -> (Result<(), anyhow::Error>,) {
803        trace!("queue: {:#?}", self.queue);
804
805        // Iteratively execute the entire dependency graph. Each turn of the
806        // loop starts out by scheduling as much work as possible (up to the
807        // maximum number of parallel jobs we have tokens for). A local queue
808        // is maintained separately from the main dependency queue as one
809        // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
810        // in one package).
811        //
812        // After a job has finished we update our internal state if it was
813        // successful and otherwise wait for pending work to finish if it failed
814        // and then immediately return (or keep going, if requested by the build
815        // config).
816        let mut errors = ErrorsDuringDrain { count: 0 };
817        // CAUTION! Do not use `?` or break out of the loop early. Every error
818        // must be handled in such a way that the loop is still allowed to
819        // drain event messages.
820        loop {
821            if errors.count == 0 || build_runner.bcx.build_config.keep_going {
822                if let Err(e) = self.spawn_work_if_possible(build_runner, jobserver_helper, scope) {
823                    self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
824                }
825            }
826
827            // If after all that we're not actually running anything then we're
828            // done!
829            if self.active.is_empty() {
830                break;
831            }
832
833            // And finally, before we block waiting for the next event, drop any
834            // excess tokens we may have accidentally acquired. Due to how our
835            // jobserver interface is architected we may acquire a token that we
836            // don't actually use, and if this happens just relinquish it back
837            // to the jobserver itself.
838            for event in self.wait_for_events() {
839                if let Err(event_err) = self.handle_event(build_runner, event) {
840                    self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, event_err);
841                }
842            }
843        }
844        self.progress.clear();
845
846        if build_runner.bcx.gctx.cli_unstable().cargo_lints {
847            let mut warn_count = 0;
848            let mut error_count = 0;
849            drop(self.unused_dep_state.emit_unused_warnings(
850                &mut warn_count,
851                &mut error_count,
852                build_runner,
853            ));
854            errors.count += error_count;
855            build_runner.compilation.lint_warning_count += warn_count;
856        }
857
858        let profile_name = build_runner.bcx.build_config.requested_profile;
859        // NOTE: this may be a bit inaccurate, since this may not display the
860        // profile for what was actually built. Profile overrides can change
861        // these settings, and in some cases different targets are built with
862        // different profiles. To be accurate, it would need to collect a
863        // list of Units built, and maybe display a list of the different
864        // profiles used. However, to keep it simple and compatible with old
865        // behavior, we just display what the base profile is.
866        let profile = build_runner.bcx.profiles.base_profile();
867        let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
868            "unoptimized"
869        } else {
870            "optimized"
871        });
872        if profile.debuginfo.is_turned_on() {
873            opt_type += " + debuginfo";
874        }
875
876        let time_elapsed = util::elapsed(build_runner.bcx.gctx.creation_time().elapsed());
877        if let Err(e) = self
878            .timings
879            .finished(build_runner, &errors.to_error())
880            .context("failed to render timing report")
881        {
882            self.handle_error(&mut build_runner.bcx.gctx.shell(), &mut errors, e);
883        }
884        if build_runner.bcx.build_config.emit_json() {
885            let mut shell = build_runner.bcx.gctx.shell();
886            let msg = machine_message::BuildFinished {
887                success: errors.count == 0,
888            }
889            .to_json_string();
890            if let Err(e) = writeln!(shell.out(), "{}", msg) {
891                self.handle_error(&mut shell, &mut errors, e);
892            }
893        }
894
895        if let Some(error) = errors.to_error() {
896            // Any errors up to this point have already been printed via the
897            // `display_error` inside `handle_error`.
898            (Err(anyhow::Error::new(AlreadyPrintedError::new(error))),)
899        } else if self.queue.is_empty() && self.pending_queue.is_empty() {
900            let profile_link = build_runner.bcx.gctx.shell().err_hyperlink(
901                "https://doc.rust-lang.org/cargo/reference/profiles.html#default-profiles",
902            );
903            let message = format!(
904                "{profile_link}`{profile_name}` profile [{opt_type}]{profile_link:#} target(s) in {time_elapsed}",
905            );
906            // It doesn't really matter if this fails.
907            let _ = build_runner.bcx.gctx.shell().status("Finished", message);
908            future_incompat::save_and_display_report(
909                build_runner.bcx,
910                &self.per_package_future_incompat_reports,
911            );
912
913            (Ok(()),)
914        } else {
915            debug!("queue: {:#?}", self.queue);
916            (Err(internal("finished with jobs still left in the queue")),)
917        }
918    }
919
920    fn handle_error(
921        &mut self,
922        shell: &mut Shell,
923        err_state: &mut ErrorsDuringDrain,
924        new_err: impl Into<ErrorToHandle>,
925    ) {
926        let new_err = new_err.into();
927        if new_err.print_always || err_state.count == 0 {
928            crate::display_error(&new_err.error, shell);
929            if err_state.count == 0 && !self.active.is_empty() {
930                self.progress.indicate_error();
931                let _ = shell.warn("build failed, waiting for other jobs to finish...");
932            }
933            err_state.count += 1;
934        } else {
935            tracing::warn!("{:?}", new_err.error);
936        }
937    }
938
939    // This also records CPU usage and marks concurrency; we roughly want to do
940    // this as often as we spin on the events receiver (at least every 500ms or
941    // so).
942    fn tick_progress(&mut self) {
943        // Record some timing information if `--timings` is enabled, and
944        // this'll end up being a noop if we're not recording this
945        // information.
946        self.timings.record_cpu();
947
948        let active_names = self
949            .active
950            .values()
951            .map(|u| self.name_for_progress(u))
952            .collect::<Vec<_>>();
953        let _ = self.progress.tick_now(
954            self.finished,
955            self.total_units,
956            &format!(": {}", active_names.join(", ")),
957        );
958    }
959
960    fn name_for_progress(&self, unit: &Unit) -> String {
961        let pkg_name = unit.pkg.name();
962        let target_name = unit.target.name();
963        match unit.mode {
964            CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
965            CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
966            CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
967                TargetKind::Lib(_) => format!("{}(test)", target_name),
968                TargetKind::CustomBuild => panic!("cannot test build script"),
969                TargetKind::Bin => format!("{}(bin test)", target_name),
970                TargetKind::Test => format!("{}(test)", target_name),
971                TargetKind::Bench => format!("{}(bench)", target_name),
972                TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
973                    format!("{}(example test)", target_name)
974                }
975            },
976            _ => match unit.target.kind() {
977                TargetKind::Lib(_) => pkg_name.to_string(),
978                TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
979                TargetKind::Bin => format!("{}(bin)", target_name),
980                TargetKind::Test => format!("{}(test)", target_name),
981                TargetKind::Bench => format!("{}(bench)", target_name),
982                TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
983                    format!("{}(example)", target_name)
984                }
985            },
986        }
987    }
988
989    /// Executes a job.
990    ///
991    /// Fresh jobs block until finished (which should be very fast!), Dirty
992    /// jobs will spawn a thread in the background and return immediately.
993    fn run<'s>(
994        &mut self,
995        unit: &Unit,
996        job: Job,
997        build_runner: &BuildRunner<'_, '_>,
998        scope: &'s Scope<'s, '_>,
999    ) {
1000        let id = JobId(self.next_id);
1001        self.next_id = self.next_id.checked_add(1).unwrap();
1002
1003        debug!("start {}: {:?}", id, unit);
1004
1005        assert!(self.active.insert(id, unit.clone()).is_none());
1006
1007        let messages = self.messages.clone();
1008        let is_fresh = job.freshness().is_fresh();
1009        let rmeta_required = build_runner.rmeta_required(unit);
1010        let lock_manager = build_runner.lock_manager.clone();
1011        let warning_handling = build_runner.bcx.gctx.warning_handling().unwrap_or_default();
1012
1013        let doit = move |diag_dedupe| {
1014            let state = JobState::new(
1015                id,
1016                messages,
1017                diag_dedupe,
1018                rmeta_required,
1019                lock_manager,
1020                warning_handling,
1021            );
1022            state.run_to_finish(job);
1023        };
1024
1025        match is_fresh {
1026            true => {
1027                // Running a fresh job on the same thread is often much faster than spawning a new
1028                // thread to run the job.
1029                doit(Some(&self.diag_dedupe));
1030            }
1031            false => {
1032                scope.spawn(move || doit(None));
1033            }
1034        }
1035    }
1036
1037    fn emit_log_messages(
1038        &self,
1039        unit: &Unit,
1040        build_runner: &mut BuildRunner<'_, '_>,
1041        show_warnings: bool,
1042    ) -> CargoResult<()> {
1043        let outputs = build_runner.build_script_outputs.lock().unwrap();
1044        let Some(metadata_vec) = build_runner.find_build_script_metadatas(unit) else {
1045            return Ok(());
1046        };
1047        let bcx = &mut build_runner.bcx;
1048        for metadata in metadata_vec {
1049            if let Some(output) = outputs.get(metadata) {
1050                if !output.log_messages.is_empty()
1051                    && (show_warnings
1052                        || output
1053                            .log_messages
1054                            .iter()
1055                            .any(|(severity, _)| *severity == Severity::Error))
1056                {
1057                    let msg_with_package =
1058                        |msg: &str| format!("{}@{}: {}", unit.pkg.name(), unit.pkg.version(), msg);
1059
1060                    for (severity, message) in output.log_messages.iter() {
1061                        match severity {
1062                            Severity::Error => {
1063                                bcx.gctx.shell().error(msg_with_package(message))?;
1064                            }
1065                            Severity::Warning => {
1066                                bcx.gctx.shell().warn(msg_with_package(message))?;
1067                            }
1068                        }
1069                    }
1070                }
1071            }
1072        }
1073
1074        Ok(())
1075    }
1076
1077    fn bump_warning_count(&mut self, id: JobId, lint: bool, emitted: bool, fixable: bool) {
1078        let count = self.warning_count.entry(id).or_default();
1079        count.total += 1;
1080        if lint {
1081            let unit = self.active.get(&id).unwrap();
1082            // If this is an upstream dep but we *do* want warnings, make sure that they
1083            // don't fail compilation.
1084            if unit.is_local() {
1085                count.lints += 1;
1086            }
1087        }
1088        if !emitted {
1089            count.duplicates += 1;
1090        // Don't add to fixable if it's already been emitted
1091        } else if fixable {
1092            // Do not add anything to the fixable warning count if
1093            // is `NotAllowed` since that indicates there was an
1094            // error while building this `Unit`
1095            if count.fixable_allowed() {
1096                count.fixable = match count.fixable {
1097                    FixableWarnings::NotAllowed => FixableWarnings::NotAllowed,
1098                    FixableWarnings::Zero => FixableWarnings::Positive(1),
1099                    FixableWarnings::Positive(fixable) => FixableWarnings::Positive(fixable + 1),
1100                };
1101            }
1102        }
1103    }
1104
1105    /// Displays a final report of the warnings emitted by a particular job.
1106    fn report_warning_count(
1107        &mut self,
1108        runner: &mut BuildRunner<'_, '_>,
1109        unit: &Unit,
1110        count: &WarningCount,
1111        rustc_workspace_wrapper: &Option<PathBuf>,
1112        denied_warnings: bool,
1113    ) {
1114        let gctx = runner.bcx.gctx;
1115        runner.compilation.lint_warning_count += count.lints;
1116        let mut message = descriptive_pkg_name(&unit.pkg.name(), &unit.target, &unit.mode);
1117        message.push_str(" generated ");
1118        match count.total {
1119            1 => message.push_str("1 warning"),
1120            n => {
1121                let _ = write!(message, "{} warnings", n);
1122            }
1123        };
1124        match count.duplicates {
1125            0 => {}
1126            1 => message.push_str(" (1 duplicate)"),
1127            n => {
1128                let _ = write!(message, " ({} duplicates)", n);
1129            }
1130        }
1131        // Only show the `cargo fix` message if its a local `Unit`
1132        if unit.is_local() {
1133            // Do not show this if there are any errors or no fixable warnings
1134            if let FixableWarnings::Positive(fixable) = count.fixable {
1135                // `cargo fix` doesn't have an option for custom builds
1136                if !unit.target.is_custom_build() {
1137                    // To make sure the correct command is shown for `clippy` we
1138                    // check if `RUSTC_WORKSPACE_WRAPPER` is set and pointing towards
1139                    // `clippy-driver`.
1140                    let clippy = std::ffi::OsStr::new("clippy-driver");
1141                    let is_clippy = rustc_workspace_wrapper.as_ref().and_then(|x| x.file_stem())
1142                        == Some(clippy);
1143
1144                    let command = if is_clippy {
1145                        "cargo clippy --fix"
1146                    } else {
1147                        "cargo fix"
1148                    };
1149
1150                    let mut args =
1151                        format!("{} -p {}", unit.target.description_named(), unit.pkg.name());
1152                    if unit.mode.is_rustc_test()
1153                        && !(unit.target.is_test() || unit.target.is_bench())
1154                    {
1155                        args.push_str(" --tests");
1156                    }
1157                    let mut suggestions = format!("{} suggestion", fixable);
1158                    if fixable > 1 {
1159                        suggestions.push_str("s")
1160                    }
1161
1162                    #[expect(clippy::disallowed_methods, reason = "consistency with clippy")]
1163                    let _ = write!(
1164                        message,
1165                        " (run `{command} --{args}{}` to apply {suggestions})",
1166                        if let Some(cli_lints_os) = env::var_os("CLIPPY_ARGS")
1167                            && let Ok(cli_lints) = cli_lints_os.into_string()
1168                            && is_clippy
1169                        {
1170                            // Clippy can take lints through the CLI, each lint flag is separated by "__CLIPPY_HACKERY__".
1171                            let cli_lints = cli_lints.replace("__CLIPPY_HACKERY__", " ");
1172                            let cli_lints = cli_lints.trim_ascii_end(); // Remove that last space left by __CLIPPY_HACKERY__
1173                            format!(" -- {cli_lints}")
1174                        } else {
1175                            "".to_owned()
1176                        }
1177                    );
1178                }
1179            }
1180        }
1181        // Errors are ignored here because it is tricky to handle them
1182        // correctly, and they aren't important.
1183        let _ = if denied_warnings {
1184            gctx.shell().error(message)
1185        } else {
1186            gctx.shell().warn(message)
1187        };
1188    }
1189
1190    fn finish(
1191        &mut self,
1192        id: JobId,
1193        unit: &Unit,
1194        artifact: Artifact,
1195        build_runner: &mut BuildRunner<'_, '_>,
1196    ) -> CargoResult<()> {
1197        if unit.mode.is_run_custom_build() {
1198            self.emit_log_messages(
1199                unit,
1200                build_runner,
1201                unit.show_warnings(build_runner.bcx.gctx),
1202            )?;
1203        }
1204        let unblocked = self.queue.finish(unit, &artifact);
1205        match artifact {
1206            Artifact::All => self.timings.unit_finished(build_runner, id, unblocked),
1207            Artifact::Metadata => self
1208                .timings
1209                .unit_rmeta_finished(build_runner, id, unblocked),
1210        }
1211        Ok(())
1212    }
1213
1214    // This isn't super trivial because we don't want to print loads and
1215    // loads of information to the console, but we also want to produce a
1216    // faithful representation of what's happening. This is somewhat nuanced
1217    // as a package can start compiling *very* early on because of custom
1218    // build commands and such.
1219    //
1220    // In general, we try to print "Compiling" for the first nontrivial task
1221    // run for a package, regardless of when that is. We then don't print
1222    // out any more information for a package after we've printed it once.
1223    fn note_working_on(
1224        &mut self,
1225        gctx: &GlobalContext,
1226        ws_root: &Path,
1227        unit: &Unit,
1228        fresh: &Freshness,
1229    ) -> CargoResult<()> {
1230        if (self.compiled.contains(&unit.pkg.package_id())
1231            && !unit.mode.is_doc()
1232            && !unit.mode.is_doc_scrape())
1233            || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
1234            || (self.scraped.contains(&unit.pkg.package_id()) && unit.mode.is_doc_scrape())
1235        {
1236            return Ok(());
1237        }
1238
1239        match fresh {
1240            // Any dirty stage which runs at least one command gets printed as
1241            // being a compiled package.
1242            Dirty(dirty_reason) => {
1243                if !dirty_reason.is_fresh_build() {
1244                    gctx.shell().verbose(|shell| {
1245                        dirty_reason.present_to(shell, unit, ws_root, &self.index_to_unit)
1246                    })?;
1247                }
1248
1249                if unit.mode.is_doc() {
1250                    self.documented.insert(unit.pkg.package_id());
1251                    gctx.shell().status("Documenting", &unit.pkg)?;
1252                } else if unit.mode.is_doc_test() {
1253                    // Skip doc test.
1254                } else if unit.mode.is_doc_scrape() {
1255                    self.scraped.insert(unit.pkg.package_id());
1256                    gctx.shell().status("Scraping", &unit.pkg)?;
1257                } else {
1258                    self.compiled.insert(unit.pkg.package_id());
1259                    if unit.mode.is_check() {
1260                        gctx.shell().status("Checking", &unit.pkg)?;
1261                    } else {
1262                        gctx.shell().status("Compiling", &unit.pkg)?;
1263                    }
1264                }
1265            }
1266            Fresh => {
1267                // If doc test are last, only print "Fresh" if nothing has been printed.
1268                if self.counts[&unit.pkg.package_id()] == 0
1269                    && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
1270                {
1271                    self.compiled.insert(unit.pkg.package_id());
1272                    gctx.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
1273                }
1274            }
1275        }
1276        Ok(())
1277    }
1278
1279    fn back_compat_notice(
1280        &self,
1281        build_runner: &BuildRunner<'_, '_>,
1282        unit: &Unit,
1283    ) -> CargoResult<()> {
1284        if unit.pkg.name() != "diesel"
1285            || unit.pkg.version() >= &Version::new(1, 4, 8)
1286            || build_runner.bcx.ws.resolve_behavior() == ResolveBehavior::V1
1287            || !unit.pkg.package_id().source_id().is_registry()
1288            || !unit.features.is_empty()
1289        {
1290            return Ok(());
1291        }
1292        if !build_runner
1293            .bcx
1294            .unit_graph
1295            .keys()
1296            .any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
1297        {
1298            return Ok(());
1299        }
1300        build_runner.bcx.gctx.shell().note(
1301            "\
1302This error may be due to an interaction between diesel and Cargo's new
1303feature resolver. Try updating to diesel 1.4.8 to fix this error.
1304",
1305        )?;
1306        Ok(())
1307    }
1308}
1309
1310impl ErrorsDuringDrain {
1311    fn to_error(&self) -> Option<anyhow::Error> {
1312        match self.count {
1313            0 => None,
1314            1 => Some(format_err!("1 job failed")),
1315            n => Some(format_err!("{} jobs failed", n)),
1316        }
1317    }
1318}