Grok 20.3.2
ChunkBuffer.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2016-2026 Grok Image Compression Inc.
3 *
4 * This source code is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Affero General Public License, version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This source code is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Affero General Public License for more details.
12 *
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 */
17
18#pragma once
19
20#include <mutex>
21#include <condition_variable>
22#include <vector>
23#include <deque>
24#include <cstdint>
25#include <stdexcept>
26#include <list>
27#include <iostream>
28
29#include "MinHeap.h"
30
31namespace grk
32{
33
45template<typename T = uint16_t>
47{
48public:
49 using index_type = T;
50 ChunkBuffer(size_t chunkSize, size_t offset, size_t length)
51 : chunkSize_(chunkSize > length ? length : chunkSize),
52 offset_(offset > length ? length : offset), length_(length), initialOffset_(offset_),
54 {
55 size_t workingLength = length_ - initialOffset_;
56 size_t numChunks = (workingLength + chunkSize_ - 1) / chunkSize_;
57 buffers_.resize(numChunks);
58 }
59 ~ChunkBuffer() = default;
60
61 size_t size() const
62 {
63 return length_;
64 }
65
66 size_t offset() const
67 {
68 return offset_;
69 }
70
71 size_t initialOffset() const
72 {
73 return initialOffset_;
74 }
75
76 size_t chunkSize() const
77 {
78 return chunkSize_;
79 }
80
81 bool set_offset(size_t new_offset)
82 {
83 std::unique_lock<std::mutex> lock(mutex_);
84 if(new_offset > length_)
85 {
86#ifdef DEBUG_SEG_BUF
87 grklog.warn("ChunkBuffer: attempt to increment offset out of bounds");
88#endif
90 return false;
91 }
92 if(new_offset > contiguous_length_)
93 cv_.wait(lock, [this, new_offset]() { return new_offset <= contiguous_length_; });
94 offset_ = new_offset;
95 return true;
96 }
97
98 bool increment_offset(std::ptrdiff_t off)
99 {
100 if(off < 0)
101 return false;
102 else if(off == 0)
103 return true;
104 std::unique_lock<std::mutex> lock(mutex_);
105
106 size_t new_offset = offset_ + (size_t)off;
107 if(new_offset > length_)
108 {
109#ifdef DEBUG_SEG_BUF
110 grklog.warn("ChunkBuffer: attempt to increment offset out of bounds");
111#endif
113 return false;
114 }
115 if(new_offset > contiguous_length_)
116 cv_.wait(lock, [this, new_offset]() { return new_offset <= contiguous_length_; });
117 offset_ = new_offset;
118 return true;
119 }
120 const uint8_t* currPtr(size_t desired_region) const
121 {
122 std::unique_lock<std::mutex> lock(mutex_);
123 size_t relative_offset = offset_ - initialOffset_;
124
125 // Wait until the desired region is contiguous
126 if(relative_offset + desired_region > contiguous_length_ - initialOffset_)
127 {
128 cv_.wait(lock, [this, relative_offset, desired_region]() {
129 return relative_offset + desired_region <= contiguous_length_ - initialOffset_;
130 });
131 }
132
133 size_t start_chunk = relative_offset / chunkSize_;
134 size_t offset_in_chunk = relative_offset % chunkSize_;
135 size_t end_offset = relative_offset + desired_region;
136
137 // 1. Handle out of bounds
138 if(end_offset > length_ - initialOffset_)
139 {
140 std::cout << "Warning: Out of bounds - truncating";
141 end_offset = length_ - initialOffset_;
142 }
143
144 // 2. Try to find region in buffers
145 auto chunk_idx = start_chunk - static_cast<size_t>(base_);
146 if(chunk_idx >= buffers_.size() || buffers_[chunk_idx].empty())
147 {
148 throw std::runtime_error("Missing chunk in contiguous sequence");
149 }
150 const auto& chunk_data = buffers_[chunk_idx];
151 if(offset_in_chunk + desired_region <= chunk_data.size())
152 {
153 return chunk_data.data() + offset_in_chunk;
154 }
155
156 // 3. Create contiguous buffer (store relative offset for correct free_before comparison)
157 auto& result = owned_buffers_.emplace_back(relative_offset, std::vector<uint8_t>());
158 auto& buffer = result.second;
159 buffer.reserve(desired_region);
160 size_t bytes_remaining = desired_region;
161 for(size_t i = start_chunk; bytes_remaining > 0 && static_cast<T>(i) <= last_contiguous_chunk_;
162 ++i)
163 {
164 auto idx = i - static_cast<size_t>(base_);
165 if(idx >= buffers_.size() || buffers_[idx].empty())
166 {
167 throw std::runtime_error("Missing chunk in contiguous sequence");
168 }
169 const auto& cd = buffers_[idx];
170 size_t chunk_start = (i == start_chunk) ? offset_in_chunk : 0;
171 size_t bytes_to_copy = std::min(bytes_remaining, cd.size() - chunk_start);
172 buffer.insert(buffer.end(), cd.begin() + static_cast<ptrdiff_t>(chunk_start),
173 cd.begin() + static_cast<ptrdiff_t>(chunk_start + bytes_to_copy));
174 bytes_remaining -= bytes_to_copy;
175 }
176 return buffer.data();
177 }
178 void add(T fetch_index, const uint8_t* buffer, size_t size)
179 {
180 if(size > chunkSize_)
181 {
182 throw std::runtime_error("Buffer size exceeds chunk size");
183 }
184
185 {
186 std::unique_lock<std::mutex> lock(mutex_);
187 // Add the new chunk to buffers
188 auto idx = static_cast<size_t>(fetch_index) - static_cast<size_t>(base_);
189 buffers_[idx] = std::vector<uint8_t>(buffer, buffer + size);
190 // Update the heap and get the last contiguous chunk
191 auto contiguous_chunk = bufferheap_.push_and_pop(fetch_index);
192
193 // Update contiguous_length_ only if it grows
194 size_t new_contiguous_length = initialOffset_; // Default to initial offset
195 if(contiguous_chunk.has_value())
196 {
197 // Valid contiguous chunk exists
198 last_contiguous_chunk_ = *contiguous_chunk;
199 auto ci = static_cast<size_t>(last_contiguous_chunk_) - static_cast<size_t>(base_);
200 if(ci >= buffers_.size() || buffers_[ci].empty())
201 {
202 throw std::runtime_error("Invalid contiguous chunk index returned by heap");
203 }
204 new_contiguous_length =
206 }
207 new_contiguous_length = std::min(new_contiguous_length, length_);
208 if(new_contiguous_length > contiguous_length_)
209 {
210 contiguous_length_ = new_contiguous_length;
211 }
212 }
213 cv_.notify_all();
214 }
215 void free_before(size_t offset)
216 {
217 std::lock_guard<std::mutex> lock(mutex_);
218 // offset is relative (from initialOffset_); clamp to relative contiguous length
219 size_t rel_contiguous = contiguous_length_ - initialOffset_;
220 if(offset > rel_contiguous)
221 offset = rel_contiguous;
222
223 // 1. remove owned buffers whose end offset is less than or equal to offset
224 owned_buffers_.remove_if([offset](const std::pair<size_t, std::vector<uint8_t>>& buf) {
225 size_t buf_end = buf.first + buf.second.size();
226 return buf_end <= offset;
227 });
228
229 // 2. remove chunk buffers from the front
230 while(!buffers_.empty() && !buffers_.front().empty())
231 {
232 size_t chunk_end = static_cast<size_t>(base_) * chunkSize_ + buffers_.front().size();
233 if(chunk_end <= offset)
234 {
235 buffers_.front().clear();
236 buffers_.front().shrink_to_fit();
237 buffers_.pop_front();
238 base_++;
239 }
240 else
241 {
242 break;
243 }
244 }
245 }
246
247private:
249 size_t offset_;
250 size_t length_;
252 std::deque<std::vector<uint8_t>> buffers_;
253 T base_ = 0;
256 size_t contiguous_length_ = 0; // Peak contiguous length, only grows
257 mutable std::list<std::pair<size_t, std::vector<uint8_t>>> owned_buffers_; // Mutable for currPtr
258 mutable std::mutex mutex_;
259 mutable std::condition_variable cv_;
260};
261
262} // namespace grk
SimpleHeap< T > bufferheap_
Definition ChunkBuffer.h:254
T last_contiguous_chunk_
Definition ChunkBuffer.h:255
bool increment_offset(std::ptrdiff_t off)
Definition ChunkBuffer.h:98
std::mutex mutex_
Definition ChunkBuffer.h:258
ChunkBuffer(size_t chunkSize, size_t offset, size_t length)
Definition ChunkBuffer.h:50
std::deque< std::vector< uint8_t > > buffers_
Definition ChunkBuffer.h:252
T base_
Definition ChunkBuffer.h:253
std::list< std::pair< size_t, std::vector< uint8_t > > > owned_buffers_
Definition ChunkBuffer.h:257
size_t chunkSize() const
Definition ChunkBuffer.h:76
void free_before(size_t offset)
Definition ChunkBuffer.h:215
~ChunkBuffer()=default
size_t initialOffset() const
Definition ChunkBuffer.h:71
size_t offset() const
Definition ChunkBuffer.h:66
size_t length_
Definition ChunkBuffer.h:250
size_t contiguous_length_
Definition ChunkBuffer.h:256
T index_type
Definition ChunkBuffer.h:49
const uint8_t * currPtr(size_t desired_region) const
Definition ChunkBuffer.h:120
size_t chunkSize_
Definition ChunkBuffer.h:248
bool set_offset(size_t new_offset)
Definition ChunkBuffer.h:81
std::condition_variable cv_
Definition ChunkBuffer.h:259
size_t initialOffset_
Definition ChunkBuffer.h:251
size_t offset_
Definition ChunkBuffer.h:249
size_t size() const
Definition ChunkBuffer.h:61
void add(T fetch_index, const uint8_t *buffer, size_t size)
Definition ChunkBuffer.h:178
A simple non-thread-safe min-heap for tracking contiguous sequences of size_t indices.
Definition MinHeap.h:34
ResWindow.
Definition CompressedChunkCache.h:36
ILogger & grklog
Definition Logger.cpp:24