Skip to main content

cargo/core/compiler/job_queue/
job_state.rs

1//! See [`JobState`].
2
3use std::{cell::Cell, marker, sync::Arc};
4
5use cargo_util::ProcessBuilder;
6
7use crate::core::compiler::future_incompat::FutureBreakageItem;
8use crate::core::compiler::locking::LockKey;
9use crate::core::compiler::timings::SectionTiming;
10use crate::util::Queue;
11use crate::util::context::WarningHandling;
12use crate::util::interning::InternedString;
13use crate::{CargoResult, core::compiler::locking::LockManager};
14
15use super::{Artifact, DiagDedupe, Job, JobId, Message};
16
17/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
18/// necessary to communicate between the main thread and the execution of the job.
19///
20/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
21/// main thread, the `output` field must be set to prevent a deadlock.
22pub struct JobState<'a, 'gctx> {
23    /// Channel back to the main thread to coordinate messages and such.
24    ///
25    /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
26    /// the message queue to prevent a deadlock.
27    messages: Arc<Queue<Message>>,
28
29    /// Normally output is sent to the job queue with backpressure. When the job is fresh
30    /// however we need to immediately display the output to prevent a deadlock as the
31    /// output messages are processed on the same thread as they are sent from. `output`
32    /// defines where to output in this case.
33    ///
34    /// Currently the [`Shell`] inside [`GlobalContext`] is wrapped in a `RefCell` and thus can't
35    /// be passed between threads. This means that it isn't possible for multiple output messages
36    /// to be interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
37    /// interleaving is still prevented as the lock would be held for the whole printing of an
38    /// output message.
39    ///
40    /// [`Shell`]: cargo_util_terminal::Shell
41    /// [`GlobalContext`]: crate::GlobalContext
42    output: Option<&'a DiagDedupe<'gctx>>,
43
44    /// The job id that this state is associated with, used when sending
45    /// messages back to the main thread.
46    id: JobId,
47
48    /// Whether or not we're expected to have a call to `rmeta_produced`. Once
49    /// that method is called this is dynamically set to `false` to prevent
50    /// sending a double message later on.
51    rmeta_required: Cell<bool>,
52
53    /// Manages locks for build units when fine grain locking is enabled.
54    lock_manager: Arc<LockManager>,
55
56    warning_handling: WarningHandling,
57
58    // Historical versions of Cargo made use of the `'a` argument here, so to
59    // leave the door open to future refactorings keep it here.
60    _marker: marker::PhantomData<&'a ()>,
61}
62
63impl<'a, 'gctx> JobState<'a, 'gctx> {
64    pub(super) fn new(
65        id: JobId,
66        messages: Arc<Queue<Message>>,
67        output: Option<&'a DiagDedupe<'gctx>>,
68        rmeta_required: bool,
69        lock_manager: Arc<LockManager>,
70        warning_handling: WarningHandling,
71    ) -> Self {
72        Self {
73            id,
74            messages,
75            output,
76            rmeta_required: Cell::new(rmeta_required),
77            lock_manager,
78            warning_handling,
79            _marker: marker::PhantomData,
80        }
81    }
82
83    pub fn running(&self, cmd: &ProcessBuilder) {
84        self.messages.push(Message::Run(self.id, cmd.to_string()));
85    }
86
87    pub fn stdout(&self, stdout: String) -> CargoResult<()> {
88        if let Some(dedupe) = self.output {
89            writeln!(dedupe.gctx.shell().out(), "{}", stdout)?;
90        } else {
91            self.messages.push_bounded(Message::Stdout(stdout));
92        }
93        Ok(())
94    }
95
96    pub fn stderr(&self, stderr: String) -> CargoResult<()> {
97        if let Some(dedupe) = self.output {
98            let mut shell = dedupe.gctx.shell();
99            shell.print_ansi_stderr(stderr.as_bytes())?;
100            shell.err().write_all(b"\n")?;
101        } else {
102            self.messages.push_bounded(Message::Stderr(stderr));
103        }
104        Ok(())
105    }
106
107    /// See [`Message::Diagnostic`] and [`Message::WarningCount`].
108    pub fn emit_diag(
109        &self,
110        level: &str,
111        diag: String,
112        lint: bool,
113        fixable: bool,
114    ) -> CargoResult<()> {
115        if level == "warning" && lint && self.warning_handling == WarningHandling::Allow {
116            tracing::warn!("{diag}");
117        } else if let Some(dedupe) = self.output {
118            let emitted = dedupe.emit_diag(&diag)?;
119            if level == "warning" {
120                self.messages.push(Message::WarningCount {
121                    id: self.id,
122                    lint,
123                    emitted,
124                    fixable,
125                });
126            }
127        } else {
128            self.messages.push_bounded(Message::Diagnostic {
129                id: self.id,
130                level: level.to_string(),
131                diag,
132                lint,
133                fixable,
134            });
135        }
136        Ok(())
137    }
138
139    /// See [`Message::Warning`].
140    pub fn warning(&self, warning: String) {
141        self.messages.push_bounded(Message::Warning {
142            id: self.id,
143            warning,
144        });
145    }
146
147    /// A method used to signal to the coordinator thread that the rmeta file
148    /// for an rlib has been produced. This is only called for some rmeta
149    /// builds when required, and can be called at any time before a job ends.
150    /// This should only be called once because a metadata file can only be
151    /// produced once!
152    pub fn rmeta_produced(&self) {
153        self.rmeta_required.set(false);
154        self.messages
155            .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
156    }
157
158    pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
159        self.lock_manager.lock(lock)
160    }
161
162    pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
163        self.lock_manager.downgrade_to_shared(lock)
164    }
165
166    pub fn on_section_timing_emitted(&self, section: SectionTiming) {
167        self.messages.push(Message::SectionTiming(self.id, section));
168    }
169
170    /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
171    /// sent even if our job panics.
172    pub(super) fn run_to_finish(self, job: Job) {
173        let mut sender = FinishOnDrop {
174            messages: &self.messages,
175            id: self.id,
176            result: None,
177        };
178        sender.result = Some(job.run(&self));
179
180        // If the `rmeta_required` wasn't consumed but it was set
181        // previously, then we either have:
182        //
183        // 1. The `job` didn't do anything because it was "fresh".
184        // 2. The `job` returned an error and didn't reach the point where
185        //    it called `rmeta_produced`.
186        // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
187        //
188        // Ruling out the third, the other two are pretty common for 2
189        // we'll just naturally abort the compilation operation but for 1
190        // we need to make sure that the metadata is flagged as produced so
191        // send a synthetic message here.
192        if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
193            self.messages
194                .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
195        }
196
197        // Use a helper struct with a `Drop` implementation to guarantee
198        // that a `Finish` message is sent even if our job panics. We
199        // shouldn't panic unless there's a bug in Cargo, so we just need
200        // to make sure nothing hangs by accident.
201        struct FinishOnDrop<'a> {
202            messages: &'a Queue<Message>,
203            id: JobId,
204            result: Option<CargoResult<()>>,
205        }
206
207        impl Drop for FinishOnDrop<'_> {
208            fn drop(&mut self) {
209                let result = self
210                    .result
211                    .take()
212                    .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
213                self.messages
214                    .push(Message::Finish(self.id, Artifact::All, result));
215            }
216        }
217    }
218
219    pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
220        self.messages
221            .push(Message::FutureIncompatReport(self.id, report));
222    }
223
224    /// The rustc emitted the list of unused `--extern` args.
225    ///
226    /// This is useful for checking unused dependencies.
227    /// Should only be called once, as the compiler only emits it once per compilation.
228    pub fn unused_externs(&self, unused_externs: std::collections::BTreeSet<InternedString>) {
229        self.messages
230            .push(Message::UnusedExterns(self.id, unused_externs));
231    }
232}