Skip to main content

std\sys\process\windows/
child_pipe.rs

1use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
2use crate::ops::Neg;
3use crate::os::windows::prelude::*;
4use crate::sync::atomic::Atomic;
5use crate::sync::atomic::Ordering::Relaxed;
6use crate::sys::handle::Handle;
7use crate::sys::{FromInner, IntoInner, api, c};
8use crate::{mem, ptr};
9
10pub struct ChildPipe {
11    inner: Handle,
12}
13
14impl IntoInner<Handle> for ChildPipe {
15    fn into_inner(self) -> Handle {
16        self.inner
17    }
18}
19
20impl FromInner<Handle> for ChildPipe {
21    fn from_inner(inner: Handle) -> ChildPipe {
22        Self { inner }
23    }
24}
25
26pub(super) struct Pipes {
27    pub ours: ChildPipe,
28    pub theirs: ChildPipe,
29}
30
31/// Creates an anonymous pipe suitable for communication with a child process.
32///
33/// Windows unfortunately does not have a way of performing asynchronous operations
34/// on a handle originally created for synchronous operation. As `read_output` can
35/// only be correctly implemented with asynchronous reads but the pipe created by
36/// `CreatePipe` is synchronous, we cannot use it (and thus [`io::pipe`]) to create
37/// a pipe for communicating with a child. Instead, this function uses the NT API
38/// to create a pipe where one pipe handle (`ours`) is asynchronous and one is
39/// synchronous and can be inherited by a child for use as a console handle
40/// (`theirs`).
41///
42/// The ours/theirs pipes are *not* specifically readable or writable. Each
43/// one only supports a read or a write, but which is which depends on the
44/// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and
45/// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours`
46/// is writable and `theirs` is readable.
47///
48/// Also note that the `ours` pipe is always a handle opened up in overlapped
49/// mode. This means that technically speaking it should only ever be used
50/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
51/// once at a time (which we do indeed guarantee).
52// FIXME(joboet): No, we don't guarantee that? E.g. `&Stdout` is both `Read`
53//                and `Sync`, so there could be multiple operations at the same
54//                time. All the functions below that forward to the inner handle
55//                methods could abort if used concurrently.
56pub(super) fn child_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
57    // A 64kb pipe capacity is the same as a typical Linux default.
58    const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
59
60    // Note that we specifically do *not* use `CreatePipe` here because
61    // unfortunately the anonymous pipes returned do not support overlapped
62    // operations. Instead, we use `NtCreateNamedPipeFile` to create the
63    // anonymous pipe with overlapped support.
64    //
65    // Once we do this, we connect to it via `NtOpenFile`, and then
66    // we return those reader/writer halves. Note that the `ours` pipe return
67    // value is always the named pipe, whereas `theirs` is just the normal file.
68    // This should hopefully shield us from child processes which assume their
69    // stdout is a named pipe, which would indeed be odd!
70    unsafe {
71        let mut io_status = c::IO_STATUS_BLOCK::default();
72        let mut object_attributes = c::OBJECT_ATTRIBUTES::default();
73        object_attributes.Length = size_of::<c::OBJECT_ATTRIBUTES>() as u32;
74
75        // Open a handle to the pipe filesystem (`\Device\NamedPipe\`).
76        // This will be used when creating a new anonymous pipe.
77        //
78        // We cache the handle once so we can reuse it without needing to reopen it each time.
79        // NOTE: this means the handle may appear to be leaked but that's fine because
80        // it's only one handle and the OS will clean it up when the process exits.
81        static PIPE_FS: Atomic<c::HANDLE> = Atomic::<c::HANDLE>::new(ptr::null_mut());
82        let pipe_fs = if let handle = PIPE_FS.load(Relaxed)
83            && !handle.is_null()
84        {
85            handle
86        } else {
87            let path = api::unicode_str!(r"\Device\NamedPipe\");
88            object_attributes.ObjectName = path.as_ptr();
89            let mut pipe_fs = ptr::null_mut();
90            let status = c::NtOpenFile(
91                &mut pipe_fs,
92                c::SYNCHRONIZE | c::GENERIC_READ,
93                &object_attributes,
94                &mut io_status,
95                c::FILE_SHARE_READ | c::FILE_SHARE_WRITE,
96                c::FILE_SYNCHRONOUS_IO_NONALERT, // synchronous access
97            );
98            if c::nt_success(status) {
99                match PIPE_FS.compare_exchange(ptr::null_mut(), pipe_fs, Relaxed, Relaxed) {
100                    Ok(_) => pipe_fs,
101                    Err(existing) => {
102                        c::CloseHandle(pipe_fs);
103                        existing
104                    }
105                }
106            } else {
107                return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
108            }
109        };
110
111        // From now on we're using handles instead of paths to create and open pipes.
112        // So set the `ObjectName` to a zero length string.
113        // As a (perhaps overzealous) mitigation for #143078, we use the null pointer
114        // for empty.Buffer instead of unicode_str!("").
115        // There's no difference to the OS itself but it's possible that third party
116        // DLLs which hook in to processes could be relying on the exact form of this string.
117        let empty = c::UNICODE_STRING::default();
118        object_attributes.ObjectName = &raw const empty;
119
120        // Create our side of the pipe for async access.
121        let ours = {
122            // Use the pipe filesystem as the root directory.
123            // With no name provided, an anonymous pipe will be created.
124            object_attributes.RootDirectory = pipe_fs;
125
126            // A negative timeout value is a relative time (rather than an absolute time).
127            // The time is given in 100's of nanoseconds so this is 50 milliseconds.
128            // This value was chosen to be consistent with the default timeout set by `CreateNamedPipeW`
129            // See: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createnamedpipew
130            let timeout = (50_i64 * 10000).neg() as u64;
131
132            let mut ours = ptr::null_mut();
133            let status = c::NtCreateNamedPipeFile(
134                &mut ours,
135                c::SYNCHRONIZE | if ours_readable { c::GENERIC_READ } else { c::GENERIC_WRITE },
136                &object_attributes,
137                &mut io_status,
138                if ours_readable { c::FILE_SHARE_WRITE } else { c::FILE_SHARE_READ },
139                c::FILE_CREATE,
140                0,
141                c::FILE_PIPE_BYTE_STREAM_TYPE,
142                c::FILE_PIPE_BYTE_STREAM_MODE,
143                c::FILE_PIPE_QUEUE_OPERATION,
144                // only allow one client pipe
145                1,
146                PIPE_BUFFER_CAPACITY,
147                PIPE_BUFFER_CAPACITY,
148                &timeout,
149            );
150            if c::nt_success(status) {
151                Handle::from_raw_handle(ours)
152            } else {
153                return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
154            }
155        };
156
157        // Open their side of the pipe for synchronous access.
158        let theirs = {
159            // We can reopen the anonymous pipe without a name by setting
160            // RootDirectory to the pipe handle and not setting a path name,
161            object_attributes.RootDirectory = ours.as_raw_handle();
162
163            if their_handle_inheritable {
164                object_attributes.Attributes |= c::OBJ_INHERIT;
165            }
166            let mut theirs = ptr::null_mut();
167            let status = c::NtOpenFile(
168                &mut theirs,
169                c::SYNCHRONIZE
170                    | if ours_readable {
171                        c::GENERIC_WRITE | c::FILE_READ_ATTRIBUTES
172                    } else {
173                        c::GENERIC_READ
174                    },
175                &object_attributes,
176                &mut io_status,
177                0,
178                c::FILE_NON_DIRECTORY_FILE | c::FILE_SYNCHRONOUS_IO_NONALERT,
179            );
180            if c::nt_success(status) {
181                Handle::from_raw_handle(theirs)
182            } else {
183                return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
184            }
185        };
186
187        Ok(Pipes { ours: ChildPipe { inner: ours }, theirs: ChildPipe { inner: theirs } })
188    }
189}
190
191/// Takes an asynchronous source pipe and returns a synchronous pipe suitable
192/// for sending to a child process.
193///
194/// This is achieved by creating a new set of pipes and spawning a thread that
195/// relays messages between the source and the synchronous pipe.
196pub(super) fn spawn_pipe_relay(
197    source: &ChildPipe,
198    ours_readable: bool,
199    their_handle_inheritable: bool,
200) -> io::Result<ChildPipe> {
201    // We need this handle to live for the lifetime of the thread spawned below.
202    let source = source.try_clone()?;
203
204    // create a new pair of anon pipes.
205    let Pipes { theirs, ours } = child_pipe(ours_readable, their_handle_inheritable)?;
206
207    // Spawn a thread that passes messages from one pipe to the other.
208    // Any errors will simply cause the thread to exit.
209    let (reader, writer) = if ours_readable { (ours, source) } else { (source, ours) };
210    crate::thread::spawn(move || {
211        let mut buf = [0_u8; 4096];
212        'reader: while let Ok(len) = reader.read(&mut buf) {
213            if len == 0 {
214                break;
215            }
216            let mut start = 0;
217            while let Ok(written) = writer.write(&buf[start..len]) {
218                start += written;
219                if start == len {
220                    continue 'reader;
221                }
222            }
223            break;
224        }
225    });
226
227    // Return the pipe that should be sent to the child process.
228    Ok(theirs)
229}
230
231impl ChildPipe {
232    pub fn handle(&self) -> &Handle {
233        &self.inner
234    }
235    pub fn into_handle(self) -> Handle {
236        self.inner
237    }
238
239    pub fn try_clone(&self) -> io::Result<Self> {
240        self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| ChildPipe { inner })
241    }
242
243    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
244        let result = unsafe {
245            let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
246            let ptr = buf.as_mut_ptr();
247            self.alertable_io_internal(|overlapped, callback| {
248                c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
249            })
250        };
251
252        match result {
253            // The special treatment of BrokenPipe is to deal with Windows
254            // pipe semantics, which yields this error when *reading* from
255            // a pipe after the other end has closed; we interpret that as
256            // EOF on the pipe.
257            Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
258            _ => result,
259        }
260    }
261
262    pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
263        let result = unsafe {
264            let len = crate::cmp::min(buf.capacity(), u32::MAX as usize) as u32;
265            let ptr = buf.as_mut().as_mut_ptr().cast::<u8>();
266            self.alertable_io_internal(|overlapped, callback| {
267                c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
268            })
269        };
270
271        match result {
272            // The special treatment of BrokenPipe is to deal with Windows
273            // pipe semantics, which yields this error when *reading* from
274            // a pipe after the other end has closed; we interpret that as
275            // EOF on the pipe.
276            Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
277            Err(e) => Err(e),
278            Ok(n) => {
279                unsafe {
280                    buf.advance(n);
281                }
282                Ok(())
283            }
284        }
285    }
286
287    pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
288        self.inner.read_vectored(bufs)
289    }
290
291    #[inline]
292    pub fn is_read_vectored(&self) -> bool {
293        self.inner.is_read_vectored()
294    }
295
296    pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
297        self.handle().read_to_end(buf)
298    }
299
300    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
301        unsafe {
302            let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
303            self.alertable_io_internal(|overlapped, callback| {
304                c::WriteFileEx(self.inner.as_raw_handle(), buf.as_ptr(), len, overlapped, callback)
305            })
306        }
307    }
308
309    pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
310        self.inner.write_vectored(bufs)
311    }
312
313    #[inline]
314    pub fn is_write_vectored(&self) -> bool {
315        self.inner.is_write_vectored()
316    }
317
318    /// Synchronizes asynchronous reads or writes using our anonymous pipe.
319    ///
320    /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
321    /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
322    ///
323    /// Note: This should not be used for handles we don't create.
324    ///
325    /// # Safety
326    ///
327    /// `buf` must be a pointer to a buffer that's valid for reads or writes
328    /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
329    ///
330    /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
331    /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
332    /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
333    unsafe fn alertable_io_internal(
334        &self,
335        io: impl FnOnce(&mut c::OVERLAPPED, c::LPOVERLAPPED_COMPLETION_ROUTINE) -> c::BOOL,
336    ) -> io::Result<usize> {
337        // Use "alertable I/O" to synchronize the pipe I/O.
338        // This has four steps.
339        //
340        // STEP 1: Start the asynchronous I/O operation.
341        //         This simply calls either `ReadFileEx` or `WriteFileEx`,
342        //         giving it a pointer to the buffer and callback function.
343        //
344        // STEP 2: Enter an alertable state.
345        //         The callback set in step 1 will not be called until the thread
346        //         enters an "alertable" state. This can be done using `SleepEx`.
347        //
348        // STEP 3: The callback
349        //         Once the I/O is complete and the thread is in an alertable state,
350        //         the callback will be run on the same thread as the call to
351        //         `ReadFileEx` or `WriteFileEx` done in step 1.
352        //         In the callback we simply set the result of the async operation.
353        //
354        // STEP 4: Return the result.
355        //         At this point we'll have a result from the callback function
356        //         and can simply return it. Note that we must not return earlier,
357        //         while the I/O is still in progress.
358
359        // The result that will be set from the asynchronous callback.
360        let mut async_result: Option<AsyncResult> = None;
361        struct AsyncResult {
362            error: u32,
363            transferred: u32,
364        }
365
366        // STEP 3: The callback.
367        unsafe extern "system" fn callback(
368            error: u32,
369            transferred: u32,
370            overlapped: *mut c::OVERLAPPED,
371        ) {
372            // Set `async_result` using a pointer smuggled through `hEvent`.
373            // SAFETY:
374            // At this point, the OVERLAPPED struct will have been written to by the OS,
375            // except for our `hEvent` field which we set to a valid AsyncResult pointer (see below)
376            unsafe {
377                let result = AsyncResult { error, transferred };
378                *(*overlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
379            }
380        }
381
382        // STEP 1: Start the I/O operation.
383        let mut overlapped: c::OVERLAPPED = unsafe { crate::mem::zeroed() };
384        // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
385        // Therefore the documentation suggests using it to smuggle a pointer to the callback.
386        overlapped.hEvent = (&raw mut async_result) as *mut _;
387
388        // Asynchronous read of the pipe.
389        // If successful, `callback` will be called once it completes.
390        let result = io(&mut overlapped, Some(callback));
391        if result == c::FALSE {
392            // We can return here because the call failed.
393            // After this we must not return until the I/O completes.
394            return Err(io::Error::last_os_error());
395        }
396
397        // Wait indefinitely for the result.
398        let result = loop {
399            // STEP 2: Enter an alertable state.
400            // The second parameter of `SleepEx` is used to make this sleep alertable.
401            unsafe { c::SleepEx(c::INFINITE, c::TRUE) };
402            if let Some(result) = async_result {
403                break result;
404            }
405        };
406        // STEP 4: Return the result.
407        // `async_result` is always `Some` at this point
408        match result.error {
409            c::ERROR_SUCCESS => Ok(result.transferred as usize),
410            error => Err(io::Error::from_raw_os_error(error as _)),
411        }
412    }
413}
414
415pub fn read_output(
416    p1: ChildPipe,
417    v1: &mut Vec<u8>,
418    p2: ChildPipe,
419    v2: &mut Vec<u8>,
420) -> io::Result<()> {
421    let p1 = p1.into_handle();
422    let p2 = p2.into_handle();
423
424    let mut p1 = AsyncPipe::new(p1, v1)?;
425    let mut p2 = AsyncPipe::new(p2, v2)?;
426    let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()];
427
428    // In a loop we wait for either pipe's scheduled read operation to complete.
429    // If the operation completes with 0 bytes, that means EOF was reached, in
430    // which case we just finish out the other pipe entirely.
431    //
432    // Note that overlapped I/O is in general super unsafe because we have to
433    // be careful to ensure that all pointers in play are valid for the entire
434    // duration of the I/O operation (where tons of operations can also fail).
435    // The destructor for `AsyncPipe` ends up taking care of most of this.
436    loop {
437        let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
438        if res == c::WAIT_OBJECT_0 {
439            if !p1.result()? || !p1.schedule_read()? {
440                return p2.finish();
441            }
442        } else if res == c::WAIT_OBJECT_0 + 1 {
443            if !p2.result()? || !p2.schedule_read()? {
444                return p1.finish();
445            }
446        } else {
447            return Err(io::Error::last_os_error());
448        }
449    }
450}
451
452struct AsyncPipe<'a> {
453    pipe: Handle,
454    event: Handle,
455    overlapped: Box<c::OVERLAPPED>, // needs a stable address
456    dst: &'a mut Vec<u8>,
457    state: State,
458}
459
460#[derive(PartialEq, Debug)]
461enum State {
462    NotReading,
463    Reading,
464    Read(usize),
465}
466
467impl<'a> AsyncPipe<'a> {
468    fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
469        // Create an event which we'll use to coordinate our overlapped
470        // operations, this event will be used in WaitForMultipleObjects
471        // and passed as part of the OVERLAPPED handle.
472        //
473        // Note that we do a somewhat clever thing here by flagging the
474        // event as being manually reset and setting it initially to the
475        // signaled state. This means that we'll naturally fall through the
476        // WaitForMultipleObjects call above for pipes created initially,
477        // and the only time an even will go back to "unset" will be once an
478        // I/O operation is successfully scheduled (what we want).
479        let event = Handle::new_event(true, true)?;
480        let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
481        overlapped.hEvent = event.as_raw_handle();
482        Ok(AsyncPipe { pipe, overlapped, event, dst, state: State::NotReading })
483    }
484
485    /// Executes an overlapped read operation.
486    ///
487    /// Must not currently be reading, and returns whether the pipe is currently
488    /// at EOF or not. If the pipe is not at EOF then `result()` must be called
489    /// to complete the read later on (may block), but if the pipe is at EOF
490    /// then `result()` should not be called as it will just block forever.
491    fn schedule_read(&mut self) -> io::Result<bool> {
492        assert_eq!(self.state, State::NotReading);
493        let amt = unsafe {
494            if self.dst.capacity() == self.dst.len() {
495                let additional = if self.dst.capacity() == 0 { 16 } else { 1 };
496                self.dst.reserve(additional);
497            }
498            self.pipe.read_overlapped(self.dst.spare_capacity_mut(), &mut *self.overlapped)?
499        };
500
501        // If this read finished immediately then our overlapped event will
502        // remain signaled (it was signaled coming in here) and we'll progress
503        // down to the method below.
504        //
505        // Otherwise the I/O operation is scheduled and the system set our event
506        // to not signaled, so we flag ourselves into the reading state and move
507        // on.
508        self.state = match amt {
509            Some(0) => return Ok(false),
510            Some(amt) => State::Read(amt),
511            None => State::Reading,
512        };
513        Ok(true)
514    }
515
516    /// Wait for the result of the overlapped operation previously executed.
517    ///
518    /// Takes a parameter `wait` which indicates if this pipe is currently being
519    /// read whether the function should block waiting for the read to complete.
520    ///
521    /// Returns values:
522    ///
523    /// * `true` - finished any pending read and the pipe is not at EOF (keep
524    ///            going)
525    /// * `false` - finished any pending read and pipe is at EOF (stop issuing
526    ///             reads)
527    fn result(&mut self) -> io::Result<bool> {
528        let amt = match self.state {
529            State::NotReading => return Ok(true),
530            State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
531            State::Read(amt) => amt,
532        };
533        self.state = State::NotReading;
534        unsafe {
535            let len = self.dst.len();
536            self.dst.set_len(len + amt);
537        }
538        Ok(amt != 0)
539    }
540
541    /// Finishes out reading this pipe entirely.
542    ///
543    /// Waits for any pending and schedule read, and then calls `read_to_end`
544    /// if necessary to read all the remaining information.
545    fn finish(&mut self) -> io::Result<()> {
546        while self.result()? && self.schedule_read()? {
547            // ...
548        }
549        Ok(())
550    }
551}
552
553impl<'a> Drop for AsyncPipe<'a> {
554    fn drop(&mut self) {
555        let State::Reading = self.state else { return };
556
557        // If we have a pending read operation, then we have to make sure that
558        // it's *done* before we actually drop this type. The kernel requires
559        // that the `OVERLAPPED` and buffer pointers are valid for the entire
560        // I/O operation.
561        //
562        // To do that, we call `CancelIo` to cancel any pending operation, and
563        // if that succeeds we wait for the overlapped result.
564        //
565        // If anything here fails, there's not really much we can do, so we leak
566        // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
567        if self.pipe.cancel_io().is_err() || self.result().is_err() {
568            let buf = mem::take(self.dst);
569            let overlapped = Box::new(unsafe { mem::zeroed() });
570            let overlapped = mem::replace(&mut self.overlapped, overlapped);
571            mem::forget((buf, overlapped));
572        }
573    }
574}