1use std::mem;
4use std::sync::atomic::Ordering::Relaxed;
5use std::task::Poll;
6use std::time::{Duration, SystemTime};
7
8use rand::RngExt;
9use rand::seq::IteratorRandom;
10use rustc_abi::ExternAbi;
11use rustc_const_eval::CTRL_C_RECEIVED;
12use rustc_data_structures::either::Either;
13use rustc_data_structures::fx::FxHashMap;
14use rustc_hir::def_id::DefId;
15use rustc_index::{Idx, IndexVec};
16use rustc_middle::mir::Mutability;
17use rustc_middle::ty::layout::TyAndLayout;
18use rustc_span::{DUMMY_SP, Span};
19use rustc_target::spec::Os;
20
21use crate::concurrency::GlobalDataRaceHandler;
22use crate::shims::{Epoll, EpollEvalContextExt, FileDescriptionRef, tls};
23use crate::*;
24
25#[derive(Clone, Copy, Debug, PartialEq)]
26enum SchedulingAction {
27 ExecuteStep,
29 SleepAndWaitForIo(Option<Duration>),
34}
35
36#[derive(Clone, Copy, Debug, PartialEq)]
38pub enum TlsAllocAction {
39 Deallocate,
41 Leak,
44}
45
46#[derive(Clone, Copy, Debug, PartialEq)]
48pub enum UnblockKind {
49 Ready,
51 TimedOut,
53}
54
55pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
58
59#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
61pub struct ThreadId(u32);
62
63impl ThreadId {
64 pub fn to_u32(self) -> u32 {
65 self.0
66 }
67
68 pub fn new_unchecked(id: u32) -> Self {
70 Self(id)
71 }
72
73 pub const MAIN_THREAD: ThreadId = ThreadId(0);
74}
75
76impl Idx for ThreadId {
77 fn new(idx: usize) -> Self {
78 ThreadId(u32::try_from(idx).unwrap())
79 }
80
81 fn index(self) -> usize {
82 usize::try_from(self.0).unwrap()
83 }
84}
85
86impl From<ThreadId> for u64 {
87 fn from(t: ThreadId) -> Self {
88 t.0.into()
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
94pub enum BlockReason {
95 Join(ThreadId),
98 Sleep,
100 Mutex,
102 Condvar,
104 RwLock,
106 Futex,
108 InitOnce,
110 Epoll { epfd: FileDescriptionRef<Epoll> },
112 Eventfd,
114 VirtualSocket,
116 IO,
118 Genmc,
121}
122
123enum ThreadState<'tcx> {
125 Enabled,
127 Blocked { reason: BlockReason, deadline: Option<Deadline>, callback: DynUnblockCallback<'tcx> },
129 Terminated,
132}
133
134impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Enabled => write!(f, "Enabled"),
138 Self::Blocked { reason, deadline, .. } =>
139 f.debug_struct("Blocked")
140 .field("reason", reason)
141 .field("deadline", deadline)
142 .finish(),
143 Self::Terminated => write!(f, "Terminated"),
144 }
145 }
146}
147
148impl<'tcx> ThreadState<'tcx> {
149 fn is_enabled(&self) -> bool {
150 matches!(self, ThreadState::Enabled)
151 }
152
153 fn is_terminated(&self) -> bool {
154 matches!(self, ThreadState::Terminated)
155 }
156
157 fn is_blocked_on(&self, reason: &BlockReason) -> bool {
158 matches!(self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
159 }
160}
161
162#[derive(Debug, Copy, Clone, PartialEq, Eq)]
164enum ThreadJoinStatus {
165 Joinable,
167 Detached,
170 Joined,
172}
173
174pub struct Thread<'tcx> {
176 state: ThreadState<'tcx>,
177
178 thread_name: Option<Vec<u8>>,
180
181 stack: Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>>,
183
184 pub(crate) origin_span: Span,
187
188 pub(crate) on_stack_empty: Option<StackEmptyCallback<'tcx>>,
193
194 top_user_relevant_frame: Option<usize>,
199
200 join_status: ThreadJoinStatus,
202
203 pub(crate) unwind_payloads: Vec<ImmTy<'tcx>>,
212
213 pub(crate) last_error: Option<MPlaceTy<'tcx>>,
215}
216
217pub type StackEmptyCallback<'tcx> =
218 Box<dyn FnMut(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, Poll<()>> + 'tcx>;
219
220impl<'tcx> Thread<'tcx> {
221 fn thread_name(&self) -> Option<&[u8]> {
223 self.thread_name.as_deref()
224 }
225
226 pub fn is_enabled(&self) -> bool {
228 self.state.is_enabled()
229 }
230
231 fn thread_display_name(&self, id: ThreadId) -> String {
233 if let Some(ref thread_name) = self.thread_name {
234 String::from_utf8_lossy(thread_name).into_owned()
235 } else {
236 format!("unnamed-{}", id.index())
237 }
238 }
239
240 fn compute_top_user_relevant_frame(&self, skip: usize) -> Option<usize> {
246 let mut best = None;
248 for (idx, frame) in self.stack.iter().enumerate().rev().skip(skip) {
249 let relevance = frame.extra.user_relevance;
250 if relevance == u8::MAX {
251 return Some(idx);
253 }
254 if best.is_none_or(|(_best_idx, best_relevance)| best_relevance < relevance) {
255 best = Some((idx, relevance));
258 }
259 }
260 best.map(|(idx, _relevance)| idx)
261 }
262
263 pub fn recompute_top_user_relevant_frame(&mut self, skip: usize) {
266 self.top_user_relevant_frame = self.compute_top_user_relevant_frame(skip);
267 }
268
269 pub fn set_top_user_relevant_frame(&mut self, frame_idx: usize) {
272 debug_assert_eq!(Some(frame_idx), self.compute_top_user_relevant_frame(0));
273 self.top_user_relevant_frame = Some(frame_idx);
274 }
275
276 pub fn top_user_relevant_frame(&self) -> Option<usize> {
279 self.top_user_relevant_frame.or_else(|| self.stack.len().checked_sub(1))
283 }
284
285 pub fn current_user_relevance(&self) -> u8 {
286 self.top_user_relevant_frame()
287 .map(|frame_idx| self.stack[frame_idx].extra.user_relevance)
288 .unwrap_or(0)
289 }
290
291 pub fn current_user_relevant_span(&self) -> Span {
292 debug_assert_eq!(self.top_user_relevant_frame, self.compute_top_user_relevant_frame(0));
293 self.top_user_relevant_frame()
294 .map(|frame_idx| self.stack[frame_idx].current_span())
295 .unwrap_or(rustc_span::DUMMY_SP)
296 }
297}
298
299impl<'tcx> std::fmt::Debug for Thread<'tcx> {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 write!(
302 f,
303 "{}({:?}, {:?})",
304 String::from_utf8_lossy(self.thread_name().unwrap_or(b"<unnamed>")),
305 self.state,
306 self.join_status
307 )
308 }
309}
310
311impl<'tcx> Thread<'tcx> {
312 fn new(name: Option<&str>, on_stack_empty: Option<StackEmptyCallback<'tcx>>) -> Self {
313 Self {
314 state: ThreadState::Enabled,
315 thread_name: name.map(|name| Vec::from(name.as_bytes())),
316 stack: Vec::new(),
317 origin_span: DUMMY_SP,
318 top_user_relevant_frame: None,
319 join_status: ThreadJoinStatus::Joinable,
320 unwind_payloads: Vec::new(),
321 last_error: None,
322 on_stack_empty,
323 }
324 }
325}
326
327impl VisitProvenance for Thread<'_> {
328 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
329 let Thread {
330 unwind_payloads: panic_payload,
331 last_error,
332 stack,
333 origin_span: _,
334 top_user_relevant_frame: _,
335 state: _,
336 thread_name: _,
337 join_status: _,
338 on_stack_empty: _, } = self;
340
341 for payload in panic_payload {
342 payload.visit_provenance(visit);
343 }
344 last_error.visit_provenance(visit);
345 for frame in stack {
346 frame.visit_provenance(visit)
347 }
348 }
349}
350
351impl VisitProvenance for Frame<'_, Provenance, FrameExtra<'_>> {
352 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
353 let return_place = self.return_place();
354 let Frame {
355 locals,
356 extra,
357 ..
359 } = self;
360
361 return_place.visit_provenance(visit);
363 for local in locals.iter() {
365 match local.as_mplace_or_imm() {
366 None => {}
367 Some(Either::Left((ptr, meta))) => {
368 ptr.visit_provenance(visit);
369 meta.visit_provenance(visit);
370 }
371 Some(Either::Right(imm)) => {
372 imm.visit_provenance(visit);
373 }
374 }
375 }
376
377 extra.visit_provenance(visit);
378 }
379}
380
381#[derive(Debug, Copy, Clone)]
383pub enum ThreadLookupError {
384 InvalidId,
386 Terminated(ThreadId),
388}
389
390#[derive(Debug)]
392pub struct ThreadManager<'tcx> {
393 active_thread: ThreadId,
395 threads: IndexVec<ThreadId, Thread<'tcx>>,
399 thread_local_allocs: FxHashMap<(DefId, ThreadId), StrictPointer>,
401 yield_active_thread: bool,
404 fixed_scheduling: bool,
406}
407
408impl VisitProvenance for ThreadManager<'_> {
409 fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
410 let ThreadManager {
411 threads,
412 thread_local_allocs,
413 active_thread: _,
414 yield_active_thread: _,
415 fixed_scheduling: _,
416 } = self;
417
418 for thread in threads {
419 thread.visit_provenance(visit);
420 }
421 for ptr in thread_local_allocs.values() {
422 ptr.visit_provenance(visit);
423 }
424 }
425}
426
427impl<'tcx> ThreadManager<'tcx> {
428 pub(crate) fn new(config: &MiriConfig) -> Self {
429 let mut threads = IndexVec::new();
430 threads.push(Thread::new(Some("main"), None));
432 Self {
433 active_thread: ThreadId::MAIN_THREAD,
434 threads,
435 thread_local_allocs: Default::default(),
436 yield_active_thread: false,
437 fixed_scheduling: config.fixed_scheduling,
438 }
439 }
440
441 pub(crate) fn init(
442 ecx: &mut MiriInterpCx<'tcx>,
443 on_main_stack_empty: StackEmptyCallback<'tcx>,
444 ) {
445 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].on_stack_empty =
446 Some(on_main_stack_empty);
447 if ecx.tcx.sess.target.os != Os::Windows {
448 ecx.machine.threads.threads[ThreadId::MAIN_THREAD].join_status =
450 ThreadJoinStatus::Detached;
451 }
452 }
453
454 pub fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
458 if let Ok(id) = id.try_into()
459 && usize::try_from(id).is_ok_and(|id| id < self.threads.len())
460 {
461 let thread_id = ThreadId(id);
462 if self.threads[thread_id].state.is_terminated() {
463 Err(ThreadLookupError::Terminated(thread_id))
464 } else {
465 Ok(thread_id)
466 }
467 } else {
468 Err(ThreadLookupError::InvalidId)
469 }
470 }
471
472 fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<StrictPointer> {
475 self.thread_local_allocs.get(&(def_id, self.active_thread)).cloned()
476 }
477
478 fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: StrictPointer) {
483 self.thread_local_allocs.try_insert((def_id, self.active_thread), ptr).unwrap();
484 }
485
486 pub fn active_thread_stack(&self) -> &[Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
488 &self.threads[self.active_thread].stack
489 }
490
491 pub fn active_thread_stack_mut(
493 &mut self,
494 ) -> &mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
495 &mut self.threads[self.active_thread].stack
496 }
497
498 pub fn all_blocked_stacks(
499 &self,
500 ) -> impl Iterator<Item = (ThreadId, &[Frame<'tcx, Provenance, FrameExtra<'tcx>>])> {
501 self.threads
502 .iter_enumerated()
503 .filter(|(_id, t)| matches!(t.state, ThreadState::Blocked { .. }))
504 .map(|(id, t)| (id, &t.stack[..]))
505 }
506
507 fn create_thread(&mut self, on_stack_empty: StackEmptyCallback<'tcx>) -> ThreadId {
509 let new_thread_id = ThreadId::new(self.threads.len());
510 self.threads.push(Thread::new(None, Some(on_stack_empty)));
511 new_thread_id
512 }
513
514 fn set_active_thread_id(&mut self, id: ThreadId) -> ThreadId {
516 assert!(id.index() < self.threads.len());
517 info!(
518 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
519 self.get_thread_display_name(id),
520 self.get_thread_display_name(self.active_thread)
521 );
522 std::mem::replace(&mut self.active_thread, id)
523 }
524
525 pub fn active_thread(&self) -> ThreadId {
527 self.active_thread
528 }
529
530 pub fn get_total_thread_count(&self) -> usize {
532 self.threads.len()
533 }
534
535 pub fn get_live_thread_count(&self) -> usize {
538 self.threads.iter().filter(|t| !t.state.is_terminated()).count()
539 }
540
541 fn has_terminated(&self, thread_id: ThreadId) -> bool {
543 self.threads[thread_id].state.is_terminated()
544 }
545
546 fn have_all_terminated(&self) -> bool {
548 self.threads.iter().all(|thread| thread.state.is_terminated())
549 }
550
551 fn enable_thread(&mut self, thread_id: ThreadId) {
553 assert!(self.has_terminated(thread_id));
554 self.threads[thread_id].state = ThreadState::Enabled;
555 }
556
557 pub fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
559 &mut self.threads[self.active_thread]
560 }
561
562 pub fn active_thread_ref(&self) -> &Thread<'tcx> {
564 &self.threads[self.active_thread]
565 }
566
567 pub fn thread_ref(&self, thread_id: ThreadId) -> &Thread<'tcx> {
568 &self.threads[thread_id]
569 }
570
571 fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
580 trace!("detaching {:?}", id);
582
583 let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
584 self.threads[id].join_status == ThreadJoinStatus::Detached
586 } else {
587 self.threads[id].join_status != ThreadJoinStatus::Joinable
588 };
589 if is_ub {
590 throw_ub_format!("trying to detach thread that was already detached or joined");
591 }
592
593 self.threads[id].join_status = ThreadJoinStatus::Detached;
594 interp_ok(())
595 }
596
597 pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
599 self.threads[thread].thread_name = Some(new_thread_name);
600 }
601
602 pub fn get_thread_name(&self, thread: ThreadId) -> Option<&[u8]> {
604 self.threads[thread].thread_name()
605 }
606
607 pub fn get_thread_display_name(&self, thread: ThreadId) -> String {
608 self.threads[thread].thread_display_name(thread)
609 }
610
611 fn block_thread(
613 &mut self,
614 reason: BlockReason,
615 deadline: Option<Deadline>,
616 callback: DynUnblockCallback<'tcx>,
617 ) {
618 let state = &mut self.threads[self.active_thread].state;
619 assert!(state.is_enabled());
620 *state = ThreadState::Blocked { reason, deadline, callback }
621 }
622
623 fn yield_active_thread(&mut self) {
625 self.yield_active_thread = true;
629 }
630}
631
632impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
633trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
634 #[inline]
635 fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
636 let this = self.eval_context_mut();
637 let active_thread = this.active_thread_mut();
638 active_thread.origin_span = DUMMY_SP; let mut callback = active_thread
640 .on_stack_empty
641 .take()
642 .expect("`on_stack_empty` not set up, or already running");
643 let res = callback(this)?;
644 this.active_thread_mut().on_stack_empty = Some(callback);
645 interp_ok(res)
646 }
647
648 fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
657 let this = self.eval_context_mut();
658
659 if this.machine.data_race.as_genmc_ref().is_some() {
661 loop {
662 let genmc_ctx = this.machine.data_race.as_genmc_ref().unwrap();
663 let Some(next_thread_id) = genmc_ctx.schedule_thread(this)? else {
664 return interp_ok(SchedulingAction::ExecuteStep);
665 };
666 if this.machine.threads.threads[next_thread_id]
668 .state
669 .is_blocked_on(&BlockReason::Genmc)
670 {
671 info!(
672 "GenMC: scheduling blocked thread {next_thread_id:?}, so we unblock it now."
673 );
674 this.unblock_thread(next_thread_id, BlockReason::Genmc)?;
675 }
676 let thread_manager = &mut this.machine.threads;
679 if thread_manager.threads[next_thread_id].state.is_enabled() {
680 thread_manager.active_thread = next_thread_id;
682 return interp_ok(SchedulingAction::ExecuteStep);
683 }
684 }
685 }
686
687 let thread_manager = &this.machine.threads;
689 if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
691 && !thread_manager.yield_active_thread
692 {
693 return interp_ok(SchedulingAction::ExecuteStep);
695 }
696
697 if this.machine.communicate() {
701 this.poll_and_unblock(Some(Duration::ZERO))?;
711 }
712
713 let potential_sleep_time = this.unblock_expired_deadlines()?;
719
720 let thread_manager = &mut this.machine.threads;
721 let rng = this.machine.rng.get_mut();
722
723 let mut threads_iter = thread_manager
730 .threads
731 .iter_enumerated()
732 .skip(thread_manager.active_thread.index() + 1)
733 .chain(
734 thread_manager
735 .threads
736 .iter_enumerated()
737 .take(thread_manager.active_thread.index() + 1),
738 )
739 .filter(|(_id, thread)| thread.state.is_enabled());
740 let new_thread = if thread_manager.fixed_scheduling {
742 let next = threads_iter.next();
743 drop(threads_iter);
744 next
745 } else {
746 threads_iter.choose(rng)
747 };
748
749 if let Some((id, _thread)) = new_thread {
750 if thread_manager.active_thread != id {
751 info!(
752 "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
753 thread_manager.get_thread_display_name(id),
754 thread_manager.get_thread_display_name(thread_manager.active_thread)
755 );
756 thread_manager.active_thread = id;
757 }
758 }
759 thread_manager.yield_active_thread = false;
761
762 if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
763 return interp_ok(SchedulingAction::ExecuteStep);
764 }
765
766 let threads = &this.machine.threads.threads;
768
769 if threads.iter().all(|thread| thread.state.is_terminated()) {
770 unreachable!("all threads terminated without the main thread terminating?!");
771 } else if let Some(sleep_time) = potential_sleep_time {
772 interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
776 } else if threads.iter().any(|thread| this.is_thread_blocked_on_host(thread)) {
777 interp_ok(SchedulingAction::SleepAndWaitForIo(None))
781 } else {
782 throw_machine_stop!(TerminationInfo::GlobalDeadlock);
783 }
784 }
785
786 fn is_thread_blocked_on_host(&self, thread: &Thread<'tcx>) -> bool {
790 let this = self.eval_context_ref();
791 match &thread.state {
792 ThreadState::Blocked { reason: BlockReason::IO, .. } => true,
793 ThreadState::Blocked { reason: BlockReason::Epoll { epfd }, .. } =>
794 this.has_epoll_host_interests(epfd),
795 _ => false,
796 }
797 }
798
799 fn unblock_expired_deadlines(&mut self) -> InterpResult<'tcx, Option<Duration>> {
804 let this = self.eval_context_mut();
805 let communicate = this.machine.communicate();
806
807 let mut min_wait_time = Option::<Duration>::None;
808 let mut callbacks = Vec::new();
809
810 for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
811 match &thread.state {
812 ThreadState::Blocked { deadline: Some(deadline), .. } => {
813 let wait_time = match deadline {
814 Deadline::Monotonic(instant) =>
815 instant.duration_since(this.machine.monotonic_clock.now()),
816 Deadline::RealTime(time) => {
817 assert!(communicate, "cannot have `RealTime` timeout with isolation");
818 time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)
819 }
820 };
821
822 if wait_time.is_zero() {
823 let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
825 let ThreadState::Blocked { callback, .. } = old_state else {
826 unreachable!()
827 };
828 callbacks.push((id, callback));
830 } else {
831 min_wait_time = Some(wait_time.min(min_wait_time.unwrap_or(Duration::MAX)));
834 }
835 }
836 _ => {}
837 }
838 }
839
840 for (thread, callback) in callbacks {
841 let old_thread = this.machine.threads.set_active_thread_id(thread);
848 callback.call(this, UnblockKind::TimedOut)?;
849 this.machine.threads.set_active_thread_id(old_thread);
850 }
851
852 interp_ok(min_wait_time)
853 }
854}
855
856impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
858pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
859 #[inline]
860 fn thread_id_try_from(&self, id: impl TryInto<u32>) -> Result<ThreadId, ThreadLookupError> {
861 self.eval_context_ref().machine.threads.thread_id_try_from(id)
862 }
863
864 fn get_or_create_thread_local_alloc(
867 &mut self,
868 def_id: DefId,
869 ) -> InterpResult<'tcx, StrictPointer> {
870 let this = self.eval_context_mut();
871 let tcx = this.tcx;
872 if let Some(old_alloc) = this.machine.threads.get_thread_local_alloc_id(def_id) {
873 interp_ok(old_alloc)
876 } else {
877 if tcx.is_foreign_item(def_id) {
881 throw_unsup_format!("foreign thread-local statics are not supported");
882 }
883 let params = this.machine.get_default_alloc_params();
884 let alloc = this.ctfe_query(|tcx| tcx.eval_static_initializer(def_id))?;
885 let mut alloc = alloc.inner().adjust_from_tcx(
887 &this.tcx,
888 |bytes, align| {
889 interp_ok(MiriAllocBytes::from_bytes(
890 std::borrow::Cow::Borrowed(bytes),
891 align,
892 params,
893 ))
894 },
895 |ptr| this.global_root_pointer(ptr),
896 )?;
897 alloc.mutability = Mutability::Mut;
899 let ptr = this.insert_allocation(alloc, MiriMemoryKind::Tls.into())?;
901 this.machine.threads.set_thread_local_alloc(def_id, ptr);
902 interp_ok(ptr)
903 }
904 }
905
906 #[inline]
908 fn start_regular_thread(
909 &mut self,
910 thread: Option<MPlaceTy<'tcx>>,
911 start_routine: Pointer,
912 start_abi: ExternAbi,
913 func_arg: ImmTy<'tcx>,
914 ret_layout: TyAndLayout<'tcx>,
915 ) -> InterpResult<'tcx, ThreadId> {
916 let this = self.eval_context_mut();
917
918 let current_span = this.machine.current_user_relevant_span();
920 let new_thread_id = this.machine.threads.create_thread({
921 let mut state = tls::TlsDtorsState::default();
922 Box::new(move |m| state.on_stack_empty(m))
923 });
924 match &mut this.machine.data_race {
925 GlobalDataRaceHandler::None => {}
926 GlobalDataRaceHandler::Vclocks(data_race) =>
927 data_race.thread_created(&this.machine.threads, new_thread_id, current_span),
928 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
929 genmc_ctx.handle_thread_create(
930 &this.machine.threads,
931 start_routine,
932 &func_arg,
933 new_thread_id,
934 )?,
935 }
936 if let Some(thread_info_place) = thread {
939 this.write_scalar(
940 Scalar::from_uint(new_thread_id.to_u32(), thread_info_place.layout.size),
941 &thread_info_place,
942 )?;
943 }
944
945 let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
948
949 if let Some(thread_cpu_affinity) = &mut this.machine.thread_cpu_affinity
952 && let Some(cpuset) = thread_cpu_affinity.get(&old_thread_id).cloned()
953 {
954 thread_cpu_affinity.insert(new_thread_id, cpuset);
955 }
956
957 let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
959
960 let ret_place = this.allocate(ret_layout, MiriMemoryKind::Machine.into())?;
964
965 this.call_thread_root_function(
966 instance,
967 start_abi,
968 &[func_arg],
969 Some(&ret_place),
970 current_span,
971 )?;
972
973 this.machine.threads.set_active_thread_id(old_thread_id);
975
976 interp_ok(new_thread_id)
977 }
978
979 fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
984 let this = self.eval_context_mut();
985
986 let thread = this.active_thread_mut();
988 assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
989 thread.state = ThreadState::Terminated;
990
991 let gone_thread = this.active_thread();
993 {
994 let mut free_tls_statics = Vec::new();
995 this.machine.threads.thread_local_allocs.retain(|&(_def_id, thread), &mut alloc_id| {
996 if thread != gone_thread {
997 return true;
999 }
1000 free_tls_statics.push(alloc_id);
1003 false
1004 });
1005 for ptr in free_tls_statics {
1007 match tls_alloc_action {
1008 TlsAllocAction::Deallocate =>
1009 this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
1010 TlsAllocAction::Leak =>
1011 if let Some(alloc) = ptr.provenance.get_alloc_id() {
1012 trace!(
1013 "Thread-local static leaked and stored as static root: {:?}",
1014 alloc
1015 );
1016 this.machine.static_roots.push(alloc);
1017 },
1018 }
1019 }
1020 }
1021
1022 match &mut this.machine.data_race {
1023 GlobalDataRaceHandler::None => {}
1024 GlobalDataRaceHandler::Vclocks(data_race) =>
1025 data_race.thread_terminated(&this.machine.threads),
1026 GlobalDataRaceHandler::Genmc(genmc_ctx) => {
1027 genmc_ctx.handle_thread_finish(&this.machine.threads)
1030 }
1031 }
1032
1033 let unblock_reason = BlockReason::Join(gone_thread);
1035 let threads = &this.machine.threads.threads;
1036 let joining_threads = threads
1037 .iter_enumerated()
1038 .filter(|(_, thread)| thread.state.is_blocked_on(&unblock_reason))
1039 .map(|(id, _)| id)
1040 .collect::<Vec<_>>();
1041 for thread in joining_threads {
1042 this.unblock_thread(thread, unblock_reason.clone())?;
1043 }
1044
1045 interp_ok(())
1046 }
1047
1048 #[inline]
1051 fn block_thread(
1052 &mut self,
1053 reason: BlockReason,
1054 deadline: Option<Deadline>,
1055 callback: DynUnblockCallback<'tcx>,
1056 ) {
1057 let this = self.eval_context_mut();
1058 if deadline.is_some() && this.machine.data_race.as_genmc_ref().is_some() {
1059 panic!("Unimplemented: Timeouts not yet supported in GenMC mode.");
1060 }
1061 if matches!(deadline, Some(Deadline::RealTime(_))) && !this.machine.communicate() {
1062 panic!("cannot have `RealTime` timeout with isolation");
1063 }
1064 this.machine.threads.block_thread(reason, deadline, callback);
1065 }
1066
1067 fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
1070 let this = self.eval_context_mut();
1071 let old_state =
1072 mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
1073 let callback = match old_state {
1074 ThreadState::Blocked { reason: actual_reason, callback, .. } => {
1075 assert_eq!(
1076 reason, actual_reason,
1077 "unblock_thread: thread was blocked for the wrong reason"
1078 );
1079 callback
1080 }
1081 _ => panic!("unblock_thread: thread was not blocked"),
1082 };
1083 let old_thread = this.machine.threads.set_active_thread_id(thread);
1085 callback.call(this, UnblockKind::Ready)?;
1086 this.machine.threads.set_active_thread_id(old_thread);
1087 interp_ok(())
1088 }
1089
1090 #[inline]
1091 fn detach_thread(
1092 &mut self,
1093 thread_id: ThreadId,
1094 allow_terminated_joined: bool,
1095 ) -> InterpResult<'tcx> {
1096 let this = self.eval_context_mut();
1097 this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
1098 }
1099
1100 fn join_thread(
1104 &mut self,
1105 joined_thread_id: ThreadId,
1106 success_retval: Scalar,
1107 return_dest: &MPlaceTy<'tcx>,
1108 ) -> InterpResult<'tcx> {
1109 let this = self.eval_context_mut();
1110 let thread_mgr = &mut this.machine.threads;
1111 if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1112 throw_ub_format!("trying to join a detached thread");
1114 }
1115
1116 fn after_join<'tcx>(
1117 this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1118 joined_thread_id: ThreadId,
1119 success_retval: Scalar,
1120 return_dest: &MPlaceTy<'tcx>,
1121 ) -> InterpResult<'tcx> {
1122 let threads = &this.machine.threads;
1123 match &mut this.machine.data_race {
1124 GlobalDataRaceHandler::None => {}
1125 GlobalDataRaceHandler::Vclocks(data_race) =>
1126 data_race.thread_joined(threads, joined_thread_id),
1127 GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1128 genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1129 }
1130 this.write_scalar(success_retval, return_dest)?;
1131 interp_ok(())
1132 }
1133
1134 thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1137 if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1138 trace!(
1139 "{:?} blocked on {:?} when trying to join",
1140 thread_mgr.active_thread, joined_thread_id
1141 );
1142 if let Some(genmc_ctx) = this.machine.data_race.as_genmc_ref() {
1143 genmc_ctx.handle_thread_join(thread_mgr.active_thread, joined_thread_id)?;
1144 }
1145
1146 let dest = return_dest.clone();
1149 thread_mgr.block_thread(
1150 BlockReason::Join(joined_thread_id),
1151 None,
1152 callback!(
1153 @capture<'tcx> {
1154 joined_thread_id: ThreadId,
1155 dest: MPlaceTy<'tcx>,
1156 success_retval: Scalar,
1157 }
1158 |this, unblock: UnblockKind| {
1159 assert_eq!(unblock, UnblockKind::Ready);
1160 after_join(this, joined_thread_id, success_retval, &dest)
1161 }
1162 ),
1163 );
1164 } else {
1165 after_join(this, joined_thread_id, success_retval, return_dest)?;
1167 }
1168 interp_ok(())
1169 }
1170
1171 fn join_thread_exclusive(
1176 &mut self,
1177 joined_thread_id: ThreadId,
1178 success_retval: Scalar,
1179 return_dest: &MPlaceTy<'tcx>,
1180 ) -> InterpResult<'tcx> {
1181 let this = self.eval_context_mut();
1182 let threads = &this.machine.threads.threads;
1183 if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1184 throw_ub_format!("trying to join an already joined thread");
1185 }
1186
1187 if joined_thread_id == this.machine.threads.active_thread {
1188 throw_ub_format!("trying to join itself");
1189 }
1190
1191 assert!(
1193 threads.iter().all(|thread| {
1194 !thread.state.is_blocked_on(&BlockReason::Join(joined_thread_id))
1195 }),
1196 "this thread already has threads waiting for its termination"
1197 );
1198
1199 this.join_thread(joined_thread_id, success_retval, return_dest)
1200 }
1201
1202 #[inline]
1203 fn active_thread(&self) -> ThreadId {
1204 let this = self.eval_context_ref();
1205 this.machine.threads.active_thread()
1206 }
1207
1208 #[inline]
1209 fn active_thread_mut(&mut self) -> &mut Thread<'tcx> {
1210 let this = self.eval_context_mut();
1211 this.machine.threads.active_thread_mut()
1212 }
1213
1214 #[inline]
1215 fn active_thread_ref(&self) -> &Thread<'tcx> {
1216 let this = self.eval_context_ref();
1217 this.machine.threads.active_thread_ref()
1218 }
1219
1220 #[inline]
1221 fn get_total_thread_count(&self) -> usize {
1222 let this = self.eval_context_ref();
1223 this.machine.threads.get_total_thread_count()
1224 }
1225
1226 #[inline]
1227 fn have_all_terminated(&self) -> bool {
1228 let this = self.eval_context_ref();
1229 this.machine.threads.have_all_terminated()
1230 }
1231
1232 #[inline]
1233 fn enable_thread(&mut self, thread_id: ThreadId) {
1234 let this = self.eval_context_mut();
1235 this.machine.threads.enable_thread(thread_id);
1236 }
1237
1238 #[inline]
1239 fn active_thread_stack<'a>(&'a self) -> &'a [Frame<'tcx, Provenance, FrameExtra<'tcx>>] {
1240 let this = self.eval_context_ref();
1241 this.machine.threads.active_thread_stack()
1242 }
1243
1244 #[inline]
1245 fn active_thread_stack_mut<'a>(
1246 &'a mut self,
1247 ) -> &'a mut Vec<Frame<'tcx, Provenance, FrameExtra<'tcx>>> {
1248 let this = self.eval_context_mut();
1249 this.machine.threads.active_thread_stack_mut()
1250 }
1251
1252 #[inline]
1254 fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
1255 self.eval_context_mut().machine.threads.set_thread_name(thread, new_thread_name);
1256 }
1257
1258 #[inline]
1259 fn get_thread_name<'c>(&'c self, thread: ThreadId) -> Option<&'c [u8]>
1260 where
1261 'tcx: 'c,
1262 {
1263 self.eval_context_ref().machine.threads.get_thread_name(thread)
1264 }
1265
1266 #[inline]
1267 fn yield_active_thread(&mut self) {
1268 self.eval_context_mut().machine.threads.yield_active_thread();
1269 }
1270
1271 #[inline]
1272 fn maybe_preempt_active_thread(&mut self) {
1273 let this = self.eval_context_mut();
1274 if !this.machine.threads.fixed_scheduling
1275 && this.machine.rng.get_mut().random_bool(this.machine.preemption_rate)
1276 {
1277 this.yield_active_thread();
1278 }
1279 }
1280
1281 fn run_threads(&mut self) -> InterpResult<'tcx, !> {
1284 let this = self.eval_context_mut();
1285 loop {
1286 if CTRL_C_RECEIVED.load(Relaxed) {
1287 this.machine.handle_abnormal_termination();
1288 throw_machine_stop!(TerminationInfo::Interrupted);
1289 }
1290 match this.schedule()? {
1291 SchedulingAction::ExecuteStep => {
1292 if !this.step()? {
1293 match this.run_on_stack_empty()? {
1295 Poll::Pending => {} Poll::Ready(()) =>
1297 this.terminate_active_thread(TlsAllocAction::Deallocate)?,
1298 }
1299 }
1300 }
1301 SchedulingAction::SleepAndWaitForIo(duration) => {
1302 if this.machine.communicate() {
1303 this.poll_and_unblock(duration)?;
1308 } else {
1309 let duration = duration.expect(
1310 "Infinite sleep should not be triggered when isolation is enabled",
1311 );
1312 this.machine.monotonic_clock.sleep(duration);
1313 }
1314 }
1315 }
1316 }
1317 }
1318}