std/sys/sync/once/queue.rs
1// Each `Once` has one word of atomic state, and this state is CAS'd on to
2// determine what to do. There are four possible state of a `Once`:
3//
4// * Incomplete - no initialization has run yet, and no thread is currently
5// using the Once.
6// * Poisoned - some thread has previously attempted to initialize the Once, but
7// it panicked, so the Once is now poisoned. There are no other
8// threads currently accessing this Once.
9// * Running - some thread is currently attempting to run initialization. It may
10// succeed, so all future threads need to wait for it to finish.
11// Note that this state is accompanied with a payload, described
12// below.
13// * Complete - initialization has completed and all future calls should finish
14// immediately.
15//
16// With 4 states we need 2 bits to encode this, and we use the remaining bits
17// in the word we have allocated as a queue of threads waiting for the thread
18// responsible for entering the RUNNING state. This queue is just a linked list
19// of Waiter nodes which is monotonically increasing in size. Each node is
20// allocated on the stack, and whenever the running closure finishes it will
21// consume the entire queue and notify all waiters they should try again.
22//
23// You'll find a few more details in the implementation, but that's the gist of
24// it!
25//
26// Futex orderings:
27// When running `Once` we deal with multiple atomics:
28// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
29// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
30// result of the `Once`, and (3) for synchronizing `Waiter` nodes.
31// - At the end of the `call` function we have to make sure the result
32// of the `Once` is acquired. So every load which can be the only one to
33// load COMPLETED must have at least acquire ordering, which means all
34// three of them.
35// - `WaiterQueue::drop` is the only place that may store COMPLETED, and
36// must do so with release ordering to make the result available.
37// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
38// needs to make the nodes available with release ordering. The load in
39// its `compare_exchange` can be relaxed because it only has to compare
40// the atomic, not to read other data.
41// - `WaiterQueue::drop` must see the `Waiter` nodes, so it must load
42// `state_and_queue` with acquire ordering.
43// - There is just one store where `state_and_queue` is used only as a
44// state flag, without having to synchronize data: switching the state
45// from INCOMPLETE to RUNNING in `call`. This store can be Relaxed,
46// but the read has to be Acquire because of the requirements mentioned
47// above.
48// * `Waiter.signaled` is both used as a flag, and to protect a field with
49// interior mutability in `Waiter`. `Waiter.thread` is changed in
50// `WaiterQueue::drop` which then sets `signaled` with release ordering.
51// After `wait` loads `signaled` with acquire ordering and sees it is true,
52// it needs to see the changes to drop the `Waiter` struct correctly.
53// * There is one place where the two atomics `Once.state_and_queue` and
54// `Waiter.signaled` come together, and might be reordered by the compiler or
55// processor. Because both use acquire ordering such a reordering is not
56// allowed, so no need for `SeqCst`.
57
58use crate::cell::Cell;
59use crate::sync::atomic::Ordering::{AcqRel, Acquire, Release};
60use crate::sync::atomic::{Atomic, AtomicBool, AtomicPtr};
61use crate::sync::once::OnceExclusiveState;
62use crate::thread::{self, Thread};
63use crate::{fmt, ptr, sync as public};
64
65type StateAndQueue = *mut ();
66
67pub struct Once {
68 state_and_queue: Atomic<*mut ()>,
69}
70
71pub struct OnceState {
72 poisoned: bool,
73 set_state_on_drop_to: Cell<StateAndQueue>,
74}
75
76// Four states that a Once can be in, encoded into the lower bits of
77// `state_and_queue` in the Once structure. By choosing COMPLETE as the all-zero
78// state the `is_completed` check can be a bit faster on some platforms.
79const INCOMPLETE: usize = 0x3;
80const POISONED: usize = 0x2;
81const RUNNING: usize = 0x1;
82const COMPLETE: usize = 0x0;
83
84// Mask to learn about the state. All other bits are the queue of waiters if
85// this is in the RUNNING state.
86const STATE_MASK: usize = 0b11;
87const QUEUE_MASK: usize = !STATE_MASK;
88
89// Representation of a node in the linked list of waiters, used while in the
90// RUNNING state.
91// Note: `Waiter` can't hold a mutable pointer to the next thread, because then
92// `wait` would both hand out a mutable reference to its `Waiter` node, and keep
93// a shared reference to check `signaled`. Instead we hold shared references and
94// use interior mutability.
95#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
96struct Waiter {
97 thread: Thread,
98 signaled: Atomic<bool>,
99 next: Cell<*const Waiter>,
100}
101
102// Head of a linked list of waiters.
103// Every node is a struct on the stack of a waiting thread.
104// Will wake up the waiters when it gets dropped, i.e. also on panic.
105struct WaiterQueue<'a> {
106 state_and_queue: &'a Atomic<*mut ()>,
107 set_state_on_drop_to: StateAndQueue,
108}
109
110fn to_queue(current: StateAndQueue) -> *const Waiter {
111 current.mask(QUEUE_MASK).cast()
112}
113
114fn to_state(current: StateAndQueue) -> usize {
115 current.addr() & STATE_MASK
116}
117
118impl Once {
119 #[inline]
120 pub const fn new() -> Once {
121 Once { state_and_queue: AtomicPtr::new(ptr::without_provenance_mut(INCOMPLETE)) }
122 }
123
124 #[inline]
125 pub const fn new_complete() -> Once {
126 Once { state_and_queue: AtomicPtr::new(ptr::without_provenance_mut(COMPLETE)) }
127 }
128
129 #[inline]
130 pub fn is_completed(&self) -> bool {
131 // An `Acquire` load is enough because that makes all the initialization
132 // operations visible to us, and, this being a fast path, weaker
133 // ordering helps with performance. This `Acquire` synchronizes with
134 // `Release` operations on the slow path.
135 self.state_and_queue.load(Acquire).addr() == COMPLETE
136 }
137
138 #[inline]
139 pub(crate) fn state(&mut self) -> OnceExclusiveState {
140 match self.state_and_queue.get_mut().addr() {
141 INCOMPLETE => OnceExclusiveState::Incomplete,
142 POISONED => OnceExclusiveState::Poisoned,
143 COMPLETE => OnceExclusiveState::Complete,
144 _ => unreachable!("invalid Once state"),
145 }
146 }
147
148 #[inline]
149 pub(crate) fn set_state(&mut self, new_state: OnceExclusiveState) {
150 *self.state_and_queue.get_mut() = match new_state {
151 OnceExclusiveState::Incomplete => ptr::without_provenance_mut(INCOMPLETE),
152 OnceExclusiveState::Poisoned => ptr::without_provenance_mut(POISONED),
153 OnceExclusiveState::Complete => ptr::without_provenance_mut(COMPLETE),
154 };
155 }
156
157 #[cold]
158 #[track_caller]
159 pub fn wait(&self, ignore_poisoning: bool) {
160 let mut current = self.state_and_queue.load(Acquire);
161 loop {
162 let state = to_state(current);
163 match state {
164 COMPLETE => return,
165 POISONED if !ignore_poisoning => {
166 // Panic to propagate the poison.
167 panic!("Once instance has previously been poisoned");
168 }
169 _ => {
170 current = wait(&self.state_and_queue, current, !ignore_poisoning);
171 }
172 }
173 }
174 }
175
176 // This is a non-generic function to reduce the monomorphization cost of
177 // using `call_once` (this isn't exactly a trivial or small implementation).
178 //
179 // Additionally, this is tagged with `#[cold]` as it should indeed be cold
180 // and it helps let LLVM know that calls to this function should be off the
181 // fast path. Essentially, this should help generate more straight line code
182 // in LLVM.
183 //
184 // Finally, this takes an `FnMut` instead of a `FnOnce` because there's
185 // currently no way to take an `FnOnce` and call it via virtual dispatch
186 // without some allocation overhead.
187 #[cold]
188 #[track_caller]
189 pub fn call(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&public::OnceState)) {
190 let mut current = self.state_and_queue.load(Acquire);
191 loop {
192 let state = to_state(current);
193 match state {
194 COMPLETE => break,
195 POISONED if !ignore_poisoning => {
196 // Panic to propagate the poison.
197 panic!("Once instance has previously been poisoned");
198 }
199 POISONED | INCOMPLETE => {
200 // Try to register this thread as the one RUNNING.
201 if let Err(new) = self.state_and_queue.compare_exchange_weak(
202 current,
203 current.mask(QUEUE_MASK).wrapping_byte_add(RUNNING),
204 Acquire,
205 Acquire,
206 ) {
207 current = new;
208 continue;
209 }
210
211 // `waiter_queue` will manage other waiting threads, and
212 // wake them up on drop.
213 let mut waiter_queue = WaiterQueue {
214 state_and_queue: &self.state_and_queue,
215 set_state_on_drop_to: ptr::without_provenance_mut(POISONED),
216 };
217 // Run the initialization function, letting it know if we're
218 // poisoned or not.
219 let init_state = public::OnceState {
220 inner: OnceState {
221 poisoned: state == POISONED,
222 set_state_on_drop_to: Cell::new(ptr::without_provenance_mut(COMPLETE)),
223 },
224 };
225 init(&init_state);
226 waiter_queue.set_state_on_drop_to = init_state.inner.set_state_on_drop_to.get();
227 return;
228 }
229 _ => {
230 // All other values must be RUNNING with possibly a
231 // pointer to the waiter queue in the more significant bits.
232 assert!(state == RUNNING);
233 current = wait(&self.state_and_queue, current, true);
234 }
235 }
236 }
237 }
238}
239
240fn wait(
241 state_and_queue: &Atomic<*mut ()>,
242 mut current: StateAndQueue,
243 return_on_poisoned: bool,
244) -> StateAndQueue {
245 let node = &Waiter {
246 thread: thread::current_or_unnamed(),
247 signaled: AtomicBool::new(false),
248 next: Cell::new(ptr::null()),
249 };
250
251 loop {
252 let state = to_state(current);
253 let queue = to_queue(current);
254
255 // If initialization has finished, return.
256 if state == COMPLETE || (return_on_poisoned && state == POISONED) {
257 return current;
258 }
259
260 // Update the node for our current thread.
261 node.next.set(queue);
262
263 // Try to slide in the node at the head of the linked list, making sure
264 // that another thread didn't just replace the head of the linked list.
265 if let Err(new) = state_and_queue.compare_exchange_weak(
266 current,
267 ptr::from_ref(node).wrapping_byte_add(state) as StateAndQueue,
268 Release,
269 Acquire,
270 ) {
271 current = new;
272 continue;
273 }
274
275 // We have enqueued ourselves, now lets wait.
276 // It is important not to return before being signaled, otherwise we
277 // would drop our `Waiter` node and leave a hole in the linked list
278 // (and a dangling reference). Guard against spurious wakeups by
279 // reparking ourselves until we are signaled.
280 while !node.signaled.load(Acquire) {
281 // If the managing thread happens to signal and unpark us before we
282 // can park ourselves, the result could be this thread never gets
283 // unparked. Luckily `park` comes with the guarantee that if it got
284 // an `unpark` just before on an unparked thread it does not park. Crucially, we know
285 // the `unpark` must have happened between the `compare_exchange_weak` above and here,
286 // and there's no other `park` in that code that could steal our token.
287 // SAFETY: we retrieved this handle on the current thread above.
288 unsafe { node.thread.park() }
289 }
290
291 return state_and_queue.load(Acquire);
292 }
293}
294
295#[stable(feature = "std_debug", since = "1.16.0")]
296impl fmt::Debug for Once {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 f.debug_struct("Once").finish_non_exhaustive()
299 }
300}
301
302impl Drop for WaiterQueue<'_> {
303 fn drop(&mut self) {
304 // Swap out our state with however we finished.
305 let current = self.state_and_queue.swap(self.set_state_on_drop_to, AcqRel);
306
307 // We should only ever see an old state which was RUNNING.
308 assert_eq!(current.addr() & STATE_MASK, RUNNING);
309
310 // Walk the entire linked list of waiters and wake them up (in lifo
311 // order, last to register is first to wake up).
312 unsafe {
313 // Right after setting `node.signaled = true` the other thread may
314 // free `node` if there happens to be has a spurious wakeup.
315 // So we have to take out the `thread` field and copy the pointer to
316 // `next` first.
317 let mut queue = to_queue(current);
318 while !queue.is_null() {
319 let next = (*queue).next.get();
320 let thread = (*queue).thread.clone();
321 (*queue).signaled.store(true, Release);
322 thread.unpark();
323 queue = next;
324 }
325 }
326 }
327}
328
329impl OnceState {
330 #[inline]
331 pub fn is_poisoned(&self) -> bool {
332 self.poisoned
333 }
334
335 #[inline]
336 pub fn poison(&self) {
337 self.set_state_on_drop_to.set(ptr::without_provenance_mut(POISONED));
338 }
339}