Grok 20.3.2
CompressedChunkCache.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2016-2026 Grok Image Compression Inc.
3 *
4 * This source code is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Affero General Public License, version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This source code is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Affero General Public License for more details.
12 *
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 */
17
18#pragma once
19
20#include <cstdint>
21#include <cstdlib>
22#include <cstring>
23#include <list>
24#include <memory>
25#include <mutex>
26#include <string>
27#include <unordered_map>
28#include <vector>
29
30#include "DiskCache.h"
31#include "TPFetchSeq.h"
32#include "MemStream.h"
33#include "IStream.h"
34
35namespace grk
36{
37
53{
54public:
61 explicit CompressedChunkCache(size_t maxBytes = 0, std::shared_ptr<DiskCache> diskCache = nullptr,
62 GRK_CODEC_FORMAT codecFormat = GRK_CODEC_J2K)
63 : maxBytes_(resolveMaxBytes(maxBytes)), currentBytes_(0), diskCache_(std::move(diskCache)),
64 codecFormat_(codecFormat)
65 {}
66
73 void put(uint16_t tileIndex, std::shared_ptr<TPFetchSeq> seq)
74 {
75 if(!seq || seq->empty())
76 return;
77
78 std::lock_guard<std::mutex> lock(mutex_);
79
80 // Remove existing entry if any
81 auto it = entries_.find(tileIndex);
82 if(it != entries_.end())
83 {
84 if(it->second.inMemory)
85 currentBytes_ -= it->second.dataSize;
86 removeLRU(tileIndex);
87 entries_.erase(it);
88 }
89
90 Entry entry;
91 entry.seq = std::move(seq);
92 entry.dataSize = calcSeqDataSize(*entry.seq);
93 entry.inMemory = true;
94 currentBytes_ += entry.dataSize;
95
96 entries_[tileIndex] = std::move(entry);
97 pushFrontLRU(tileIndex);
98
99 evictToFit();
100 }
101
110 std::shared_ptr<TPFetchSeq> get(uint16_t tileIndex)
111 {
112 std::lock_guard<std::mutex> lock(mutex_);
113
114 auto it = entries_.find(tileIndex);
115 if(it == entries_.end())
116 return nullptr;
117
118 auto& entry = it->second;
119
120 // Reload from disk if spilled
121 if(!entry.inMemory)
122 {
123 if(!reloadFromDisk(tileIndex, entry))
124 return nullptr;
125 }
126
127 promoteLRU(tileIndex);
128 return entry.seq;
129 }
130
134 bool contains(uint16_t tileIndex) const
135 {
136 std::lock_guard<std::mutex> lock(mutex_);
137 return entries_.find(tileIndex) != entries_.end();
138 }
139
140 void clear()
141 {
142 std::lock_guard<std::mutex> lock(mutex_);
143 entries_.clear();
144 lruList_.clear();
145 lruMap_.clear();
146 currentBytes_ = 0;
147 if(diskCache_)
148 diskCache_->clear();
149 }
150
151 size_t currentBytes() const
152 {
153 std::lock_guard<std::mutex> lock(mutex_);
154 return currentBytes_;
155 }
156 size_t maxBytes() const
157 {
158 return maxBytes_;
159 }
160 size_t size() const
161 {
162 std::lock_guard<std::mutex> lock(mutex_);
163 return entries_.size();
164 }
165
166private:
167 struct Entry
168 {
169 std::shared_ptr<TPFetchSeq> seq;
170 bool inMemory = true;
171 size_t dataSize = 0;
172 };
173
174 static size_t calcSeqDataSize(const TPFetchSeq& seq)
175 {
176 size_t total = 0;
177 for(size_t i = 0; i < seq.size(); ++i)
178 {
179 const auto& tp = seq[i];
180 if(tp && tp->data_)
181 total += tp->length_;
182 }
183 return total;
184 }
185
186 // LRU helpers
187 void pushFrontLRU(uint16_t tileIndex)
188 {
189 lruList_.push_front(tileIndex);
190 lruMap_[tileIndex] = lruList_.begin();
191 }
192
193 void removeLRU(uint16_t tileIndex)
194 {
195 auto it = lruMap_.find(tileIndex);
196 if(it != lruMap_.end())
197 {
198 lruList_.erase(it->second);
199 lruMap_.erase(it);
200 }
201 }
202
203 void promoteLRU(uint16_t tileIndex)
204 {
205 auto it = lruMap_.find(tileIndex);
206 if(it != lruMap_.end())
207 {
208 lruList_.splice(lruList_.begin(), lruList_, it->second);
209 }
210 }
211
212 // Evict LRU entries until under memory budget
214 {
215 while(maxBytes_ > 0 && currentBytes_ > maxBytes_ && !lruList_.empty())
216 {
217 auto lruTile = lruList_.back();
218 auto it = entries_.find(lruTile);
219 if(it == entries_.end())
220 {
221 // Orphaned LRU entry — just remove
222 lruList_.pop_back();
223 lruMap_.erase(lruTile);
224 continue;
225 }
226
227 auto& entry = it->second;
228 if(entry.inMemory)
229 spillToDisk(lruTile, entry);
230 else
231 {
232 // Already spilled; move to front to avoid infinite loop
233 lruList_.splice(lruList_.begin(), lruList_, lruMap_[lruTile]);
234 break;
235 }
236 }
237 }
238
247 void spillToDisk(uint16_t tileIndex, Entry& entry)
248 {
249 if(!diskCache_ || !entry.seq)
250 return;
251
252 auto& seq = *entry.seq;
253 uint16_t numParts = static_cast<uint16_t>(seq.size());
254
255 // Calculate total serialized size
256 size_t headerSize = sizeof(uint16_t) + numParts * (sizeof(uint64_t) + sizeof(uint64_t));
257 size_t dataSize = 0;
258 for(size_t i = 0; i < numParts; ++i)
259 {
260 if(seq[i] && seq[i]->data_)
261 dataSize += seq[i]->length_;
262 }
263
264 std::vector<uint8_t> buf(headerSize + dataSize);
265 uint8_t* ptr = buf.data();
266
267 // Write header
268 std::memcpy(ptr, &numParts, sizeof(numParts));
269 ptr += sizeof(numParts);
270
271 for(size_t i = 0; i < numParts; ++i)
272 {
273 uint64_t offset = seq[i] ? seq[i]->offset_ : 0;
274 uint64_t length = seq[i] ? seq[i]->length_ : 0;
275 std::memcpy(ptr, &offset, sizeof(offset));
276 ptr += sizeof(offset);
277 std::memcpy(ptr, &length, sizeof(length));
278 ptr += sizeof(length);
279 }
280
281 // Write data
282 for(size_t i = 0; i < numParts; ++i)
283 {
284 if(seq[i] && seq[i]->data_)
285 {
286 std::memcpy(ptr, seq[i]->data_.get(), seq[i]->length_);
287 ptr += seq[i]->length_;
288 }
289 }
290
291 diskCache_->store(tileIndex, buf.data(), buf.size());
292
293 // Release in-memory buffers
294 for(size_t i = 0; i < numParts; ++i)
295 {
296 if(seq[i])
297 {
298 seq[i]->data_.reset();
299 seq[i]->stream_.reset();
300 seq[i]->fetchOffset_ = 0;
301 }
302 }
303
304 currentBytes_ -= entry.dataSize;
305 entry.inMemory = false;
306 }
307
311 bool reloadFromDisk(uint16_t tileIndex, Entry& entry)
312 {
313 if(!diskCache_ || !entry.seq)
314 return false;
315
316 auto blob = diskCache_->load(tileIndex);
317 if(!blob.has_value())
318 return false;
319
320 auto& data = blob.value();
321 const uint8_t* ptr = data.data();
322 const uint8_t* end = ptr + data.size();
323
324 if(static_cast<size_t>(end - ptr) < sizeof(uint16_t))
325 return false;
326
327 uint16_t numParts;
328 std::memcpy(&numParts, ptr, sizeof(numParts));
329 ptr += sizeof(numParts);
330
331 auto& seq = *entry.seq;
332 if(numParts != static_cast<uint16_t>(seq.size()))
333 return false;
334
335 size_t headerRemaining = numParts * (sizeof(uint64_t) + sizeof(uint64_t));
336 if(static_cast<size_t>(end - ptr) < headerRemaining)
337 return false;
338
339 // Read part headers (offset, length) — skip since TPFetch already has them
340 ptr += headerRemaining;
341
342 // Read data into TPFetch objects
343 for(size_t i = 0; i < numParts; ++i)
344 {
345 uint64_t length = seq[i] ? seq[i]->length_ : 0;
346 if(length > 0)
347 {
348 if(static_cast<size_t>(end - ptr) < length)
349 return false;
350
351 seq[i]->data_ = std::make_unique<uint8_t[]>(length);
352 std::memcpy(seq[i]->data_.get(), ptr, length);
353 seq[i]->fetchOffset_ = length;
354 seq[i]->stream_ = std::unique_ptr<IStream>(
355 memStreamCreate(seq[i]->data_.get(), length, false, nullptr, codecFormat_, true));
356 ptr += length;
357 }
358 }
359
360 entry.inMemory = true;
361 currentBytes_ += entry.dataSize;
362 return true;
363 }
364
370 static size_t resolveMaxBytes(size_t provided)
371 {
372 if(provided > 0)
373 return provided;
374
375 const char* env = std::getenv("GRK_CACHEMAX");
376 if(!env)
377 env = std::getenv("GDAL_CACHEMAX");
378
379 if(env)
380 {
381 std::string s(env);
382 if(s.empty())
383 return kDefaultMaxBytes;
384
385 char suffix = s.back();
386 if(suffix == 'M' || suffix == 'm')
387 return static_cast<size_t>(std::stoull(s.substr(0, s.size() - 1))) * 1024 * 1024;
388 if(suffix == 'G' || suffix == 'g')
389 return static_cast<size_t>(std::stoull(s.substr(0, s.size() - 1))) * 1024 * 1024 * 1024;
390
391 return static_cast<size_t>(std::stoull(s));
392 }
393
394 return kDefaultMaxBytes;
395 }
396
397 static constexpr size_t kDefaultMaxBytes = 256 * 1024 * 1024; // 256 MB
398
399 size_t maxBytes_;
401 std::shared_ptr<DiskCache> diskCache_;
403
404 mutable std::mutex mutex_;
405 std::list<uint16_t> lruList_; // front = MRU, back = LRU
406 std::unordered_map<uint16_t, std::list<uint16_t>::iterator> lruMap_;
407 std::unordered_map<uint16_t, Entry> entries_;
408};
409
410} // namespace grk
size_t size() const
Definition CompressedChunkCache.h:160
static size_t resolveMaxBytes(size_t provided)
Resolve max bytes from environment or use provided value.
Definition CompressedChunkCache.h:370
size_t maxBytes_
Definition CompressedChunkCache.h:399
void put(uint16_t tileIndex, std::shared_ptr< TPFetchSeq > seq)
Register a tile's fetched data with the cache.
Definition CompressedChunkCache.h:73
GRK_CODEC_FORMAT codecFormat_
Definition CompressedChunkCache.h:402
bool reloadFromDisk(uint16_t tileIndex, Entry &entry)
Reload tile part data from disk into the existing TPFetch objects.
Definition CompressedChunkCache.h:311
std::list< uint16_t > lruList_
Definition CompressedChunkCache.h:405
std::shared_ptr< DiskCache > diskCache_
Definition CompressedChunkCache.h:401
void evictToFit()
Definition CompressedChunkCache.h:213
void promoteLRU(uint16_t tileIndex)
Definition CompressedChunkCache.h:203
static constexpr size_t kDefaultMaxBytes
Definition CompressedChunkCache.h:397
size_t maxBytes() const
Definition CompressedChunkCache.h:156
bool contains(uint16_t tileIndex) const
Check if a tile is tracked (memory or disk).
Definition CompressedChunkCache.h:134
void clear()
Definition CompressedChunkCache.h:140
void pushFrontLRU(uint16_t tileIndex)
Definition CompressedChunkCache.h:187
std::unordered_map< uint16_t, std::list< uint16_t >::iterator > lruMap_
Definition CompressedChunkCache.h:406
std::unordered_map< uint16_t, Entry > entries_
Definition CompressedChunkCache.h:407
void spillToDisk(uint16_t tileIndex, Entry &entry)
Serialize tile part data to disk and release in-memory buffers.
Definition CompressedChunkCache.h:247
size_t currentBytes() const
Definition CompressedChunkCache.h:151
std::shared_ptr< TPFetchSeq > get(uint16_t tileIndex)
Ensure a tile's compressed data is in memory.
Definition CompressedChunkCache.h:110
static size_t calcSeqDataSize(const TPFetchSeq &seq)
Definition CompressedChunkCache.h:174
void removeLRU(uint16_t tileIndex)
Definition CompressedChunkCache.h:193
size_t currentBytes_
Definition CompressedChunkCache.h:400
std::mutex mutex_
Definition CompressedChunkCache.h:404
CompressedChunkCache(size_t maxBytes=0, std::shared_ptr< DiskCache > diskCache=nullptr, GRK_CODEC_FORMAT codecFormat=GRK_CODEC_J2K)
Construct the cache.
Definition CompressedChunkCache.h:61
enum _GRK_CODEC_FORMAT GRK_CODEC_FORMAT
Grok Supported JPEG 2000 formats.
@ GRK_CODEC_J2K
unknown format
Definition grok.h:322
ResWindow.
Definition CompressedChunkCache.h:36
IStream * memStreamCreate(uint8_t *buf, size_t len, bool ownsBuffer, grk_stream_free_user_data_fn freeCallback, GRK_CODEC_FORMAT format, bool isReadStream)
Create stream from buffer.
Definition MemStream.cpp:153
Definition CompressedChunkCache.h:168
std::shared_ptr< TPFetchSeq > seq
Definition CompressedChunkCache.h:169
bool inMemory
Definition CompressedChunkCache.h:170
size_t dataSize
Definition CompressedChunkCache.h:171
size_t size() const
Returns number of objects in store.
Definition TPFetchSeq.h:81
Definition TPFetchSeq.h:225