LCOV - code coverage report
Current view: top level - src - offline_queue.dart Coverage Total Hit
Test: lcov.info Lines: 58.6 % 99 58
Test Date: 2026-04-30 18:23:23 Functions: - 0 0

            Line data    Source code
       1              : import 'package:flutter/foundation.dart';
       2              : 
       3              : import 'dart:async';
       4              : import 'dart:math' as math;
       5              : 
       6              : import 'constraints.dart';
       7              : import 'events.dart';
       8              : import 'native_work_manager.dart';
       9              : import 'platform_interface.dart';
      10              : import 'task_trigger.dart';
      11              : import 'worker.dart';
      12              : 
      13              : /// Retry policy for [OfflineQueue] tasks.
      14              : ///
      15              : /// Determines how many times a task is retried and what constraints
      16              : /// must be met before each retry attempt.
      17              : @immutable
      18              : class OfflineRetryPolicy {
      19            6 :   const OfflineRetryPolicy({
      20              :     this.maxRetries = 5,
      21              :     this.requiresNetwork = true,
      22              :     this.requiresCharging = false,
      23              :     this.backoffMultiplier = 2.0,
      24              :     this.initialDelay = const Duration(seconds: 30),
      25              :     this.maxDelay = const Duration(hours: 6),
      26           18 :   })  : assert(maxRetries >= 0 && maxRetries <= 100,
      27              :             'maxRetries must be between 0 and 100'),
      28              :         // NaN fails >= 1.0 already; Infinity is clamped to maxDelay in delayFor().
      29           12 :         assert(backoffMultiplier >= 1.0,
      30              :             'backoffMultiplier must be >= 1.0 (NaN and negative values not allowed)');
      31              : 
      32              :   /// Maximum retry attempts (0–100). Set to 0 for no retries.
      33              :   final int maxRetries;
      34              : 
      35              :   /// Require any network connection before retrying.
      36              :   final bool requiresNetwork;
      37              : 
      38              :   /// Require charging before retrying (useful for heavy tasks).
      39              :   final bool requiresCharging;
      40              : 
      41              :   /// Exponential backoff multiplier (finite number ≥ 1.0; 1.0 = constant interval).
      42              :   final double backoffMultiplier;
      43              : 
      44              :   /// Delay before the first retry.
      45              :   final Duration initialDelay;
      46              : 
      47              :   /// Maximum delay cap for exponential backoff.
      48              :   final Duration maxDelay;
      49              : 
      50              :   /// Convenience: retry up to 10 times on any network.
      51              :   static const networkAvailable = OfflineRetryPolicy(
      52              :     maxRetries: 10,
      53              :     requiresNetwork: true,
      54              :   );
      55              : 
      56              :   /// Convenience: retry up to 5 times, network required.
      57              :   static const networkRequired = OfflineRetryPolicy(
      58              :     maxRetries: 5,
      59              :     requiresNetwork: true,
      60              :   );
      61              : 
      62              :   /// Convenience: immediate retry, no constraints.
      63              :   static const aggressive = OfflineRetryPolicy(
      64              :     maxRetries: 3,
      65              :     requiresNetwork: false,
      66              :     initialDelay: Duration(seconds: 5),
      67              :   );
      68              : 
      69              :   /// Compute the delay before retry attempt [attempt] (0-indexed).
      70            1 :   Duration delayFor(int attempt) {
      71            3 :     final ms = initialDelay.inMilliseconds *
      72            2 :         (backoffMultiplier == 1.0
      73              :             ? 1.0
      74            3 :             : _pow(backoffMultiplier, attempt.toDouble()));
      75            1 :     final clamped = ms.isFinite
      76            4 :         ? ms.round().clamp(0, maxDelay.inMilliseconds)
      77            0 :         : maxDelay.inMilliseconds;
      78            1 :     return Duration(milliseconds: clamped);
      79              :   }
      80              : 
      81            1 :   static double _pow(double base, double exp) {
      82            2 :     return math.pow(base, exp).toDouble();
      83              :   }
      84              : 
      85              :   /// Convert to map for platform channel.
      86            1 :   Map<String, dynamic> toMap() {
      87            1 :     return {
      88            1 :       'maxRetries': maxRetries,
      89            1 :       'requiresNetwork': requiresNetwork,
      90            1 :       'requiresCharging': requiresCharging,
      91            1 :       'backoffMultiplier': backoffMultiplier,
      92            2 :       'initialDelayMs': initialDelay.inMilliseconds,
      93            2 :       'maxDelayMs': maxDelay.inMilliseconds,
      94              :     };
      95              :   }
      96              : }
      97              : 
      98              : /// An entry in an [OfflineQueue].
      99              : @immutable
     100              : class QueueEntry {
     101            1 :   const QueueEntry({
     102              :     required this.taskId,
     103              :     required this.worker,
     104              :     this.retryPolicy = const OfflineRetryPolicy(),
     105              :     this.tag,
     106              :   });
     107              : 
     108              :   /// Unique ID for this queued task.
     109              :   final String taskId;
     110              : 
     111              :   /// Worker to execute.
     112              :   final Worker worker;
     113              : 
     114              :   /// Retry policy for this entry.
     115              :   final OfflineRetryPolicy retryPolicy;
     116              : 
     117              :   /// Optional tag for grouping / cancellation.
     118              :   final String? tag;
     119              : 
     120              :   /// Convert to map for platform channel.
     121            1 :   Map<String, dynamic> toMap() {
     122            1 :     return {
     123            1 :       'taskId': taskId,
     124            2 :       'workerClassName': worker.workerClassName,
     125            2 :       'workerConfig': worker.toMap(),
     126            2 :       'retryPolicy': retryPolicy.toMap(),
     127            1 :       'tag': tag,
     128              :     };
     129              :   }
     130              : }
     131              : 
     132              : /// A persistent, ordered queue of background tasks that are retried
     133              : /// automatically when network is available.
     134              : ///
     135              : /// The offline queue is ideal for "best-effort delivery" scenarios:
     136              : /// - Uploading analytics events when connectivity is restored
     137              : /// - Syncing user data in the background
     138              : /// - Sending logs to a remote server
     139              : ///
     140              : /// ## Usage
     141              : ///
     142              : /// ```dart
     143              : /// // Create the queue (one instance per queue in your app)
     144              : /// final uploadQueue = OfflineQueue(
     145              : ///   id: 'upload-queue',
     146              : ///   maxSize: 100,
     147              : ///   defaultRetryPolicy: OfflineRetryPolicy.networkAvailable,
     148              : /// );
     149              : ///
     150              : /// // Enqueue tasks (safe to call when offline)
     151              : /// await uploadQueue.enqueue(QueueEntry(
     152              : ///   taskId: 'upload-${timestamp}',
     153              : ///   worker: HttpUploadWorker(
     154              : ///     url: 'https://api.example.com/events',
     155              : ///     filePath: '/tmp/events.json',
     156              : ///   ),
     157              : /// ));
     158              : ///
     159              : /// // Start processing (call once, e.g. in main())
     160              : /// uploadQueue.start();
     161              : /// ```
     162              : ///
     163              : /// ## Behavior
     164              : ///
     165              : /// - Tasks run **one at a time** in FIFO order.
     166              : /// - Failed tasks are retried with exponential backoff up to
     167              : ///   [OfflineRetryPolicy.maxRetries] times.
     168              : /// - After all retries are exhausted the task is moved to a **dead-letter**
     169              : ///   state (accessible via [deadLetterCount]).
     170              : /// - Calling [enqueue] when the queue is full (> [maxSize]) throws a
     171              : ///   [StateError].
     172              : ///
     173              : /// ## Limitations
     174              : ///
     175              : /// The current implementation stores the queue in memory.  If the app is
     176              : /// killed, in-flight and pending tasks are re-enqueued from the
     177              : /// [NativeWorkManager] task store on the next [start] call (tasks that were
     178              : /// accepted by the OS continue to completion natively).
     179              : ///
     180              : /// For a fully persistent queue that survives process death, schedule
     181              : /// each task directly with [NativeWorkManager.enqueue] using
     182              : /// [Constraints.networkRequired] and the WorkManager retry mechanism
     183              : /// (`shouldRetry = true` from the worker result).
     184              : class OfflineQueue {
     185            1 :   OfflineQueue({
     186              :     required this.id,
     187              :     this.maxSize = 100,
     188              :     this.defaultRetryPolicy = const OfflineRetryPolicy(),
     189              :   });
     190              : 
     191              :   /// Unique queue identifier.
     192              :   final String id;
     193              : 
     194              :   /// Maximum number of pending entries. [enqueue] throws [StateError] if full.
     195              :   final int maxSize;
     196              : 
     197              :   /// Default retry policy for entries that do not specify their own.
     198              :   final OfflineRetryPolicy defaultRetryPolicy;
     199              : 
     200              :   final List<_QueueSlot> _pending = [];
     201              :   final List<_QueueSlot> _deadLetter = [];
     202              :   bool _running = false;
     203              :   bool _processing = false;
     204              : 
     205              :   /// Number of entries currently waiting in the queue.
     206            3 :   int get pendingCount => _pending.length;
     207              : 
     208              :   /// Number of entries that exhausted all retries and were dropped.
     209            3 :   int get deadLetterCount => _deadLetter.length;
     210              : 
     211              :   /// Whether this queue is currently started (processing tasks as they come).
     212            2 :   bool get isRunning => _running;
     213              : 
     214              :   /// Add a [QueueEntry] to the back of the queue.
     215              :   ///
     216              :   /// If the queue has reached [maxSize] capacity the call returns silently
     217              :   /// without enqueuing the entry (the entry is dropped). Safe to call before [start].
     218            1 :   Future<void> enqueue(QueueEntry entry) async {
     219            4 :     if (_pending.length >= maxSize) {
     220              :       return;
     221              :     }
     222            3 :     _pending.add(_QueueSlot(
     223              :       entry: entry,
     224            1 :       policy: entry.retryPolicy,
     225              :       attempt: 0,
     226              :     ));
     227            1 :     if (_running) _scheduleNext();
     228              :   }
     229              : 
     230              :   /// Start processing queued tasks.
     231              :   ///
     232              :   /// Safe to call multiple times — subsequent calls are no-ops.
     233            1 :   void start() {
     234            1 :     if (_running) return;
     235            1 :     _running = true;
     236            1 :     _scheduleNext();
     237              :   }
     238              : 
     239              :   /// Stop processing.  The current in-flight task (if any) completes normally;
     240              :   /// no further tasks are dequeued until [start] is called again.
     241            1 :   void stop() {
     242            1 :     _running = false;
     243              :   }
     244              : 
     245              :   /// Cancel all queued entries that match [taskId] or [tag].
     246            1 :   void cancel({String? taskId, String? tag}) {
     247              :     // Snapshot matching slots and remove from list atomically before issuing
     248              :     // native cancels — prevents a concurrent _scheduleNext from dequeuing
     249              :     // a slot that is in the process of being cancelled.
     250            3 :     final toCancel = _pending.where((slot) {
     251            3 :       return (taskId != null && slot.entry.taskId == taskId) ||
     252            3 :           (tag != null && slot.entry.tag == tag);
     253            1 :     }).toList();
     254            3 :     _pending.removeWhere((slot) {
     255            3 :       return (taskId != null && slot.entry.taskId == taskId) ||
     256            3 :           (tag != null && slot.entry.tag == tag);
     257              :     });
     258              :     // Fire native cancels after list removal so no further scheduling occurs.
     259            2 :     for (final slot in toCancel) {
     260            5 :       final nativeId = '${id}__${slot.entry.taskId}__${slot.attempt}';
     261            2 :       NativeWorkManager.cancel(taskId: nativeId).ignore();
     262              :     }
     263              :   }
     264              : 
     265              :   /// Remove all dead-letter entries.
     266            3 :   void clearDeadLetter() => _deadLetter.clear();
     267              : 
     268              :   // ── Internal ──────────────────────────────────────────────────────────────
     269              : 
     270            1 :   void _scheduleNext() {
     271            4 :     if (!_running || _processing || _pending.isEmpty) return;
     272            0 :     _processing = true;
     273            0 :     _processHead();
     274              :   }
     275              : 
     276            0 :   Future<void> _processHead() async {
     277            0 :     if (_pending.isEmpty) {
     278            0 :       _processing = false;
     279              :       return;
     280              :     }
     281              : 
     282            0 :     final slot = _pending.first;
     283            0 :     final policy = slot.policy;
     284            0 :     final entry = slot.entry;
     285              : 
     286              :     // Apply delay for retry attempts
     287            0 :     if (slot.attempt > 0) {
     288            0 :       final delay = policy.delayFor(slot.attempt - 1);
     289            0 :       if (delay > Duration.zero) {
     290            0 :         await Future<void>.delayed(delay);
     291              :       }
     292              :     }
     293              : 
     294            0 :     if (!_running) {
     295            0 :       _processing = false;
     296              :       return;
     297              :     }
     298              : 
     299              :     // Build constraints from policy
     300            0 :     final constraints = Constraints(
     301            0 :       requiresNetwork: policy.requiresNetwork,
     302            0 :       requiresCharging: policy.requiresCharging,
     303              :     );
     304              : 
     305              :     try {
     306            0 :       await NativeWorkManager.enqueue(
     307            0 :         taskId: '${id}__${entry.taskId}__${slot.attempt}',
     308            0 :         trigger: TaskTrigger.oneTime(),
     309            0 :         worker: entry.worker,
     310              :         constraints: constraints,
     311              :       );
     312              : 
     313              :       // Wait for the task to complete via the events stream.
     314            0 :       final event = await _awaitEvent(
     315            0 :         '${id}__${entry.taskId}__${slot.attempt}',
     316              :         timeout: const Duration(hours: 1),
     317              :       );
     318              : 
     319            0 :       if (event?.success == true) {
     320            0 :         _pending.remove(slot);
     321            0 :         _processing = false;
     322            0 :         _scheduleNext();
     323              :         return;
     324              :       }
     325              :     } catch (_) {
     326              :       // enqueue itself failed — treat as task failure
     327              :     }
     328              : 
     329              :     // Task failed — retry or dead-letter
     330            0 :     if (slot.attempt < policy.maxRetries) {
     331              :       // Update attempt counter in-place
     332            0 :       _pending[0] = _QueueSlot(
     333              :         entry: entry,
     334              :         policy: policy,
     335            0 :         attempt: slot.attempt + 1,
     336              :       );
     337              :     } else {
     338              :       // Exhausted retries → dead-letter
     339            0 :       _pending.removeAt(0);
     340            0 :       _deadLetter.add(slot);
     341              :     }
     342              : 
     343            0 :     _processing = false;
     344            0 :     _scheduleNext();
     345              :   }
     346              : 
     347              :   /// Wait for a specific taskId event on [NativeWorkManager.events].
     348              :   ///
     349              :   /// Returns the event or `null` if [timeout] is exceeded.
     350            0 :   static Future<TaskEvent?> _awaitEvent(String taskId,
     351              :       {required Duration timeout}) async {
     352              :     try {
     353            0 :       return await NativeWorkManagerPlatform.instance.events
     354            0 :           .where((e) => e.taskId == taskId && !e.isStarted)
     355            0 :           .first
     356            0 :           .timeout(timeout);
     357            0 :     } on TimeoutException {
     358              :       return null;
     359              :     } catch (e) {
     360              :       return null;
     361              :     }
     362              :   }
     363              : }
     364              : 
     365              : @immutable
     366              : class _QueueSlot {
     367            1 :   const _QueueSlot({
     368              :     required this.entry,
     369              :     required this.policy,
     370              :     required this.attempt,
     371              :   });
     372              : 
     373              :   final QueueEntry entry;
     374              :   final OfflineRetryPolicy policy;
     375              :   final int attempt;
     376              : }
        

Generated by: LCOV version 2.4-0