Grok 20.3.2
BufferedStream.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2016-2026 Grok Image Compression Inc.
3 *
4 * This source code is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Affero General Public License, version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This source code is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Affero General Public License for more details.
12 *
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 */
17
18#pragma once
19
20#include "MemAdvisor.h"
21#include "buffer.h"
22
23namespace grk
24{
25
26#define GROK_STREAM_STATUS_OUTPUT 0x1U
27#define GROK_STREAM_STATUS_INPUT 0x2U
28#define GROK_STREAM_STATUS_END 0x4U
29#define GROK_STREAM_STATUS_ERROR 0x8U
30
41struct BufferedStream : public IStream
42{
52 BufferedStream(uint8_t* buffer, size_t initial_buffer_size, size_t buffer_size, bool is_input)
54 originalBufferLength_(buffer_size)
55 {
56 if(!buffer_size)
57 throw std::runtime_error("BufferedStream: buffer size cannot be zero");
58
59 initial_buffer_size = std::max(initial_buffer_size, buffer_size);
60 if(buffer)
61 buf_ = std::make_unique<BufferAligned8>(buffer, initial_buffer_size, false);
62 else
63 buf_ = std::make_unique<BufferAligned8>(
64 (uint8_t*)grk_aligned_malloc(4096, initial_buffer_size), initial_buffer_size, true);
65 }
66
76
77 IStream* bifurcate(void) override
78 {
79 if(!isMemStream())
80 return nullptr;
81
82 auto memStream = static_cast<MemStream*>(userData_);
83 auto stream = new BufferedStream(memStream->buf_, 0, memStream->len_, true);
84 stream->setFormat(format_);
85 stream->setUserData(memStream, nullptr, memStream->len_);
86 memStreamSetup(stream, true);
87 stream->streamOffset_ = streamOffset_;
88 stream->bufferedBytes_ = memStream->len_ - streamOffset_;
89 stream->readBytesSeekable_ = bufferedBytes_;
90 stream->buf_->set_offset(buf_->offset());
91
92 return stream;
93 }
94
100 void setFetcher(IFetcher* fetcher)
101 {
102 fetcher_ = fetcher;
103 }
104
110 IFetcher* getFetcher(void) override
111 {
112 return fetcher_;
113 }
114
116 {
117 memAdvisor_ = advisor;
118 }
119
120 void memAdvise(size_t virtual_offset, size_t length, GrkAccessPattern pattern) override
121 {
122 if(memAdvisor_)
123 memAdvisor_->advise(virtual_offset, length, pattern);
124 }
125
126 void setChunkBuffer(std::shared_ptr<ChunkBuffer<>> chunkBuffer) override
127 {
128 chunk_buf_ = chunkBuffer;
129 streamOffset_ = chunkBuffer->offset();
132 }
133
134 void setUserData(void* data, grk_stream_free_user_data_fn freeUserDataFun, uint64_t len) override
135 {
136 userData_ = data;
137 freeUserDataCallback_ = freeUserDataFun;
138 userDataLength_ = len;
139 }
140
141 void setCallbacks(StreamCallbacks& callbacks) override
142 {
143 callbacks_ = callbacks;
144 }
145
154 size_t read(uint8_t* buffer, uint8_t** zero_copy_buffer, size_t len) override
155 {
156 if(buffer)
157 return read(buffer, len);
158 if(!zero_copy_buffer)
159 throw std::runtime_error("Missing zero copy buffer.");
160 *zero_copy_buffer = chunk_buf_ ? (uint8_t*)chunk_buf_->currPtr(len) : buf_->currPtr(len);
161
162 return read(nullptr, len);
163 }
164 bool write24u(uint32_t value) override
165 {
166 return write_non_template((const uint8_t*)&value, sizeof(uint32_t), 3);
167 }
168
169 bool write8u(uint8_t value) override
170 {
171 return writeBytes(&value, 1) == 1;
172 }
173
174 size_t writeBytes(const uint8_t* buffer, size_t len) override
175 {
176 assert(len);
177 if(!len || !buffer)
178 return 0;
179
181 return 0;
182
183 // handle case where there is no internal buffer (memory stream)
184 if(isMemStream())
185 {
186 /* we should do an actual write on the media */
187 auto current_write_nb_bytes = callbacks_.writeCallback_(buffer, len, userData_);
188 writeIncrement(current_write_nb_bytes);
189
190 return current_write_nb_bytes;
191 }
192 size_t write_nb_bytes = 0;
193 while(true)
194 {
195 size_t available_bytes = buf_->num_elts() - bufferedBytes_;
196
197 // we can copy all write bytes to double buffer
198 if(available_bytes >= len)
199 {
200 memcpy(buf_->currPtr(len), buffer, len);
201 write_nb_bytes += len;
202 writeIncrement(len);
203 return write_nb_bytes;
204 }
205
206 // we fill the double buffer with write bytes
207 if(available_bytes)
208 {
209 write_nb_bytes += available_bytes;
210 memcpy(buf_->currPtr(available_bytes), buffer, available_bytes);
211 buf_->set_offset(0);
212 bufferedBytes_ += available_bytes;
213 streamOffset_ += available_bytes;
214 buffer += available_bytes;
215 len -= available_bytes;
216 }
217 // now we can flush the double buffer, and try to write
218 // more bytes
219 if(!flush())
220 return 0;
221 }
222
223 return write_nb_bytes;
224 }
225
226 bool flush() override
227 {
228 if(isMemStream())
229 return true;
230 /* the number of bytes written on the media. */
231 buf_->set_offset(0);
233 {
234 /* we should do an actual write on the media */
235 size_t current_write_nb_bytes =
236 callbacks_.writeCallback_(buf_->currPtr(bufferedBytes_), bufferedBytes_, userData_);
237
238 if(current_write_nb_bytes != bufferedBytes_)
239 {
241 grklog.error("Error on writing stream.");
242 return false;
243 }
244 buf_->increment_offset((ptrdiff_t)current_write_nb_bytes);
245 bufferedBytes_ = 0;
246 }
247 buf_->set_offset(0);
248
249 return true;
250 }
251
252 bool skip(int64_t len) override
253 {
254 return (status_ & GROK_STREAM_STATUS_INPUT) ? readSkip(len) : writeSkip(len);
255 }
256
257 uint64_t tell(void) override
258 {
259 return streamOffset_;
260 }
261
262 uint64_t numBytesLeft(void) override
263 {
265 return userDataLength_ ? (uint64_t)(userDataLength_ - streamOffset_) : 0;
266 }
267
268 bool seek(uint64_t offset) override
269 {
270 return (status_ & GROK_STREAM_STATUS_INPUT) ? readSeek(offset) : writeSeek(offset);
271 }
272
277 bool hasSeek() override
278 {
279 return callbacks_.seekCallback_ != nullptr;
280 }
281
282 bool supportsZeroCopy() override
283 {
285 }
286
287 void setFormat(GRK_CODEC_FORMAT format) override
288 {
289 format_ = format;
290 }
291
293 {
294 return format_;
295 }
296
297 uint8_t* currPtr(void) override
298 {
299 return buf_->currPtr(0);
300 }
301
302 bool isMemStream() override
303 {
304 return !buf_->owns_data();
305 }
306
307private:
315 size_t read(uint8_t* buffer, size_t len)
316 {
317 if(!buffer && !supportsZeroCopy())
318 throw std::exception();
319 assert(len);
320 if(!len)
321 return 0;
322 size_t read_nb_bytes = 0;
323
324 // 1. if stream is at end, then return immediately
326 return 0;
327 // 2. if we have enough bytes in buffer, then read from buffer and return
328 if(len <= bufferedBytes_)
329 {
330 if(buffer)
331 memcpy(buffer, chunk_buf_ ? chunk_buf_->currPtr(len) : buf_->currPtr(len), len);
332 bool rc = chunk_buf_ ? chunk_buf_->increment_offset((ptrdiff_t)len)
333 : buf_->increment_offset((ptrdiff_t)len);
334 if(!rc)
335 {
337 return 0;
338 }
339 bufferedBytes_ -= len;
341 read_nb_bytes += len;
342 streamOffset_ += len;
344 return read_nb_bytes;
345 }
346 // 2b. ChunkBuffer exhausted: read remaining and signal end
347 if(chunk_buf_)
348 {
350 {
351 read_nb_bytes += bufferedBytes_;
352 if(buffer)
353 memcpy(buffer, chunk_buf_->currPtr(bufferedBytes_), bufferedBytes_);
354 chunk_buf_->increment_offset((ptrdiff_t)bufferedBytes_);
356 bufferedBytes_ = 0;
357 }
359 return read_nb_bytes;
360 }
361 // 3. if stream is at end, then read remaining bytes in buffer and return
363 {
364 read_nb_bytes += bufferedBytes_;
365 if(buffer && bufferedBytes_)
366 memcpy(buffer, buf_->currPtr(bufferedBytes_), bufferedBytes_);
370 return read_nb_bytes;
371 }
372 // 4. read remaining bytes in buffer
374 {
375 read_nb_bytes += bufferedBytes_;
376 if(buffer)
377 {
378 memcpy(buffer, buf_->currPtr(bufferedBytes_), bufferedBytes_);
379 buffer += bufferedBytes_;
380 }
381 len -= bufferedBytes_;
384 bufferedBytes_ = 0;
385 }
386
387 // 5. read from "media"
389 // direct read into buffer
390 if(len > buf_->num_elts())
391 {
392 auto b_read = readDirect(buffer, len);
393 return read_nb_bytes + b_read;
394 }
395 if(!firstCache_)
396 buf_->set_num_elts(originalBufferLength_);
397 while(true)
398 {
400 callbacks_.readCallback_(buf_->currPtr(buf_->num_elts()), buf_->num_elts(), userData_);
401 // sanity check on external read function
402 if(bufferedBytes_ > buf_->num_elts())
403 {
404 grklog.error("Buffered stream: read length greater than buffer length");
405 break;
406 }
408 // i) end of stream
409 if(bufferedBytes_ == 0)
410 {
413 break;
414 }
415 // ii) or not enough data
416 else if(bufferedBytes_ < len)
417 {
418 read_nb_bytes += bufferedBytes_;
419 if(buffer)
420 {
421 memcpy(buffer, buf_->currPtr(bufferedBytes_), bufferedBytes_);
422 buffer += bufferedBytes_;
423 }
424 len -= bufferedBytes_;
428 }
429 // iii) or we have read the exact amount requested
430 else
431 {
432 read_nb_bytes += len;
433 if(buffer && len)
434 memcpy(buffer, buf_->currPtr(len), len);
435 buf_->increment_offset((ptrdiff_t)len);
436 bufferedBytes_ -= len;
438 streamOffset_ += len;
440 break;
441 }
442 }
443 firstCache_ = false;
444 return read_nb_bytes;
445 }
446
454 size_t readDirect(uint8_t* buffer, size_t len)
455 {
456 size_t read_nb_bytes = 0;
457 size_t remaining = len;
458 while(true)
459 {
460 auto buffered_bytes = callbacks_.readCallback_(buffer, remaining, userData_);
461 // sanity check on external read function
462 if(buffered_bytes > remaining)
463 {
464 grklog.error("Buffered stream: read length greater than buffer length");
465 return 0;
466 }
467 // i) end of stream
468 if(buffered_bytes == 0)
469 {
471 return read_nb_bytes;
472 }
473 else
474 {
475 read_nb_bytes += buffered_bytes;
476 buffer += buffered_bytes;
477 remaining -= buffered_bytes;
478 streamOffset_ += buffered_bytes;
480 if(read_nb_bytes == len)
481 return read_nb_bytes;
482 }
483 }
484 return 0;
485 }
486
492 bool writeSkip(int64_t len)
493 {
494 auto offset = (int64_t)streamOffset_ + len;
495 if(offset < 0)
496 return false;
497 return writeSeek((uint64_t)offset);
498 }
499
504 bool readSkip(int64_t len)
505 {
506 auto offset = (int64_t)streamOffset_ + len;
507 if(offset < 0)
508 return false;
509
510 return readSeek((uint64_t)offset);
511 }
512
518 bool readSeek(uint64_t offset)
519 {
521 return false;
522
523 if(chunk_buf_)
524 {
525 bool rc = chunk_buf_->set_offset(offset);
526 if(rc)
527 {
528 streamOffset_ = chunk_buf_->offset();
531 }
532 return rc;
533 }
534
535 // 1. try to seek in buffer
537 {
538 if((offset >= streamOffset_ && offset < streamOffset_ + bufferedBytes_) ||
539 (offset < streamOffset_ &&
541 {
542 auto increment = (int64_t)offset - (int64_t)streamOffset_;
543 streamOffset_ = offset;
545 buf_->increment_offset((ptrdiff_t)increment);
546 bufferedBytes_ = (size_t)((int64_t)bufferedBytes_ - increment);
548
549 return true;
550 }
551 }
552
553 // 2. Since we can't seek in buffer, we must invalidate
554 // buffer contents and seek in media
556 if(!(callbacks_.seekCallback_(offset, userData_)))
557 {
559 return false;
560 }
561 else
562 {
564 streamOffset_ = offset;
566 {
568 return false;
569 }
570 }
571 return true;
572 }
573
579 bool writeSeek(uint64_t offset)
580 {
582 return false;
583
584 if(!flush())
585 {
587 return false;
588 }
590 if(!callbacks_.seekCallback_(offset, userData_))
591 {
593 return false;
594 }
595 else
596 {
597 streamOffset_ = offset;
598 }
599 if(isMemStream())
600 buf_->set_offset(offset);
601 return true;
602 }
603
604 void writeIncrement(size_t len)
605 {
606 buf_->increment_offset((ptrdiff_t)len);
607 if(!isMemStream())
608 bufferedBytes_ += len;
609 else
610 assert(bufferedBytes_ == 0);
611 streamOffset_ += len;
612 }
613
614 bool write_non_template(const uint8_t* value, uint8_t sizeOfType, uint8_t numBytes) override
615 {
617 return false;
618 if(numBytes > sizeOfType)
619 return false;
620
621 // handle case where there is no internal buffer (buffer stream)
622 if(isMemStream())
623 {
624 // skip first to make sure that we are not at the end of the stream
625 if(!callbacks_.seekCallback_(streamOffset_ + numBytes, userData_))
626 return false;
627 grk_write(buf_->currPtr(), value, sizeOfType, numBytes);
628 writeIncrement(numBytes);
629 return true;
630 }
631 size_t remaining_bytes = buf_->num_elts() - bufferedBytes_;
632 if(remaining_bytes < numBytes)
633 {
634 if(!flush())
635 return false;
636 }
637 grk_write(buf_->currPtr(), value, sizeOfType, numBytes);
638 writeIncrement(numBytes);
639 return true;
640 }
642 {
643 buf_->set_offset(0);
644 bufferedBytes_ = 0;
647 }
648
652 void* userData_ = nullptr;
664 uint64_t userDataLength_ = 0;
665
670 uint32_t status_ = 0;
671
675 std::unique_ptr<BufferAligned8> buf_;
676 std::shared_ptr<ChunkBuffer<>> chunk_buf_;
677
678 // number of bytes read in, or slated for write
679 size_t bufferedBytes_ = 0;
680
681 // number of seekable bytes in buffer. This will equal
682 // the number of bytes
683 // read in the last media read.
684 // We always have buffered_bytes_ <= read_bytes_seekable_
686
687 // number of bytes read/written from the beginning of the stream
688 uint64_t streamOffset_ = 0;
689
691
692 bool firstCache_ = true;
694
695 IFetcher* fetcher_ = nullptr;
697};
698
699} // namespace grk
#define GROK_STREAM_STATUS_INPUT
Definition BufferedStream.h:27
#define GROK_STREAM_STATUS_OUTPUT
Definition BufferedStream.h:26
#define GROK_STREAM_STATUS_ERROR
Definition BufferedStream.h:29
#define GROK_STREAM_STATUS_END
Definition BufferedStream.h:28
Manages a partially ordered deque of buffer chunks that are added asynchronously out of order.
Definition ChunkBuffer.h:47
Definition CurlFetcher.h:49
Definition MemAdvisor.h:35
enum _GRK_CODEC_FORMAT GRK_CODEC_FORMAT
Grok Supported JPEG 2000 formats.
@ GRK_CODEC_UNK
Definition grok.h:321
void(* grk_stream_free_user_data_fn)(void *user_data)
Free user data callback.
Definition grok.h:543
ResWindow.
Definition CompressedChunkCache.h:36
GrkAccessPattern
Definition IMemAdvisor.h:24
void * grk_aligned_malloc(size_t bytes)
Definition MemManager.h:308
void memStreamSetup(IStream *stream, bool isReadStream)
Definition MemStream.cpp:137
ILogger & grklog
Definition Logger.cpp:24
void grk_write(uint8_t *dest, const uint8_t *value, uint8_t sizeOfType, uint32_t numBytes)
Definition StreamIO.cpp:25
bool supportsZeroCopy() override
Checks is stream supports zero copy.
Definition BufferedStream.h:282
void setFetcher(IFetcher *fetcher)
Sets the IFetcher.
Definition BufferedStream.h:100
uint32_t status_
Stream status flags.
Definition BufferedStream.h:670
IStream * bifurcate(void) override
Definition BufferedStream.h:77
std::shared_ptr< ChunkBuffer<> > chunk_buf_
Definition BufferedStream.h:676
void setCallbacks(StreamCallbacks &callbacks) override
Sets callbacks.
Definition BufferedStream.h:141
StreamCallbacks callbacks_
Definition BufferedStream.h:666
size_t read(uint8_t *buffer, size_t len)
Reads bytes from stream.
Definition BufferedStream.h:315
uint64_t streamOffset_
Definition BufferedStream.h:688
std::unique_ptr< BufferAligned8 > buf_
Backing buffer.
Definition BufferedStream.h:675
size_t read(uint8_t *buffer, uint8_t **zero_copy_buffer, size_t len) override
Read bytes.
Definition BufferedStream.h:154
bool readSeek(uint64_t offset)
Performs absolute seek in read stream.
Definition BufferedStream.h:518
size_t readBytesSeekable_
Definition BufferedStream.h:685
~BufferedStream() override
Destroys a BufferedStream.
Definition BufferedStream.h:70
IFetcher * fetcher_
Definition BufferedStream.h:695
bool skip(int64_t len) override
Skips bytes in stream.
Definition BufferedStream.h:252
bool hasSeek() override
Checks if stream is seekable.
Definition BufferedStream.h:277
bool readSkip(int64_t len)
Skip bytes in read stream.
Definition BufferedStream.h:504
void * userData_
user data
Definition BufferedStream.h:652
uint64_t tell(void) override
query byte offset of stream (similar to ftell).
Definition BufferedStream.h:257
void setUserData(void *data, grk_stream_free_user_data_fn freeUserDataFun, uint64_t len) override
Sets user data.
Definition BufferedStream.h:134
void invalidateBuffer()
Definition BufferedStream.h:641
void setChunkBuffer(std::shared_ptr< ChunkBuffer<> > chunkBuffer) override
Definition BufferedStream.h:126
size_t bufferedBytes_
Definition BufferedStream.h:679
size_t readDirect(uint8_t *buffer, size_t len)
Reads directly from callback.
Definition BufferedStream.h:454
size_t originalBufferLength_
Definition BufferedStream.h:693
GRK_CODEC_FORMAT getFormat(void) override
Gets codec format (J2K/JP2/MJ2).
Definition BufferedStream.h:292
bool write24u(uint32_t value) override
Writes 3 bytes from uint32_t as big endian.
Definition BufferedStream.h:164
void writeIncrement(size_t len)
Definition BufferedStream.h:604
bool write_non_template(const uint8_t *value, uint8_t sizeOfType, uint8_t numBytes) override
Definition BufferedStream.h:614
uint8_t * currPtr(void) override
Gets current pointer (used for zero copy).
Definition BufferedStream.h:297
void setMemAdvisor(MemAdvisor *advisor)
Definition BufferedStream.h:115
uint64_t numBytesLeft(void) override
Gets the number of bytes left before end of stream.
Definition BufferedStream.h:262
void setFormat(GRK_CODEC_FORMAT format) override
Stores codec format J2K/JP2/MJ2.
Definition BufferedStream.h:287
IFetcher * getFetcher(void) override
Gets the IFetcher.
Definition BufferedStream.h:110
bool flush() override
Flushes stream to disk.
Definition BufferedStream.h:226
bool seek(uint64_t offset) override
Seek bytes from the stream (absolute).
Definition BufferedStream.h:268
grk_stream_free_user_data_fn freeUserDataCallback_
Pointer to function to free user_data_ (nullptr at initialization) when destroying the stream.
Definition BufferedStream.h:658
void memAdvise(size_t virtual_offset, size_t length, GrkAccessPattern pattern) override
Definition BufferedStream.h:120
size_t writeBytes(const uint8_t *buffer, size_t len) override
Writes bytes to stream (no correction for endian!).
Definition BufferedStream.h:174
bool write8u(uint8_t value) override
Writes byte.
Definition BufferedStream.h:169
bool isMemStream() override
Checks if stream is memory stream i.e.
Definition BufferedStream.h:302
bool firstCache_
Definition BufferedStream.h:692
bool writeSkip(int64_t len)
Skips bytes in write stream.
Definition BufferedStream.h:492
BufferedStream(uint8_t *buffer, size_t initial_buffer_size, size_t buffer_size, bool is_input)
Constructs a BufferedStream.
Definition BufferedStream.h:52
bool writeSeek(uint64_t offset)
Performs absolute seek in write stream.
Definition BufferedStream.h:579
GRK_CODEC_FORMAT format_
Definition BufferedStream.h:690
MemAdvisor * memAdvisor_
Definition BufferedStream.h:696
uint64_t userDataLength_
User data length.
Definition BufferedStream.h:664
Definition IStream.h:60
Definition MemStream.h:32
Stores callbacks.
Definition IStream.h:40