Skip to main content

std\sys\process\windows/
child_pipe.rs

1use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
2use crate::ops::Neg;
3use crate::os::windows::prelude::*;
4use crate::sys::handle::Handle;
5use crate::sys::{FromInner, IntoInner, api, c};
6use crate::{mem, ptr};
7
8pub struct ChildPipe {
9    inner: Handle,
10}
11
12impl IntoInner<Handle> for ChildPipe {
13    fn into_inner(self) -> Handle {
14        self.inner
15    }
16}
17
18impl FromInner<Handle> for ChildPipe {
19    fn from_inner(inner: Handle) -> ChildPipe {
20        Self { inner }
21    }
22}
23
24pub(super) struct Pipes {
25    pub ours: ChildPipe,
26    pub theirs: ChildPipe,
27}
28
29/// Creates an anonymous pipe suitable for communication with a child process.
30///
31/// Windows unfortunately does not have a way of performing asynchronous operations
32/// on a handle originally created for synchronous operation. As `read_output` can
33/// only be correctly implemented with asynchronous reads but the pipe created by
34/// `CreatePipe` is synchronous, we cannot use it (and thus [`io::pipe`]) to create
35/// a pipe for communicating with a child. Instead, this function uses the NT API
36/// to create a pipe where one pipe handle (`ours`) is asynchronous and one is
37/// synchronous and can be inherited by a child for use as a console handle
38/// (`theirs`).
39///
40/// The ours/theirs pipes are *not* specifically readable or writable. Each
41/// one only supports a read or a write, but which is which depends on the
42/// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and
43/// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours`
44/// is writable and `theirs` is readable.
45///
46/// Also note that the `ours` pipe is always a handle opened up in overlapped
47/// mode. This means that technically speaking it should only ever be used
48/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
49/// once at a time (which we do indeed guarantee).
50// FIXME(joboet): No, we don't guarantee that? E.g. `&Stdout` is both `Read`
51//                and `Sync`, so there could be multiple operations at the same
52//                time. All the functions below that forward to the inner handle
53//                methods could abort if used concurrently.
54pub(super) fn child_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
55    // A 64kb pipe capacity is the same as a typical Linux default.
56    const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
57
58    // Note that we specifically do *not* use `CreatePipe` here because
59    // unfortunately the anonymous pipes returned do not support overlapped
60    // operations. Instead, we use `NtCreateNamedPipeFile` to create the
61    // anonymous pipe with overlapped support.
62    //
63    // Once we do this, we connect to it via `NtOpenFile`, and then
64    // we return those reader/writer halves. Note that the `ours` pipe return
65    // value is always the named pipe, whereas `theirs` is just the normal file.
66    // This should hopefully shield us from child processes which assume their
67    // stdout is a named pipe, which would indeed be odd!
68    unsafe {
69        let mut io_status = c::IO_STATUS_BLOCK::default();
70        let mut object_attributes = c::OBJECT_ATTRIBUTES::default();
71        object_attributes.Length = size_of::<c::OBJECT_ATTRIBUTES>() as u32;
72
73        // Open a handle to the pipe filesystem (`\??\PIPE\`).
74        // This will be used when creating a new annon pipe.
75        let pipe_fs = {
76            let path = api::unicode_str!(r"\??\PIPE\");
77            object_attributes.ObjectName = path.as_ptr();
78            let mut pipe_fs = ptr::null_mut();
79            let status = c::NtOpenFile(
80                &mut pipe_fs,
81                c::SYNCHRONIZE | c::GENERIC_READ,
82                &object_attributes,
83                &mut io_status,
84                c::FILE_SHARE_READ | c::FILE_SHARE_WRITE,
85                c::FILE_SYNCHRONOUS_IO_NONALERT, // synchronous access
86            );
87            if c::nt_success(status) {
88                Handle::from_raw_handle(pipe_fs)
89            } else {
90                return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
91            }
92        };
93
94        // From now on we're using handles instead of paths to create and open pipes.
95        // So set the `ObjectName` to a zero length string.
96        // As a (perhaps overzealous) mitigation for #143078, we use the null pointer
97        // for empty.Buffer instead of unicode_str!("").
98        // There's no difference to the OS itself but it's possible that third party
99        // DLLs which hook in to processes could be relying on the exact form of this string.
100        let empty = c::UNICODE_STRING::default();
101        object_attributes.ObjectName = &raw const empty;
102
103        // Create our side of the pipe for async access.
104        let ours = {
105            // Use the pipe filesystem as the root directory.
106            // With no name provided, an anonymous pipe will be created.
107            object_attributes.RootDirectory = pipe_fs.as_raw_handle();
108
109            // A negative timeout value is a relative time (rather than an absolute time).
110            // The time is given in 100's of nanoseconds so this is 50 milliseconds.
111            // This value was chosen to be consistent with the default timeout set by `CreateNamedPipeW`
112            // See: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createnamedpipew
113            let timeout = (50_i64 * 10000).neg() as u64;
114
115            let mut ours = ptr::null_mut();
116            let status = c::NtCreateNamedPipeFile(
117                &mut ours,
118                c::SYNCHRONIZE | if ours_readable { c::GENERIC_READ } else { c::GENERIC_WRITE },
119                &object_attributes,
120                &mut io_status,
121                if ours_readable { c::FILE_SHARE_WRITE } else { c::FILE_SHARE_READ },
122                c::FILE_CREATE,
123                0,
124                c::FILE_PIPE_BYTE_STREAM_TYPE,
125                c::FILE_PIPE_BYTE_STREAM_MODE,
126                c::FILE_PIPE_QUEUE_OPERATION,
127                // only allow one client pipe
128                1,
129                PIPE_BUFFER_CAPACITY,
130                PIPE_BUFFER_CAPACITY,
131                &timeout,
132            );
133            if c::nt_success(status) {
134                Handle::from_raw_handle(ours)
135            } else {
136                return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
137            }
138        };
139
140        // Open their side of the pipe for synchronous access.
141        let theirs = {
142            // We can reopen the anonymous pipe without a name by setting
143            // RootDirectory to the pipe handle and not setting a path name,
144            object_attributes.RootDirectory = ours.as_raw_handle();
145
146            if their_handle_inheritable {
147                object_attributes.Attributes |= c::OBJ_INHERIT;
148            }
149            let mut theirs = ptr::null_mut();
150            let status = c::NtOpenFile(
151                &mut theirs,
152                c::SYNCHRONIZE
153                    | if ours_readable {
154                        c::GENERIC_WRITE | c::FILE_READ_ATTRIBUTES
155                    } else {
156                        c::GENERIC_READ
157                    },
158                &object_attributes,
159                &mut io_status,
160                0,
161                c::FILE_NON_DIRECTORY_FILE | c::FILE_SYNCHRONOUS_IO_NONALERT,
162            );
163            if c::nt_success(status) {
164                Handle::from_raw_handle(theirs)
165            } else {
166                return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
167            }
168        };
169
170        Ok(Pipes { ours: ChildPipe { inner: ours }, theirs: ChildPipe { inner: theirs } })
171    }
172}
173
174/// Takes an asynchronous source pipe and returns a synchronous pipe suitable
175/// for sending to a child process.
176///
177/// This is achieved by creating a new set of pipes and spawning a thread that
178/// relays messages between the source and the synchronous pipe.
179pub(super) fn spawn_pipe_relay(
180    source: &ChildPipe,
181    ours_readable: bool,
182    their_handle_inheritable: bool,
183) -> io::Result<ChildPipe> {
184    // We need this handle to live for the lifetime of the thread spawned below.
185    let source = source.try_clone()?;
186
187    // create a new pair of anon pipes.
188    let Pipes { theirs, ours } = child_pipe(ours_readable, their_handle_inheritable)?;
189
190    // Spawn a thread that passes messages from one pipe to the other.
191    // Any errors will simply cause the thread to exit.
192    let (reader, writer) = if ours_readable { (ours, source) } else { (source, ours) };
193    crate::thread::spawn(move || {
194        let mut buf = [0_u8; 4096];
195        'reader: while let Ok(len) = reader.read(&mut buf) {
196            if len == 0 {
197                break;
198            }
199            let mut start = 0;
200            while let Ok(written) = writer.write(&buf[start..len]) {
201                start += written;
202                if start == len {
203                    continue 'reader;
204                }
205            }
206            break;
207        }
208    });
209
210    // Return the pipe that should be sent to the child process.
211    Ok(theirs)
212}
213
214impl ChildPipe {
215    pub fn handle(&self) -> &Handle {
216        &self.inner
217    }
218    pub fn into_handle(self) -> Handle {
219        self.inner
220    }
221
222    pub fn try_clone(&self) -> io::Result<Self> {
223        self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| ChildPipe { inner })
224    }
225
226    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
227        let result = unsafe {
228            let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
229            let ptr = buf.as_mut_ptr();
230            self.alertable_io_internal(|overlapped, callback| {
231                c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
232            })
233        };
234
235        match result {
236            // The special treatment of BrokenPipe is to deal with Windows
237            // pipe semantics, which yields this error when *reading* from
238            // a pipe after the other end has closed; we interpret that as
239            // EOF on the pipe.
240            Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
241            _ => result,
242        }
243    }
244
245    pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
246        let result = unsafe {
247            let len = crate::cmp::min(buf.capacity(), u32::MAX as usize) as u32;
248            let ptr = buf.as_mut().as_mut_ptr().cast::<u8>();
249            self.alertable_io_internal(|overlapped, callback| {
250                c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
251            })
252        };
253
254        match result {
255            // The special treatment of BrokenPipe is to deal with Windows
256            // pipe semantics, which yields this error when *reading* from
257            // a pipe after the other end has closed; we interpret that as
258            // EOF on the pipe.
259            Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
260            Err(e) => Err(e),
261            Ok(n) => {
262                unsafe {
263                    buf.advance(n);
264                }
265                Ok(())
266            }
267        }
268    }
269
270    pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
271        self.inner.read_vectored(bufs)
272    }
273
274    #[inline]
275    pub fn is_read_vectored(&self) -> bool {
276        self.inner.is_read_vectored()
277    }
278
279    pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
280        self.handle().read_to_end(buf)
281    }
282
283    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
284        unsafe {
285            let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
286            self.alertable_io_internal(|overlapped, callback| {
287                c::WriteFileEx(self.inner.as_raw_handle(), buf.as_ptr(), len, overlapped, callback)
288            })
289        }
290    }
291
292    pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
293        self.inner.write_vectored(bufs)
294    }
295
296    #[inline]
297    pub fn is_write_vectored(&self) -> bool {
298        self.inner.is_write_vectored()
299    }
300
301    /// Synchronizes asynchronous reads or writes using our anonymous pipe.
302    ///
303    /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
304    /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
305    ///
306    /// Note: This should not be used for handles we don't create.
307    ///
308    /// # Safety
309    ///
310    /// `buf` must be a pointer to a buffer that's valid for reads or writes
311    /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
312    ///
313    /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
314    /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
315    /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
316    unsafe fn alertable_io_internal(
317        &self,
318        io: impl FnOnce(&mut c::OVERLAPPED, c::LPOVERLAPPED_COMPLETION_ROUTINE) -> c::BOOL,
319    ) -> io::Result<usize> {
320        // Use "alertable I/O" to synchronize the pipe I/O.
321        // This has four steps.
322        //
323        // STEP 1: Start the asynchronous I/O operation.
324        //         This simply calls either `ReadFileEx` or `WriteFileEx`,
325        //         giving it a pointer to the buffer and callback function.
326        //
327        // STEP 2: Enter an alertable state.
328        //         The callback set in step 1 will not be called until the thread
329        //         enters an "alertable" state. This can be done using `SleepEx`.
330        //
331        // STEP 3: The callback
332        //         Once the I/O is complete and the thread is in an alertable state,
333        //         the callback will be run on the same thread as the call to
334        //         `ReadFileEx` or `WriteFileEx` done in step 1.
335        //         In the callback we simply set the result of the async operation.
336        //
337        // STEP 4: Return the result.
338        //         At this point we'll have a result from the callback function
339        //         and can simply return it. Note that we must not return earlier,
340        //         while the I/O is still in progress.
341
342        // The result that will be set from the asynchronous callback.
343        let mut async_result: Option<AsyncResult> = None;
344        struct AsyncResult {
345            error: u32,
346            transferred: u32,
347        }
348
349        // STEP 3: The callback.
350        unsafe extern "system" fn callback(
351            error: u32,
352            transferred: u32,
353            overlapped: *mut c::OVERLAPPED,
354        ) {
355            // Set `async_result` using a pointer smuggled through `hEvent`.
356            // SAFETY:
357            // At this point, the OVERLAPPED struct will have been written to by the OS,
358            // except for our `hEvent` field which we set to a valid AsyncResult pointer (see below)
359            unsafe {
360                let result = AsyncResult { error, transferred };
361                *(*overlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
362            }
363        }
364
365        // STEP 1: Start the I/O operation.
366        let mut overlapped: c::OVERLAPPED = unsafe { crate::mem::zeroed() };
367        // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
368        // Therefore the documentation suggests using it to smuggle a pointer to the callback.
369        overlapped.hEvent = (&raw mut async_result) as *mut _;
370
371        // Asynchronous read of the pipe.
372        // If successful, `callback` will be called once it completes.
373        let result = io(&mut overlapped, Some(callback));
374        if result == c::FALSE {
375            // We can return here because the call failed.
376            // After this we must not return until the I/O completes.
377            return Err(io::Error::last_os_error());
378        }
379
380        // Wait indefinitely for the result.
381        let result = loop {
382            // STEP 2: Enter an alertable state.
383            // The second parameter of `SleepEx` is used to make this sleep alertable.
384            unsafe { c::SleepEx(c::INFINITE, c::TRUE) };
385            if let Some(result) = async_result {
386                break result;
387            }
388        };
389        // STEP 4: Return the result.
390        // `async_result` is always `Some` at this point
391        match result.error {
392            c::ERROR_SUCCESS => Ok(result.transferred as usize),
393            error => Err(io::Error::from_raw_os_error(error as _)),
394        }
395    }
396}
397
398pub fn read_output(
399    p1: ChildPipe,
400    v1: &mut Vec<u8>,
401    p2: ChildPipe,
402    v2: &mut Vec<u8>,
403) -> io::Result<()> {
404    let p1 = p1.into_handle();
405    let p2 = p2.into_handle();
406
407    let mut p1 = AsyncPipe::new(p1, v1)?;
408    let mut p2 = AsyncPipe::new(p2, v2)?;
409    let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()];
410
411    // In a loop we wait for either pipe's scheduled read operation to complete.
412    // If the operation completes with 0 bytes, that means EOF was reached, in
413    // which case we just finish out the other pipe entirely.
414    //
415    // Note that overlapped I/O is in general super unsafe because we have to
416    // be careful to ensure that all pointers in play are valid for the entire
417    // duration of the I/O operation (where tons of operations can also fail).
418    // The destructor for `AsyncPipe` ends up taking care of most of this.
419    loop {
420        let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
421        if res == c::WAIT_OBJECT_0 {
422            if !p1.result()? || !p1.schedule_read()? {
423                return p2.finish();
424            }
425        } else if res == c::WAIT_OBJECT_0 + 1 {
426            if !p2.result()? || !p2.schedule_read()? {
427                return p1.finish();
428            }
429        } else {
430            return Err(io::Error::last_os_error());
431        }
432    }
433}
434
435struct AsyncPipe<'a> {
436    pipe: Handle,
437    event: Handle,
438    overlapped: Box<c::OVERLAPPED>, // needs a stable address
439    dst: &'a mut Vec<u8>,
440    state: State,
441}
442
443#[derive(PartialEq, Debug)]
444enum State {
445    NotReading,
446    Reading,
447    Read(usize),
448}
449
450impl<'a> AsyncPipe<'a> {
451    fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
452        // Create an event which we'll use to coordinate our overlapped
453        // operations, this event will be used in WaitForMultipleObjects
454        // and passed as part of the OVERLAPPED handle.
455        //
456        // Note that we do a somewhat clever thing here by flagging the
457        // event as being manually reset and setting it initially to the
458        // signaled state. This means that we'll naturally fall through the
459        // WaitForMultipleObjects call above for pipes created initially,
460        // and the only time an even will go back to "unset" will be once an
461        // I/O operation is successfully scheduled (what we want).
462        let event = Handle::new_event(true, true)?;
463        let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
464        overlapped.hEvent = event.as_raw_handle();
465        Ok(AsyncPipe { pipe, overlapped, event, dst, state: State::NotReading })
466    }
467
468    /// Executes an overlapped read operation.
469    ///
470    /// Must not currently be reading, and returns whether the pipe is currently
471    /// at EOF or not. If the pipe is not at EOF then `result()` must be called
472    /// to complete the read later on (may block), but if the pipe is at EOF
473    /// then `result()` should not be called as it will just block forever.
474    fn schedule_read(&mut self) -> io::Result<bool> {
475        assert_eq!(self.state, State::NotReading);
476        let amt = unsafe {
477            if self.dst.capacity() == self.dst.len() {
478                let additional = if self.dst.capacity() == 0 { 16 } else { 1 };
479                self.dst.reserve(additional);
480            }
481            self.pipe.read_overlapped(self.dst.spare_capacity_mut(), &mut *self.overlapped)?
482        };
483
484        // If this read finished immediately then our overlapped event will
485        // remain signaled (it was signaled coming in here) and we'll progress
486        // down to the method below.
487        //
488        // Otherwise the I/O operation is scheduled and the system set our event
489        // to not signaled, so we flag ourselves into the reading state and move
490        // on.
491        self.state = match amt {
492            Some(0) => return Ok(false),
493            Some(amt) => State::Read(amt),
494            None => State::Reading,
495        };
496        Ok(true)
497    }
498
499    /// Wait for the result of the overlapped operation previously executed.
500    ///
501    /// Takes a parameter `wait` which indicates if this pipe is currently being
502    /// read whether the function should block waiting for the read to complete.
503    ///
504    /// Returns values:
505    ///
506    /// * `true` - finished any pending read and the pipe is not at EOF (keep
507    ///            going)
508    /// * `false` - finished any pending read and pipe is at EOF (stop issuing
509    ///             reads)
510    fn result(&mut self) -> io::Result<bool> {
511        let amt = match self.state {
512            State::NotReading => return Ok(true),
513            State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
514            State::Read(amt) => amt,
515        };
516        self.state = State::NotReading;
517        unsafe {
518            let len = self.dst.len();
519            self.dst.set_len(len + amt);
520        }
521        Ok(amt != 0)
522    }
523
524    /// Finishes out reading this pipe entirely.
525    ///
526    /// Waits for any pending and schedule read, and then calls `read_to_end`
527    /// if necessary to read all the remaining information.
528    fn finish(&mut self) -> io::Result<()> {
529        while self.result()? && self.schedule_read()? {
530            // ...
531        }
532        Ok(())
533    }
534}
535
536impl<'a> Drop for AsyncPipe<'a> {
537    fn drop(&mut self) {
538        let State::Reading = self.state else { return };
539
540        // If we have a pending read operation, then we have to make sure that
541        // it's *done* before we actually drop this type. The kernel requires
542        // that the `OVERLAPPED` and buffer pointers are valid for the entire
543        // I/O operation.
544        //
545        // To do that, we call `CancelIo` to cancel any pending operation, and
546        // if that succeeds we wait for the overlapped result.
547        //
548        // If anything here fails, there's not really much we can do, so we leak
549        // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
550        if self.pipe.cancel_io().is_err() || self.result().is_err() {
551            let buf = mem::take(self.dst);
552            let overlapped = Box::new(unsafe { mem::zeroed() });
553            let overlapped = mem::replace(&mut self.overlapped, overlapped);
554            mem::forget((buf, overlapped));
555        }
556    }
557}