Grok 20.3.2
CurlFetcher.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2016-2026 Grok Image Compression Inc.
3 *
4 * This source code is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Affero General Public License, version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This source code is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Affero General Public License for more details.
12 *
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 */
17
18#pragma once
19
20#include <memory>
21#include <string>
22#include <cstring>
23#include <thread>
24#include <mutex>
25#include <condition_variable>
26#include <future>
27#include <set>
28#include <vector>
29#include <map>
30#include <sstream>
31#include <functional>
32
33#ifdef GRK_ENABLE_LIBCURL
34#include <curl/curl.h>
35#endif
36
37#include "grk_config_private.h"
38#include "FetchCommon.h"
39#include "EnvVarManager.h"
40#include "SimpleXmlParser.h"
41
42namespace grk
43{
44
45struct TileFetchContext;
46using TileFetchCallback = std::function<void(size_t requestIndex, TileFetchContext* context)>;
47
49{
50public:
51 virtual ~IFetcher() = default;
52
53 // Initialize the fetcher with a path and authentication details
54 virtual void init(const std::string& path, const FetchAuth& auth) = 0;
55
56 // Read data into a buffer
57 virtual size_t read(uint8_t* buffer, size_t numBytes) = 0;
58
59 // Seek to a specific offset
60 virtual bool seek(uint64_t offset) = 0;
61
62 // Get the total size of the resource
63 virtual uint64_t size() const = 0;
64
65 // Get the current offset
66 virtual uint64_t offset() const = 0;
67
68 // Fetch tiles asynchronously
69 virtual std::future<bool> fetchTiles(const TPSEQ_VEC& allTileParts, std::set<uint16_t>& slated,
70 void* user_data, TileFetchCallback callback) = 0;
71
72 virtual void onFetchTilesComplete(std::shared_ptr<TileFetchContext> context, bool success) = 0;
73
74 // Fetch chunks asynchronously
75 virtual void fetchChunks(std::shared_ptr<ChunkBuffer<>> chunkBuffer) = 0;
76
77 virtual void fetchChunks(std::shared_ptr<ChunkBuffer<>> chunkBuffer,
78 std::shared_ptr<std::vector<ChunkRequest>> requests) = 0;
79
80 // List directory contents
81 virtual std::vector<std::string> listDirectory(const std::string& path) = 0;
82
83 // Retrieve metadata
84 virtual bool getMetadata(const std::string& path,
85 std::map<std::string, std::string>& metadata) = 0;
86
87 // Fetch throttle: when set, the fetcher pauses scheduling new HTTP requests
88 // until the callback returns true. notifyThrottleRelease() wakes the
89 // fetcher so it can re-check the condition.
90 virtual void setFetchThrottle(std::function<bool()> throttle) = 0;
91 virtual void notifyThrottleRelease() = 0;
92};
93
94struct TileFetchContext : public std::enable_shared_from_this<TileFetchContext>
95{
96 std::shared_ptr<TPFetchSeq> requests_;
97 void* user_data_ = nullptr;
98 std::shared_ptr<std::unordered_map<uint16_t, std::shared_ptr<TPFetchSeq>>> tilePartFetchByTile_;
100 IFetcher* fetcher_ = nullptr;
101
102 TileFetchContext(std::shared_ptr<TPFetchSeq>& requests, void* user_data,
103 std::shared_ptr<std::unordered_map<uint16_t, std::shared_ptr<TPFetchSeq>>>&
104 tilePartFetchByTile,
105 TileFetchCallback callback, IFetcher* fetcher)
106 : requests_(requests), user_data_(user_data), tilePartFetchByTile_(tilePartFetchByTile),
107 callback_(callback), fetcher_(fetcher)
108 {}
109
111
112private:
113 mutable size_t completeCount_ = 0;
114};
115
116typedef size_t (*CURL_FETCHER_WRITE_CALLBACK)(void* contents, size_t size, size_t nmemb,
117 void* userp);
118
119#ifdef GRK_ENABLE_LIBCURL
120
121// Tile write callback — copies data into TPFetch::data_ (zero-copy to tile buffer)
122static size_t tileWriteCallback(void* contents, size_t size, size_t nmemb, void* userp)
123{
124 size_t total_size = size * nmemb;
125 auto result = static_cast<FetchResult*>(userp);
126 auto ctx = std::static_pointer_cast<TileFetchContext>(result->ctx_);
127 if(ctx)
128 {
129 auto& tpseq = (*ctx->requests_)[result->requestIndex_];
130 tpseq->copy(static_cast<uint8_t*>(contents), total_size);
131 if(tpseq->fetchOffset_ == tpseq->length_)
132 {
133 ctx->callback_(result->requestIndex_, ctx.get());
134 ctx->incrementCompleteCount();
135 }
136 }
137 else
138 {
139 result->data_.insert(result->data_.end(), static_cast<const uint8_t*>(contents),
140 static_cast<const uint8_t*>(contents) + total_size);
141 }
142 return total_size;
143}
144
145// Chunk write callback — accumulates into result data, delivers to ChunkBuffer when complete
146static size_t chunkWriteCallback(void* contents, size_t size, size_t nmemb, void* userp)
147{
148 size_t total_size = size * nmemb;
149 auto* res = static_cast<FetchResult*>(userp);
150 res->data_.insert(res->data_.end(), static_cast<uint8_t*>(contents),
151 static_cast<uint8_t*>(contents) + total_size);
152 auto ctx = std::static_pointer_cast<ChunkContext>(res->ctx_);
153 auto& req = (*ctx->requests_)[res->requestIndex_];
154 if(res->data_.size() == req.length_)
155 {
156 ctx->chunkBuffer_->add(static_cast<ChunkBuffer<>::index_type>(res->requestIndex_),
157 res->data_.data(), req.length_);
158 res->data_.clear();
159 res->data_.shrink_to_fit();
160 }
161
162 return total_size;
163}
164
165class CurlFetcher : public IFetcher
166{
167public:
168 CurlFetcher(void) : tileWriteCallback_(tileWriteCallback)
169 {
170 curl_global_init(CURL_GLOBAL_ALL);
171 multi_handle_ = curl_multi_init();
172 if(!multi_handle_)
173 throw std::runtime_error("Failed to initialize CURL multi handle");
174 curl_multi_setopt(multi_handle_, CURLMOPT_MAX_TOTAL_CONNECTIONS, 100L);
175 fetchThread_ = std::thread(&CurlFetcher::fetchWorker, this);
176 }
177
178 virtual ~CurlFetcher() override
179 {
180 {
181 std::lock_guard<std::mutex> lock(queue_mutex_);
182 stop_ = true;
183 }
184 queue_cv_.notify_all(); // Wake worker from wait on queue_cv_
185 throttleCV_.notify_all(); // Wake worker from throttle wait
186 if(fetchThread_.joinable())
187 fetchThread_.join();
188 if(multi_handle_)
189 curl_multi_cleanup(multi_handle_);
190 curl_global_cleanup();
191 }
192
193 void setFetchThrottle(std::function<bool()> throttle) override
194 {
195 std::lock_guard<std::mutex> lock(throttleMutex_);
196 fetchThrottle_ = std::move(throttle);
197 }
198
199 void notifyThrottleRelease() override
200 {
201 throttleCV_.notify_one();
202 }
203
204 void init(const std::string& path, const FetchAuth& auth) override
205 {
206 auth_ = auth;
207 if(auth_.max_retry_ > 0)
208 maxRetries_ = auth_.max_retry_;
209 if(auth_.retry_delay_ > 0)
210 retryDelayMs_ = auth_.retry_delay_ * 1000;
211 parse(path);
212 fetch_total_size();
213 }
214
215 size_t read(uint8_t* buffer, size_t numBytes) override
216 {
217 if(current_offset_ + numBytes > total_size_)
218 {
219 grklog.error("Read %zu bytes at offset %llu exceeds total size %llu", numBytes,
220 current_offset_, total_size_);
221 return 0;
222 }
223
224 FetchResult result;
225 auto curl = configureHandle(current_offset_, current_offset_ + numBytes - 1, result,
226 tileWriteCallback_);
227 auto res = curl_easy_perform(curl);
228 if(res != CURLE_OK)
229 {
230 grklog.error("curl_easy_perform failed: %s", curl_easy_strerror(res));
231 curl_easy_cleanup(curl);
232 return 0;
233 }
234
235 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &result.responseCode_);
236 if(result.responseCode_ != 206)
237 {
238 grklog.error("Read failed with HTTP code: %ld", result.responseCode_);
239 curl_easy_cleanup(curl);
240 return 0;
241 }
242
243 size_t bytes_read = result.data_.size();
244 if(bytes_read > numBytes)
245 {
246 grklog.error("Received %zu bytes, but buffer only fits %zu", bytes_read, numBytes);
247 bytes_read = numBytes;
248 }
249
250 std::memcpy(buffer, result.data_.data(), bytes_read);
251 grklog.debug("Read %zu bytes from %llu, new offset: %llu", bytes_read, current_offset_,
252 current_offset_ + bytes_read);
253 current_offset_ += bytes_read;
254
255 curl_easy_cleanup(curl);
256 return bytes_read;
257 }
258
259 bool seek(uint64_t offset) override
260 {
261 if(offset >= total_size_)
262 {
263 grklog.error("Seek offset %llu exceeds total size %llu", offset, total_size_);
264 return false;
265 }
266 current_offset_ = offset;
267 grklog.debug("Seeked to offset: %llu", current_offset_);
268 return true;
269 }
270
271 uint64_t size() const override
272 {
273 return total_size_;
274 }
275
276 uint64_t offset() const override
277 {
278 return current_offset_;
279 }
280
291 std::future<bool> fetchTiles(const TPSEQ_VEC& allTileParts, std::set<uint16_t>& slated,
292 void* user_data, TileFetchCallback callback) override
293 {
294 // 1. cache fetch data
295 {
296 std::lock_guard<std::mutex> lock(fetch_mutex_);
297 if(!allTileParts_)
298 allTileParts_ = &allTileParts;
299 if(!user_data_)
300 user_data_ = user_data;
301 if(!tileFetchCallback_)
302 tileFetchCallback_ = callback;
303 }
304
305 // 2. queue fetch
306 FetchJob job(std::move(slated));
307 std::future<bool> future = job.promise_.get_future();
308 {
309 std::lock_guard<std::mutex> lock(queue_mutex_);
310 tile_fetch_queue_.push(std::move(job));
311 }
312 queue_cv_.notify_one();
313 grklog.debug("Queued tile fetch job, queue size: %zu", tile_fetch_queue_.size());
314
315 return future;
316 }
317
324 void onFetchTilesComplete(std::shared_ptr<TileFetchContext> context, bool success) override
325 {
326 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
327 auto it = active_jobs_.find(context);
328 if(it != active_jobs_.end())
329 {
330 it->second.set_value(success);
331 active_jobs_.erase(it);
332 grklog.debug("Fetch job completed");
333 }
334 else
335 {
336 grklog.error("TileFetchContext not found in active_jobs_ during completion");
337 }
338 }
339
340 void fetchChunks(std::shared_ptr<ChunkBuffer<>> chunkBuffer) override
341 {
342 auto requests = std::make_shared<std::vector<ChunkRequest>>();
343 auto length = chunkBuffer->size();
344 auto offset = chunkBuffer->offset();
345 auto workingLength = length - offset;
346 auto chunkSize = chunkBuffer->chunkSize();
347 auto numChunks = (workingLength + chunkSize - 1) / chunkSize;
348
349 for(uint16_t i = 0; i < numChunks; ++i)
350 {
351 auto end = offset + chunkSize - 1;
352 if(end > length - 1)
353 end = length - 1;
354 requests->push_back(ChunkRequest(i, offset, end));
355 offset += chunkSize;
356 }
357 fetchChunks(chunkBuffer, requests);
358 }
359
360 void fetchChunks(std::shared_ptr<ChunkBuffer<>> chunkBuffer,
361 std::shared_ptr<std::vector<ChunkRequest>> requests) override
362 {
363 ChunkTask task(chunkBuffer, requests);
364
365 for(size_t i = 0; i < task.requests_->size(); ++i)
366 {
367 auto& req = (*task.requests_)[i];
368 if(req.end_ < req.offset_ || req.end_ >= total_size_)
369 {
370 grklog.error("Invalid range %llu-%llu for ID %u (total size: %llu)", req.offset_, req.end_,
371 req.requestIndex_, total_size_);
372 }
373 }
374
375 {
376 std::lock_guard<std::mutex> lock(queue_mutex_);
377 chunk_fetch_queue_.push(std::move(task));
378 }
379 queue_cv_.notify_one();
380 grklog.debug("Queued chunk fetch task with %zu requests", requests->size());
381 }
382
383 // Directory listing
384 std::vector<std::string> listDirectory(const std::string& path) override
385 {
386 std::vector<std::string> files;
387 CURL* curl = curl_easy_init();
388 if(!curl)
389 {
390 grklog.error("Failed to initialize curl for directory listing");
391 return files;
392 }
393
394 parse(path);
395 std::string list_url = url_ + (url_.back() == '/' ? "" : "/") + "?list-type=2";
396
397 struct curl_slist* headers = nullptr;
398 headers = prepareAuthHeaders(headers);
399
400 std::string response;
401
402 struct TempResult
403 {
404 long responseCode_ = 0;
405 uint32_t retryCount_ = 0;
406 } temp_result;
407
408 do
409 {
410 temp_result.responseCode_ = 0;
411 response.clear();
412 curl_easy_setopt(curl, CURLOPT_URL, list_url.c_str());
413 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
414 auth(curl);
415 curl_initiate_retry(curl);
416 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
417 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
418
419 CURLcode res = curl_easy_perform(curl);
420 if(res == CURLE_OK)
421 {
422 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &temp_result.responseCode_);
423 }
424
425 if(temp_result.retryCount_ < maxRetries_ &&
426 (res != CURLE_OK || temp_result.responseCode_ != 200))
427 {
428 temp_result.retryCount_++;
429 grklog.warn("Retrying directory listing for %s (retry %u/%u), HTTP %ld, CURL %d",
430 path.c_str(), temp_result.retryCount_, maxRetries_, temp_result.responseCode_,
431 res);
432 std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs_));
433 }
434 else
435 {
436 break;
437 }
438 } while(true);
439
440 if(temp_result.responseCode_ == 200)
441 {
442 SimpleXmlParser parser;
443 if(parser.parse(response))
444 {
445 files = parser.keys;
446 grklog.debug("Listed %zu objects in %s", files.size(), path.c_str());
447 }
448 else
449 {
450 grklog.warn("Failed to parse ListObjectsV2 response for %s", path.c_str());
451 }
452 }
453 else
454 {
455 grklog.error("Directory listing failed for %s: HTTP %ld, CURL %d after %u retries",
456 path.c_str(), temp_result.responseCode_, curl_easy_perform(curl),
457 temp_result.retryCount_);
458 }
459
460 curl_slist_free_all(headers);
461 curl_easy_cleanup(curl);
462 return files;
463 }
464
465 // Metadata retrieval (HEAD request)
466 bool getMetadata(const std::string& path, std::map<std::string, std::string>& metadata) override
467 {
468 CURL* curl = curl_easy_init();
469 if(!curl)
470 {
471 grklog.error("Failed to initialize curl for metadata retrieval");
472 return false;
473 }
474
475 parse(path);
476 struct curl_slist* headers = nullptr;
477 headers = prepareAuthHeaders(headers);
478
479 std::string header_data;
480 bool success = false;
481
482 struct TempResult
483 {
484 long responseCode_ = 0;
485 uint32_t retryCount_ = 0;
486 } temp_result;
487
488 do
489 {
490 temp_result.responseCode_ = 0;
491 header_data.clear();
492 curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
493 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
494 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
495 auth(curl);
496 curl_initiate_retry(curl);
497 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writeCallback);
498 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &header_data);
499
500 CURLcode res = curl_easy_perform(curl);
501 if(res == CURLE_OK)
502 {
503 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &temp_result.responseCode_);
504 }
505
506 if(temp_result.retryCount_ < maxRetries_ &&
507 (res != CURLE_OK || temp_result.responseCode_ != 200))
508 {
509 temp_result.retryCount_++;
510 grklog.warn("Retrying metadata retrieval for %s (retry %u/%u), HTTP %ld, CURL %d",
511 path.c_str(), temp_result.retryCount_, maxRetries_, temp_result.responseCode_,
512 res);
513 std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs_));
514 }
515 else
516 {
517 success = (res == CURLE_OK && temp_result.responseCode_ == 200);
518 break;
519 }
520 } while(true);
521
522 if(success)
523 {
524 std::istringstream header_stream(header_data);
525 std::string line;
526 while(std::getline(header_stream, line))
527 {
528 size_t colon_pos = line.find(':');
529 if(colon_pos != std::string::npos)
530 {
531 std::string key = line.substr(0, colon_pos);
532 std::string value = line.substr(colon_pos + 1);
533 key.erase(key.find_last_not_of(" \t") + 1);
534 value.erase(0, value.find_first_not_of(" \t"));
535 metadata[key] = value;
536 grklog.debug("Metadata: %s=%s", key.c_str(), value.c_str());
537 }
538 }
539 }
540 else
541 {
542 grklog.error("Metadata retrieval failed for %s: HTTP %ld, CURL %d after %u retries",
543 path.c_str(), temp_result.responseCode_, curl_easy_perform(curl),
544 temp_result.retryCount_);
545 }
546
547 curl_slist_free_all(headers);
548 curl_easy_cleanup(curl);
549 return success;
550 }
551
552protected:
553 virtual curl_slist* prepareAuthHeaders(curl_slist* headers) = 0;
554 virtual void parse(const std::string& path) = 0;
555
556 void fetchError(FetchResult* result)
557 {
558 auto ctx = std::static_pointer_cast<TileFetchContext>(result->ctx_);
559 if(ctx && ctx->fetcher_)
560 ctx->fetcher_->onFetchTilesComplete(ctx, false);
561 }
562
563 virtual void auth(CURL* curl)
564 {
565 if(EnvVarManager::test_bool("GRK_HTTP_UNSAFESSL") || auth_.s3_allow_insecure_)
566 {
567 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
568 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
569 }
570
571 if(!auth_.username_.empty())
572 curl_easy_setopt(curl, CURLOPT_USERNAME, auth_.username_.c_str());
573 if(!auth_.password_.empty())
574 curl_easy_setopt(curl, CURLOPT_PASSWORD, auth_.password_.c_str());
575
576 // Cookies (CURLOPT_COOKIE / COOKIEFILE / COOKIEJAR)
577 if(!auth_.cookie_.empty())
578 curl_easy_setopt(curl, CURLOPT_COOKIE, auth_.cookie_.c_str());
579 if(!auth_.cookie_file_.empty())
580 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, auth_.cookie_file_.c_str());
581 if(!auth_.cookie_jar_.empty())
582 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, auth_.cookie_jar_.c_str());
583
584 // .netrc (CURLOPT_NETRC / CURLOPT_NETRC_FILE)
585 if(auth_.netrc_)
586 curl_easy_setopt(curl, CURLOPT_NETRC, (long)CURL_NETRC_REQUIRED);
587 if(!auth_.netrc_file_.empty())
588 curl_easy_setopt(curl, CURLOPT_NETRC_FILE, auth_.netrc_file_.c_str());
589
590 // Proxy (shared by both HTTP and S3 fetchers)
591 if(!auth_.proxy_.empty())
592 {
593 curl_easy_setopt(curl, CURLOPT_PROXY, auth_.proxy_.c_str());
594 if(!auth_.proxy_userpwd_.empty())
595 curl_easy_setopt(curl, CURLOPT_PROXYUSERPWD, auth_.proxy_userpwd_.c_str());
596 curl_easy_setopt(curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
597 }
598 else if(auto proxy = EnvVarManager::get("GRK_CURL_PROXY"))
599 {
600 curl_easy_setopt(curl, CURLOPT_PROXY, proxy->c_str());
601 if(auto proxyUserPwd = EnvVarManager::get("GRK_CURL_PROXYUSERPWD"))
602 curl_easy_setopt(curl, CURLOPT_PROXYUSERPWD, proxyUserPwd->c_str());
603 if(EnvVarManager::get("GRK_CURL_PROXYAUTH"))
604 curl_easy_setopt(curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
605 }
606
607 // User agent
608 if(!auth_.user_agent_.empty())
609 curl_easy_setopt(curl, CURLOPT_USERAGENT, auth_.user_agent_.c_str());
610
611 // Timeouts
612 long connect_timeout = auth_.connect_timeout_ > 0 ? auth_.connect_timeout_ : 10L;
613 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, connect_timeout);
614 }
615
616 curl_slist* configureHeaders(const std::string& range)
617 {
618 struct curl_slist* headers = nullptr;
619 headers = prepareAuthHeaders(headers);
620 if(!range.empty())
621 {
622 headers = curl_slist_append(headers, range.c_str());
623 }
624 return headers;
625 }
626
627 time_t getLastModifiedTime() const
628 {
629 return last_modified_time_;
630 }
631
632 void fetch_total_size()
633 {
634 CURL* curl = curl_easy_init();
635 if(!curl)
636 throw std::runtime_error("Failed to initialize CURL easy handle for HEAD");
637
638 curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
639 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
640 curl_easy_setopt(curl, CURLOPT_FILETIME, 1L);
641 auth(curl);
642
643 auto headers = configureHeaders("");
644 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
645
646 CURLcode res = curl_easy_perform(curl);
647 if(res != CURLE_OK)
648 {
649 grklog.error("HEAD request failed: %s", curl_easy_strerror(res));
650 curl_easy_cleanup(curl);
651 throw std::runtime_error("Failed to fetch file size");
652 }
653
654 long response_code;
655 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
656 if(response_code != 200)
657 {
658 grklog.error("HEAD request returned HTTP %ld", response_code);
659 curl_easy_cleanup(curl);
660 throw std::runtime_error("Invalid HEAD response");
661 }
662
663 curl_off_t content_length;
664 curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &content_length);
665 total_size_ = static_cast<uint64_t>(content_length);
666 grklog.debug("Fetched total size: %llu bytes", total_size_);
667
668 long filetime;
669 res = curl_easy_getinfo(curl, CURLINFO_FILETIME, &filetime);
670 if(res == CURLE_OK && filetime != -1)
671 {
672 last_modified_time_ = static_cast<time_t>(filetime);
673 grklog.debug("Fetched last modified time: %ld (Unix timestamp)", last_modified_time_);
674 }
675 else
676 {
677 grklog.warn("Last modified time not available from server");
678 }
679
680 curl_easy_cleanup(curl);
681 curl_slist_free_all(headers);
682 }
683
693 CURL* configureHandle(uint64_t offset, uint64_t end, FetchResult& result,
694 CURL_FETCHER_WRITE_CALLBACK callback)
695 {
696 CURL* curl = curl_easy_init();
697 if(!curl)
698 throw std::runtime_error("Failed to initialize CURL easy handle");
699
700 curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
701 curl_initiate_retry(curl);
702 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
703 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
704 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &result);
705 curl_easy_setopt(curl, CURLOPT_PRIVATE, &result);
706
707 auth(curl);
708
709 std::string range = "Range: bytes=" + std::to_string(offset) + "-" + std::to_string(end);
710 auto headers = configureHeaders(range);
711 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
712
713 return curl;
714 }
715
723 std::shared_ptr<TileFetchContext> scheduleTileFetch(std::set<uint16_t>& slated)
724 {
725 auto requests = std::make_shared<TPFetchSeq>();
726 auto tilePartFetchByTile =
727 std::make_shared<std::unordered_map<uint16_t, std::shared_ptr<TPFetchSeq>>>();
728 TPFetchSeq::genCollections(allTileParts_, slated, requests, tilePartFetchByTile);
729
730 auto results = std::make_shared<std::vector<FetchResult>>(requests->size());
731
732 auto ctx = std::make_shared<TileFetchContext>(requests, user_data_, tilePartFetchByTile,
733 tileFetchCallback_, this);
734
735 auto batch = std::make_unique<TileRequestBatch>(requests);
736 currentFetch_ = ScheduledFetch(ctx, std::move(batch), results);
737
738 // Set context on all results
739 for(auto& r : *results)
740 r.ctx_ = ctx;
741
742 return scheduleNextBatch(tileWriteCallback_) ? ctx : nullptr;
743 }
744
755 bool scheduleChunkFetch(std::shared_ptr<ChunkContext> ctx,
756 std::shared_ptr<std::vector<ChunkRequest>> requests,
757 std::shared_ptr<std::vector<FetchResult>> results,
758 std::shared_ptr<std::vector<std::promise<FetchResult>>> promises)
759 {
760 auto batch = std::make_unique<ChunkRequestBatch>(requests);
761 currentFetch_ = ScheduledFetch(ctx, std::move(batch), results, promises);
762 return scheduleNextBatch(chunkWriteCallback);
763 }
764
772 bool scheduleNextBatch(CURL_FETCHER_WRITE_CALLBACK callback)
773 {
774 if(!currentFetch_.requests_ || !currentFetch_.requests_->hasMore())
775 return true;
776
777 // Back pressure: if a throttle is set, skip scheduling when the
778 // downstream pipeline is backlogged.
779 {
780 std::lock_guard<std::mutex> lock(throttleMutex_);
781 if(fetchThrottle_ && !fetchThrottle_())
782 return true;
783 }
784
785 size_t activeRequests = currentFetch_.scheduled_ - currentFetch_.completed_;
786 size_t remainingBatch = batchSize_ > activeRequests ? batchSize_ - activeRequests : 0;
787 size_t remainingRequests = currentFetch_.requests_->remaining();
788 size_t requestsToSchedule = std::min(remainingBatch, remainingRequests);
789
790 for(size_t i = 0; i < requestsToSchedule && currentFetch_.requests_->hasMore(); ++i)
791 {
792 auto [offset, end] = currentFetch_.requests_->next();
793 if(end >= this->total_size_)
794 {
795 grklog.warn("Range %llu-%llu exceeds total size %llu", offset, end, total_size_);
796 end = this->total_size_ - 1;
797 }
798 auto& res = (*currentFetch_.results_)[currentFetch_.scheduled_];
799 res.requestIndex_ = currentFetch_.scheduled_;
800 if(!res.ctx_)
801 res.ctx_ = currentFetch_.ctx_;
802 CURL* handle = configureHandle(offset, end, res, callback);
803 CURLMcode ret = curl_multi_add_handle(multi_handle_, handle);
804 if(ret != CURLM_OK)
805 {
806 grklog.error("curl_multi_add_handle failed: %s", curl_multi_strerror(ret));
807 curl_easy_cleanup(handle);
808 return false;
809 }
810 {
811 std::lock_guard<std::mutex> lock(active_handles_mutex_);
812 active_handles_[handle] = currentFetch_.scheduled_;
813 }
814 grklog.debug("Added fetch range request: %llu-%llu (index %zu)", offset, end,
815 currentFetch_.scheduled_);
816 currentFetch_.scheduled_++;
817 }
818 return true;
819 }
820
831 void retryRequest(FetchResult* result, uint64_t offset, uint64_t end,
832 CURL_FETCHER_WRITE_CALLBACK callback, std::function<void()> onFatalError)
833 {
834 result->retryCount_++;
835 grklog.warn("Retrying request %zu (retry %u/%u)", result->requestIndex_, result->retryCount_,
836 maxRetries_);
837
838 result->data_.clear();
839 result->responseCode_ = 0;
840 result->success_ = false;
841
842 // For tile retries, reset the TPFetch write offset so data is re-received from scratch
843 if(!currentFetch_.promises_)
844 {
845 auto ctx = std::static_pointer_cast<TileFetchContext>(result->ctx_);
846 if(ctx)
847 {
848 auto& tpseq = (*ctx->requests_)[result->requestIndex_];
849 tpseq->fetchOffset_ = 0;
850 }
851 }
852
853 if(end >= total_size_)
854 end = total_size_ - 1;
855
856 CURL* handle = configureHandle(offset, end, *result, callback);
857 CURLMcode ret = curl_multi_add_handle(multi_handle_, handle);
858 if(ret != CURLM_OK)
859 {
860 grklog.error("Retry curl_multi_add_handle failed: %s", curl_multi_strerror(ret));
861 curl_easy_cleanup(handle);
862 onFatalError();
863 }
864 else
865 {
866 std::lock_guard<std::mutex> lock(active_handles_mutex_);
867 active_handles_[handle] = result->requestIndex_;
868 grklog.debug("Rescheduled retry %u: %llu-%llu (index %zu)", result->retryCount_, offset, end,
869 result->requestIndex_);
870 }
871 }
872
873 void curl_initiate_retry(CURL* curl)
874 {
875 long timeout = auth_.timeout_ > 0 ? auth_.timeout_ : 30L;
876 curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
877 }
878
886 bool shouldRetry(const FetchResult& result, CURLcode curl_code) const
887 {
888 if(result.retryCount_ >= maxRetries_)
889 return false;
890
891 bool isCurlError = curl_code != CURLE_OK;
892 bool isHttpError = result.responseCode_ != 206 && result.responseCode_ != 0;
893
894 return isCurlError || isHttpError;
895 }
896
904 std::pair<uint64_t, uint64_t> getRequestRange(const FetchResult& result) const
905 {
906 if(!currentFetch_.promises_)
907 {
908 // Tile path — look up from TileFetchContext
909 auto ctx = std::static_pointer_cast<TileFetchContext>(result.ctx_);
910 auto& req = (*ctx->requests_)[result.requestIndex_];
911 return {req->offset_, req->offset_ + req->length_ - 1};
912 }
913 else
914 {
915 // Chunk path — look up from ChunkContext
916 auto ctx = std::static_pointer_cast<ChunkContext>(result.ctx_);
917 auto& req = (*ctx->requests_)[result.requestIndex_];
918 return {req.offset_, req.end_};
919 }
920 }
921
922 void fetchWorker()
923 {
924 CURL_FETCHER_WRITE_CALLBACK activeCallback_ = nullptr;
925
926 while(!stop_)
927 {
928 std::vector<FetchJob> tile_jobs_to_process;
929 std::vector<ChunkTask> chunk_tasks_to_process;
930 {
931 std::unique_lock<std::mutex> lock(queue_mutex_);
932 while(!tile_fetch_queue_.empty())
933 {
934 tile_jobs_to_process.emplace_back(std::move(tile_fetch_queue_.front()));
935 tile_fetch_queue_.pop();
936 grklog.debug("Dequeued tile fetch job, queue size now: %zu", tile_fetch_queue_.size());
937 }
938 while(!chunk_fetch_queue_.empty())
939 {
940 chunk_tasks_to_process.emplace_back(std::move(chunk_fetch_queue_.front()));
941 chunk_fetch_queue_.pop();
942 grklog.debug("Dequeued chunk fetch task, queue size now: %zu", chunk_fetch_queue_.size());
943 }
944 }
945
946 if(!tile_jobs_to_process.empty())
947 {
948 activeCallback_ = tileWriteCallback_;
949 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
950 for(auto& job : tile_jobs_to_process)
951 {
952 auto ctx = scheduleTileFetch(job.slated);
953 if(!ctx)
954 job.promise_.set_value(false);
955 else
956 active_jobs_.emplace(ctx, std::move(job.promise_));
957 }
958 }
959
960 if(!chunk_tasks_to_process.empty())
961 {
962 activeCallback_ = chunkWriteCallback;
963 for(auto& task : chunk_tasks_to_process)
964 {
965 auto requests = task.requests_;
966 auto results = std::make_shared<std::vector<FetchResult>>(requests->size());
967 auto promises =
968 std::make_shared<std::vector<std::promise<FetchResult>>>(std::move(task.promises_));
969 auto ctx = std::make_shared<ChunkContext>(task.chunkBuffer_, requests);
970 for(size_t i = 0; i < results->size(); ++i)
971 {
972 (*results)[i] = FetchResult((*requests)[i].requestIndex_);
973 (*results)[i].ctx_ = ctx;
974 }
975 if(!scheduleChunkFetch(ctx, requests, results, promises))
976 {
977 for(size_t i = 0; i < promises->size(); ++i)
978 {
979 (*promises)[i].set_value((*results)[i]);
980 }
981 }
982 }
983 }
984
985 int still_running = 0;
986 auto ret = curl_multi_perform(multi_handle_, &still_running);
987 if(ret != CURLM_OK)
988 {
989 grklog.error("curl_multi_perform failed: %s", curl_multi_strerror(ret));
990 // Fail all active tile jobs
991 {
992 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
993 for(auto& job : active_jobs_)
994 job.second.set_value(false);
995 active_jobs_.clear();
996 }
997 // Fail all active chunk promises
998 {
999 std::lock_guard<std::mutex> lock(active_handles_mutex_);
1000 if(currentFetch_.promises_)
1001 {
1002 for(size_t i = 0; i < currentFetch_.promises_->size(); ++i)
1003 {
1004 FetchResult res(i);
1005 res.success_ = false;
1006 (*currentFetch_.promises_)[i].set_value(res);
1007 }
1008 }
1009 for(auto& [handle, idx] : active_handles_)
1010 {
1011 curl_multi_remove_handle(multi_handle_, handle);
1012 curl_easy_cleanup(handle);
1013 }
1014 active_handles_.clear();
1015 }
1016 continue;
1017 }
1018
1019 CURLMsg* msg;
1020 int msgs_left;
1021 while((msg = curl_multi_info_read(multi_handle_, &msgs_left)))
1022 {
1023 if(msg->msg == CURLMSG_DONE)
1024 {
1025 CURL* curl = msg->easy_handle;
1026 void* userp;
1027 curl_easy_getinfo(curl, CURLINFO_PRIVATE, &userp);
1028 auto* result = static_cast<FetchResult*>(userp);
1029
1030 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &result->responseCode_);
1031 result->success_ = (msg->data.result == CURLE_OK && result->responseCode_ == 206);
1032
1033 size_t idx = 0;
1034 {
1035 std::lock_guard<std::mutex> lock(active_handles_mutex_);
1036 auto it = active_handles_.find(curl);
1037 if(it != active_handles_.end())
1038 {
1039 idx = it->second;
1040 active_handles_.erase(it);
1041 }
1042 }
1043
1044 curl_multi_remove_handle(multi_handle_, curl);
1045 curl_easy_cleanup(curl);
1046
1047 if(!result->success_)
1048 {
1049 if(shouldRetry(*result, msg->data.result))
1050 {
1051 auto [offset, end] = getRequestRange(*result);
1052 grklog.warn("Fetch request %zu failed (HTTP %ld, CURL %s), retrying...",
1053 result->requestIndex_, result->responseCode_,
1054 curl_easy_strerror(msg->data.result));
1055 auto callback = activeCallback_;
1056 retryRequest(result, offset, end, callback, [this, result, idx]() {
1057 // Fatal retry failure
1058 if(!currentFetch_.promises_)
1059 fetchError(result);
1060 else if(idx < currentFetch_.promises_->size())
1061 (*currentFetch_.promises_)[idx].set_value(*result);
1062 });
1063 continue; // Don't count as completed — retry is in progress
1064 }
1065
1066 grklog.error("Fetch request %zu failed: %s, HTTP %ld (no more retries)",
1067 result->requestIndex_, curl_easy_strerror(msg->data.result),
1068 result->responseCode_);
1069 // For tile fetches, signal error
1070 if(!currentFetch_.promises_)
1071 fetchError(result);
1072 }
1073 else
1074 {
1075 grklog.debug("Fetch request %zu completed", result->requestIndex_);
1076 }
1077
1078 // Set promise for chunk fetches
1079 if(currentFetch_.promises_ && idx < currentFetch_.promises_->size())
1080 {
1081 (*currentFetch_.promises_)[idx].set_value(*result);
1082 }
1083
1084 currentFetch_.completed_++;
1085
1086 // Schedule next batch when half the current batch is complete
1087 if(currentFetch_.scheduled_ > currentFetch_.completed_ &&
1088 currentFetch_.completed_ >= batchSize_ / 2 && currentFetch_.requests_ &&
1089 currentFetch_.requests_->hasMore())
1090 {
1091 grklog.debug("Half of batch (%zu) completed, scheduling next batch", batchSize_ / 2);
1092 if(activeCallback_)
1093 scheduleNextBatch(activeCallback_);
1094 }
1095 }
1096 }
1097
1098 if(still_running > 0)
1099 {
1100 grklog.trace("Still running: %d requests", still_running);
1101 }
1102 else
1103 {
1104 // Try to resume scheduling if we were previously throttled
1105 bool throttled = false;
1106 if(currentFetch_.requests_ && currentFetch_.requests_->hasMore() && activeCallback_)
1107 {
1108 {
1109 std::lock_guard<std::mutex> tlock(throttleMutex_);
1110 throttled = fetchThrottle_ && !fetchThrottle_();
1111 }
1112 if(!throttled)
1113 scheduleNextBatch(activeCallback_);
1114 }
1115
1116 // If throttled with no in-flight requests, wait for the consumer
1117 // to release back pressure before continuing
1118 if(throttled)
1119 {
1120 std::unique_lock<std::mutex> tlock(throttleMutex_);
1121 throttleCV_.wait_for(tlock, std::chrono::milliseconds(100),
1122 [this] { return stop_ || !fetchThrottle_ || fetchThrottle_(); });
1123 }
1124 else
1125 {
1126 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
1127 std::lock_guard<std::mutex> lock2(active_handles_mutex_);
1128 if(active_jobs_.empty() && active_handles_.empty())
1129 {
1130 grklog.debug("No active requests, waiting");
1131 std::unique_lock<std::mutex> qlock(queue_mutex_);
1132 queue_cv_.wait(qlock, [this] {
1133 return stop_ || !tile_fetch_queue_.empty() || !chunk_fetch_queue_.empty();
1134 });
1135 }
1136 }
1137 }
1138 }
1139
1140 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
1141 for(auto& job : active_jobs_)
1142 job.second.set_value(false);
1143 active_jobs_.clear();
1144
1145 std::lock_guard<std::mutex> lock2(active_handles_mutex_);
1146 if(currentFetch_.promises_)
1147 {
1148 for(auto& [handle, idx] : active_handles_)
1149 {
1150 FetchResult resp(idx);
1151 resp.success_ = false;
1152 if(idx < currentFetch_.promises_->size())
1153 (*currentFetch_.promises_)[idx].set_value(resp);
1154 curl_multi_remove_handle(multi_handle_, handle);
1155 curl_easy_cleanup(handle);
1156 }
1157 }
1158 active_handles_.clear();
1159 grklog.debug("Worker thread exiting");
1160 }
1161
1162protected:
1163 static size_t writeCallback(void* contents, size_t size, size_t nmemb, std::string* s)
1164 {
1165 s->append((char*)contents, size * nmemb);
1166 return size * nmemb;
1167 }
1168
1169 FetchAuth auth_;
1170 std::string url_;
1171 std::mutex queue_mutex_;
1172 std::condition_variable queue_cv_;
1173 std::unordered_map<std::shared_ptr<TileFetchContext>, std::promise<bool>> active_jobs_;
1174 std::mutex active_jobs_mutex_;
1175 std::mutex fetch_mutex_;
1176 std::queue<FetchJob> tile_fetch_queue_;
1177 std::queue<ChunkTask> chunk_fetch_queue_;
1178 std::mutex active_handles_mutex_;
1179 std::unordered_map<CURL*, size_t> active_handles_;
1180 void* user_data_ = nullptr;
1181 uint64_t total_size_ = 0;
1182 uint64_t current_offset_ = 0;
1183 CURLM* multi_handle_ = nullptr;
1184
1185private:
1186 size_t batchSize_ = 30;
1187 bool stop_ = false;
1188 std::thread fetchThread_;
1189 uint32_t maxRetries_ = 3;
1190 uint32_t retryDelayMs_ = 1000;
1191 std::function<bool()> fetchThrottle_;
1192 std::mutex throttleMutex_;
1193 std::condition_variable throttleCV_;
1194
1195protected:
1196 TileFetchCallback tileFetchCallback_;
1197 const TPSEQ_VEC* allTileParts_ = nullptr;
1198 time_t last_modified_time_ = -1;
1199
1200private:
1201 CURL_FETCHER_WRITE_CALLBACK tileWriteCallback_;
1202 ScheduledFetch currentFetch_;
1203};
1204
1205#endif
1206
1208{
1209 std::atomic_ref<size_t> atomicCount(completeCount_);
1210 if((atomicCount.fetch_add(1, std::memory_order_seq_cst) + 1) == requests_->size())
1211 {
1212 fetcher_->onFetchTilesComplete(shared_from_this(), true);
1213 }
1214}
1215
1216} // namespace grk
Manages a partially ordered deque of buffer chunks that are added asynchronously out of order.
Definition ChunkBuffer.h:47
T index_type
Definition ChunkBuffer.h:49
Definition CurlFetcher.h:49
virtual void fetchChunks(std::shared_ptr< ChunkBuffer<> > chunkBuffer)=0
virtual uint64_t size() const =0
virtual std::vector< std::string > listDirectory(const std::string &path)=0
virtual size_t read(uint8_t *buffer, size_t numBytes)=0
virtual void fetchChunks(std::shared_ptr< ChunkBuffer<> > chunkBuffer, std::shared_ptr< std::vector< ChunkRequest > > requests)=0
virtual bool getMetadata(const std::string &path, std::map< std::string, std::string > &metadata)=0
virtual ~IFetcher()=default
virtual void onFetchTilesComplete(std::shared_ptr< TileFetchContext > context, bool success)=0
virtual void notifyThrottleRelease()=0
virtual bool seek(uint64_t offset)=0
virtual uint64_t offset() const =0
virtual std::future< bool > fetchTiles(const TPSEQ_VEC &allTileParts, std::set< uint16_t > &slated, void *user_data, TileFetchCallback callback)=0
virtual void init(const std::string &path, const FetchAuth &auth)=0
virtual void setFetchThrottle(std::function< bool()> throttle)=0
ResWindow.
Definition CompressedChunkCache.h:36
std::function< void(size_t requestIndex, TileFetchContext *context)> TileFetchCallback
Definition CurlFetcher.h:46
ILogger & grklog
Definition Logger.cpp:24
std::vector< std::unique_ptr< TPSeq > > TPSEQ_VEC
Definition TPFetchSeq.h:181
size_t(* CURL_FETCHER_WRITE_CALLBACK)(void *contents, size_t size, size_t nmemb, void *userp)
Definition CurlFetcher.h:116
Definition FetchCommon.h:42
Definition FetchCommon.h:93
virtual void warn(const char *fmt,...)=0
virtual void error(const char *fmt,...)=0
virtual void debug(const char *fmt,...)=0
virtual void trace(const char *fmt,...)=0
Definition CurlFetcher.h:95
size_t completeCount_
Definition CurlFetcher.h:113
IFetcher * fetcher_
Definition CurlFetcher.h:100
std::shared_ptr< TPFetchSeq > requests_
Definition CurlFetcher.h:96
void * user_data_
Definition CurlFetcher.h:97
void incrementCompleteCount()
Definition CurlFetcher.h:1207
TileFetchCallback callback_
Definition CurlFetcher.h:99
TileFetchContext(std::shared_ptr< TPFetchSeq > &requests, void *user_data, std::shared_ptr< std::unordered_map< uint16_t, std::shared_ptr< TPFetchSeq > > > &tilePartFetchByTile, TileFetchCallback callback, IFetcher *fetcher)
Definition CurlFetcher.h:102
std::shared_ptr< std::unordered_map< uint16_t, std::shared_ptr< TPFetchSeq > > > tilePartFetchByTile_
Definition CurlFetcher.h:98