LCOV - code coverage report
Current view: top level - src - task_graph.dart Coverage Total Hit
Test: lcov.info Lines: 23.4 % 141 33
Test Date: 2026-04-30 18:23:23 Functions: - 0 0

            Line data    Source code
       1              : import 'dart:async';
       2              : 
       3              : import 'package:flutter/foundation.dart';
       4              : 
       5              : import 'constraints.dart';
       6              : import 'events.dart';
       7              : import 'native_work_manager.dart';
       8              : import 'platform_interface.dart';
       9              : import 'task_trigger.dart';
      10              : import 'worker.dart';
      11              : 
      12              : /// A node in a [TaskGraph].
      13              : ///
      14              : /// Each node represents one background task. Nodes can have zero or more
      15              : /// dependencies — a node does not execute until all its dependencies complete
      16              : /// successfully.
      17              : @immutable
      18              : class TaskNode {
      19            1 :   const TaskNode({
      20              :     required this.id,
      21              :     required this.worker,
      22              :     this.dependsOn = const [],
      23              :     this.constraints = const Constraints(),
      24              :   });
      25              : 
      26              :   /// Unique ID for this task within the graph.
      27              :   final String id;
      28              : 
      29              :   /// The worker to execute.
      30              :   final Worker worker;
      31              : 
      32              :   /// IDs of nodes that must complete before this node runs.
      33              :   ///
      34              :   /// Empty means the node has no dependencies and runs immediately.
      35              :   final List<String> dependsOn;
      36              : 
      37              :   /// Optional scheduling constraints for this node.
      38              :   final Constraints constraints;
      39              : 
      40              :   /// Convert to map for platform channel.
      41            1 :   Map<String, dynamic> toMap() {
      42            1 :     return {
      43            1 :       'id': id,
      44            2 :       'workerClassName': worker.workerClassName,
      45            2 :       'workerConfig': worker.toMap(),
      46            1 :       'dependsOn': dependsOn,
      47            2 :       'constraints': constraints.toMap(),
      48              :     };
      49              :   }
      50              : }
      51              : 
      52              : /// A directed acyclic graph (DAG) of background tasks.
      53              : ///
      54              : /// Use [TaskGraph] when you need complex parallel → sequential workflows
      55              : /// that go beyond a simple linear [TaskChainBuilder].
      56              : ///
      57              : /// ## Example: Parallel download → merge → upload
      58              : ///
      59              : /// ```dart
      60              : /// // A and B download in parallel.
      61              : /// // When both finish, C merges the results.
      62              : /// // When merge is done, D uploads.
      63              : ///
      64              : /// final graph = TaskGraph(id: 'export-workflow')
      65              : ///   ..add(TaskNode(id: 'download-A', worker: HttpDownloadWorker(
      66              : ///       url: 'https://cdn.example.com/part-a.zip',
      67              : ///       savePath: '/tmp/part-a.zip')))
      68              : ///   ..add(TaskNode(id: 'download-B', worker: HttpDownloadWorker(
      69              : ///       url: 'https://cdn.example.com/part-b.zip',
      70              : ///       savePath: '/tmp/part-b.zip')))
      71              : ///   ..add(TaskNode(id: 'merge', worker: DartWorker(callbackId: 'mergeFiles'),
      72              : ///       dependsOn: ['download-A', 'download-B']))
      73              : ///   ..add(TaskNode(id: 'upload', worker: HttpUploadWorker(
      74              : ///       url: 'https://api.example.com/submit',
      75              : ///       filePath: '/tmp/merged.zip'),
      76              : ///       dependsOn: ['merge']));
      77              : ///
      78              : /// final execution = await NativeWorkManager.enqueueGraph(graph);
      79              : ///
      80              : /// // Monitor completion
      81              : /// execution.result.then((r) {
      82              : ///   if (r.success) print('All ${r.completedCount} tasks done!');
      83              : ///   else print('Failed: ${r.failedNodes}');
      84              : /// });
      85              : /// ```
      86              : ///
      87              : /// ## Constraints Per Node
      88              : ///
      89              : /// ```dart
      90              : /// final graph = TaskGraph(id: 'nightly-sync')
      91              : ///   ..add(TaskNode(
      92              : ///       id: 'heavy-compute',
      93              : ///       worker: DartWorker(callbackId: 'computeStats'),
      94              : ///       constraints: Constraints(requiresCharging: true)))
      95              : ///   ..add(TaskNode(
      96              : ///       id: 'upload-stats',
      97              : ///       worker: HttpUploadWorker(url: uploadUrl, filePath: statsFile),
      98              : ///       dependsOn: ['heavy-compute'],
      99              : ///       constraints: Constraints(requiresWifi: true)));
     100              : ///
     101              : /// await NativeWorkManager.enqueueGraph(graph);
     102              : /// ```
     103              : ///
     104              : /// ## Failure Behavior
     105              : ///
     106              : /// If a node fails, all downstream nodes that depend on it (directly or
     107              : /// transitively) are **cancelled**. Nodes that do not depend on the failed
     108              : /// node continue to execute.
     109              : ///
     110              : /// ## Limitations
     111              : ///
     112              : /// - The graph must be a **DAG** — cycles are detected during [enqueueTaskGraph] and
     113              : ///   throw an [ArgumentError].
     114              : /// - All node IDs must be **unique within the graph**.
     115              : /// - The implementation uses the existing [NativeWorkManager.events] stream
     116              : ///   for fan-in synchronization, so the app must be running while the graph
     117              : ///   executes. For graphs that must survive app termination, use individual
     118              : ///   [NativeWorkManager.enqueue] calls with [TaskTrigger] instead.
     119              : class TaskGraph {
     120            1 :   TaskGraph({required this.id});
     121              : 
     122              :   /// Unique ID for this graph execution.  Used to namespace node task IDs.
     123              :   final String id;
     124              : 
     125              :   final List<TaskNode> _nodes = [];
     126              : 
     127              :   /// All nodes currently in the graph.
     128            3 :   List<TaskNode> get nodes => List.unmodifiable(_nodes);
     129              : 
     130            5 :   Map<String, TaskNode> get _nodeMap => {for (final n in _nodes) n.id: n};
     131              : 
     132              :   /// Add a [TaskNode] to the graph.
     133              :   ///
     134              :   /// Returns `this` for fluent chaining:
     135              :   /// ```dart
     136              :   /// final graph = TaskGraph(id: 'my-flow')
     137              :   ///   ..add(TaskNode(id: 'a', worker: workerA))
     138              :   ///   ..add(TaskNode(id: 'b', worker: workerB, dependsOn: ['a']));
     139              :   /// ```
     140            1 :   TaskGraph add(TaskNode node) {
     141            2 :     _nodes.add(node);
     142              :     return this;
     143              :   }
     144              : 
     145              :   /// Validate the graph: check for duplicate IDs, missing dependencies,
     146              :   /// and cycles.
     147              :   ///
     148              :   /// Throws [ArgumentError] if the graph is invalid.
     149            1 :   void validate() {
     150              :     final ids = <String>{};
     151            2 :     for (final n in _nodes) {
     152            2 :       if (!ids.add(n.id)) {
     153            3 :         throw ArgumentError('Duplicate node ID in TaskGraph: "${n.id}"');
     154              :       }
     155              :     }
     156            2 :     for (final n in _nodes) {
     157            2 :       for (final dep in n.dependsOn) {
     158            1 :         if (!ids.contains(dep)) {
     159            1 :           throw ArgumentError(
     160            2 :             'Node "${n.id}" depends on unknown node "$dep"',
     161              :           );
     162              :         }
     163              :       }
     164              :     }
     165              :     // Cycle detection via DFS
     166              :     final visited = <String>{};
     167              :     final inStack = <String>{};
     168              : 
     169            1 :     void dfs(String nodeId) {
     170            1 :       if (inStack.contains(nodeId)) {
     171            2 :         throw ArgumentError('Cycle detected in TaskGraph at node "$nodeId"');
     172              :       }
     173            1 :       if (visited.contains(nodeId)) return;
     174            1 :       inStack.add(nodeId);
     175            4 :       for (final dep in _nodeMap[nodeId]!.dependsOn) {
     176            1 :         dfs(dep);
     177              :       }
     178            1 :       inStack.remove(nodeId);
     179            1 :       visited.add(nodeId);
     180              :     }
     181              : 
     182            2 :     for (final n in _nodes) {
     183            2 :       dfs(n.id);
     184              :     }
     185              :   }
     186              : 
     187              :   /// Topologically sorted nodes (roots first).
     188            0 :   List<TaskNode> get _topoSorted {
     189            0 :     final result = <TaskNode>[];
     190              :     final visited = <String>{};
     191              : 
     192            0 :     void visit(String id) {
     193            0 :       if (visited.contains(id)) return;
     194            0 :       final node = _nodeMap[id]!;
     195            0 :       for (final dep in node.dependsOn) {
     196            0 :         visit(dep);
     197              :       }
     198            0 :       visited.add(id);
     199            0 :       result.add(node);
     200              :     }
     201              : 
     202            0 :     for (final n in _nodes) {
     203            0 :       visit(n.id);
     204              :     }
     205              :     return result;
     206              :   }
     207              : 
     208              :   /// Returns the set of node IDs that are roots (no dependencies).
     209            0 :   Set<String> get _roots =>
     210            0 :       _nodes.where((n) => n.dependsOn.isEmpty).map((n) => n.id).toSet();
     211              : 
     212              :   /// Convert to map for platform channel.
     213            0 :   Map<String, dynamic> toMap() {
     214            0 :     return {
     215            0 :       'id': id,
     216            0 :       'nodes': _nodes.map((n) => n.toMap()).toList(),
     217              :     };
     218              :   }
     219              : }
     220              : 
     221              : /// Result of a [TaskGraph] execution.
     222              : @immutable
     223              : class GraphResult {
     224            0 :   const GraphResult({
     225              :     required this.graphId,
     226              :     required this.success,
     227              :     required this.completedCount,
     228              :     required this.failedNodes,
     229              :     required this.cancelledNodes,
     230              :   });
     231              : 
     232              :   final String graphId;
     233              : 
     234              :   /// `true` when all nodes completed successfully.
     235              :   final bool success;
     236              : 
     237              :   /// Number of nodes that completed successfully.
     238              :   final int completedCount;
     239              : 
     240              :   /// IDs of nodes that failed.
     241              :   final List<String> failedNodes;
     242              : 
     243              :   /// IDs of nodes that were cancelled due to upstream failure.
     244              :   final List<String> cancelledNodes;
     245              : 
     246            0 :   @override
     247            0 :   String toString() => 'GraphResult($graphId: success=$success, '
     248            0 :       'completed=$completedCount, '
     249            0 :       'failed=${failedNodes.length}, '
     250            0 :       'cancelled=${cancelledNodes.length})';
     251              : }
     252              : 
     253              : /// Handle returned by [NativeWorkManager.enqueueGraph].
     254              : ///
     255              : /// Exposes a [result] future that resolves when the entire graph finishes
     256              : /// (all nodes complete or any node fails).
     257              : class GraphExecution {
     258            0 :   GraphExecution._(this.graphId, this._result);
     259              : 
     260              :   /// Internal constructor for testing.
     261            0 :   @visibleForTesting
     262              :   factory GraphExecution.internal(String graphId, Future<GraphResult> result) =>
     263            0 :       GraphExecution._(graphId, result);
     264              : 
     265              :   final String graphId;
     266              :   final Future<GraphResult> _result;
     267              : 
     268              :   /// Resolves when all graph nodes have finished executing.
     269            0 :   Future<GraphResult> get result => _result;
     270              : }
     271              : 
     272              : // ── Internal executor ─────────────────────────────────────────────────────────
     273              : 
     274              : /// Executes a [TaskGraph] by scheduling nodes in dependency order.
     275              : ///
     276              : /// Internal implementation — callers use [NativeWorkManager.enqueueGraph].
     277              : class _GraphExecutor {
     278            0 :   _GraphExecutor(this._graph);
     279              : 
     280              :   final TaskGraph _graph;
     281              : 
     282              :   final _completed = <String>{};
     283              :   final _failed = <String>{};
     284              :   final _cancelled = <String>{};
     285              :   final _inFlight = <String>{};
     286              : 
     287              :   late Completer<GraphResult> _completer;
     288              :   StreamSubscription<TaskEvent>? _eventSub;
     289              : 
     290            0 :   Future<GraphResult> execute({bool isAlreadyEnqueued = false}) async {
     291            0 :     _graph.validate();
     292              : 
     293            0 :     _completer = Completer<GraphResult>();
     294            0 :     final nodes = _graph._topoSorted;
     295              : 
     296            0 :     if (nodes.isEmpty) {
     297            0 :       return GraphResult(
     298            0 :         graphId: _graph.id,
     299              :         success: true,
     300              :         completedCount: 0,
     301              :         failedNodes: const [],
     302              :         cancelledNodes: const [],
     303              :       );
     304              :     }
     305              : 
     306              :     // Listen to events before scheduling to avoid race conditions.
     307            0 :     _eventSub = NativeWorkManagerPlatform.instance.events.listen(_onEvent);
     308              : 
     309              :     if (!isAlreadyEnqueued) {
     310              :       // Schedule all root nodes immediately (legacy behavior).
     311            0 :       for (final nodeId in _graph._roots) {
     312            0 :         await _scheduleNode(_graph._nodeMap[nodeId]!);
     313              :       }
     314              :     } else {
     315              :       // Mark all nodes as in-flight since they are being handled by native.
     316            0 :       for (final node in _graph._nodes) {
     317            0 :         _inFlight.add(node.id);
     318              :       }
     319              :     }
     320              : 
     321            0 :     return _completer.future;
     322              :   }
     323              : 
     324            0 :   void _onEvent(TaskEvent event) {
     325            0 :     final taskId = event.taskId;
     326              :     // Strip the graph namespace prefix to get the node ID.
     327            0 :     final prefix = '${_graph.id}__';
     328            0 :     if (!taskId.startsWith(prefix)) return;
     329            0 :     final nodeId = taskId.substring(prefix.length);
     330              : 
     331            0 :     if (!_inFlight.contains(nodeId)) return;
     332              : 
     333              :     // FIX G1: Lifecycle 'started' events must NOT remove nodes from _inFlight.
     334              :     // Proceed only if this is a completion event (success or failure).
     335              :     // Removing the node here for a 'started' event would cause the actual
     336              :     // completion event to be ignored later, hanging the graph.
     337            0 :     if (event.isStarted) return;
     338              : 
     339            0 :     _inFlight.remove(nodeId);
     340              : 
     341            0 :     if (event.success) {
     342            0 :       _completed.add(nodeId);
     343            0 :       _tryScheduleDownstream(nodeId);
     344              :     } else {
     345            0 :       _failed.add(nodeId);
     346            0 :       _cancelDownstream(nodeId);
     347              :     }
     348              : 
     349            0 :     _checkDone();
     350              :   }
     351              : 
     352            0 :   void _tryScheduleDownstream(String completedNodeId) {
     353            0 :     for (final node in _graph._nodes) {
     354            0 :       if (_completed.contains(node.id) ||
     355            0 :           _failed.contains(node.id) ||
     356            0 :           _cancelled.contains(node.id) ||
     357            0 :           _inFlight.contains(node.id)) {
     358              :         continue;
     359              :       }
     360              : 
     361              :       // All dependencies satisfied?
     362            0 :       final ready = node.dependsOn.every(_completed.contains);
     363              :       if (ready) {
     364            0 :         _scheduleNode(node);
     365              :       }
     366              :     }
     367              :   }
     368              : 
     369            0 :   Future<void> _scheduleNode(TaskNode node) async {
     370            0 :     _inFlight.add(node.id);
     371              :     try {
     372            0 :       await NativeWorkManager.enqueue(
     373            0 :         taskId: '${_graph.id}__${node.id}',
     374            0 :         trigger: TaskTrigger.oneTime(),
     375            0 :         worker: node.worker,
     376            0 :         constraints: node.constraints,
     377              :       );
     378              :     } catch (e) {
     379            0 :       _inFlight.remove(node.id);
     380            0 :       _failed.add(node.id);
     381            0 :       _cancelDownstream(node.id);
     382            0 :       _checkDone();
     383              :     }
     384              :   }
     385              : 
     386            0 :   void _cancelDownstream(String failedNodeId) {
     387            0 :     final allNodes = _graph._nodeMap;
     388              : 
     389              :     // BFS to find all transitive dependents of failedNodeId.
     390            0 :     final queue = <String>[failedNodeId];
     391              :     final toCancel = <String>{};
     392              : 
     393            0 :     while (queue.isNotEmpty) {
     394            0 :       final current = queue.removeLast();
     395            0 :       for (final node in allNodes.values) {
     396            0 :         if (node.dependsOn.contains(current) &&
     397            0 :             !_completed.contains(node.id) &&
     398            0 :             !_failed.contains(node.id) &&
     399            0 :             !toCancel.contains(node.id)) {
     400            0 :           toCancel.add(node.id);
     401            0 :           queue.add(node.id);
     402              :         }
     403              :       }
     404              :     }
     405              : 
     406            0 :     for (final nodeId in toCancel) {
     407            0 :       _inFlight.remove(nodeId);
     408            0 :       _cancelled.add(nodeId);
     409              :     }
     410              :   }
     411              : 
     412            0 :   void _checkDone() {
     413            0 :     final total = _graph._nodes.length;
     414            0 :     final resolved = _completed.length + _failed.length + _cancelled.length;
     415            0 :     if (resolved < total) return;
     416            0 :     if (_inFlight.isNotEmpty) return;
     417              : 
     418            0 :     _eventSub?.cancel();
     419            0 :     _eventSub = null;
     420              : 
     421            0 :     _completer.complete(GraphResult(
     422            0 :       graphId: _graph.id,
     423            0 :       success: _failed.isEmpty && _cancelled.isEmpty,
     424            0 :       completedCount: _completed.length,
     425            0 :       failedNodes: _failed.toList(),
     426            0 :       cancelledNodes: _cancelled.toList(),
     427              :     ));
     428              :   }
     429              : }
     430              : 
     431              : /// Enqueue a [TaskGraph] for execution.
     432              : ///
     433              : /// This is the internal implementation called by
     434              : /// [NativeWorkManager.enqueueGraph]. Not part of the public API.
     435            0 : Future<GraphExecution> enqueueTaskGraph(TaskGraph graph) async {
     436            0 :   graph.validate();
     437              : 
     438              :   // 1. Send graph to native for persistent orchestration.
     439              :   // This ensures the graph continues even if the app is killed.
     440            0 :   await NativeWorkManagerPlatform.instance.enqueueGraph(graph.toMap());
     441              : 
     442              :   // 2. Start the Dart-side listener so we can resolve the result future
     443              :   // if the app stays alive.
     444            0 :   final executor = _GraphExecutor(graph);
     445            0 :   final resultFuture = executor.execute(isAlreadyEnqueued: true);
     446              : 
     447            0 :   return GraphExecution._(graph.id, resultFuture);
     448              : }
        

Generated by: LCOV version 2.4-0