std\sys\process\windows/child_pipe.rs
1use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
2use crate::ops::Neg;
3use crate::os::windows::prelude::*;
4use crate::sync::atomic::Atomic;
5use crate::sync::atomic::Ordering::Relaxed;
6use crate::sys::handle::Handle;
7use crate::sys::{FromInner, IntoInner, api, c};
8use crate::{mem, ptr};
9
10pub struct ChildPipe {
11 inner: Handle,
12}
13
14impl IntoInner<Handle> for ChildPipe {
15 fn into_inner(self) -> Handle {
16 self.inner
17 }
18}
19
20impl FromInner<Handle> for ChildPipe {
21 fn from_inner(inner: Handle) -> ChildPipe {
22 Self { inner }
23 }
24}
25
26pub(super) struct Pipes {
27 pub ours: ChildPipe,
28 pub theirs: ChildPipe,
29}
30
31/// Creates an anonymous pipe suitable for communication with a child process.
32///
33/// Windows unfortunately does not have a way of performing asynchronous operations
34/// on a handle originally created for synchronous operation. As `read_output` can
35/// only be correctly implemented with asynchronous reads but the pipe created by
36/// `CreatePipe` is synchronous, we cannot use it (and thus [`io::pipe`]) to create
37/// a pipe for communicating with a child. Instead, this function uses the NT API
38/// to create a pipe where one pipe handle (`ours`) is asynchronous and one is
39/// synchronous and can be inherited by a child for use as a console handle
40/// (`theirs`).
41///
42/// The ours/theirs pipes are *not* specifically readable or writable. Each
43/// one only supports a read or a write, but which is which depends on the
44/// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and
45/// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours`
46/// is writable and `theirs` is readable.
47///
48/// Also note that the `ours` pipe is always a handle opened up in overlapped
49/// mode. This means that technically speaking it should only ever be used
50/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
51/// once at a time (which we do indeed guarantee).
52// FIXME(joboet): No, we don't guarantee that? E.g. `&Stdout` is both `Read`
53// and `Sync`, so there could be multiple operations at the same
54// time. All the functions below that forward to the inner handle
55// methods could abort if used concurrently.
56pub(super) fn child_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
57 // A 64kb pipe capacity is the same as a typical Linux default.
58 const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
59
60 // Note that we specifically do *not* use `CreatePipe` here because
61 // unfortunately the anonymous pipes returned do not support overlapped
62 // operations. Instead, we use `NtCreateNamedPipeFile` to create the
63 // anonymous pipe with overlapped support.
64 //
65 // Once we do this, we connect to it via `NtOpenFile`, and then
66 // we return those reader/writer halves. Note that the `ours` pipe return
67 // value is always the named pipe, whereas `theirs` is just the normal file.
68 // This should hopefully shield us from child processes which assume their
69 // stdout is a named pipe, which would indeed be odd!
70 unsafe {
71 let mut io_status = c::IO_STATUS_BLOCK::default();
72 let mut object_attributes = c::OBJECT_ATTRIBUTES::default();
73 object_attributes.Length = size_of::<c::OBJECT_ATTRIBUTES>() as u32;
74
75 // Open a handle to the pipe filesystem (`\Device\NamedPipe\`).
76 // This will be used when creating a new anonymous pipe.
77 //
78 // We cache the handle once so we can reuse it without needing to reopen it each time.
79 // NOTE: this means the handle may appear to be leaked but that's fine because
80 // it's only one handle and the OS will clean it up when the process exits.
81 static PIPE_FS: Atomic<c::HANDLE> = Atomic::<c::HANDLE>::new(ptr::null_mut());
82 let pipe_fs = if let handle = PIPE_FS.load(Relaxed)
83 && !handle.is_null()
84 {
85 handle
86 } else {
87 let path = api::unicode_str!(r"\Device\NamedPipe\");
88 object_attributes.ObjectName = path.as_ptr();
89 let mut pipe_fs = ptr::null_mut();
90 let status = c::NtOpenFile(
91 &mut pipe_fs,
92 c::SYNCHRONIZE | c::GENERIC_READ,
93 &object_attributes,
94 &mut io_status,
95 c::FILE_SHARE_READ | c::FILE_SHARE_WRITE,
96 c::FILE_SYNCHRONOUS_IO_NONALERT, // synchronous access
97 );
98 if c::nt_success(status) {
99 match PIPE_FS.compare_exchange(ptr::null_mut(), pipe_fs, Relaxed, Relaxed) {
100 Ok(_) => pipe_fs,
101 Err(existing) => {
102 c::CloseHandle(pipe_fs);
103 existing
104 }
105 }
106 } else {
107 return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
108 }
109 };
110
111 // From now on we're using handles instead of paths to create and open pipes.
112 // So set the `ObjectName` to a zero length string.
113 // As a (perhaps overzealous) mitigation for #143078, we use the null pointer
114 // for empty.Buffer instead of unicode_str!("").
115 // There's no difference to the OS itself but it's possible that third party
116 // DLLs which hook in to processes could be relying on the exact form of this string.
117 let empty = c::UNICODE_STRING::default();
118 object_attributes.ObjectName = &raw const empty;
119
120 // Create our side of the pipe for async access.
121 let ours = {
122 // Use the pipe filesystem as the root directory.
123 // With no name provided, an anonymous pipe will be created.
124 object_attributes.RootDirectory = pipe_fs;
125
126 // A negative timeout value is a relative time (rather than an absolute time).
127 // The time is given in 100's of nanoseconds so this is 50 milliseconds.
128 // This value was chosen to be consistent with the default timeout set by `CreateNamedPipeW`
129 // See: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createnamedpipew
130 let timeout = (50_i64 * 10000).neg() as u64;
131
132 let mut ours = ptr::null_mut();
133 let status = c::NtCreateNamedPipeFile(
134 &mut ours,
135 c::SYNCHRONIZE | if ours_readable { c::GENERIC_READ } else { c::GENERIC_WRITE },
136 &object_attributes,
137 &mut io_status,
138 if ours_readable { c::FILE_SHARE_WRITE } else { c::FILE_SHARE_READ },
139 c::FILE_CREATE,
140 0,
141 c::FILE_PIPE_BYTE_STREAM_TYPE,
142 c::FILE_PIPE_BYTE_STREAM_MODE,
143 c::FILE_PIPE_QUEUE_OPERATION,
144 // only allow one client pipe
145 1,
146 PIPE_BUFFER_CAPACITY,
147 PIPE_BUFFER_CAPACITY,
148 &timeout,
149 );
150 if c::nt_success(status) {
151 Handle::from_raw_handle(ours)
152 } else {
153 return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
154 }
155 };
156
157 // Open their side of the pipe for synchronous access.
158 let theirs = {
159 // We can reopen the anonymous pipe without a name by setting
160 // RootDirectory to the pipe handle and not setting a path name,
161 object_attributes.RootDirectory = ours.as_raw_handle();
162
163 if their_handle_inheritable {
164 object_attributes.Attributes |= c::OBJ_INHERIT;
165 }
166 let mut theirs = ptr::null_mut();
167 let status = c::NtOpenFile(
168 &mut theirs,
169 c::SYNCHRONIZE
170 | if ours_readable {
171 c::GENERIC_WRITE | c::FILE_READ_ATTRIBUTES
172 } else {
173 c::GENERIC_READ
174 },
175 &object_attributes,
176 &mut io_status,
177 0,
178 c::FILE_NON_DIRECTORY_FILE | c::FILE_SYNCHRONOUS_IO_NONALERT,
179 );
180 if c::nt_success(status) {
181 Handle::from_raw_handle(theirs)
182 } else {
183 return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
184 }
185 };
186
187 Ok(Pipes { ours: ChildPipe { inner: ours }, theirs: ChildPipe { inner: theirs } })
188 }
189}
190
191/// Takes an asynchronous source pipe and returns a synchronous pipe suitable
192/// for sending to a child process.
193///
194/// This is achieved by creating a new set of pipes and spawning a thread that
195/// relays messages between the source and the synchronous pipe.
196pub(super) fn spawn_pipe_relay(
197 source: &ChildPipe,
198 ours_readable: bool,
199 their_handle_inheritable: bool,
200) -> io::Result<ChildPipe> {
201 // We need this handle to live for the lifetime of the thread spawned below.
202 let source = source.try_clone()?;
203
204 // create a new pair of anon pipes.
205 let Pipes { theirs, ours } = child_pipe(ours_readable, their_handle_inheritable)?;
206
207 // Spawn a thread that passes messages from one pipe to the other.
208 // Any errors will simply cause the thread to exit.
209 let (reader, writer) = if ours_readable { (ours, source) } else { (source, ours) };
210 crate::thread::spawn(move || {
211 let mut buf = [0_u8; 4096];
212 'reader: while let Ok(len) = reader.read(&mut buf) {
213 if len == 0 {
214 break;
215 }
216 let mut start = 0;
217 while let Ok(written) = writer.write(&buf[start..len]) {
218 start += written;
219 if start == len {
220 continue 'reader;
221 }
222 }
223 break;
224 }
225 });
226
227 // Return the pipe that should be sent to the child process.
228 Ok(theirs)
229}
230
231impl ChildPipe {
232 pub fn handle(&self) -> &Handle {
233 &self.inner
234 }
235 pub fn into_handle(self) -> Handle {
236 self.inner
237 }
238
239 pub fn try_clone(&self) -> io::Result<Self> {
240 self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| ChildPipe { inner })
241 }
242
243 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
244 let result = unsafe {
245 let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
246 let ptr = buf.as_mut_ptr();
247 self.alertable_io_internal(|overlapped, callback| {
248 c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
249 })
250 };
251
252 match result {
253 // The special treatment of BrokenPipe is to deal with Windows
254 // pipe semantics, which yields this error when *reading* from
255 // a pipe after the other end has closed; we interpret that as
256 // EOF on the pipe.
257 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
258 _ => result,
259 }
260 }
261
262 pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
263 let result = unsafe {
264 let len = crate::cmp::min(buf.capacity(), u32::MAX as usize) as u32;
265 let ptr = buf.as_mut().as_mut_ptr().cast::<u8>();
266 self.alertable_io_internal(|overlapped, callback| {
267 c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
268 })
269 };
270
271 match result {
272 // The special treatment of BrokenPipe is to deal with Windows
273 // pipe semantics, which yields this error when *reading* from
274 // a pipe after the other end has closed; we interpret that as
275 // EOF on the pipe.
276 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
277 Err(e) => Err(e),
278 Ok(n) => {
279 unsafe {
280 buf.advance(n);
281 }
282 Ok(())
283 }
284 }
285 }
286
287 pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
288 self.inner.read_vectored(bufs)
289 }
290
291 #[inline]
292 pub fn is_read_vectored(&self) -> bool {
293 self.inner.is_read_vectored()
294 }
295
296 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
297 self.handle().read_to_end(buf)
298 }
299
300 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
301 unsafe {
302 let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
303 self.alertable_io_internal(|overlapped, callback| {
304 c::WriteFileEx(self.inner.as_raw_handle(), buf.as_ptr(), len, overlapped, callback)
305 })
306 }
307 }
308
309 pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
310 self.inner.write_vectored(bufs)
311 }
312
313 #[inline]
314 pub fn is_write_vectored(&self) -> bool {
315 self.inner.is_write_vectored()
316 }
317
318 /// Synchronizes asynchronous reads or writes using our anonymous pipe.
319 ///
320 /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
321 /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
322 ///
323 /// Note: This should not be used for handles we don't create.
324 ///
325 /// # Safety
326 ///
327 /// `buf` must be a pointer to a buffer that's valid for reads or writes
328 /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
329 ///
330 /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
331 /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
332 /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
333 unsafe fn alertable_io_internal(
334 &self,
335 io: impl FnOnce(&mut c::OVERLAPPED, c::LPOVERLAPPED_COMPLETION_ROUTINE) -> c::BOOL,
336 ) -> io::Result<usize> {
337 // Use "alertable I/O" to synchronize the pipe I/O.
338 // This has four steps.
339 //
340 // STEP 1: Start the asynchronous I/O operation.
341 // This simply calls either `ReadFileEx` or `WriteFileEx`,
342 // giving it a pointer to the buffer and callback function.
343 //
344 // STEP 2: Enter an alertable state.
345 // The callback set in step 1 will not be called until the thread
346 // enters an "alertable" state. This can be done using `SleepEx`.
347 //
348 // STEP 3: The callback
349 // Once the I/O is complete and the thread is in an alertable state,
350 // the callback will be run on the same thread as the call to
351 // `ReadFileEx` or `WriteFileEx` done in step 1.
352 // In the callback we simply set the result of the async operation.
353 //
354 // STEP 4: Return the result.
355 // At this point we'll have a result from the callback function
356 // and can simply return it. Note that we must not return earlier,
357 // while the I/O is still in progress.
358
359 // The result that will be set from the asynchronous callback.
360 let mut async_result: Option<AsyncResult> = None;
361 struct AsyncResult {
362 error: u32,
363 transferred: u32,
364 }
365
366 // STEP 3: The callback.
367 unsafe extern "system" fn callback(
368 error: u32,
369 transferred: u32,
370 overlapped: *mut c::OVERLAPPED,
371 ) {
372 // Set `async_result` using a pointer smuggled through `hEvent`.
373 // SAFETY:
374 // At this point, the OVERLAPPED struct will have been written to by the OS,
375 // except for our `hEvent` field which we set to a valid AsyncResult pointer (see below)
376 unsafe {
377 let result = AsyncResult { error, transferred };
378 *(*overlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
379 }
380 }
381
382 // STEP 1: Start the I/O operation.
383 let mut overlapped: c::OVERLAPPED = unsafe { crate::mem::zeroed() };
384 // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
385 // Therefore the documentation suggests using it to smuggle a pointer to the callback.
386 overlapped.hEvent = (&raw mut async_result) as *mut _;
387
388 // Asynchronous read of the pipe.
389 // If successful, `callback` will be called once it completes.
390 let result = io(&mut overlapped, Some(callback));
391 if result == c::FALSE {
392 // We can return here because the call failed.
393 // After this we must not return until the I/O completes.
394 return Err(io::Error::last_os_error());
395 }
396
397 // Wait indefinitely for the result.
398 let result = loop {
399 // STEP 2: Enter an alertable state.
400 // The second parameter of `SleepEx` is used to make this sleep alertable.
401 unsafe { c::SleepEx(c::INFINITE, c::TRUE) };
402 if let Some(result) = async_result {
403 break result;
404 }
405 };
406 // STEP 4: Return the result.
407 // `async_result` is always `Some` at this point
408 match result.error {
409 c::ERROR_SUCCESS => Ok(result.transferred as usize),
410 error => Err(io::Error::from_raw_os_error(error as _)),
411 }
412 }
413}
414
415pub fn read_output(
416 p1: ChildPipe,
417 v1: &mut Vec<u8>,
418 p2: ChildPipe,
419 v2: &mut Vec<u8>,
420) -> io::Result<()> {
421 let p1 = p1.into_handle();
422 let p2 = p2.into_handle();
423
424 let mut p1 = AsyncPipe::new(p1, v1)?;
425 let mut p2 = AsyncPipe::new(p2, v2)?;
426 let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()];
427
428 // In a loop we wait for either pipe's scheduled read operation to complete.
429 // If the operation completes with 0 bytes, that means EOF was reached, in
430 // which case we just finish out the other pipe entirely.
431 //
432 // Note that overlapped I/O is in general super unsafe because we have to
433 // be careful to ensure that all pointers in play are valid for the entire
434 // duration of the I/O operation (where tons of operations can also fail).
435 // The destructor for `AsyncPipe` ends up taking care of most of this.
436 loop {
437 let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
438 if res == c::WAIT_OBJECT_0 {
439 if !p1.result()? || !p1.schedule_read()? {
440 return p2.finish();
441 }
442 } else if res == c::WAIT_OBJECT_0 + 1 {
443 if !p2.result()? || !p2.schedule_read()? {
444 return p1.finish();
445 }
446 } else {
447 return Err(io::Error::last_os_error());
448 }
449 }
450}
451
452struct AsyncPipe<'a> {
453 pipe: Handle,
454 event: Handle,
455 overlapped: Box<c::OVERLAPPED>, // needs a stable address
456 dst: &'a mut Vec<u8>,
457 state: State,
458}
459
460#[derive(PartialEq, Debug)]
461enum State {
462 NotReading,
463 Reading,
464 Read(usize),
465}
466
467impl<'a> AsyncPipe<'a> {
468 fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
469 // Create an event which we'll use to coordinate our overlapped
470 // operations, this event will be used in WaitForMultipleObjects
471 // and passed as part of the OVERLAPPED handle.
472 //
473 // Note that we do a somewhat clever thing here by flagging the
474 // event as being manually reset and setting it initially to the
475 // signaled state. This means that we'll naturally fall through the
476 // WaitForMultipleObjects call above for pipes created initially,
477 // and the only time an even will go back to "unset" will be once an
478 // I/O operation is successfully scheduled (what we want).
479 let event = Handle::new_event(true, true)?;
480 let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
481 overlapped.hEvent = event.as_raw_handle();
482 Ok(AsyncPipe { pipe, overlapped, event, dst, state: State::NotReading })
483 }
484
485 /// Executes an overlapped read operation.
486 ///
487 /// Must not currently be reading, and returns whether the pipe is currently
488 /// at EOF or not. If the pipe is not at EOF then `result()` must be called
489 /// to complete the read later on (may block), but if the pipe is at EOF
490 /// then `result()` should not be called as it will just block forever.
491 fn schedule_read(&mut self) -> io::Result<bool> {
492 assert_eq!(self.state, State::NotReading);
493 let amt = unsafe {
494 if self.dst.capacity() == self.dst.len() {
495 let additional = if self.dst.capacity() == 0 { 16 } else { 1 };
496 self.dst.reserve(additional);
497 }
498 self.pipe.read_overlapped(self.dst.spare_capacity_mut(), &mut *self.overlapped)?
499 };
500
501 // If this read finished immediately then our overlapped event will
502 // remain signaled (it was signaled coming in here) and we'll progress
503 // down to the method below.
504 //
505 // Otherwise the I/O operation is scheduled and the system set our event
506 // to not signaled, so we flag ourselves into the reading state and move
507 // on.
508 self.state = match amt {
509 Some(0) => return Ok(false),
510 Some(amt) => State::Read(amt),
511 None => State::Reading,
512 };
513 Ok(true)
514 }
515
516 /// Wait for the result of the overlapped operation previously executed.
517 ///
518 /// Takes a parameter `wait` which indicates if this pipe is currently being
519 /// read whether the function should block waiting for the read to complete.
520 ///
521 /// Returns values:
522 ///
523 /// * `true` - finished any pending read and the pipe is not at EOF (keep
524 /// going)
525 /// * `false` - finished any pending read and pipe is at EOF (stop issuing
526 /// reads)
527 fn result(&mut self) -> io::Result<bool> {
528 let amt = match self.state {
529 State::NotReading => return Ok(true),
530 State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
531 State::Read(amt) => amt,
532 };
533 self.state = State::NotReading;
534 unsafe {
535 let len = self.dst.len();
536 self.dst.set_len(len + amt);
537 }
538 Ok(amt != 0)
539 }
540
541 /// Finishes out reading this pipe entirely.
542 ///
543 /// Waits for any pending and schedule read, and then calls `read_to_end`
544 /// if necessary to read all the remaining information.
545 fn finish(&mut self) -> io::Result<()> {
546 while self.result()? && self.schedule_read()? {
547 // ...
548 }
549 Ok(())
550 }
551}
552
553impl<'a> Drop for AsyncPipe<'a> {
554 fn drop(&mut self) {
555 let State::Reading = self.state else { return };
556
557 // If we have a pending read operation, then we have to make sure that
558 // it's *done* before we actually drop this type. The kernel requires
559 // that the `OVERLAPPED` and buffer pointers are valid for the entire
560 // I/O operation.
561 //
562 // To do that, we call `CancelIo` to cancel any pending operation, and
563 // if that succeeds we wait for the overlapped result.
564 //
565 // If anything here fails, there's not really much we can do, so we leak
566 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
567 if self.pipe.cancel_io().is_err() || self.result().is_err() {
568 let buf = mem::take(self.dst);
569 let overlapped = Box::new(unsafe { mem::zeroed() });
570 let overlapped = mem::replace(&mut self.overlapped, overlapped);
571 mem::forget((buf, overlapped));
572 }
573 }
574}