Line data Source code
1 : import 'dart:async';
2 :
3 : import '../constraints.dart';
4 : import '../enqueue_request.dart';
5 : import '../events.dart';
6 : import '../task_chain.dart';
7 : import '../task_graph.dart';
8 : import '../task_handler.dart';
9 : import '../task_trigger.dart';
10 : import '../worker.dart';
11 : import 'i_work_manager.dart';
12 :
13 : /// A full record of a chain that was enqueued via [FakeWorkManager.beginWith].
14 : class FakeChainRecord {
15 0 : const FakeChainRecord({
16 : required this.firstTask,
17 : required this.steps,
18 : });
19 :
20 : /// The task passed to [FakeWorkManager.beginWith].
21 : final TaskRequest firstTask;
22 :
23 : /// All steps of the built chain (each step is a list of parallel tasks).
24 : /// Mirrors [TaskChainBuilder.steps].
25 : final List<List<TaskRequest>> steps;
26 :
27 : /// Flat list of all tasks across all steps, in order.
28 0 : List<TaskRequest> get allTasks => steps.expand((s) => s).toList();
29 :
30 0 : @override
31 : String toString() {
32 0 : final desc = steps
33 0 : .map((s) =>
34 0 : s.length == 1 ? s.first.id : '[${s.map((t) => t.id).join(', ')}]')
35 0 : .join(' → ');
36 0 : return 'FakeChainRecord($desc)';
37 : }
38 : }
39 :
40 : /// In-memory [IWorkManager] test double.
41 : ///
42 : /// Records every call and lets you inject [TaskEvent]s / [TaskProgress] updates
43 : /// via [emitEvent] and [emitProgress].
44 : ///
45 : /// **Important lifecycle rules:**
46 : /// - Always call [dispose] in `tearDown` to prevent resource leaks and
47 : /// to restore [TaskChainBuilder.enqueueCallback] to its original value.
48 : /// - Calling [reset] recreates the stream controllers; existing stream
49 : /// subscriptions are cancelled. Re-subscribe after [reset] if needed.
50 : ///
51 : /// ## Example
52 : ///
53 : /// ```dart
54 : /// group('SyncService', () {
55 : /// late FakeWorkManager wm;
56 : /// late SyncService service;
57 : ///
58 : /// setUp(() {
59 : /// wm = FakeWorkManager();
60 : /// service = SyncService(wm);
61 : /// });
62 : ///
63 : /// tearDown(wm.dispose); // ← always required
64 : ///
65 : /// test('schedules one task on start', () async {
66 : /// await service.start();
67 : /// expect(wm.enqueued, hasLength(1));
68 : /// expect(wm.enqueued.first.taskId, 'periodic-sync');
69 : /// });
70 : ///
71 : /// test('reacts to task failure', () async {
72 : /// await service.start();
73 : /// wm.emitEvent(TaskEvent(
74 : /// taskId: 'periodic-sync',
75 : /// success: false,
76 : /// message: 'Network error',
77 : /// timestamp: DateTime.now(),
78 : /// ));
79 : /// expect(service.lastError, 'Network error');
80 : /// });
81 : ///
82 : /// test('cancels on stop', () async {
83 : /// await service.start();
84 : /// await service.stop();
85 : /// expect(wm.cancelAllCalled, isTrue);
86 : /// });
87 : ///
88 : /// test('full chain structure is visible', () async {
89 : /// await service.startWithChain();
90 : /// expect(wm.chains, hasLength(1));
91 : /// expect(wm.chains.first.allTasks.map((t) => t.id), ['step1', 'step2', 'step3']);
92 : /// });
93 : /// });
94 : /// ```
95 : class FakeWorkManager implements IWorkManager {
96 : /// Creates a new [FakeWorkManager].
97 : ///
98 : /// Saves the current [TaskChainBuilder.enqueueCallback] so it can be restored
99 : /// on [dispose]. This prevents test contamination when multiple test cases run
100 : /// in the same process.
101 4 : FakeWorkManager() {
102 4 : _savedEnqueueCallback = TaskChainBuilder.enqueueCallback;
103 : }
104 :
105 : // ── Streams ────────────────────────────────────────────────────────────────
106 :
107 : StreamController<TaskEvent> _eventsController =
108 : StreamController<TaskEvent>.broadcast();
109 : StreamController<TaskProgress> _progressController =
110 : StreamController<TaskProgress>.broadcast();
111 :
112 1 : @override
113 2 : Stream<TaskEvent> get events => _eventsController.stream;
114 :
115 1 : @override
116 2 : Stream<TaskProgress> get progress => _progressController.stream;
117 :
118 0 : @override
119 0 : Future<Map<String, TaskProgress>> getRunningProgress() async => {};
120 :
121 : // ── Scheduling ─────────────────────────────────────────────────────────────
122 :
123 : /// All [enqueue] / [enqueueAll] calls in order.
124 : final List<EnqueueCall> enqueued = [];
125 :
126 : /// All recorded chains from [beginWith], with their full step structures.
127 : final List<FakeChainRecord> chains = [];
128 :
129 : /// All [cancel] task IDs in order.
130 : final List<String> cancelled = [];
131 :
132 : /// All [cancelByTag] values in order.
133 : final List<String> cancelledTags = [];
134 :
135 : /// Whether [cancelAll] was called.
136 : bool cancelAllCalled = false;
137 :
138 : /// All [pause] task IDs in order.
139 : final List<String> paused = [];
140 :
141 : /// All [resume] task IDs in order.
142 : final List<String> resumed = [];
143 :
144 : // ── Configurable responses ─────────────────────────────────────────────────
145 :
146 : /// Default return value for [enqueue] / [enqueueAll].
147 : ScheduleResult enqueueResult = ScheduleResult.accepted;
148 :
149 : /// Per-task result overrides. Falls back to [enqueueResult] if not set.
150 : ///
151 : /// Use this to simulate mixed results in [enqueueAll]:
152 : /// ```dart
153 : /// wm.enqueueResultByTaskId['task-2'] = ScheduleResult.rejectedOsPolicy;
154 : /// final results = await wm.enqueueAll([req1, req2, req3]);
155 : /// // [accepted, rejectedOsPolicy, accepted]
156 : /// ```
157 : final Map<String, ScheduleResult> enqueueResultByTaskId = {};
158 :
159 : /// Stub task statuses. [getTaskStatus] returns `null` for unknown IDs.
160 : final Map<String, TaskStatus> taskStatuses = {};
161 :
162 : /// Stub tasks-by-tag. [getTasksByTag] returns `[]` for unknown tags.
163 : final Map<String, List<String>> tasksByTag = {};
164 :
165 : /// Return value for [getAllTags].
166 : List<String> allTagsResult = [];
167 :
168 : /// Return value for [allTasks].
169 : List<TaskRecord> allTasksResult = [];
170 :
171 : // ── Internal ───────────────────────────────────────────────────────────────
172 :
173 : // Saved so dispose() can restore it — prevents C-02 test contamination.
174 : Future<ScheduleResult> Function(TaskChainBuilder)? _savedEnqueueCallback;
175 :
176 : // ── IWorkManager ───────────────────────────────────────────────────────────
177 4 : @override
178 : Future<TaskHandler> enqueue({
179 : required String taskId,
180 : required TaskTrigger trigger,
181 : required Worker worker,
182 : Constraints constraints = const Constraints(),
183 : ExistingTaskPolicy existingPolicy = ExistingTaskPolicy.replace,
184 : String? tag,
185 : }) async {
186 8 : enqueued.add(
187 4 : EnqueueCall(
188 : taskId: taskId,
189 : trigger: trigger,
190 : worker: worker,
191 : constraints: constraints,
192 : existingPolicy: existingPolicy,
193 : tag: tag,
194 : ),
195 : );
196 4 : return TaskHandler(
197 : taskId: taskId,
198 12 : scheduleResult: enqueueResultByTaskId[taskId] ?? enqueueResult,
199 : );
200 : }
201 :
202 1 : @override
203 : Future<List<TaskHandler>> enqueueAll(List<EnqueueRequest> requests) async {
204 1 : return [
205 1 : for (final r in requests)
206 2 : await enqueue(
207 1 : taskId: r.taskId,
208 1 : trigger: r.trigger,
209 1 : worker: r.worker,
210 1 : constraints: r.constraints,
211 1 : existingPolicy: r.existingPolicy,
212 1 : tag: r.tag,
213 : ),
214 : ];
215 : }
216 :
217 : /// Records the chain start and intercepts [TaskChainBuilder.enqueue] so the
218 : /// full chain structure (all steps from all `.then()` / `.thenAll()` calls)
219 : /// is captured in [chains].
220 : ///
221 : /// **Note:** This overwrites [TaskChainBuilder.enqueueCallback]. The original
222 : /// value is restored when [dispose] is called.
223 0 : @override
224 : TaskChainBuilder beginWith(TaskRequest task) {
225 0 : final builder = TaskChainBuilder.internal([task]);
226 :
227 : // Intercept enqueue() on the builder to capture the complete chain.
228 : // C-02 fix: we saved the original callback in the constructor; it is
229 : // restored in dispose() so tests can't contaminate each other.
230 0 : TaskChainBuilder.enqueueCallback = (b) async {
231 0 : final record = FakeChainRecord(firstTask: task, steps: b.steps);
232 0 : chains.add(record);
233 :
234 : // Also record each task in enqueued for easy assertion.
235 0 : for (final step in b.steps) {
236 0 : for (final t in step) {
237 0 : enqueued.add(EnqueueCall(
238 0 : taskId: t.id,
239 : trigger: const TaskTrigger.oneTime(),
240 0 : worker: t.worker,
241 0 : constraints: t.constraints,
242 : existingPolicy: ExistingTaskPolicy.replace,
243 : tag: null,
244 : ));
245 : }
246 : }
247 :
248 0 : return enqueueResultByTaskId[task.id] ?? enqueueResult;
249 : };
250 :
251 : return builder;
252 : }
253 :
254 0 : @override
255 : Future<GraphExecution> enqueueGraph(TaskGraph graph) async {
256 0 : graph.validate();
257 :
258 : // Record root nodes as enqueued (since they start immediately)
259 0 : final rootNodes = graph.nodes.where((n) => n.dependsOn.isEmpty);
260 0 : for (final node in rootNodes) {
261 0 : enqueued.add(EnqueueCall(
262 0 : taskId: node.id,
263 : trigger: const TaskTrigger.oneTime(),
264 0 : worker: node.worker,
265 0 : constraints: node.constraints,
266 : existingPolicy: ExistingTaskPolicy.replace,
267 : tag: null,
268 : ));
269 : }
270 :
271 : // In a fake, we don't actually run the DAG logic unless requested.
272 : // For now just return a handle that never completes automatically.
273 0 : return GraphExecution.internal(graph.id, Completer<GraphResult>().future);
274 : }
275 :
276 0 : @override
277 0 : Future<void> cancel({required String taskId}) async => cancelled.add(taskId);
278 :
279 0 : @override
280 : Future<void> cancelByTag({required String tag}) async =>
281 0 : cancelledTags.add(tag);
282 :
283 0 : @override
284 0 : Future<void> cancelAll() async => cancelAllCalled = true;
285 :
286 0 : @override
287 0 : Future<void> pause({required String taskId}) async => paused.add(taskId);
288 :
289 0 : @override
290 0 : Future<void> resume({required String taskId}) async => resumed.add(taskId);
291 :
292 0 : @override
293 : Future<TaskStatus?> getTaskStatus({required String taskId}) async =>
294 0 : taskStatuses[taskId];
295 :
296 0 : @override
297 : Future<TaskRecord?> getTaskRecord({required String taskId}) async {
298 0 : return allTasksResult.where((t) => t.taskId == taskId).firstOrNull;
299 : }
300 :
301 0 : @override
302 : Future<List<String>> getTasksByTag({required String tag}) async =>
303 0 : tasksByTag[tag] ?? [];
304 :
305 0 : @override
306 0 : Future<List<String>> getAllTags() async => allTagsResult;
307 :
308 0 : @override
309 0 : Future<List<TaskRecord>> allTasks() async => allTasksResult;
310 :
311 : // ── Test helpers ───────────────────────────────────────────────────────────
312 :
313 : /// Push a [TaskEvent] into the [events] stream.
314 3 : void emitEvent(TaskEvent event) => _eventsController.add(event);
315 :
316 : /// Push a [TaskProgress] into the [progress] stream.
317 3 : void emitProgress(TaskProgress p) => _progressController.add(p);
318 :
319 : /// Clear all recorded state and recreate stream controllers.
320 : ///
321 : /// Existing stream subscriptions are cancelled when the old controllers are
322 : /// closed — re-subscribe to [events] and [progress] after calling [reset].
323 0 : void reset() {
324 : // M-04 fix: close and recreate stream controllers so old subscriptions
325 : // (from a previous test phase) do not receive events from the new phase.
326 0 : _eventsController.close();
327 0 : _eventsController = StreamController<TaskEvent>.broadcast();
328 0 : _progressController.close();
329 0 : _progressController = StreamController<TaskProgress>.broadcast();
330 :
331 0 : enqueued.clear();
332 0 : chains.clear();
333 0 : cancelled.clear();
334 0 : cancelledTags.clear();
335 0 : cancelAllCalled = false;
336 0 : paused.clear();
337 0 : resumed.clear();
338 0 : taskStatuses.clear();
339 0 : tasksByTag.clear();
340 0 : enqueueResultByTaskId.clear();
341 0 : allTagsResult = [];
342 0 : allTasksResult = [];
343 0 : enqueueResult = ScheduleResult.accepted;
344 : }
345 :
346 : /// Close stream controllers and restore [TaskChainBuilder.enqueueCallback].
347 : ///
348 : /// **Must** be called in `tearDown` to prevent:
349 : /// - Stream subscription leaks.
350 : /// - Test contamination via the static [TaskChainBuilder.enqueueCallback].
351 4 : @override
352 : void dispose() {
353 : // C-02 fix: restore the original enqueueCallback so subsequent tests
354 : // (or production code) are not affected by this fake.
355 4 : TaskChainBuilder.enqueueCallback = _savedEnqueueCallback;
356 8 : _eventsController.close();
357 8 : _progressController.close();
358 : }
359 : }
360 :
361 : // ── Support types ──────────────────────────────────────────────────────────
362 :
363 : /// A single recorded [FakeWorkManager.enqueue] invocation.
364 : class EnqueueCall {
365 4 : const EnqueueCall({
366 : required this.taskId,
367 : required this.trigger,
368 : required this.worker,
369 : required this.constraints,
370 : required this.existingPolicy,
371 : this.tag,
372 : });
373 :
374 : final String taskId;
375 : final TaskTrigger trigger;
376 : final Worker worker;
377 : final Constraints constraints;
378 : final ExistingTaskPolicy existingPolicy;
379 : final String? tag;
380 :
381 : // L-02 fix: implement == and hashCode so expect(wm.enqueued, contains(...))
382 : // works correctly in tests.
383 0 : @override
384 : bool operator ==(Object other) {
385 : if (identical(this, other)) return true;
386 0 : return other is EnqueueCall &&
387 0 : other.taskId == taskId &&
388 0 : other.tag == tag &&
389 0 : other.existingPolicy == existingPolicy;
390 : }
391 :
392 0 : @override
393 0 : int get hashCode => Object.hash(taskId, tag, existingPolicy);
394 :
395 0 : @override
396 : String toString() =>
397 0 : 'EnqueueCall(taskId: $taskId, trigger: ${trigger.runtimeType}, tag: $tag)';
398 : }
|