std/io/buffered/linewritershim.rs
1use core::slice::memchr;
2
3use crate::io::{self, BufWriter, IoSlice, Write};
4
5/// Private helper struct for implementing the line-buffered writing logic.
6///
7/// This shim temporarily wraps a BufWriter, and uses its internals to
8/// implement a line-buffered writer (specifically by using the internal
9/// methods like write_to_buf and flush_buf). In this way, a more
10/// efficient abstraction can be created than one that only had access to
11/// `write` and `flush`, without needlessly duplicating a lot of the
12/// implementation details of BufWriter. This also allows existing
13/// `BufWriters` to be temporarily given line-buffering logic; this is what
14/// enables Stdout to be alternately in line-buffered or block-buffered mode.
15#[derive(Debug)]
16pub struct LineWriterShim<'a, W: ?Sized + Write> {
17 buffer: &'a mut BufWriter<W>,
18}
19
20impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
21 pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
22 Self { buffer }
23 }
24
25 /// Gets a reference to the inner writer (that is, the writer
26 /// wrapped by the BufWriter).
27 fn inner(&self) -> &W {
28 self.buffer.get_ref()
29 }
30
31 /// Gets a mutable reference to the inner writer (that is, the writer
32 /// wrapped by the BufWriter). Be careful with this writer, as writes to
33 /// it will bypass the buffer.
34 fn inner_mut(&mut self) -> &mut W {
35 self.buffer.get_mut()
36 }
37
38 /// Gets the content currently buffered in self.buffer
39 fn buffered(&self) -> &[u8] {
40 self.buffer.buffer()
41 }
42
43 /// Flushes the buffer iff the last byte is a newline (indicating that an
44 /// earlier write only succeeded partially, and we want to retry flushing
45 /// the buffered line before continuing with a subsequent write).
46 fn flush_if_completed_line(&mut self) -> io::Result<()> {
47 match self.buffered().last().copied() {
48 Some(b'\n') => self.buffer.flush_buf(),
49 _ => Ok(()),
50 }
51 }
52
53 /// Vectored line-buffered write over an already-capped list of buffers.
54 ///
55 /// The caller is responsible for trimming `bufs` to the prefix it is
56 /// willing to scan (see `MAX_BUFS_TO_SCAN`). This method only ever writes
57 /// or buffers bytes from `bufs`, so any newline it might bury in the
58 /// `BufWriter` is one it has itself scanned for -- buffers the caller
59 /// dropped past the cap can never end up stuck in the buffer. Bytes not
60 /// accounted for in the return value are left for the next call.
61 fn write_vectored_scanned(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
62 // Find the buffer containing the last newline.
63 let last_newline_buf_idx = bufs
64 .iter()
65 .enumerate()
66 .rev()
67 .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
68
69 // If there are no new newlines (that is, if this write is less than
70 // one line), just do a regular buffered write.
71 let last_newline_buf_idx = match last_newline_buf_idx {
72 None => {
73 self.flush_if_completed_line()?;
74 return self.buffer.write_vectored(bufs);
75 }
76 Some(i) => i,
77 };
78
79 // Flush existing content to prepare for our write.
80 self.buffer.flush_buf()?;
81
82 // This is what we're going to try to write directly to the inner
83 // writer. The rest will be buffered, if nothing goes wrong.
84 let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
85
86 // Write `lines` directly to the inner writer. In keeping with the
87 // `write` convention, make at most one attempt to add new (unbuffered)
88 // data. Because this write doesn't touch the BufWriter state directly,
89 // and the buffer is known to be empty, we don't need to worry about
90 // self.panicked here.
91 let flushed = self.inner_mut().write_vectored(lines)?;
92
93 // If inner returns Ok(0), propagate that to the caller without
94 // doing additional buffering; otherwise we're just guaranteeing
95 // an "ErrorKind::WriteZero" later.
96 if flushed == 0 {
97 return Ok(0);
98 }
99
100 // Don't try to reconstruct the exact amount written; just bail
101 // in the event of a partial write.
102 let mut lines_len: usize = 0;
103 for buf in lines {
104 // With overlapping/duplicate slices the total length may in theory
105 // exceed usize::MAX
106 lines_len = lines_len.saturating_add(buf.len());
107 if flushed < lines_len {
108 return Ok(flushed);
109 }
110 }
111
112 // Now that the write has succeeded, buffer the rest (or as much of the
113 // rest as possible). `tail` is the part of the scanned prefix after the
114 // last newline, so it cannot contain a newline of its own.
115 let buffered: usize = tail
116 .iter()
117 .filter(|buf| !buf.is_empty())
118 .map(|buf| self.buffer.write_to_buf(buf))
119 .take_while(|&n| n > 0)
120 .sum();
121
122 Ok(flushed + buffered)
123 }
124}
125
126impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
127 /// Writes some data into this BufWriter with line buffering.
128 ///
129 /// This means that, if any newlines are present in the data, the data up to
130 /// the last newline is sent directly to the underlying writer, and data
131 /// after it is buffered. Returns the number of bytes written.
132 ///
133 /// This function operates on a "best effort basis"; in keeping with the
134 /// convention of `Write::write`, it makes at most one attempt to write
135 /// new data to the underlying writer. If that write only reports a partial
136 /// success, the remaining data will be buffered.
137 ///
138 /// Because this function attempts to send completed lines to the underlying
139 /// writer, it will also flush the existing buffer if it ends with a
140 /// newline, even if the incoming data does not contain any newlines.
141 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
142 let newline_idx = match memchr::memrchr(b'\n', buf) {
143 // If there are no new newlines (that is, if this write is less than
144 // one line), just do a regular buffered write (which may flush if
145 // we exceed the inner buffer's size)
146 None => {
147 self.flush_if_completed_line()?;
148 return self.buffer.write(buf);
149 }
150 // Otherwise, arrange for the lines to be written directly to the
151 // inner writer.
152 Some(newline_idx) => newline_idx + 1,
153 };
154
155 // Flush existing content to prepare for our write. We have to do this
156 // before attempting to write `buf` in order to maintain consistency;
157 // if we add `buf` to the buffer then try to flush it all at once,
158 // we're obligated to return Ok(), which would mean suppressing any
159 // errors that occur during flush.
160 self.buffer.flush_buf()?;
161
162 // This is what we're going to try to write directly to the inner
163 // writer. The rest will be buffered, if nothing goes wrong.
164 let lines = &buf[..newline_idx];
165
166 // Write `lines` directly to the inner writer. In keeping with the
167 // `write` convention, make at most one attempt to add new (unbuffered)
168 // data. Because this write doesn't touch the BufWriter state directly,
169 // and the buffer is known to be empty, we don't need to worry about
170 // self.buffer.panicked here.
171 let flushed = self.inner_mut().write(lines)?;
172
173 // If buffer returns Ok(0), propagate that to the caller without
174 // doing additional buffering; otherwise we're just guaranteeing
175 // an "ErrorKind::WriteZero" later.
176 if flushed == 0 {
177 return Ok(0);
178 }
179
180 // Now that the write has succeeded, buffer the rest (or as much of
181 // the rest as possible). If there were any unwritten newlines, we
182 // only buffer out to the last unwritten newline that fits in the
183 // buffer; this helps prevent flushing partial lines on subsequent
184 // calls to LineWriterShim::write.
185
186 // Handle the cases in order of most-common to least-common, under
187 // the presumption that most writes succeed in totality, and that most
188 // writes are smaller than the buffer.
189 // - Is this a partial line (ie, no newlines left in the unwritten tail)
190 // - If not, does the data out to the last unwritten newline fit in
191 // the buffer?
192 // - If not, scan for the last newline that *does* fit in the buffer
193 let tail = if flushed >= newline_idx {
194 let tail = &buf[flushed..];
195 // Avoid unnecessary short writes by not splitting the remaining
196 // bytes if they're larger than the buffer.
197 // They can be written in full by the next call to write.
198 if tail.len() >= self.buffer.capacity() {
199 return Ok(flushed);
200 }
201 tail
202 } else if newline_idx - flushed <= self.buffer.capacity() {
203 &buf[flushed..newline_idx]
204 } else {
205 let scan_area = &buf[flushed..];
206 let scan_area = &scan_area[..self.buffer.capacity()];
207 match memchr::memrchr(b'\n', scan_area) {
208 Some(newline_idx) => &scan_area[..newline_idx + 1],
209 None => scan_area,
210 }
211 };
212
213 let buffered = self.buffer.write_to_buf(tail);
214 Ok(flushed + buffered)
215 }
216
217 fn flush(&mut self) -> io::Result<()> {
218 self.buffer.flush()
219 }
220
221 /// Writes some vectored data into this BufWriter with line buffering.
222 ///
223 /// This means that, if any newlines are present in the data, the data up to
224 /// and including the buffer containing the last newline is sent directly to
225 /// the inner writer, and the data after it is buffered. Returns the number
226 /// of bytes written.
227 ///
228 /// This function operates on a "best effort basis"; in keeping with the
229 /// convention of `Write::write`, it makes at most one attempt to write
230 /// new data to the underlying writer.
231 ///
232 /// Because this function attempts to send completed lines to the underlying
233 /// writer, it will also flush the existing buffer if it contains any
234 /// newlines.
235 ///
236 /// Because sorting through an array of `IoSlice` can be a bit convoluted,
237 /// This method differs from write in the following ways:
238 ///
239 /// - It attempts to write the full content of all the buffers up to and
240 /// including the one containing the last newline. This means that it
241 /// may attempt to write a partial line, that buffer has data past the
242 /// newline.
243 /// - If the write only reports partial success, it does not attempt to
244 /// find the precise location of the written bytes and buffer the rest.
245 ///
246 /// If the underlying vector doesn't support vectored writing, we instead
247 /// simply write the first non-empty buffer with `write`. This way, we
248 /// get the benefits of more granular partial-line handling without losing
249 /// anything in efficiency
250 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
251 // If there's no specialized behavior for write_vectored, just use
252 // write. This has the benefit of more granular partial-line handling.
253 if !self.is_write_vectored() {
254 return match bufs.iter().find(|buf| !buf.is_empty()) {
255 Some(buf) => self.write(buf),
256 None => Ok(0),
257 };
258 }
259
260 // Only scan (and operate on) the first MAX_BUFS_TO_SCAN slices. The cap
261 // is what keeps write_all_vectored() from going quadratic when callers
262 // pass many newline-free slices -- without it, every iteration of the
263 // outer loop rescans every remaining buffer. 1024 is a portable,
264 // generous upper bound: it is the value of UIO_MAXIOV / IOV_MAX on
265 // Linux and the BSDs (and the hardcoded cap in
266 // sys::net::connection::socket::solid), so on those platforms it also
267 // lines up with the most a single writev() can retire. On platforms
268 // whose syscall cap is smaller (POSIX requires only 16) or that have no
269 // cap at all (Windows), the constant still serves its primary purpose
270 // of bounding scan work.
271 //
272 // Everything past the cap is left untouched for the next call; the
273 // outer loop in write_all_vectored() makes forward progress via the
274 // short return value, and correctness is preserved everywhere. We hand
275 // the capped prefix to a helper so the rest of the logic can only ever
276 // see -- and therefore only ever write or buffer -- buffers we have
277 // actually scanned for newlines.
278 const MAX_BUFS_TO_SCAN: usize = 1024;
279 self.write_vectored_scanned(&bufs[..bufs.len().min(MAX_BUFS_TO_SCAN)])
280 }
281
282 fn is_write_vectored(&self) -> bool {
283 self.inner().is_write_vectored()
284 }
285
286 /// Writes some data into this BufWriter with line buffering.
287 ///
288 /// This means that, if any newlines are present in the data, the data up to
289 /// the last newline is sent directly to the underlying writer, and data
290 /// after it is buffered.
291 ///
292 /// Because this function attempts to send completed lines to the underlying
293 /// writer, it will also flush the existing buffer if it contains any
294 /// newlines, even if the incoming data does not contain any newlines.
295 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
296 match memchr::memrchr(b'\n', buf) {
297 // If there are no new newlines (that is, if this write is less than
298 // one line), just do a regular buffered write (which may flush if
299 // we exceed the inner buffer's size)
300 None => {
301 self.flush_if_completed_line()?;
302 self.buffer.write_all(buf)
303 }
304 Some(newline_idx) => {
305 let (lines, tail) = buf.split_at(newline_idx + 1);
306
307 if self.buffered().is_empty() {
308 self.inner_mut().write_all(lines)?;
309 } else {
310 // If there is any buffered data, we add the incoming lines
311 // to that buffer before flushing, which saves us at least
312 // one write call. We can't really do this with `write`,
313 // since we can't do this *and* not suppress errors *and*
314 // report a consistent state to the caller in a return
315 // value, but here in write_all it's fine.
316 self.buffer.write_all(lines)?;
317 self.buffer.flush_buf()?;
318 }
319
320 self.buffer.write_all(tail)
321 }
322 }
323 }
324}