Line data Source code
1 : import 'package:flutter/foundation.dart';
2 :
3 : import 'dart:async';
4 : import 'dart:math' as math;
5 :
6 : import 'constraints.dart';
7 : import 'events.dart';
8 : import 'native_work_manager.dart';
9 : import 'platform_interface.dart';
10 : import 'task_trigger.dart';
11 : import 'worker.dart';
12 :
13 : /// Retry policy for [OfflineQueue] tasks.
14 : ///
15 : /// Determines how many times a task is retried and what constraints
16 : /// must be met before each retry attempt.
17 : @immutable
18 : class OfflineRetryPolicy {
19 6 : const OfflineRetryPolicy({
20 : this.maxRetries = 5,
21 : this.requiresNetwork = true,
22 : this.requiresCharging = false,
23 : this.backoffMultiplier = 2.0,
24 : this.initialDelay = const Duration(seconds: 30),
25 : this.maxDelay = const Duration(hours: 6),
26 18 : }) : assert(maxRetries >= 0 && maxRetries <= 100,
27 : 'maxRetries must be between 0 and 100'),
28 : // NaN fails >= 1.0 already; Infinity is clamped to maxDelay in delayFor().
29 12 : assert(backoffMultiplier >= 1.0,
30 : 'backoffMultiplier must be >= 1.0 (NaN and negative values not allowed)');
31 :
32 : /// Maximum retry attempts (0–100). Set to 0 for no retries.
33 : final int maxRetries;
34 :
35 : /// Require any network connection before retrying.
36 : final bool requiresNetwork;
37 :
38 : /// Require charging before retrying (useful for heavy tasks).
39 : final bool requiresCharging;
40 :
41 : /// Exponential backoff multiplier (finite number ≥ 1.0; 1.0 = constant interval).
42 : final double backoffMultiplier;
43 :
44 : /// Delay before the first retry.
45 : final Duration initialDelay;
46 :
47 : /// Maximum delay cap for exponential backoff.
48 : final Duration maxDelay;
49 :
50 : /// Convenience: retry up to 10 times on any network.
51 : static const networkAvailable = OfflineRetryPolicy(
52 : maxRetries: 10,
53 : requiresNetwork: true,
54 : );
55 :
56 : /// Convenience: retry up to 5 times, network required.
57 : static const networkRequired = OfflineRetryPolicy(
58 : maxRetries: 5,
59 : requiresNetwork: true,
60 : );
61 :
62 : /// Convenience: immediate retry, no constraints.
63 : static const aggressive = OfflineRetryPolicy(
64 : maxRetries: 3,
65 : requiresNetwork: false,
66 : initialDelay: Duration(seconds: 5),
67 : );
68 :
69 : /// Compute the delay before retry attempt [attempt] (0-indexed).
70 1 : Duration delayFor(int attempt) {
71 3 : final ms = initialDelay.inMilliseconds *
72 2 : (backoffMultiplier == 1.0
73 : ? 1.0
74 3 : : _pow(backoffMultiplier, attempt.toDouble()));
75 1 : final clamped = ms.isFinite
76 4 : ? ms.round().clamp(0, maxDelay.inMilliseconds)
77 0 : : maxDelay.inMilliseconds;
78 1 : return Duration(milliseconds: clamped);
79 : }
80 :
81 1 : static double _pow(double base, double exp) {
82 2 : return math.pow(base, exp).toDouble();
83 : }
84 :
85 : /// Convert to map for platform channel.
86 1 : Map<String, dynamic> toMap() {
87 1 : return {
88 1 : 'maxRetries': maxRetries,
89 1 : 'requiresNetwork': requiresNetwork,
90 1 : 'requiresCharging': requiresCharging,
91 1 : 'backoffMultiplier': backoffMultiplier,
92 2 : 'initialDelayMs': initialDelay.inMilliseconds,
93 2 : 'maxDelayMs': maxDelay.inMilliseconds,
94 : };
95 : }
96 : }
97 :
98 : /// An entry in an [OfflineQueue].
99 : @immutable
100 : class QueueEntry {
101 1 : const QueueEntry({
102 : required this.taskId,
103 : required this.worker,
104 : this.retryPolicy = const OfflineRetryPolicy(),
105 : this.tag,
106 : });
107 :
108 : /// Unique ID for this queued task.
109 : final String taskId;
110 :
111 : /// Worker to execute.
112 : final Worker worker;
113 :
114 : /// Retry policy for this entry.
115 : final OfflineRetryPolicy retryPolicy;
116 :
117 : /// Optional tag for grouping / cancellation.
118 : final String? tag;
119 :
120 : /// Convert to map for platform channel.
121 1 : Map<String, dynamic> toMap() {
122 1 : return {
123 1 : 'taskId': taskId,
124 2 : 'workerClassName': worker.workerClassName,
125 2 : 'workerConfig': worker.toMap(),
126 2 : 'retryPolicy': retryPolicy.toMap(),
127 1 : 'tag': tag,
128 : };
129 : }
130 : }
131 :
132 : /// A persistent, ordered queue of background tasks that are retried
133 : /// automatically when network is available.
134 : ///
135 : /// The offline queue is ideal for "best-effort delivery" scenarios:
136 : /// - Uploading analytics events when connectivity is restored
137 : /// - Syncing user data in the background
138 : /// - Sending logs to a remote server
139 : ///
140 : /// ## Usage
141 : ///
142 : /// ```dart
143 : /// // Create the queue (one instance per queue in your app)
144 : /// final uploadQueue = OfflineQueue(
145 : /// id: 'upload-queue',
146 : /// maxSize: 100,
147 : /// defaultRetryPolicy: OfflineRetryPolicy.networkAvailable,
148 : /// );
149 : ///
150 : /// // Enqueue tasks (safe to call when offline)
151 : /// await uploadQueue.enqueue(QueueEntry(
152 : /// taskId: 'upload-${timestamp}',
153 : /// worker: HttpUploadWorker(
154 : /// url: 'https://api.example.com/events',
155 : /// filePath: '/tmp/events.json',
156 : /// ),
157 : /// ));
158 : ///
159 : /// // Start processing (call once, e.g. in main())
160 : /// uploadQueue.start();
161 : /// ```
162 : ///
163 : /// ## Behavior
164 : ///
165 : /// - Tasks run **one at a time** in FIFO order.
166 : /// - Failed tasks are retried with exponential backoff up to
167 : /// [OfflineRetryPolicy.maxRetries] times.
168 : /// - After all retries are exhausted the task is moved to a **dead-letter**
169 : /// state (accessible via [deadLetterCount]).
170 : /// - Calling [enqueue] when the queue is full (> [maxSize]) throws a
171 : /// [StateError].
172 : ///
173 : /// ## Limitations
174 : ///
175 : /// The current implementation stores the queue in memory. If the app is
176 : /// killed, in-flight and pending tasks are re-enqueued from the
177 : /// [NativeWorkManager] task store on the next [start] call (tasks that were
178 : /// accepted by the OS continue to completion natively).
179 : ///
180 : /// For a fully persistent queue that survives process death, schedule
181 : /// each task directly with [NativeWorkManager.enqueue] using
182 : /// [Constraints.networkRequired] and the WorkManager retry mechanism
183 : /// (`shouldRetry = true` from the worker result).
184 : class OfflineQueue {
185 1 : OfflineQueue({
186 : required this.id,
187 : this.maxSize = 100,
188 : this.defaultRetryPolicy = const OfflineRetryPolicy(),
189 : });
190 :
191 : /// Unique queue identifier.
192 : final String id;
193 :
194 : /// Maximum number of pending entries. [enqueue] throws [StateError] if full.
195 : final int maxSize;
196 :
197 : /// Default retry policy for entries that do not specify their own.
198 : final OfflineRetryPolicy defaultRetryPolicy;
199 :
200 : final List<_QueueSlot> _pending = [];
201 : final List<_QueueSlot> _deadLetter = [];
202 : bool _running = false;
203 : bool _processing = false;
204 :
205 : /// Number of entries currently waiting in the queue.
206 3 : int get pendingCount => _pending.length;
207 :
208 : /// Number of entries that exhausted all retries and were dropped.
209 3 : int get deadLetterCount => _deadLetter.length;
210 :
211 : /// Whether this queue is currently started (processing tasks as they come).
212 2 : bool get isRunning => _running;
213 :
214 : /// Add a [QueueEntry] to the back of the queue.
215 : ///
216 : /// If the queue has reached [maxSize] capacity the call returns silently
217 : /// without enqueuing the entry (the entry is dropped). Safe to call before [start].
218 1 : Future<void> enqueue(QueueEntry entry) async {
219 4 : if (_pending.length >= maxSize) {
220 : return;
221 : }
222 3 : _pending.add(_QueueSlot(
223 : entry: entry,
224 1 : policy: entry.retryPolicy,
225 : attempt: 0,
226 : ));
227 1 : if (_running) _scheduleNext();
228 : }
229 :
230 : /// Start processing queued tasks.
231 : ///
232 : /// Safe to call multiple times — subsequent calls are no-ops.
233 1 : void start() {
234 1 : if (_running) return;
235 1 : _running = true;
236 1 : _scheduleNext();
237 : }
238 :
239 : /// Stop processing. The current in-flight task (if any) completes normally;
240 : /// no further tasks are dequeued until [start] is called again.
241 1 : void stop() {
242 1 : _running = false;
243 : }
244 :
245 : /// Cancel all queued entries that match [taskId] or [tag].
246 1 : void cancel({String? taskId, String? tag}) {
247 : // Snapshot matching slots and remove from list atomically before issuing
248 : // native cancels — prevents a concurrent _scheduleNext from dequeuing
249 : // a slot that is in the process of being cancelled.
250 3 : final toCancel = _pending.where((slot) {
251 3 : return (taskId != null && slot.entry.taskId == taskId) ||
252 3 : (tag != null && slot.entry.tag == tag);
253 1 : }).toList();
254 3 : _pending.removeWhere((slot) {
255 3 : return (taskId != null && slot.entry.taskId == taskId) ||
256 3 : (tag != null && slot.entry.tag == tag);
257 : });
258 : // Fire native cancels after list removal so no further scheduling occurs.
259 2 : for (final slot in toCancel) {
260 5 : final nativeId = '${id}__${slot.entry.taskId}__${slot.attempt}';
261 2 : NativeWorkManager.cancel(taskId: nativeId).ignore();
262 : }
263 : }
264 :
265 : /// Remove all dead-letter entries.
266 3 : void clearDeadLetter() => _deadLetter.clear();
267 :
268 : // ── Internal ──────────────────────────────────────────────────────────────
269 :
270 1 : void _scheduleNext() {
271 4 : if (!_running || _processing || _pending.isEmpty) return;
272 0 : _processing = true;
273 0 : _processHead();
274 : }
275 :
276 0 : Future<void> _processHead() async {
277 0 : if (_pending.isEmpty) {
278 0 : _processing = false;
279 : return;
280 : }
281 :
282 0 : final slot = _pending.first;
283 0 : final policy = slot.policy;
284 0 : final entry = slot.entry;
285 :
286 : // Apply delay for retry attempts
287 0 : if (slot.attempt > 0) {
288 0 : final delay = policy.delayFor(slot.attempt - 1);
289 0 : if (delay > Duration.zero) {
290 0 : await Future<void>.delayed(delay);
291 : }
292 : }
293 :
294 0 : if (!_running) {
295 0 : _processing = false;
296 : return;
297 : }
298 :
299 : // Build constraints from policy
300 0 : final constraints = Constraints(
301 0 : requiresNetwork: policy.requiresNetwork,
302 0 : requiresCharging: policy.requiresCharging,
303 : );
304 :
305 : try {
306 0 : await NativeWorkManager.enqueue(
307 0 : taskId: '${id}__${entry.taskId}__${slot.attempt}',
308 0 : trigger: TaskTrigger.oneTime(),
309 0 : worker: entry.worker,
310 : constraints: constraints,
311 : );
312 :
313 : // Wait for the task to complete via the events stream.
314 0 : final event = await _awaitEvent(
315 0 : '${id}__${entry.taskId}__${slot.attempt}',
316 : timeout: const Duration(hours: 1),
317 : );
318 :
319 0 : if (event?.success == true) {
320 0 : _pending.remove(slot);
321 0 : _processing = false;
322 0 : _scheduleNext();
323 : return;
324 : }
325 : } catch (_) {
326 : // enqueue itself failed — treat as task failure
327 : }
328 :
329 : // Task failed — retry or dead-letter
330 0 : if (slot.attempt < policy.maxRetries) {
331 : // Update attempt counter in-place
332 0 : _pending[0] = _QueueSlot(
333 : entry: entry,
334 : policy: policy,
335 0 : attempt: slot.attempt + 1,
336 : );
337 : } else {
338 : // Exhausted retries → dead-letter
339 0 : _pending.removeAt(0);
340 0 : _deadLetter.add(slot);
341 : }
342 :
343 0 : _processing = false;
344 0 : _scheduleNext();
345 : }
346 :
347 : /// Wait for a specific taskId event on [NativeWorkManager.events].
348 : ///
349 : /// Returns the event or `null` if [timeout] is exceeded.
350 0 : static Future<TaskEvent?> _awaitEvent(String taskId,
351 : {required Duration timeout}) async {
352 : try {
353 0 : return await NativeWorkManagerPlatform.instance.events
354 0 : .where((e) => e.taskId == taskId && !e.isStarted)
355 0 : .first
356 0 : .timeout(timeout);
357 0 : } on TimeoutException {
358 : return null;
359 : } catch (e) {
360 : return null;
361 : }
362 : }
363 : }
364 :
365 : @immutable
366 : class _QueueSlot {
367 1 : const _QueueSlot({
368 : required this.entry,
369 : required this.policy,
370 : required this.attempt,
371 : });
372 :
373 : final QueueEntry entry;
374 : final OfflineRetryPolicy policy;
375 : final int attempt;
376 : }
|