Skip to main content

std/io/buffered/
linewritershim.rs

1use core::slice::memchr;
2
3use crate::io::{self, BufWriter, IoSlice, Write};
4
5/// Private helper struct for implementing the line-buffered writing logic.
6///
7/// This shim temporarily wraps a BufWriter, and uses its internals to
8/// implement a line-buffered writer (specifically by using the internal
9/// methods like write_to_buf and flush_buf). In this way, a more
10/// efficient abstraction can be created than one that only had access to
11/// `write` and `flush`, without needlessly duplicating a lot of the
12/// implementation details of BufWriter. This also allows existing
13/// `BufWriters` to be temporarily given line-buffering logic; this is what
14/// enables Stdout to be alternately in line-buffered or block-buffered mode.
15#[derive(Debug)]
16pub struct LineWriterShim<'a, W: ?Sized + Write> {
17    buffer: &'a mut BufWriter<W>,
18}
19
20impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
21    pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
22        Self { buffer }
23    }
24
25    /// Gets a reference to the inner writer (that is, the writer
26    /// wrapped by the BufWriter).
27    fn inner(&self) -> &W {
28        self.buffer.get_ref()
29    }
30
31    /// Gets a mutable reference to the inner writer (that is, the writer
32    /// wrapped by the BufWriter). Be careful with this writer, as writes to
33    /// it will bypass the buffer.
34    fn inner_mut(&mut self) -> &mut W {
35        self.buffer.get_mut()
36    }
37
38    /// Gets the content currently buffered in self.buffer
39    fn buffered(&self) -> &[u8] {
40        self.buffer.buffer()
41    }
42
43    /// Flushes the buffer if and only if the last byte is a newline
44    /// (indicating that an earlier write only succeeded partially, and we
45    /// want to retry flushing the buffered line before continuing with a
46    /// subsequent write).
47    fn flush_if_completed_line(&mut self) -> io::Result<()> {
48        match self.buffered().last().copied() {
49            Some(b'\n') => self.buffer.flush_buf(),
50            _ => Ok(()),
51        }
52    }
53
54    /// Vectored line-buffered write over an already-capped list of buffers.
55    ///
56    /// The caller is responsible for trimming `bufs` to the prefix it is
57    /// willing to scan (see `MAX_BUFS_TO_SCAN`). This method only ever writes
58    /// or buffers bytes from `bufs`, so any newline it might bury in the
59    /// `BufWriter` is one it has itself scanned for -- buffers the caller
60    /// dropped past the cap can never end up stuck in the buffer. Bytes not
61    /// accounted for in the return value are left for the next call.
62    fn write_vectored_scanned(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
63        // Find the buffer containing the last newline.
64        let last_newline_buf_idx = bufs
65            .iter()
66            .enumerate()
67            .rev()
68            .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
69
70        // If there are no new newlines (that is, if this write is less than
71        // one line), just do a regular buffered write.
72        let last_newline_buf_idx = match last_newline_buf_idx {
73            None => {
74                self.flush_if_completed_line()?;
75                return self.buffer.write_vectored(bufs);
76            }
77            Some(i) => i,
78        };
79
80        // Flush existing content to prepare for our write.
81        self.buffer.flush_buf()?;
82
83        // This is what we're going to try to write directly to the inner
84        // writer. The rest will be buffered, if nothing goes wrong.
85        let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
86
87        // Write `lines` directly to the inner writer. In keeping with the
88        // `write` convention, make at most one attempt to add new (unbuffered)
89        // data. Because this write doesn't touch the BufWriter state directly,
90        // and the buffer is known to be empty, we don't need to worry about
91        // self.panicked here.
92        let flushed = self.inner_mut().write_vectored(lines)?;
93
94        // If inner returns Ok(0), propagate that to the caller without
95        // doing additional buffering; otherwise we're just guaranteeing
96        // an "ErrorKind::WriteZero" later.
97        if flushed == 0 {
98            return Ok(0);
99        }
100
101        // Don't try to reconstruct the exact amount written; just bail
102        // in the event of a partial write.
103        let mut lines_len: usize = 0;
104        for buf in lines {
105            // With overlapping/duplicate slices the total length may in theory
106            // exceed usize::MAX
107            lines_len = lines_len.saturating_add(buf.len());
108            if flushed < lines_len {
109                return Ok(flushed);
110            }
111        }
112
113        // Now that the write has succeeded, buffer the rest (or as much of the
114        // rest as possible). `tail` is the part of the scanned prefix after the
115        // last newline, so it cannot contain a newline of its own.
116        let buffered: usize = tail
117            .iter()
118            .filter(|buf| !buf.is_empty())
119            .map(|buf| self.buffer.write_to_buf(buf))
120            .take_while(|&n| n > 0)
121            .sum();
122
123        Ok(flushed + buffered)
124    }
125}
126
127impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
128    /// Writes some data into this BufWriter with line buffering.
129    ///
130    /// This means that, if any newlines are present in the data, the data up to
131    /// the last newline is sent directly to the underlying writer, and data
132    /// after it is buffered. Returns the number of bytes written.
133    ///
134    /// This function operates on a "best effort basis"; in keeping with the
135    /// convention of `Write::write`, it makes at most one attempt to write
136    /// new data to the underlying writer. If that write only reports a partial
137    /// success, the remaining data will be buffered.
138    ///
139    /// Because this function attempts to send completed lines to the underlying
140    /// writer, it will also flush the existing buffer if it ends with a
141    /// newline, even if the incoming data does not contain any newlines.
142    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
143        let newline_idx = match memchr::memrchr(b'\n', buf) {
144            // If there are no new newlines (that is, if this write is less than
145            // one line), just do a regular buffered write (which may flush if
146            // we exceed the inner buffer's size)
147            None => {
148                self.flush_if_completed_line()?;
149                return self.buffer.write(buf);
150            }
151            // Otherwise, arrange for the lines to be written directly to the
152            // inner writer.
153            Some(newline_idx) => newline_idx + 1,
154        };
155
156        // Flush existing content to prepare for our write. We have to do this
157        // before attempting to write `buf` in order to maintain consistency;
158        // if we add `buf` to the buffer then try to flush it all at once,
159        // we're obligated to return Ok(), which would mean suppressing any
160        // errors that occur during flush.
161        self.buffer.flush_buf()?;
162
163        // This is what we're going to try to write directly to the inner
164        // writer. The rest will be buffered, if nothing goes wrong.
165        let lines = &buf[..newline_idx];
166
167        // Write `lines` directly to the inner writer. In keeping with the
168        // `write` convention, make at most one attempt to add new (unbuffered)
169        // data. Because this write doesn't touch the BufWriter state directly,
170        // and the buffer is known to be empty, we don't need to worry about
171        // self.buffer.panicked here.
172        let flushed = self.inner_mut().write(lines)?;
173
174        // If buffer returns Ok(0), propagate that to the caller without
175        // doing additional buffering; otherwise we're just guaranteeing
176        // an "ErrorKind::WriteZero" later.
177        if flushed == 0 {
178            return Ok(0);
179        }
180
181        // Now that the write has succeeded, buffer the rest (or as much of
182        // the rest as possible). If there were any unwritten newlines, we
183        // only buffer out to the last unwritten newline that fits in the
184        // buffer; this helps prevent flushing partial lines on subsequent
185        // calls to LineWriterShim::write.
186
187        // Handle the cases in order of most-common to least-common, under
188        // the presumption that most writes succeed in totality, and that most
189        // writes are smaller than the buffer.
190        // - Is this a partial line (ie, no newlines left in the unwritten tail)
191        // - If not, does the data out to the last unwritten newline fit in
192        //   the buffer?
193        // - If not, scan for the last newline that *does* fit in the buffer
194        let tail = if flushed >= newline_idx {
195            let tail = &buf[flushed..];
196            // Avoid unnecessary short writes by not splitting the remaining
197            // bytes if they're larger than the buffer.
198            // They can be written in full by the next call to write.
199            if tail.len() >= self.buffer.capacity() {
200                return Ok(flushed);
201            }
202            tail
203        } else if newline_idx - flushed <= self.buffer.capacity() {
204            &buf[flushed..newline_idx]
205        } else {
206            let scan_area = &buf[flushed..];
207            let scan_area = &scan_area[..self.buffer.capacity()];
208            match memchr::memrchr(b'\n', scan_area) {
209                Some(newline_idx) => &scan_area[..newline_idx + 1],
210                None => scan_area,
211            }
212        };
213
214        let buffered = self.buffer.write_to_buf(tail);
215        Ok(flushed + buffered)
216    }
217
218    fn flush(&mut self) -> io::Result<()> {
219        self.buffer.flush()
220    }
221
222    /// Writes some vectored data into this BufWriter with line buffering.
223    ///
224    /// This means that, if any newlines are present in the data, the data up to
225    /// and including the buffer containing the last newline is sent directly to
226    /// the inner writer, and the data after it is buffered. Returns the number
227    /// of bytes written.
228    ///
229    /// This function operates on a "best effort basis"; in keeping with the
230    /// convention of `Write::write`, it makes at most one attempt to write
231    /// new data to the underlying writer.
232    ///
233    /// Because this function attempts to send completed lines to the underlying
234    /// writer, it will also flush the existing buffer if it contains any
235    /// newlines.
236    ///
237    /// Because sorting through an array of `IoSlice` can be a bit convoluted,
238    /// This method differs from write in the following ways:
239    ///
240    /// - It attempts to write the full content of all the buffers up to and
241    ///   including the one containing the last newline. This means that it
242    ///   may attempt to write a partial line, that buffer has data past the
243    ///   newline.
244    /// - If the write only reports partial success, it does not attempt to
245    ///   find the precise location of the written bytes and buffer the rest.
246    ///
247    /// If the underlying vector doesn't support vectored writing, we instead
248    /// simply write the first non-empty buffer with `write`. This way, we
249    /// get the benefits of more granular partial-line handling without losing
250    /// anything in efficiency
251    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
252        // If there's no specialized behavior for write_vectored, just use
253        // write. This has the benefit of more granular partial-line handling.
254        if !self.is_write_vectored() {
255            return match bufs.iter().find(|buf| !buf.is_empty()) {
256                Some(buf) => self.write(buf),
257                None => Ok(0),
258            };
259        }
260
261        // Only scan (and operate on) the first MAX_BUFS_TO_SCAN slices. The cap
262        // is what keeps write_all_vectored() from going quadratic when callers
263        // pass many newline-free slices -- without it, every iteration of the
264        // outer loop rescans every remaining buffer. 1024 is a portable,
265        // generous upper bound: it is the value of UIO_MAXIOV / IOV_MAX on
266        // Linux and the BSDs (and the hardcoded cap in
267        // sys::net::connection::socket::solid), so on those platforms it also
268        // lines up with the most a single writev() can retire. On platforms
269        // whose syscall cap is smaller (POSIX requires only 16) or that have no
270        // cap at all (Windows), the constant still serves its primary purpose
271        // of bounding scan work.
272        //
273        // Everything past the cap is left untouched for the next call; the
274        // outer loop in write_all_vectored() makes forward progress via the
275        // short return value, and correctness is preserved everywhere. We hand
276        // the capped prefix to a helper so the rest of the logic can only ever
277        // see -- and therefore only ever write or buffer -- buffers we have
278        // actually scanned for newlines.
279        const MAX_BUFS_TO_SCAN: usize = 1024;
280        self.write_vectored_scanned(&bufs[..bufs.len().min(MAX_BUFS_TO_SCAN)])
281    }
282
283    fn is_write_vectored(&self) -> bool {
284        self.inner().is_write_vectored()
285    }
286
287    /// Writes some data into this BufWriter with line buffering.
288    ///
289    /// This means that, if any newlines are present in the data, the data up to
290    /// the last newline is sent directly to the underlying writer, and data
291    /// after it is buffered.
292    ///
293    /// Because this function attempts to send completed lines to the underlying
294    /// writer, it will also flush the existing buffer if it contains any
295    /// newlines, even if the incoming data does not contain any newlines.
296    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
297        match memchr::memrchr(b'\n', buf) {
298            // If there are no new newlines (that is, if this write is less than
299            // one line), just do a regular buffered write (which may flush if
300            // we exceed the inner buffer's size)
301            None => {
302                self.flush_if_completed_line()?;
303                self.buffer.write_all(buf)
304            }
305            Some(newline_idx) => {
306                let (lines, tail) = buf.split_at(newline_idx + 1);
307
308                if self.buffered().is_empty() {
309                    self.inner_mut().write_all(lines)?;
310                } else {
311                    // If there is any buffered data, we add the incoming lines
312                    // to that buffer before flushing, which saves us at least
313                    // one write call. We can't really do this with `write`,
314                    // since we can't do this *and* not suppress errors *and*
315                    // report a consistent state to the caller in a return
316                    // value, but here in write_all it's fine.
317                    self.buffer.write_all(lines)?;
318                    self.buffer.flush_buf()?;
319                }
320
321                self.buffer.write_all(tail)
322            }
323        }
324    }
325}