Line data Source code
1 : import 'dart:async';
2 :
3 : import 'package:flutter/foundation.dart';
4 :
5 : import 'constraints.dart';
6 : import 'events.dart';
7 : import 'native_work_manager.dart';
8 : import 'platform_interface.dart';
9 : import 'task_trigger.dart';
10 : import 'worker.dart';
11 :
12 : /// A node in a [TaskGraph].
13 : ///
14 : /// Each node represents one background task. Nodes can have zero or more
15 : /// dependencies — a node does not execute until all its dependencies complete
16 : /// successfully.
17 : @immutable
18 : class TaskNode {
19 1 : const TaskNode({
20 : required this.id,
21 : required this.worker,
22 : this.dependsOn = const [],
23 : this.constraints = const Constraints(),
24 : });
25 :
26 : /// Unique ID for this task within the graph.
27 : final String id;
28 :
29 : /// The worker to execute.
30 : final Worker worker;
31 :
32 : /// IDs of nodes that must complete before this node runs.
33 : ///
34 : /// Empty means the node has no dependencies and runs immediately.
35 : final List<String> dependsOn;
36 :
37 : /// Optional scheduling constraints for this node.
38 : final Constraints constraints;
39 :
40 : /// Convert to map for platform channel.
41 1 : Map<String, dynamic> toMap() {
42 1 : return {
43 1 : 'id': id,
44 2 : 'workerClassName': worker.workerClassName,
45 2 : 'workerConfig': worker.toMap(),
46 1 : 'dependsOn': dependsOn,
47 2 : 'constraints': constraints.toMap(),
48 : };
49 : }
50 : }
51 :
52 : /// A directed acyclic graph (DAG) of background tasks.
53 : ///
54 : /// Use [TaskGraph] when you need complex parallel → sequential workflows
55 : /// that go beyond a simple linear [TaskChainBuilder].
56 : ///
57 : /// ## Example: Parallel download → merge → upload
58 : ///
59 : /// ```dart
60 : /// // A and B download in parallel.
61 : /// // When both finish, C merges the results.
62 : /// // When merge is done, D uploads.
63 : ///
64 : /// final graph = TaskGraph(id: 'export-workflow')
65 : /// ..add(TaskNode(id: 'download-A', worker: HttpDownloadWorker(
66 : /// url: 'https://cdn.example.com/part-a.zip',
67 : /// savePath: '/tmp/part-a.zip')))
68 : /// ..add(TaskNode(id: 'download-B', worker: HttpDownloadWorker(
69 : /// url: 'https://cdn.example.com/part-b.zip',
70 : /// savePath: '/tmp/part-b.zip')))
71 : /// ..add(TaskNode(id: 'merge', worker: DartWorker(callbackId: 'mergeFiles'),
72 : /// dependsOn: ['download-A', 'download-B']))
73 : /// ..add(TaskNode(id: 'upload', worker: HttpUploadWorker(
74 : /// url: 'https://api.example.com/submit',
75 : /// filePath: '/tmp/merged.zip'),
76 : /// dependsOn: ['merge']));
77 : ///
78 : /// final execution = await NativeWorkManager.enqueueGraph(graph);
79 : ///
80 : /// // Monitor completion
81 : /// execution.result.then((r) {
82 : /// if (r.success) print('All ${r.completedCount} tasks done!');
83 : /// else print('Failed: ${r.failedNodes}');
84 : /// });
85 : /// ```
86 : ///
87 : /// ## Constraints Per Node
88 : ///
89 : /// ```dart
90 : /// final graph = TaskGraph(id: 'nightly-sync')
91 : /// ..add(TaskNode(
92 : /// id: 'heavy-compute',
93 : /// worker: DartWorker(callbackId: 'computeStats'),
94 : /// constraints: Constraints(requiresCharging: true)))
95 : /// ..add(TaskNode(
96 : /// id: 'upload-stats',
97 : /// worker: HttpUploadWorker(url: uploadUrl, filePath: statsFile),
98 : /// dependsOn: ['heavy-compute'],
99 : /// constraints: Constraints(requiresWifi: true)));
100 : ///
101 : /// await NativeWorkManager.enqueueGraph(graph);
102 : /// ```
103 : ///
104 : /// ## Failure Behavior
105 : ///
106 : /// If a node fails, all downstream nodes that depend on it (directly or
107 : /// transitively) are **cancelled**. Nodes that do not depend on the failed
108 : /// node continue to execute.
109 : ///
110 : /// ## Limitations
111 : ///
112 : /// - The graph must be a **DAG** — cycles are detected during [enqueueTaskGraph] and
113 : /// throw an [ArgumentError].
114 : /// - All node IDs must be **unique within the graph**.
115 : /// - The implementation uses the existing [NativeWorkManager.events] stream
116 : /// for fan-in synchronization, so the app must be running while the graph
117 : /// executes. For graphs that must survive app termination, use individual
118 : /// [NativeWorkManager.enqueue] calls with [TaskTrigger] instead.
119 : class TaskGraph {
120 1 : TaskGraph({required this.id});
121 :
122 : /// Unique ID for this graph execution. Used to namespace node task IDs.
123 : final String id;
124 :
125 : final List<TaskNode> _nodes = [];
126 :
127 : /// All nodes currently in the graph.
128 3 : List<TaskNode> get nodes => List.unmodifiable(_nodes);
129 :
130 5 : Map<String, TaskNode> get _nodeMap => {for (final n in _nodes) n.id: n};
131 :
132 : /// Add a [TaskNode] to the graph.
133 : ///
134 : /// Returns `this` for fluent chaining:
135 : /// ```dart
136 : /// final graph = TaskGraph(id: 'my-flow')
137 : /// ..add(TaskNode(id: 'a', worker: workerA))
138 : /// ..add(TaskNode(id: 'b', worker: workerB, dependsOn: ['a']));
139 : /// ```
140 1 : TaskGraph add(TaskNode node) {
141 2 : _nodes.add(node);
142 : return this;
143 : }
144 :
145 : /// Validate the graph: check for duplicate IDs, missing dependencies,
146 : /// and cycles.
147 : ///
148 : /// Throws [ArgumentError] if the graph is invalid.
149 1 : void validate() {
150 : final ids = <String>{};
151 2 : for (final n in _nodes) {
152 2 : if (!ids.add(n.id)) {
153 3 : throw ArgumentError('Duplicate node ID in TaskGraph: "${n.id}"');
154 : }
155 : }
156 2 : for (final n in _nodes) {
157 2 : for (final dep in n.dependsOn) {
158 1 : if (!ids.contains(dep)) {
159 1 : throw ArgumentError(
160 2 : 'Node "${n.id}" depends on unknown node "$dep"',
161 : );
162 : }
163 : }
164 : }
165 : // Cycle detection via DFS
166 : final visited = <String>{};
167 : final inStack = <String>{};
168 :
169 1 : void dfs(String nodeId) {
170 1 : if (inStack.contains(nodeId)) {
171 2 : throw ArgumentError('Cycle detected in TaskGraph at node "$nodeId"');
172 : }
173 1 : if (visited.contains(nodeId)) return;
174 1 : inStack.add(nodeId);
175 4 : for (final dep in _nodeMap[nodeId]!.dependsOn) {
176 1 : dfs(dep);
177 : }
178 1 : inStack.remove(nodeId);
179 1 : visited.add(nodeId);
180 : }
181 :
182 2 : for (final n in _nodes) {
183 2 : dfs(n.id);
184 : }
185 : }
186 :
187 : /// Topologically sorted nodes (roots first).
188 0 : List<TaskNode> get _topoSorted {
189 0 : final result = <TaskNode>[];
190 : final visited = <String>{};
191 :
192 0 : void visit(String id) {
193 0 : if (visited.contains(id)) return;
194 0 : final node = _nodeMap[id]!;
195 0 : for (final dep in node.dependsOn) {
196 0 : visit(dep);
197 : }
198 0 : visited.add(id);
199 0 : result.add(node);
200 : }
201 :
202 0 : for (final n in _nodes) {
203 0 : visit(n.id);
204 : }
205 : return result;
206 : }
207 :
208 : /// Returns the set of node IDs that are roots (no dependencies).
209 0 : Set<String> get _roots =>
210 0 : _nodes.where((n) => n.dependsOn.isEmpty).map((n) => n.id).toSet();
211 :
212 : /// Convert to map for platform channel.
213 0 : Map<String, dynamic> toMap() {
214 0 : return {
215 0 : 'id': id,
216 0 : 'nodes': _nodes.map((n) => n.toMap()).toList(),
217 : };
218 : }
219 : }
220 :
221 : /// Result of a [TaskGraph] execution.
222 : @immutable
223 : class GraphResult {
224 0 : const GraphResult({
225 : required this.graphId,
226 : required this.success,
227 : required this.completedCount,
228 : required this.failedNodes,
229 : required this.cancelledNodes,
230 : });
231 :
232 : final String graphId;
233 :
234 : /// `true` when all nodes completed successfully.
235 : final bool success;
236 :
237 : /// Number of nodes that completed successfully.
238 : final int completedCount;
239 :
240 : /// IDs of nodes that failed.
241 : final List<String> failedNodes;
242 :
243 : /// IDs of nodes that were cancelled due to upstream failure.
244 : final List<String> cancelledNodes;
245 :
246 0 : @override
247 0 : String toString() => 'GraphResult($graphId: success=$success, '
248 0 : 'completed=$completedCount, '
249 0 : 'failed=${failedNodes.length}, '
250 0 : 'cancelled=${cancelledNodes.length})';
251 : }
252 :
253 : /// Handle returned by [NativeWorkManager.enqueueGraph].
254 : ///
255 : /// Exposes a [result] future that resolves when the entire graph finishes
256 : /// (all nodes complete or any node fails).
257 : class GraphExecution {
258 0 : GraphExecution._(this.graphId, this._result);
259 :
260 : /// Internal constructor for testing.
261 0 : @visibleForTesting
262 : factory GraphExecution.internal(String graphId, Future<GraphResult> result) =>
263 0 : GraphExecution._(graphId, result);
264 :
265 : final String graphId;
266 : final Future<GraphResult> _result;
267 :
268 : /// Resolves when all graph nodes have finished executing.
269 0 : Future<GraphResult> get result => _result;
270 : }
271 :
272 : // ── Internal executor ─────────────────────────────────────────────────────────
273 :
274 : /// Executes a [TaskGraph] by scheduling nodes in dependency order.
275 : ///
276 : /// Internal implementation — callers use [NativeWorkManager.enqueueGraph].
277 : class _GraphExecutor {
278 0 : _GraphExecutor(this._graph);
279 :
280 : final TaskGraph _graph;
281 :
282 : final _completed = <String>{};
283 : final _failed = <String>{};
284 : final _cancelled = <String>{};
285 : final _inFlight = <String>{};
286 :
287 : late Completer<GraphResult> _completer;
288 : StreamSubscription<TaskEvent>? _eventSub;
289 :
290 0 : Future<GraphResult> execute({bool isAlreadyEnqueued = false}) async {
291 0 : _graph.validate();
292 :
293 0 : _completer = Completer<GraphResult>();
294 0 : final nodes = _graph._topoSorted;
295 :
296 0 : if (nodes.isEmpty) {
297 0 : return GraphResult(
298 0 : graphId: _graph.id,
299 : success: true,
300 : completedCount: 0,
301 : failedNodes: const [],
302 : cancelledNodes: const [],
303 : );
304 : }
305 :
306 : // Listen to events before scheduling to avoid race conditions.
307 0 : _eventSub = NativeWorkManagerPlatform.instance.events.listen(_onEvent);
308 :
309 : if (!isAlreadyEnqueued) {
310 : // Schedule all root nodes immediately (legacy behavior).
311 0 : for (final nodeId in _graph._roots) {
312 0 : await _scheduleNode(_graph._nodeMap[nodeId]!);
313 : }
314 : } else {
315 : // Mark all nodes as in-flight since they are being handled by native.
316 0 : for (final node in _graph._nodes) {
317 0 : _inFlight.add(node.id);
318 : }
319 : }
320 :
321 0 : return _completer.future;
322 : }
323 :
324 0 : void _onEvent(TaskEvent event) {
325 0 : final taskId = event.taskId;
326 : // Strip the graph namespace prefix to get the node ID.
327 0 : final prefix = '${_graph.id}__';
328 0 : if (!taskId.startsWith(prefix)) return;
329 0 : final nodeId = taskId.substring(prefix.length);
330 :
331 0 : if (!_inFlight.contains(nodeId)) return;
332 :
333 : // FIX G1: Lifecycle 'started' events must NOT remove nodes from _inFlight.
334 : // Proceed only if this is a completion event (success or failure).
335 : // Removing the node here for a 'started' event would cause the actual
336 : // completion event to be ignored later, hanging the graph.
337 0 : if (event.isStarted) return;
338 :
339 0 : _inFlight.remove(nodeId);
340 :
341 0 : if (event.success) {
342 0 : _completed.add(nodeId);
343 0 : _tryScheduleDownstream(nodeId);
344 : } else {
345 0 : _failed.add(nodeId);
346 0 : _cancelDownstream(nodeId);
347 : }
348 :
349 0 : _checkDone();
350 : }
351 :
352 0 : void _tryScheduleDownstream(String completedNodeId) {
353 0 : for (final node in _graph._nodes) {
354 0 : if (_completed.contains(node.id) ||
355 0 : _failed.contains(node.id) ||
356 0 : _cancelled.contains(node.id) ||
357 0 : _inFlight.contains(node.id)) {
358 : continue;
359 : }
360 :
361 : // All dependencies satisfied?
362 0 : final ready = node.dependsOn.every(_completed.contains);
363 : if (ready) {
364 0 : _scheduleNode(node);
365 : }
366 : }
367 : }
368 :
369 0 : Future<void> _scheduleNode(TaskNode node) async {
370 0 : _inFlight.add(node.id);
371 : try {
372 0 : await NativeWorkManager.enqueue(
373 0 : taskId: '${_graph.id}__${node.id}',
374 0 : trigger: TaskTrigger.oneTime(),
375 0 : worker: node.worker,
376 0 : constraints: node.constraints,
377 : );
378 : } catch (e) {
379 0 : _inFlight.remove(node.id);
380 0 : _failed.add(node.id);
381 0 : _cancelDownstream(node.id);
382 0 : _checkDone();
383 : }
384 : }
385 :
386 0 : void _cancelDownstream(String failedNodeId) {
387 0 : final allNodes = _graph._nodeMap;
388 :
389 : // BFS to find all transitive dependents of failedNodeId.
390 0 : final queue = <String>[failedNodeId];
391 : final toCancel = <String>{};
392 :
393 0 : while (queue.isNotEmpty) {
394 0 : final current = queue.removeLast();
395 0 : for (final node in allNodes.values) {
396 0 : if (node.dependsOn.contains(current) &&
397 0 : !_completed.contains(node.id) &&
398 0 : !_failed.contains(node.id) &&
399 0 : !toCancel.contains(node.id)) {
400 0 : toCancel.add(node.id);
401 0 : queue.add(node.id);
402 : }
403 : }
404 : }
405 :
406 0 : for (final nodeId in toCancel) {
407 0 : _inFlight.remove(nodeId);
408 0 : _cancelled.add(nodeId);
409 : }
410 : }
411 :
412 0 : void _checkDone() {
413 0 : final total = _graph._nodes.length;
414 0 : final resolved = _completed.length + _failed.length + _cancelled.length;
415 0 : if (resolved < total) return;
416 0 : if (_inFlight.isNotEmpty) return;
417 :
418 0 : _eventSub?.cancel();
419 0 : _eventSub = null;
420 :
421 0 : _completer.complete(GraphResult(
422 0 : graphId: _graph.id,
423 0 : success: _failed.isEmpty && _cancelled.isEmpty,
424 0 : completedCount: _completed.length,
425 0 : failedNodes: _failed.toList(),
426 0 : cancelledNodes: _cancelled.toList(),
427 : ));
428 : }
429 : }
430 :
431 : /// Enqueue a [TaskGraph] for execution.
432 : ///
433 : /// This is the internal implementation called by
434 : /// [NativeWorkManager.enqueueGraph]. Not part of the public API.
435 0 : Future<GraphExecution> enqueueTaskGraph(TaskGraph graph) async {
436 0 : graph.validate();
437 :
438 : // 1. Send graph to native for persistent orchestration.
439 : // This ensures the graph continues even if the app is killed.
440 0 : await NativeWorkManagerPlatform.instance.enqueueGraph(graph.toMap());
441 :
442 : // 2. Start the Dart-side listener so we can resolve the result future
443 : // if the app stays alive.
444 0 : final executor = _GraphExecutor(graph);
445 0 : final resultFuture = executor.execute(isAlreadyEnqueued: true);
446 :
447 0 : return GraphExecution._(graph.id, resultFuture);
448 : }
|