Skip to main content

std/sys/sync/once/
queue.rs

1// Each `Once` has one word of atomic state, and this state is CAS'd on to
2// determine what to do. There are four possible state of a `Once`:
3//
4// * Incomplete - no initialization has run yet, and no thread is currently
5//                using the Once.
6// * Poisoned - some thread has previously attempted to initialize the Once, but
7//              it panicked, so the Once is now poisoned. There are no other
8//              threads currently accessing this Once.
9// * Running - some thread is currently attempting to run initialization. It may
10//             succeed, so all future threads need to wait for it to finish.
11//             Note that this state is accompanied with a payload, described
12//             below.
13// * Complete - initialization has completed and all future calls should finish
14//              immediately.
15//
16// With 4 states we need 2 bits to encode this, and we use the remaining bits
17// in the word we have allocated as a queue of threads waiting for the thread
18// responsible for entering the RUNNING state. This queue is just a linked list
19// of Waiter nodes which is monotonically increasing in size. Each node is
20// allocated on the stack, and whenever the running closure finishes it will
21// consume the entire queue and notify all waiters they should try again.
22//
23// You'll find a few more details in the implementation, but that's the gist of
24// it!
25//
26// Futex orderings:
27// When running `Once` we deal with multiple atomics:
28// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
29// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
30//   result of the `Once`, and (3) for synchronizing `Waiter` nodes.
31//     - At the end of the `call` function we have to make sure the result
32//       of the `Once` is acquired. So every load which can be the only one to
33//       load COMPLETED must have at least acquire ordering, which means all
34//       three of them.
35//     - `WaiterQueue::drop` is the only place that may store COMPLETED, and
36//       must do so with release ordering to make the result available.
37//     - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
38//       needs to make the nodes available with release ordering. The load in
39//       its `compare_exchange` can be relaxed because it only has to compare
40//       the atomic, not to read other data.
41//     - `WaiterQueue::drop` must see the `Waiter` nodes, so it must load
42//       `state_and_queue` with acquire ordering.
43//     - There is just one store where `state_and_queue` is used only as a
44//       state flag, without having to synchronize data: switching the state
45//       from INCOMPLETE to RUNNING in `call`. This store can be Relaxed,
46//       but the read has to be Acquire because of the requirements mentioned
47//       above.
48// * `Waiter.signaled` is both used as a flag, and to protect a field with
49//   interior mutability in `Waiter`. `Waiter.thread` is changed in
50//   `WaiterQueue::drop` which then sets `signaled` with release ordering.
51//   After `wait` loads `signaled` with acquire ordering and sees it is true,
52//   it needs to see the changes to drop the `Waiter` struct correctly.
53// * There is one place where the two atomics `Once.state_and_queue` and
54//   `Waiter.signaled` come together, and might be reordered by the compiler or
55//   processor. Because both use acquire ordering such a reordering is not
56//   allowed, so no need for `SeqCst`.
57
58use crate::cell::Cell;
59use crate::sync::atomic::Ordering::{AcqRel, Acquire, Release};
60use crate::sync::atomic::{Atomic, AtomicBool, AtomicPtr};
61use crate::sync::once::OnceExclusiveState;
62use crate::thread::{self, Thread};
63use crate::{fmt, ptr, sync as public};
64
65type StateAndQueue = *mut ();
66
67pub struct Once {
68    state_and_queue: Atomic<*mut ()>,
69}
70
71pub struct OnceState {
72    poisoned: bool,
73    set_state_on_drop_to: Cell<StateAndQueue>,
74}
75
76// Four states that a Once can be in, encoded into the lower bits of
77// `state_and_queue` in the Once structure. By choosing COMPLETE as the all-zero
78// state the `is_completed` check can be a bit faster on some platforms.
79const INCOMPLETE: usize = 0x3;
80const POISONED: usize = 0x2;
81const RUNNING: usize = 0x1;
82const COMPLETE: usize = 0x0;
83
84// Mask to learn about the state. All other bits are the queue of waiters if
85// this is in the RUNNING state.
86const STATE_MASK: usize = 0b11;
87const QUEUE_MASK: usize = !STATE_MASK;
88
89// Representation of a node in the linked list of waiters, used while in the
90// RUNNING state.
91// Note: `Waiter` can't hold a mutable pointer to the next thread, because then
92// `wait` would both hand out a mutable reference to its `Waiter` node, and keep
93// a shared reference to check `signaled`. Instead we hold shared references and
94// use interior mutability.
95#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
96struct Waiter {
97    thread: Thread,
98    signaled: Atomic<bool>,
99    next: Cell<*const Waiter>,
100}
101
102// Head of a linked list of waiters.
103// Every node is a struct on the stack of a waiting thread.
104// Will wake up the waiters when it gets dropped, i.e. also on panic.
105struct WaiterQueue<'a> {
106    state_and_queue: &'a Atomic<*mut ()>,
107    set_state_on_drop_to: StateAndQueue,
108}
109
110fn to_queue(current: StateAndQueue) -> *const Waiter {
111    current.mask(QUEUE_MASK).cast()
112}
113
114fn to_state(current: StateAndQueue) -> usize {
115    current.addr() & STATE_MASK
116}
117
118impl Once {
119    #[inline]
120    pub const fn new() -> Once {
121        Once { state_and_queue: AtomicPtr::new(ptr::without_provenance_mut(INCOMPLETE)) }
122    }
123
124    #[inline]
125    pub const fn new_complete() -> Once {
126        Once { state_and_queue: AtomicPtr::new(ptr::without_provenance_mut(COMPLETE)) }
127    }
128
129    #[inline]
130    pub fn is_completed(&self) -> bool {
131        // An `Acquire` load is enough because that makes all the initialization
132        // operations visible to us, and, this being a fast path, weaker
133        // ordering helps with performance. This `Acquire` synchronizes with
134        // `Release` operations on the slow path.
135        self.state_and_queue.load(Acquire).addr() == COMPLETE
136    }
137
138    #[inline]
139    pub(crate) fn state(&mut self) -> OnceExclusiveState {
140        match self.state_and_queue.get_mut().addr() {
141            INCOMPLETE => OnceExclusiveState::Incomplete,
142            POISONED => OnceExclusiveState::Poisoned,
143            COMPLETE => OnceExclusiveState::Complete,
144            _ => unreachable!("invalid Once state"),
145        }
146    }
147
148    #[inline]
149    pub(crate) fn set_state(&mut self, new_state: OnceExclusiveState) {
150        *self.state_and_queue.get_mut() = match new_state {
151            OnceExclusiveState::Incomplete => ptr::without_provenance_mut(INCOMPLETE),
152            OnceExclusiveState::Poisoned => ptr::without_provenance_mut(POISONED),
153            OnceExclusiveState::Complete => ptr::without_provenance_mut(COMPLETE),
154        };
155    }
156
157    #[cold]
158    #[track_caller]
159    pub fn wait(&self, ignore_poisoning: bool) {
160        let mut current = self.state_and_queue.load(Acquire);
161        loop {
162            let state = to_state(current);
163            match state {
164                COMPLETE => return,
165                POISONED if !ignore_poisoning => {
166                    // Panic to propagate the poison.
167                    panic!("Once instance has previously been poisoned");
168                }
169                _ => {
170                    current = wait(&self.state_and_queue, current, !ignore_poisoning);
171                }
172            }
173        }
174    }
175
176    // This is a non-generic function to reduce the monomorphization cost of
177    // using `call_once` (this isn't exactly a trivial or small implementation).
178    //
179    // Additionally, this is tagged with `#[cold]` as it should indeed be cold
180    // and it helps let LLVM know that calls to this function should be off the
181    // fast path. Essentially, this should help generate more straight line code
182    // in LLVM.
183    //
184    // Finally, this takes an `FnMut` instead of a `FnOnce` because there's
185    // currently no way to take an `FnOnce` and call it via virtual dispatch
186    // without some allocation overhead.
187    #[cold]
188    #[track_caller]
189    pub fn call(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&public::OnceState)) {
190        let mut current = self.state_and_queue.load(Acquire);
191        loop {
192            let state = to_state(current);
193            match state {
194                COMPLETE => break,
195                POISONED if !ignore_poisoning => {
196                    // Panic to propagate the poison.
197                    panic!("Once instance has previously been poisoned");
198                }
199                POISONED | INCOMPLETE => {
200                    // Try to register this thread as the one RUNNING.
201                    if let Err(new) = self.state_and_queue.compare_exchange_weak(
202                        current,
203                        current.mask(QUEUE_MASK).wrapping_byte_add(RUNNING),
204                        Acquire,
205                        Acquire,
206                    ) {
207                        current = new;
208                        continue;
209                    }
210
211                    // `waiter_queue` will manage other waiting threads, and
212                    // wake them up on drop.
213                    let mut waiter_queue = WaiterQueue {
214                        state_and_queue: &self.state_and_queue,
215                        set_state_on_drop_to: ptr::without_provenance_mut(POISONED),
216                    };
217                    // Run the initialization function, letting it know if we're
218                    // poisoned or not.
219                    let init_state = public::OnceState {
220                        inner: OnceState {
221                            poisoned: state == POISONED,
222                            set_state_on_drop_to: Cell::new(ptr::without_provenance_mut(COMPLETE)),
223                        },
224                    };
225                    init(&init_state);
226                    waiter_queue.set_state_on_drop_to = init_state.inner.set_state_on_drop_to.get();
227                    return;
228                }
229                _ => {
230                    // All other values must be RUNNING with possibly a
231                    // pointer to the waiter queue in the more significant bits.
232                    assert!(state == RUNNING);
233                    current = wait(&self.state_and_queue, current, true);
234                }
235            }
236        }
237    }
238}
239
240fn wait(
241    state_and_queue: &Atomic<*mut ()>,
242    mut current: StateAndQueue,
243    return_on_poisoned: bool,
244) -> StateAndQueue {
245    let node = &Waiter {
246        thread: thread::current_or_unnamed(),
247        signaled: AtomicBool::new(false),
248        next: Cell::new(ptr::null()),
249    };
250
251    loop {
252        let state = to_state(current);
253        let queue = to_queue(current);
254
255        // If initialization has finished, return.
256        if state == COMPLETE || (return_on_poisoned && state == POISONED) {
257            return current;
258        }
259
260        // Update the node for our current thread.
261        node.next.set(queue);
262
263        // Try to slide in the node at the head of the linked list, making sure
264        // that another thread didn't just replace the head of the linked list.
265        if let Err(new) = state_and_queue.compare_exchange_weak(
266            current,
267            ptr::from_ref(node).wrapping_byte_add(state) as StateAndQueue,
268            Release,
269            Acquire,
270        ) {
271            current = new;
272            continue;
273        }
274
275        // We have enqueued ourselves, now lets wait.
276        // It is important not to return before being signaled, otherwise we
277        // would drop our `Waiter` node and leave a hole in the linked list
278        // (and a dangling reference). Guard against spurious wakeups by
279        // reparking ourselves until we are signaled.
280        while !node.signaled.load(Acquire) {
281            // If the managing thread happens to signal and unpark us before we
282            // can park ourselves, the result could be this thread never gets
283            // unparked. Luckily `park` comes with the guarantee that if it got
284            // an `unpark` just before on an unparked thread it does not park. Crucially, we know
285            // the `unpark` must have happened between the `compare_exchange_weak` above and here,
286            // and there's no other `park` in that code that could steal our token.
287            // SAFETY: we retrieved this handle on the current thread above.
288            unsafe { node.thread.park() }
289        }
290
291        return state_and_queue.load(Acquire);
292    }
293}
294
295#[stable(feature = "std_debug", since = "1.16.0")]
296impl fmt::Debug for Once {
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298        f.debug_struct("Once").finish_non_exhaustive()
299    }
300}
301
302impl Drop for WaiterQueue<'_> {
303    fn drop(&mut self) {
304        // Swap out our state with however we finished.
305        let current = self.state_and_queue.swap(self.set_state_on_drop_to, AcqRel);
306
307        // We should only ever see an old state which was RUNNING.
308        assert_eq!(current.addr() & STATE_MASK, RUNNING);
309
310        // Walk the entire linked list of waiters and wake them up (in lifo
311        // order, last to register is first to wake up).
312        unsafe {
313            // Right after setting `node.signaled = true` the other thread may
314            // free `node` if there happens to be has a spurious wakeup.
315            // So we have to take out the `thread` field and copy the pointer to
316            // `next` first.
317            let mut queue = to_queue(current);
318            while !queue.is_null() {
319                let next = (*queue).next.get();
320                let thread = (*queue).thread.clone();
321                (*queue).signaled.store(true, Release);
322                thread.unpark();
323                queue = next;
324            }
325        }
326    }
327}
328
329impl OnceState {
330    #[inline]
331    pub fn is_poisoned(&self) -> bool {
332        self.poisoned
333    }
334
335    #[inline]
336    pub fn poison(&self) {
337        self.set_state_on_drop_to.set(ptr::without_provenance_mut(POISONED));
338    }
339}