25#include <condition_variable>
33#ifdef GRK_ENABLE_LIBCURL
37#include "grk_config_private.h"
57 virtual size_t read(uint8_t* buffer,
size_t numBytes) = 0;
63 virtual uint64_t
size()
const = 0;
78 std::shared_ptr<std::vector<ChunkRequest>> requests) = 0;
81 virtual std::vector<std::string>
listDirectory(
const std::string& path) = 0;
85 std::map<std::string, std::string>& metadata) = 0;
103 std::shared_ptr<std::unordered_map<uint16_t, std::shared_ptr<TPFetchSeq>>>&
119#ifdef GRK_ENABLE_LIBCURL
122static size_t tileWriteCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
124 size_t total_size = size * nmemb;
126 auto ctx = std::static_pointer_cast<TileFetchContext>(result->ctx_);
129 auto& tpseq = (*ctx->requests_)[result->requestIndex_];
130 tpseq->copy(
static_cast<uint8_t*
>(contents), total_size);
131 if(tpseq->fetchOffset_ == tpseq->length_)
133 ctx->callback_(result->requestIndex_, ctx.get());
134 ctx->incrementCompleteCount();
139 result->data_.insert(result->data_.end(),
static_cast<const uint8_t*
>(contents),
140 static_cast<const uint8_t*
>(contents) + total_size);
146static size_t chunkWriteCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
148 size_t total_size = size * nmemb;
149 auto* res =
static_cast<FetchResult*
>(userp);
150 res->data_.insert(res->data_.end(),
static_cast<uint8_t*
>(contents),
151 static_cast<uint8_t*
>(contents) + total_size);
152 auto ctx = std::static_pointer_cast<ChunkContext>(res->ctx_);
153 auto& req = (*ctx->requests_)[res->requestIndex_];
154 if(res->data_.size() == req.length_)
157 res->data_.data(), req.length_);
159 res->data_.shrink_to_fit();
168 CurlFetcher(
void) : tileWriteCallback_(tileWriteCallback)
170 curl_global_init(CURL_GLOBAL_ALL);
171 multi_handle_ = curl_multi_init();
173 throw std::runtime_error(
"Failed to initialize CURL multi handle");
174 curl_multi_setopt(multi_handle_, CURLMOPT_MAX_TOTAL_CONNECTIONS, 100L);
175 fetchThread_ = std::thread(&CurlFetcher::fetchWorker,
this);
178 virtual ~CurlFetcher()
override
181 std::lock_guard<std::mutex> lock(queue_mutex_);
184 queue_cv_.notify_all();
185 throttleCV_.notify_all();
186 if(fetchThread_.joinable())
189 curl_multi_cleanup(multi_handle_);
190 curl_global_cleanup();
193 void setFetchThrottle(std::function<
bool()> throttle)
override
195 std::lock_guard<std::mutex> lock(throttleMutex_);
196 fetchThrottle_ = std::move(throttle);
199 void notifyThrottleRelease()
override
201 throttleCV_.notify_one();
204 void init(
const std::string& path,
const FetchAuth& auth)
override
207 if(auth_.max_retry_ > 0)
208 maxRetries_ = auth_.max_retry_;
209 if(auth_.retry_delay_ > 0)
210 retryDelayMs_ = auth_.retry_delay_ * 1000;
215 size_t read(uint8_t* buffer,
size_t numBytes)
override
217 if(current_offset_ + numBytes > total_size_)
219 grklog.
error(
"Read %zu bytes at offset %llu exceeds total size %llu", numBytes,
220 current_offset_, total_size_);
225 auto curl = configureHandle(current_offset_, current_offset_ + numBytes - 1, result,
227 auto res = curl_easy_perform(curl);
230 grklog.
error(
"curl_easy_perform failed: %s", curl_easy_strerror(res));
231 curl_easy_cleanup(curl);
235 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &result.responseCode_);
236 if(result.responseCode_ != 206)
238 grklog.
error(
"Read failed with HTTP code: %ld", result.responseCode_);
239 curl_easy_cleanup(curl);
243 size_t bytes_read = result.data_.size();
244 if(bytes_read > numBytes)
246 grklog.
error(
"Received %zu bytes, but buffer only fits %zu", bytes_read, numBytes);
247 bytes_read = numBytes;
250 std::memcpy(buffer, result.data_.data(), bytes_read);
251 grklog.
debug(
"Read %zu bytes from %llu, new offset: %llu", bytes_read, current_offset_,
252 current_offset_ + bytes_read);
253 current_offset_ += bytes_read;
255 curl_easy_cleanup(curl);
259 bool seek(uint64_t offset)
override
261 if(offset >= total_size_)
263 grklog.
error(
"Seek offset %llu exceeds total size %llu", offset, total_size_);
266 current_offset_ = offset;
267 grklog.
debug(
"Seeked to offset: %llu", current_offset_);
271 uint64_t size()
const override
276 uint64_t offset()
const override
278 return current_offset_;
291 std::future<bool> fetchTiles(
const TPSEQ_VEC& allTileParts, std::set<uint16_t>& slated,
292 void* user_data, TileFetchCallback callback)
override
296 std::lock_guard<std::mutex> lock(fetch_mutex_);
298 allTileParts_ = &allTileParts;
300 user_data_ = user_data;
301 if(!tileFetchCallback_)
302 tileFetchCallback_ = callback;
306 FetchJob job(std::move(slated));
307 std::future<bool> future = job.promise_.get_future();
309 std::lock_guard<std::mutex> lock(queue_mutex_);
310 tile_fetch_queue_.push(std::move(job));
312 queue_cv_.notify_one();
313 grklog.
debug(
"Queued tile fetch job, queue size: %zu", tile_fetch_queue_.size());
324 void onFetchTilesComplete(std::shared_ptr<TileFetchContext> context,
bool success)
override
326 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
327 auto it = active_jobs_.find(context);
328 if(it != active_jobs_.end())
330 it->second.set_value(success);
331 active_jobs_.erase(it);
336 grklog.
error(
"TileFetchContext not found in active_jobs_ during completion");
340 void fetchChunks(std::shared_ptr<ChunkBuffer<>> chunkBuffer)
override
342 auto requests = std::make_shared<std::vector<ChunkRequest>>();
343 auto length = chunkBuffer->size();
344 auto offset = chunkBuffer->offset();
345 auto workingLength = length - offset;
346 auto chunkSize = chunkBuffer->chunkSize();
347 auto numChunks = (workingLength + chunkSize - 1) / chunkSize;
349 for(uint16_t i = 0; i < numChunks; ++i)
351 auto end = offset + chunkSize - 1;
354 requests->push_back(ChunkRequest(i, offset, end));
357 fetchChunks(chunkBuffer, requests);
360 void fetchChunks(std::shared_ptr<ChunkBuffer<>> chunkBuffer,
361 std::shared_ptr<std::vector<ChunkRequest>> requests)
override
363 ChunkTask task(chunkBuffer, requests);
365 for(
size_t i = 0; i < task.requests_->size(); ++i)
367 auto& req = (*task.requests_)[i];
368 if(req.end_ < req.offset_ || req.end_ >= total_size_)
370 grklog.
error(
"Invalid range %llu-%llu for ID %u (total size: %llu)", req.offset_, req.end_,
371 req.requestIndex_, total_size_);
376 std::lock_guard<std::mutex> lock(queue_mutex_);
377 chunk_fetch_queue_.push(std::move(task));
379 queue_cv_.notify_one();
380 grklog.
debug(
"Queued chunk fetch task with %zu requests", requests->size());
384 std::vector<std::string> listDirectory(
const std::string& path)
override
386 std::vector<std::string> files;
387 CURL* curl = curl_easy_init();
390 grklog.
error(
"Failed to initialize curl for directory listing");
395 std::string list_url = url_ + (url_.back() ==
'/' ?
"" :
"/") +
"?list-type=2";
397 struct curl_slist* headers =
nullptr;
398 headers = prepareAuthHeaders(headers);
400 std::string response;
404 long responseCode_ = 0;
405 uint32_t retryCount_ = 0;
410 temp_result.responseCode_ = 0;
412 curl_easy_setopt(curl, CURLOPT_URL, list_url.c_str());
413 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
415 curl_initiate_retry(curl);
416 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
417 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
419 CURLcode res = curl_easy_perform(curl);
422 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &temp_result.responseCode_);
425 if(temp_result.retryCount_ < maxRetries_ &&
426 (res != CURLE_OK || temp_result.responseCode_ != 200))
428 temp_result.retryCount_++;
429 grklog.
warn(
"Retrying directory listing for %s (retry %u/%u), HTTP %ld, CURL %d",
430 path.c_str(), temp_result.retryCount_, maxRetries_, temp_result.responseCode_,
432 std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs_));
440 if(temp_result.responseCode_ == 200)
442 SimpleXmlParser parser;
443 if(parser.parse(response))
446 grklog.
debug(
"Listed %zu objects in %s", files.size(), path.c_str());
450 grklog.
warn(
"Failed to parse ListObjectsV2 response for %s", path.c_str());
455 grklog.
error(
"Directory listing failed for %s: HTTP %ld, CURL %d after %u retries",
456 path.c_str(), temp_result.responseCode_, curl_easy_perform(curl),
457 temp_result.retryCount_);
460 curl_slist_free_all(headers);
461 curl_easy_cleanup(curl);
466 bool getMetadata(
const std::string& path, std::map<std::string, std::string>& metadata)
override
468 CURL* curl = curl_easy_init();
471 grklog.
error(
"Failed to initialize curl for metadata retrieval");
476 struct curl_slist* headers =
nullptr;
477 headers = prepareAuthHeaders(headers);
479 std::string header_data;
480 bool success =
false;
484 long responseCode_ = 0;
485 uint32_t retryCount_ = 0;
490 temp_result.responseCode_ = 0;
492 curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
493 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
494 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
496 curl_initiate_retry(curl);
497 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, writeCallback);
498 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &header_data);
500 CURLcode res = curl_easy_perform(curl);
503 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &temp_result.responseCode_);
506 if(temp_result.retryCount_ < maxRetries_ &&
507 (res != CURLE_OK || temp_result.responseCode_ != 200))
509 temp_result.retryCount_++;
510 grklog.
warn(
"Retrying metadata retrieval for %s (retry %u/%u), HTTP %ld, CURL %d",
511 path.c_str(), temp_result.retryCount_, maxRetries_, temp_result.responseCode_,
513 std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs_));
517 success = (res == CURLE_OK && temp_result.responseCode_ == 200);
524 std::istringstream header_stream(header_data);
526 while(std::getline(header_stream, line))
528 size_t colon_pos = line.find(
':');
529 if(colon_pos != std::string::npos)
531 std::string key = line.substr(0, colon_pos);
532 std::string value = line.substr(colon_pos + 1);
533 key.erase(key.find_last_not_of(
" \t") + 1);
534 value.erase(0, value.find_first_not_of(
" \t"));
535 metadata[key] = value;
536 grklog.
debug(
"Metadata: %s=%s", key.c_str(), value.c_str());
542 grklog.
error(
"Metadata retrieval failed for %s: HTTP %ld, CURL %d after %u retries",
543 path.c_str(), temp_result.responseCode_, curl_easy_perform(curl),
544 temp_result.retryCount_);
547 curl_slist_free_all(headers);
548 curl_easy_cleanup(curl);
553 virtual curl_slist* prepareAuthHeaders(curl_slist* headers) = 0;
554 virtual void parse(
const std::string& path) = 0;
556 void fetchError(FetchResult* result)
558 auto ctx = std::static_pointer_cast<TileFetchContext>(result->ctx_);
559 if(ctx && ctx->fetcher_)
560 ctx->fetcher_->onFetchTilesComplete(ctx,
false);
563 virtual void auth(CURL* curl)
565 if(EnvVarManager::test_bool(
"GRK_HTTP_UNSAFESSL") || auth_.s3_allow_insecure_)
567 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
568 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
571 if(!auth_.username_.empty())
572 curl_easy_setopt(curl, CURLOPT_USERNAME, auth_.username_.c_str());
573 if(!auth_.password_.empty())
574 curl_easy_setopt(curl, CURLOPT_PASSWORD, auth_.password_.c_str());
577 if(!auth_.cookie_.empty())
578 curl_easy_setopt(curl, CURLOPT_COOKIE, auth_.cookie_.c_str());
579 if(!auth_.cookie_file_.empty())
580 curl_easy_setopt(curl, CURLOPT_COOKIEFILE, auth_.cookie_file_.c_str());
581 if(!auth_.cookie_jar_.empty())
582 curl_easy_setopt(curl, CURLOPT_COOKIEJAR, auth_.cookie_jar_.c_str());
586 curl_easy_setopt(curl, CURLOPT_NETRC, (
long)CURL_NETRC_REQUIRED);
587 if(!auth_.netrc_file_.empty())
588 curl_easy_setopt(curl, CURLOPT_NETRC_FILE, auth_.netrc_file_.c_str());
591 if(!auth_.proxy_.empty())
593 curl_easy_setopt(curl, CURLOPT_PROXY, auth_.proxy_.c_str());
594 if(!auth_.proxy_userpwd_.empty())
595 curl_easy_setopt(curl, CURLOPT_PROXYUSERPWD, auth_.proxy_userpwd_.c_str());
596 curl_easy_setopt(curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
598 else if(
auto proxy = EnvVarManager::get(
"GRK_CURL_PROXY"))
600 curl_easy_setopt(curl, CURLOPT_PROXY, proxy->c_str());
601 if(
auto proxyUserPwd = EnvVarManager::get(
"GRK_CURL_PROXYUSERPWD"))
602 curl_easy_setopt(curl, CURLOPT_PROXYUSERPWD, proxyUserPwd->c_str());
603 if(EnvVarManager::get(
"GRK_CURL_PROXYAUTH"))
604 curl_easy_setopt(curl, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
608 if(!auth_.user_agent_.empty())
609 curl_easy_setopt(curl, CURLOPT_USERAGENT, auth_.user_agent_.c_str());
612 long connect_timeout = auth_.connect_timeout_ > 0 ? auth_.connect_timeout_ : 10L;
613 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, connect_timeout);
616 curl_slist* configureHeaders(
const std::string& range)
618 struct curl_slist* headers =
nullptr;
619 headers = prepareAuthHeaders(headers);
622 headers = curl_slist_append(headers, range.c_str());
627 time_t getLastModifiedTime()
const
629 return last_modified_time_;
632 void fetch_total_size()
634 CURL* curl = curl_easy_init();
636 throw std::runtime_error(
"Failed to initialize CURL easy handle for HEAD");
638 curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
639 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
640 curl_easy_setopt(curl, CURLOPT_FILETIME, 1L);
643 auto headers = configureHeaders(
"");
644 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
646 CURLcode res = curl_easy_perform(curl);
649 grklog.
error(
"HEAD request failed: %s", curl_easy_strerror(res));
650 curl_easy_cleanup(curl);
651 throw std::runtime_error(
"Failed to fetch file size");
655 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
656 if(response_code != 200)
658 grklog.
error(
"HEAD request returned HTTP %ld", response_code);
659 curl_easy_cleanup(curl);
660 throw std::runtime_error(
"Invalid HEAD response");
663 curl_off_t content_length;
664 curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &content_length);
665 total_size_ =
static_cast<uint64_t
>(content_length);
666 grklog.
debug(
"Fetched total size: %llu bytes", total_size_);
669 res = curl_easy_getinfo(curl, CURLINFO_FILETIME, &filetime);
670 if(res == CURLE_OK && filetime != -1)
672 last_modified_time_ =
static_cast<time_t
>(filetime);
673 grklog.
debug(
"Fetched last modified time: %ld (Unix timestamp)", last_modified_time_);
677 grklog.
warn(
"Last modified time not available from server");
680 curl_easy_cleanup(curl);
681 curl_slist_free_all(headers);
693 CURL* configureHandle(uint64_t offset, uint64_t end, FetchResult& result,
694 CURL_FETCHER_WRITE_CALLBACK callback)
696 CURL* curl = curl_easy_init();
698 throw std::runtime_error(
"Failed to initialize CURL easy handle");
700 curl_easy_setopt(curl, CURLOPT_URL, url_.c_str());
701 curl_initiate_retry(curl);
702 curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
703 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
704 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &result);
705 curl_easy_setopt(curl, CURLOPT_PRIVATE, &result);
709 std::string range =
"Range: bytes=" + std::to_string(offset) +
"-" + std::to_string(end);
710 auto headers = configureHeaders(range);
711 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
723 std::shared_ptr<TileFetchContext> scheduleTileFetch(std::set<uint16_t>& slated)
725 auto requests = std::make_shared<TPFetchSeq>();
726 auto tilePartFetchByTile =
727 std::make_shared<std::unordered_map<uint16_t, std::shared_ptr<TPFetchSeq>>>();
728 TPFetchSeq::genCollections(allTileParts_, slated, requests, tilePartFetchByTile);
730 auto results = std::make_shared<std::vector<FetchResult>>(requests->size());
732 auto ctx = std::make_shared<TileFetchContext>(requests, user_data_, tilePartFetchByTile,
733 tileFetchCallback_,
this);
735 auto batch = std::make_unique<TileRequestBatch>(requests);
736 currentFetch_ = ScheduledFetch(ctx, std::move(batch), results);
739 for(
auto& r : *results)
742 return scheduleNextBatch(tileWriteCallback_) ? ctx :
nullptr;
755 bool scheduleChunkFetch(std::shared_ptr<ChunkContext> ctx,
756 std::shared_ptr<std::vector<ChunkRequest>> requests,
757 std::shared_ptr<std::vector<FetchResult>> results,
758 std::shared_ptr<std::vector<std::promise<FetchResult>>> promises)
760 auto batch = std::make_unique<ChunkRequestBatch>(requests);
761 currentFetch_ = ScheduledFetch(ctx, std::move(batch), results, promises);
762 return scheduleNextBatch(chunkWriteCallback);
772 bool scheduleNextBatch(CURL_FETCHER_WRITE_CALLBACK callback)
774 if(!currentFetch_.requests_ || !currentFetch_.requests_->hasMore())
780 std::lock_guard<std::mutex> lock(throttleMutex_);
781 if(fetchThrottle_ && !fetchThrottle_())
785 size_t activeRequests = currentFetch_.scheduled_ - currentFetch_.completed_;
786 size_t remainingBatch = batchSize_ > activeRequests ? batchSize_ - activeRequests : 0;
787 size_t remainingRequests = currentFetch_.requests_->remaining();
788 size_t requestsToSchedule = std::min(remainingBatch, remainingRequests);
790 for(
size_t i = 0; i < requestsToSchedule && currentFetch_.requests_->hasMore(); ++i)
792 auto [offset, end] = currentFetch_.requests_->next();
793 if(end >= this->total_size_)
795 grklog.
warn(
"Range %llu-%llu exceeds total size %llu", offset, end, total_size_);
796 end = this->total_size_ - 1;
798 auto& res = (*currentFetch_.results_)[currentFetch_.scheduled_];
799 res.requestIndex_ = currentFetch_.scheduled_;
801 res.ctx_ = currentFetch_.ctx_;
802 CURL* handle = configureHandle(offset, end, res, callback);
803 CURLMcode ret = curl_multi_add_handle(multi_handle_, handle);
806 grklog.
error(
"curl_multi_add_handle failed: %s", curl_multi_strerror(ret));
807 curl_easy_cleanup(handle);
811 std::lock_guard<std::mutex> lock(active_handles_mutex_);
812 active_handles_[handle] = currentFetch_.scheduled_;
814 grklog.
debug(
"Added fetch range request: %llu-%llu (index %zu)", offset, end,
815 currentFetch_.scheduled_);
816 currentFetch_.scheduled_++;
831 void retryRequest(FetchResult* result, uint64_t offset, uint64_t end,
832 CURL_FETCHER_WRITE_CALLBACK callback, std::function<
void()> onFatalError)
834 result->retryCount_++;
835 grklog.
warn(
"Retrying request %zu (retry %u/%u)", result->requestIndex_, result->retryCount_,
838 result->data_.clear();
839 result->responseCode_ = 0;
840 result->success_ =
false;
843 if(!currentFetch_.promises_)
845 auto ctx = std::static_pointer_cast<TileFetchContext>(result->ctx_);
848 auto& tpseq = (*ctx->requests_)[result->requestIndex_];
849 tpseq->fetchOffset_ = 0;
853 if(end >= total_size_)
854 end = total_size_ - 1;
856 CURL* handle = configureHandle(offset, end, *result, callback);
857 CURLMcode ret = curl_multi_add_handle(multi_handle_, handle);
860 grklog.
error(
"Retry curl_multi_add_handle failed: %s", curl_multi_strerror(ret));
861 curl_easy_cleanup(handle);
866 std::lock_guard<std::mutex> lock(active_handles_mutex_);
867 active_handles_[handle] = result->requestIndex_;
868 grklog.
debug(
"Rescheduled retry %u: %llu-%llu (index %zu)", result->retryCount_, offset, end,
869 result->requestIndex_);
873 void curl_initiate_retry(CURL* curl)
875 long timeout = auth_.timeout_ > 0 ? auth_.timeout_ : 30L;
876 curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
886 bool shouldRetry(
const FetchResult& result, CURLcode curl_code)
const
888 if(result.retryCount_ >= maxRetries_)
891 bool isCurlError = curl_code != CURLE_OK;
892 bool isHttpError = result.responseCode_ != 206 && result.responseCode_ != 0;
894 return isCurlError || isHttpError;
904 std::pair<uint64_t, uint64_t> getRequestRange(
const FetchResult& result)
const
906 if(!currentFetch_.promises_)
909 auto ctx = std::static_pointer_cast<TileFetchContext>(result.ctx_);
910 auto& req = (*ctx->requests_)[result.requestIndex_];
911 return {req->offset_, req->offset_ + req->length_ - 1};
916 auto ctx = std::static_pointer_cast<ChunkContext>(result.ctx_);
917 auto& req = (*ctx->requests_)[result.requestIndex_];
918 return {req.offset_, req.end_};
928 std::vector<FetchJob> tile_jobs_to_process;
929 std::vector<ChunkTask> chunk_tasks_to_process;
931 std::unique_lock<std::mutex> lock(queue_mutex_);
932 while(!tile_fetch_queue_.empty())
934 tile_jobs_to_process.emplace_back(std::move(tile_fetch_queue_.front()));
935 tile_fetch_queue_.pop();
936 grklog.
debug(
"Dequeued tile fetch job, queue size now: %zu", tile_fetch_queue_.size());
938 while(!chunk_fetch_queue_.empty())
940 chunk_tasks_to_process.emplace_back(std::move(chunk_fetch_queue_.front()));
941 chunk_fetch_queue_.pop();
942 grklog.
debug(
"Dequeued chunk fetch task, queue size now: %zu", chunk_fetch_queue_.size());
946 if(!tile_jobs_to_process.empty())
948 activeCallback_ = tileWriteCallback_;
949 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
950 for(
auto& job : tile_jobs_to_process)
952 auto ctx = scheduleTileFetch(job.slated);
954 job.promise_.set_value(
false);
956 active_jobs_.emplace(ctx, std::move(job.promise_));
960 if(!chunk_tasks_to_process.empty())
962 activeCallback_ = chunkWriteCallback;
963 for(
auto& task : chunk_tasks_to_process)
965 auto requests = task.requests_;
966 auto results = std::make_shared<std::vector<FetchResult>>(requests->size());
968 std::make_shared<std::vector<std::promise<FetchResult>>>(std::move(task.promises_));
969 auto ctx = std::make_shared<ChunkContext>(task.chunkBuffer_, requests);
970 for(
size_t i = 0; i < results->size(); ++i)
972 (*results)[i] = FetchResult((*requests)[i].requestIndex_);
973 (*results)[i].ctx_ = ctx;
975 if(!scheduleChunkFetch(ctx, requests, results, promises))
977 for(
size_t i = 0; i < promises->size(); ++i)
979 (*promises)[i].set_value((*results)[i]);
985 int still_running = 0;
986 auto ret = curl_multi_perform(multi_handle_, &still_running);
989 grklog.
error(
"curl_multi_perform failed: %s", curl_multi_strerror(ret));
992 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
993 for(
auto& job : active_jobs_)
994 job.second.set_value(
false);
995 active_jobs_.clear();
999 std::lock_guard<std::mutex> lock(active_handles_mutex_);
1000 if(currentFetch_.promises_)
1002 for(
size_t i = 0; i < currentFetch_.promises_->size(); ++i)
1005 res.success_ =
false;
1006 (*currentFetch_.promises_)[i].set_value(res);
1009 for(
auto& [handle, idx] : active_handles_)
1011 curl_multi_remove_handle(multi_handle_, handle);
1012 curl_easy_cleanup(handle);
1014 active_handles_.clear();
1021 while((msg = curl_multi_info_read(multi_handle_, &msgs_left)))
1023 if(msg->msg == CURLMSG_DONE)
1025 CURL* curl = msg->easy_handle;
1027 curl_easy_getinfo(curl, CURLINFO_PRIVATE, &userp);
1028 auto* result =
static_cast<FetchResult*
>(userp);
1030 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &result->responseCode_);
1031 result->success_ = (msg->data.result == CURLE_OK && result->responseCode_ == 206);
1035 std::lock_guard<std::mutex> lock(active_handles_mutex_);
1036 auto it = active_handles_.find(curl);
1037 if(it != active_handles_.end())
1040 active_handles_.erase(it);
1044 curl_multi_remove_handle(multi_handle_, curl);
1045 curl_easy_cleanup(curl);
1047 if(!result->success_)
1049 if(shouldRetry(*result, msg->data.result))
1051 auto [offset, end] = getRequestRange(*result);
1052 grklog.
warn(
"Fetch request %zu failed (HTTP %ld, CURL %s), retrying...",
1053 result->requestIndex_, result->responseCode_,
1054 curl_easy_strerror(msg->data.result));
1055 auto callback = activeCallback_;
1056 retryRequest(result, offset, end, callback, [
this, result, idx]() {
1058 if(!currentFetch_.promises_)
1060 else if(idx < currentFetch_.promises_->size())
1061 (*currentFetch_.promises_)[idx].set_value(*result);
1066 grklog.
error(
"Fetch request %zu failed: %s, HTTP %ld (no more retries)",
1067 result->requestIndex_, curl_easy_strerror(msg->data.result),
1068 result->responseCode_);
1070 if(!currentFetch_.promises_)
1075 grklog.
debug(
"Fetch request %zu completed", result->requestIndex_);
1079 if(currentFetch_.promises_ && idx < currentFetch_.promises_->size())
1081 (*currentFetch_.promises_)[idx].set_value(*result);
1084 currentFetch_.completed_++;
1087 if(currentFetch_.scheduled_ > currentFetch_.completed_ &&
1088 currentFetch_.completed_ >= batchSize_ / 2 && currentFetch_.requests_ &&
1089 currentFetch_.requests_->hasMore())
1091 grklog.
debug(
"Half of batch (%zu) completed, scheduling next batch", batchSize_ / 2);
1093 scheduleNextBatch(activeCallback_);
1098 if(still_running > 0)
1100 grklog.
trace(
"Still running: %d requests", still_running);
1105 bool throttled =
false;
1106 if(currentFetch_.requests_ && currentFetch_.requests_->hasMore() && activeCallback_)
1109 std::lock_guard<std::mutex> tlock(throttleMutex_);
1110 throttled = fetchThrottle_ && !fetchThrottle_();
1113 scheduleNextBatch(activeCallback_);
1120 std::unique_lock<std::mutex> tlock(throttleMutex_);
1121 throttleCV_.wait_for(tlock, std::chrono::milliseconds(100),
1122 [
this] {
return stop_ || !fetchThrottle_ || fetchThrottle_(); });
1126 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
1127 std::lock_guard<std::mutex> lock2(active_handles_mutex_);
1128 if(active_jobs_.empty() && active_handles_.empty())
1131 std::unique_lock<std::mutex> qlock(queue_mutex_);
1132 queue_cv_.wait(qlock, [
this] {
1133 return stop_ || !tile_fetch_queue_.empty() || !chunk_fetch_queue_.empty();
1140 std::lock_guard<std::mutex> lock(active_jobs_mutex_);
1141 for(
auto& job : active_jobs_)
1142 job.second.set_value(
false);
1143 active_jobs_.clear();
1145 std::lock_guard<std::mutex> lock2(active_handles_mutex_);
1146 if(currentFetch_.promises_)
1148 for(
auto& [handle, idx] : active_handles_)
1150 FetchResult resp(idx);
1151 resp.success_ =
false;
1152 if(idx < currentFetch_.promises_->size())
1153 (*currentFetch_.promises_)[idx].set_value(resp);
1154 curl_multi_remove_handle(multi_handle_, handle);
1155 curl_easy_cleanup(handle);
1158 active_handles_.clear();
1163 static size_t writeCallback(
void* contents,
size_t size,
size_t nmemb, std::string* s)
1165 s->append((
char*)contents, size * nmemb);
1166 return size * nmemb;
1171 std::mutex queue_mutex_;
1172 std::condition_variable queue_cv_;
1173 std::unordered_map<std::shared_ptr<TileFetchContext>, std::promise<bool>> active_jobs_;
1174 std::mutex active_jobs_mutex_;
1175 std::mutex fetch_mutex_;
1176 std::queue<FetchJob> tile_fetch_queue_;
1177 std::queue<ChunkTask> chunk_fetch_queue_;
1178 std::mutex active_handles_mutex_;
1179 std::unordered_map<CURL*, size_t> active_handles_;
1180 void* user_data_ =
nullptr;
1181 uint64_t total_size_ = 0;
1182 uint64_t current_offset_ = 0;
1183 CURLM* multi_handle_ =
nullptr;
1186 size_t batchSize_ = 30;
1188 std::thread fetchThread_;
1189 uint32_t maxRetries_ = 3;
1190 uint32_t retryDelayMs_ = 1000;
1191 std::function<bool()> fetchThrottle_;
1192 std::mutex throttleMutex_;
1193 std::condition_variable throttleCV_;
1197 const TPSEQ_VEC* allTileParts_ =
nullptr;
1198 time_t last_modified_time_ = -1;
1202 ScheduledFetch currentFetch_;
1210 if((atomicCount.fetch_add(1, std::memory_order_seq_cst) + 1) ==
requests_->size())
1212 fetcher_->onFetchTilesComplete(shared_from_this(),
true);
Manages a partially ordered deque of buffer chunks that are added asynchronously out of order.
Definition ChunkBuffer.h:47
T index_type
Definition ChunkBuffer.h:49
Definition CurlFetcher.h:49
virtual void fetchChunks(std::shared_ptr< ChunkBuffer<> > chunkBuffer)=0
virtual uint64_t size() const =0
virtual std::vector< std::string > listDirectory(const std::string &path)=0
virtual size_t read(uint8_t *buffer, size_t numBytes)=0
virtual void fetchChunks(std::shared_ptr< ChunkBuffer<> > chunkBuffer, std::shared_ptr< std::vector< ChunkRequest > > requests)=0
virtual bool getMetadata(const std::string &path, std::map< std::string, std::string > &metadata)=0
virtual ~IFetcher()=default
virtual void onFetchTilesComplete(std::shared_ptr< TileFetchContext > context, bool success)=0
virtual void notifyThrottleRelease()=0
virtual bool seek(uint64_t offset)=0
virtual uint64_t offset() const =0
virtual std::future< bool > fetchTiles(const TPSEQ_VEC &allTileParts, std::set< uint16_t > &slated, void *user_data, TileFetchCallback callback)=0
virtual void init(const std::string &path, const FetchAuth &auth)=0
virtual void setFetchThrottle(std::function< bool()> throttle)=0
ResWindow.
Definition CompressedChunkCache.h:36
std::function< void(size_t requestIndex, TileFetchContext *context)> TileFetchCallback
Definition CurlFetcher.h:46
ILogger & grklog
Definition Logger.cpp:24
std::vector< std::unique_ptr< TPSeq > > TPSEQ_VEC
Definition TPFetchSeq.h:181
size_t(* CURL_FETCHER_WRITE_CALLBACK)(void *contents, size_t size, size_t nmemb, void *userp)
Definition CurlFetcher.h:116
Definition FetchCommon.h:42
Definition FetchCommon.h:93
virtual void warn(const char *fmt,...)=0
virtual void error(const char *fmt,...)=0
virtual void debug(const char *fmt,...)=0
virtual void trace(const char *fmt,...)=0
Definition CurlFetcher.h:95
size_t completeCount_
Definition CurlFetcher.h:113
IFetcher * fetcher_
Definition CurlFetcher.h:100
std::shared_ptr< TPFetchSeq > requests_
Definition CurlFetcher.h:96
void * user_data_
Definition CurlFetcher.h:97
void incrementCompleteCount()
Definition CurlFetcher.h:1207
TileFetchCallback callback_
Definition CurlFetcher.h:99
TileFetchContext(std::shared_ptr< TPFetchSeq > &requests, void *user_data, std::shared_ptr< std::unordered_map< uint16_t, std::shared_ptr< TPFetchSeq > > > &tilePartFetchByTile, TileFetchCallback callback, IFetcher *fetcher)
Definition CurlFetcher.h:102
std::shared_ptr< std::unordered_map< uint16_t, std::shared_ptr< TPFetchSeq > > > tilePartFetchByTile_
Definition CurlFetcher.h:98