std\sys\process\windows/child_pipe.rs
1use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
2use crate::ops::Neg;
3use crate::os::windows::prelude::*;
4use crate::sys::handle::Handle;
5use crate::sys::{FromInner, IntoInner, api, c};
6use crate::{mem, ptr};
7
8pub struct ChildPipe {
9 inner: Handle,
10}
11
12impl IntoInner<Handle> for ChildPipe {
13 fn into_inner(self) -> Handle {
14 self.inner
15 }
16}
17
18impl FromInner<Handle> for ChildPipe {
19 fn from_inner(inner: Handle) -> ChildPipe {
20 Self { inner }
21 }
22}
23
24pub(super) struct Pipes {
25 pub ours: ChildPipe,
26 pub theirs: ChildPipe,
27}
28
29/// Creates an anonymous pipe suitable for communication with a child process.
30///
31/// Windows unfortunately does not have a way of performing asynchronous operations
32/// on a handle originally created for synchronous operation. As `read_output` can
33/// only be correctly implemented with asynchronous reads but the pipe created by
34/// `CreatePipe` is synchronous, we cannot use it (and thus [`io::pipe`]) to create
35/// a pipe for communicating with a child. Instead, this function uses the NT API
36/// to create a pipe where one pipe handle (`ours`) is asynchronous and one is
37/// synchronous and can be inherited by a child for use as a console handle
38/// (`theirs`).
39///
40/// The ours/theirs pipes are *not* specifically readable or writable. Each
41/// one only supports a read or a write, but which is which depends on the
42/// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and
43/// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours`
44/// is writable and `theirs` is readable.
45///
46/// Also note that the `ours` pipe is always a handle opened up in overlapped
47/// mode. This means that technically speaking it should only ever be used
48/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
49/// once at a time (which we do indeed guarantee).
50// FIXME(joboet): No, we don't guarantee that? E.g. `&Stdout` is both `Read`
51// and `Sync`, so there could be multiple operations at the same
52// time. All the functions below that forward to the inner handle
53// methods could abort if used concurrently.
54pub(super) fn child_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
55 // A 64kb pipe capacity is the same as a typical Linux default.
56 const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
57
58 // Note that we specifically do *not* use `CreatePipe` here because
59 // unfortunately the anonymous pipes returned do not support overlapped
60 // operations. Instead, we use `NtCreateNamedPipeFile` to create the
61 // anonymous pipe with overlapped support.
62 //
63 // Once we do this, we connect to it via `NtOpenFile`, and then
64 // we return those reader/writer halves. Note that the `ours` pipe return
65 // value is always the named pipe, whereas `theirs` is just the normal file.
66 // This should hopefully shield us from child processes which assume their
67 // stdout is a named pipe, which would indeed be odd!
68 unsafe {
69 let mut io_status = c::IO_STATUS_BLOCK::default();
70 let mut object_attributes = c::OBJECT_ATTRIBUTES::default();
71 object_attributes.Length = size_of::<c::OBJECT_ATTRIBUTES>() as u32;
72
73 // Open a handle to the pipe filesystem (`\??\PIPE\`).
74 // This will be used when creating a new annon pipe.
75 let pipe_fs = {
76 let path = api::unicode_str!(r"\??\PIPE\");
77 object_attributes.ObjectName = path.as_ptr();
78 let mut pipe_fs = ptr::null_mut();
79 let status = c::NtOpenFile(
80 &mut pipe_fs,
81 c::SYNCHRONIZE | c::GENERIC_READ,
82 &object_attributes,
83 &mut io_status,
84 c::FILE_SHARE_READ | c::FILE_SHARE_WRITE,
85 c::FILE_SYNCHRONOUS_IO_NONALERT, // synchronous access
86 );
87 if c::nt_success(status) {
88 Handle::from_raw_handle(pipe_fs)
89 } else {
90 return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
91 }
92 };
93
94 // From now on we're using handles instead of paths to create and open pipes.
95 // So set the `ObjectName` to a zero length string.
96 // As a (perhaps overzealous) mitigation for #143078, we use the null pointer
97 // for empty.Buffer instead of unicode_str!("").
98 // There's no difference to the OS itself but it's possible that third party
99 // DLLs which hook in to processes could be relying on the exact form of this string.
100 let empty = c::UNICODE_STRING::default();
101 object_attributes.ObjectName = &raw const empty;
102
103 // Create our side of the pipe for async access.
104 let ours = {
105 // Use the pipe filesystem as the root directory.
106 // With no name provided, an anonymous pipe will be created.
107 object_attributes.RootDirectory = pipe_fs.as_raw_handle();
108
109 // A negative timeout value is a relative time (rather than an absolute time).
110 // The time is given in 100's of nanoseconds so this is 50 milliseconds.
111 // This value was chosen to be consistent with the default timeout set by `CreateNamedPipeW`
112 // See: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createnamedpipew
113 let timeout = (50_i64 * 10000).neg() as u64;
114
115 let mut ours = ptr::null_mut();
116 let status = c::NtCreateNamedPipeFile(
117 &mut ours,
118 c::SYNCHRONIZE | if ours_readable { c::GENERIC_READ } else { c::GENERIC_WRITE },
119 &object_attributes,
120 &mut io_status,
121 if ours_readable { c::FILE_SHARE_WRITE } else { c::FILE_SHARE_READ },
122 c::FILE_CREATE,
123 0,
124 c::FILE_PIPE_BYTE_STREAM_TYPE,
125 c::FILE_PIPE_BYTE_STREAM_MODE,
126 c::FILE_PIPE_QUEUE_OPERATION,
127 // only allow one client pipe
128 1,
129 PIPE_BUFFER_CAPACITY,
130 PIPE_BUFFER_CAPACITY,
131 &timeout,
132 );
133 if c::nt_success(status) {
134 Handle::from_raw_handle(ours)
135 } else {
136 return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
137 }
138 };
139
140 // Open their side of the pipe for synchronous access.
141 let theirs = {
142 // We can reopen the anonymous pipe without a name by setting
143 // RootDirectory to the pipe handle and not setting a path name,
144 object_attributes.RootDirectory = ours.as_raw_handle();
145
146 if their_handle_inheritable {
147 object_attributes.Attributes |= c::OBJ_INHERIT;
148 }
149 let mut theirs = ptr::null_mut();
150 let status = c::NtOpenFile(
151 &mut theirs,
152 c::SYNCHRONIZE
153 | if ours_readable {
154 c::GENERIC_WRITE | c::FILE_READ_ATTRIBUTES
155 } else {
156 c::GENERIC_READ
157 },
158 &object_attributes,
159 &mut io_status,
160 0,
161 c::FILE_NON_DIRECTORY_FILE | c::FILE_SYNCHRONOUS_IO_NONALERT,
162 );
163 if c::nt_success(status) {
164 Handle::from_raw_handle(theirs)
165 } else {
166 return Err(io::Error::from_raw_os_error(c::RtlNtStatusToDosError(status) as i32));
167 }
168 };
169
170 Ok(Pipes { ours: ChildPipe { inner: ours }, theirs: ChildPipe { inner: theirs } })
171 }
172}
173
174/// Takes an asynchronous source pipe and returns a synchronous pipe suitable
175/// for sending to a child process.
176///
177/// This is achieved by creating a new set of pipes and spawning a thread that
178/// relays messages between the source and the synchronous pipe.
179pub(super) fn spawn_pipe_relay(
180 source: &ChildPipe,
181 ours_readable: bool,
182 their_handle_inheritable: bool,
183) -> io::Result<ChildPipe> {
184 // We need this handle to live for the lifetime of the thread spawned below.
185 let source = source.try_clone()?;
186
187 // create a new pair of anon pipes.
188 let Pipes { theirs, ours } = child_pipe(ours_readable, their_handle_inheritable)?;
189
190 // Spawn a thread that passes messages from one pipe to the other.
191 // Any errors will simply cause the thread to exit.
192 let (reader, writer) = if ours_readable { (ours, source) } else { (source, ours) };
193 crate::thread::spawn(move || {
194 let mut buf = [0_u8; 4096];
195 'reader: while let Ok(len) = reader.read(&mut buf) {
196 if len == 0 {
197 break;
198 }
199 let mut start = 0;
200 while let Ok(written) = writer.write(&buf[start..len]) {
201 start += written;
202 if start == len {
203 continue 'reader;
204 }
205 }
206 break;
207 }
208 });
209
210 // Return the pipe that should be sent to the child process.
211 Ok(theirs)
212}
213
214impl ChildPipe {
215 pub fn handle(&self) -> &Handle {
216 &self.inner
217 }
218 pub fn into_handle(self) -> Handle {
219 self.inner
220 }
221
222 pub fn try_clone(&self) -> io::Result<Self> {
223 self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| ChildPipe { inner })
224 }
225
226 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
227 let result = unsafe {
228 let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
229 let ptr = buf.as_mut_ptr();
230 self.alertable_io_internal(|overlapped, callback| {
231 c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
232 })
233 };
234
235 match result {
236 // The special treatment of BrokenPipe is to deal with Windows
237 // pipe semantics, which yields this error when *reading* from
238 // a pipe after the other end has closed; we interpret that as
239 // EOF on the pipe.
240 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
241 _ => result,
242 }
243 }
244
245 pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
246 let result = unsafe {
247 let len = crate::cmp::min(buf.capacity(), u32::MAX as usize) as u32;
248 let ptr = buf.as_mut().as_mut_ptr().cast::<u8>();
249 self.alertable_io_internal(|overlapped, callback| {
250 c::ReadFileEx(self.inner.as_raw_handle(), ptr, len, overlapped, callback)
251 })
252 };
253
254 match result {
255 // The special treatment of BrokenPipe is to deal with Windows
256 // pipe semantics, which yields this error when *reading* from
257 // a pipe after the other end has closed; we interpret that as
258 // EOF on the pipe.
259 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
260 Err(e) => Err(e),
261 Ok(n) => {
262 unsafe {
263 buf.advance(n);
264 }
265 Ok(())
266 }
267 }
268 }
269
270 pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
271 self.inner.read_vectored(bufs)
272 }
273
274 #[inline]
275 pub fn is_read_vectored(&self) -> bool {
276 self.inner.is_read_vectored()
277 }
278
279 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
280 self.handle().read_to_end(buf)
281 }
282
283 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
284 unsafe {
285 let len = crate::cmp::min(buf.len(), u32::MAX as usize) as u32;
286 self.alertable_io_internal(|overlapped, callback| {
287 c::WriteFileEx(self.inner.as_raw_handle(), buf.as_ptr(), len, overlapped, callback)
288 })
289 }
290 }
291
292 pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
293 self.inner.write_vectored(bufs)
294 }
295
296 #[inline]
297 pub fn is_write_vectored(&self) -> bool {
298 self.inner.is_write_vectored()
299 }
300
301 /// Synchronizes asynchronous reads or writes using our anonymous pipe.
302 ///
303 /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
304 /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
305 ///
306 /// Note: This should not be used for handles we don't create.
307 ///
308 /// # Safety
309 ///
310 /// `buf` must be a pointer to a buffer that's valid for reads or writes
311 /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
312 ///
313 /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
314 /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
315 /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
316 unsafe fn alertable_io_internal(
317 &self,
318 io: impl FnOnce(&mut c::OVERLAPPED, c::LPOVERLAPPED_COMPLETION_ROUTINE) -> c::BOOL,
319 ) -> io::Result<usize> {
320 // Use "alertable I/O" to synchronize the pipe I/O.
321 // This has four steps.
322 //
323 // STEP 1: Start the asynchronous I/O operation.
324 // This simply calls either `ReadFileEx` or `WriteFileEx`,
325 // giving it a pointer to the buffer and callback function.
326 //
327 // STEP 2: Enter an alertable state.
328 // The callback set in step 1 will not be called until the thread
329 // enters an "alertable" state. This can be done using `SleepEx`.
330 //
331 // STEP 3: The callback
332 // Once the I/O is complete and the thread is in an alertable state,
333 // the callback will be run on the same thread as the call to
334 // `ReadFileEx` or `WriteFileEx` done in step 1.
335 // In the callback we simply set the result of the async operation.
336 //
337 // STEP 4: Return the result.
338 // At this point we'll have a result from the callback function
339 // and can simply return it. Note that we must not return earlier,
340 // while the I/O is still in progress.
341
342 // The result that will be set from the asynchronous callback.
343 let mut async_result: Option<AsyncResult> = None;
344 struct AsyncResult {
345 error: u32,
346 transferred: u32,
347 }
348
349 // STEP 3: The callback.
350 unsafe extern "system" fn callback(
351 error: u32,
352 transferred: u32,
353 overlapped: *mut c::OVERLAPPED,
354 ) {
355 // Set `async_result` using a pointer smuggled through `hEvent`.
356 // SAFETY:
357 // At this point, the OVERLAPPED struct will have been written to by the OS,
358 // except for our `hEvent` field which we set to a valid AsyncResult pointer (see below)
359 unsafe {
360 let result = AsyncResult { error, transferred };
361 *(*overlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
362 }
363 }
364
365 // STEP 1: Start the I/O operation.
366 let mut overlapped: c::OVERLAPPED = unsafe { crate::mem::zeroed() };
367 // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
368 // Therefore the documentation suggests using it to smuggle a pointer to the callback.
369 overlapped.hEvent = (&raw mut async_result) as *mut _;
370
371 // Asynchronous read of the pipe.
372 // If successful, `callback` will be called once it completes.
373 let result = io(&mut overlapped, Some(callback));
374 if result == c::FALSE {
375 // We can return here because the call failed.
376 // After this we must not return until the I/O completes.
377 return Err(io::Error::last_os_error());
378 }
379
380 // Wait indefinitely for the result.
381 let result = loop {
382 // STEP 2: Enter an alertable state.
383 // The second parameter of `SleepEx` is used to make this sleep alertable.
384 unsafe { c::SleepEx(c::INFINITE, c::TRUE) };
385 if let Some(result) = async_result {
386 break result;
387 }
388 };
389 // STEP 4: Return the result.
390 // `async_result` is always `Some` at this point
391 match result.error {
392 c::ERROR_SUCCESS => Ok(result.transferred as usize),
393 error => Err(io::Error::from_raw_os_error(error as _)),
394 }
395 }
396}
397
398pub fn read_output(
399 p1: ChildPipe,
400 v1: &mut Vec<u8>,
401 p2: ChildPipe,
402 v2: &mut Vec<u8>,
403) -> io::Result<()> {
404 let p1 = p1.into_handle();
405 let p2 = p2.into_handle();
406
407 let mut p1 = AsyncPipe::new(p1, v1)?;
408 let mut p2 = AsyncPipe::new(p2, v2)?;
409 let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()];
410
411 // In a loop we wait for either pipe's scheduled read operation to complete.
412 // If the operation completes with 0 bytes, that means EOF was reached, in
413 // which case we just finish out the other pipe entirely.
414 //
415 // Note that overlapped I/O is in general super unsafe because we have to
416 // be careful to ensure that all pointers in play are valid for the entire
417 // duration of the I/O operation (where tons of operations can also fail).
418 // The destructor for `AsyncPipe` ends up taking care of most of this.
419 loop {
420 let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
421 if res == c::WAIT_OBJECT_0 {
422 if !p1.result()? || !p1.schedule_read()? {
423 return p2.finish();
424 }
425 } else if res == c::WAIT_OBJECT_0 + 1 {
426 if !p2.result()? || !p2.schedule_read()? {
427 return p1.finish();
428 }
429 } else {
430 return Err(io::Error::last_os_error());
431 }
432 }
433}
434
435struct AsyncPipe<'a> {
436 pipe: Handle,
437 event: Handle,
438 overlapped: Box<c::OVERLAPPED>, // needs a stable address
439 dst: &'a mut Vec<u8>,
440 state: State,
441}
442
443#[derive(PartialEq, Debug)]
444enum State {
445 NotReading,
446 Reading,
447 Read(usize),
448}
449
450impl<'a> AsyncPipe<'a> {
451 fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
452 // Create an event which we'll use to coordinate our overlapped
453 // operations, this event will be used in WaitForMultipleObjects
454 // and passed as part of the OVERLAPPED handle.
455 //
456 // Note that we do a somewhat clever thing here by flagging the
457 // event as being manually reset and setting it initially to the
458 // signaled state. This means that we'll naturally fall through the
459 // WaitForMultipleObjects call above for pipes created initially,
460 // and the only time an even will go back to "unset" will be once an
461 // I/O operation is successfully scheduled (what we want).
462 let event = Handle::new_event(true, true)?;
463 let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
464 overlapped.hEvent = event.as_raw_handle();
465 Ok(AsyncPipe { pipe, overlapped, event, dst, state: State::NotReading })
466 }
467
468 /// Executes an overlapped read operation.
469 ///
470 /// Must not currently be reading, and returns whether the pipe is currently
471 /// at EOF or not. If the pipe is not at EOF then `result()` must be called
472 /// to complete the read later on (may block), but if the pipe is at EOF
473 /// then `result()` should not be called as it will just block forever.
474 fn schedule_read(&mut self) -> io::Result<bool> {
475 assert_eq!(self.state, State::NotReading);
476 let amt = unsafe {
477 if self.dst.capacity() == self.dst.len() {
478 let additional = if self.dst.capacity() == 0 { 16 } else { 1 };
479 self.dst.reserve(additional);
480 }
481 self.pipe.read_overlapped(self.dst.spare_capacity_mut(), &mut *self.overlapped)?
482 };
483
484 // If this read finished immediately then our overlapped event will
485 // remain signaled (it was signaled coming in here) and we'll progress
486 // down to the method below.
487 //
488 // Otherwise the I/O operation is scheduled and the system set our event
489 // to not signaled, so we flag ourselves into the reading state and move
490 // on.
491 self.state = match amt {
492 Some(0) => return Ok(false),
493 Some(amt) => State::Read(amt),
494 None => State::Reading,
495 };
496 Ok(true)
497 }
498
499 /// Wait for the result of the overlapped operation previously executed.
500 ///
501 /// Takes a parameter `wait` which indicates if this pipe is currently being
502 /// read whether the function should block waiting for the read to complete.
503 ///
504 /// Returns values:
505 ///
506 /// * `true` - finished any pending read and the pipe is not at EOF (keep
507 /// going)
508 /// * `false` - finished any pending read and pipe is at EOF (stop issuing
509 /// reads)
510 fn result(&mut self) -> io::Result<bool> {
511 let amt = match self.state {
512 State::NotReading => return Ok(true),
513 State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
514 State::Read(amt) => amt,
515 };
516 self.state = State::NotReading;
517 unsafe {
518 let len = self.dst.len();
519 self.dst.set_len(len + amt);
520 }
521 Ok(amt != 0)
522 }
523
524 /// Finishes out reading this pipe entirely.
525 ///
526 /// Waits for any pending and schedule read, and then calls `read_to_end`
527 /// if necessary to read all the remaining information.
528 fn finish(&mut self) -> io::Result<()> {
529 while self.result()? && self.schedule_read()? {
530 // ...
531 }
532 Ok(())
533 }
534}
535
536impl<'a> Drop for AsyncPipe<'a> {
537 fn drop(&mut self) {
538 let State::Reading = self.state else { return };
539
540 // If we have a pending read operation, then we have to make sure that
541 // it's *done* before we actually drop this type. The kernel requires
542 // that the `OVERLAPPED` and buffer pointers are valid for the entire
543 // I/O operation.
544 //
545 // To do that, we call `CancelIo` to cancel any pending operation, and
546 // if that succeeds we wait for the overlapped result.
547 //
548 // If anything here fails, there's not really much we can do, so we leak
549 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
550 if self.pipe.cancel_io().is_err() || self.result().is_err() {
551 let buf = mem::take(self.dst);
552 let overlapped = Box::new(unsafe { mem::zeroed() });
553 let overlapped = mem::replace(&mut self.overlapped, overlapped);
554 mem::forget((buf, overlapped));
555 }
556 }
557}