Line data Source code
1 : import 'dart:async';
2 : import 'dart:convert';
3 : import 'dart:developer' as developer;
4 :
5 : import 'package:flutter/foundation.dart';
6 : import 'package:flutter/services.dart';
7 :
8 : import 'constraints.dart';
9 : import 'events.dart';
10 : import 'platform_interface.dart';
11 : import 'remote_trigger.dart';
12 : import 'task_trigger.dart';
13 : import 'worker.dart';
14 :
15 : /// Method channel implementation of [NativeWorkManagerPlatform].
16 : class MethodChannelNativeWorkManager extends NativeWorkManagerPlatform {
17 : /// The method channel used to interact with the native platform.
18 : @visibleForTesting
19 4 : late final methodChannel =
20 : const MethodChannel('dev.brewkits/native_workmanager');
21 :
22 : /// Event channel for task completion events.
23 : @visibleForTesting
24 : final eventChannel =
25 : const EventChannel('dev.brewkits/native_workmanager/events');
26 :
27 : /// Event channel for task progress updates.
28 : @visibleForTesting
29 : final progressChannel =
30 : const EventChannel('dev.brewkits/native_workmanager/progress');
31 :
32 : /// Event channel for system-level errors.
33 : @visibleForTesting
34 : final systemErrorChannel =
35 : const EventChannel('dev.brewkits/native_workmanager/system_errors');
36 :
37 : StreamController<TaskEvent>? _eventController;
38 : StreamController<TaskProgress>? _progressController;
39 : StreamController<SystemError>? _systemErrorController;
40 : StreamSubscription? _eventSubscription;
41 : StreamSubscription? _progressSubscription;
42 : StreamSubscription? _systemErrorSubscription;
43 :
44 : /// Task IDs that have reached a terminal state (completed / failed / cancelled).
45 : ///
46 : /// Progress events can arrive *after* the completion event due to async
47 : /// queueing in the native bridge (time-travel progress). Any progress event
48 : /// for a task already in this set is dropped. The set is cleared on each
49 : /// call to [_initEventStreams] so that re-initialisation (e.g. hot restart)
50 : /// starts clean.
51 : final _completedTaskIds = <String>{};
52 :
53 : Future<bool> Function(String, Map<String, dynamic>?)? _callbackExecutor;
54 :
55 : /// Session start time (ms). Used to drop stale events from previous app runs.
56 : int _sessionStartTime = 0;
57 :
58 4 : @override
59 : Future<void> initialize({
60 : int? callbackHandle,
61 : bool debugMode = false,
62 : int maxConcurrentTasks = 4,
63 : int diskSpaceBufferMB = 20,
64 : int cleanupAfterDays = 30,
65 : bool enforceHttps = false,
66 : bool blockPrivateIPs = false,
67 : bool registerPlugins = false,
68 : }) async {
69 : // Setup method call handler for Dart callbacks
70 12 : methodChannel.setMethodCallHandler(_handleMethodCall);
71 :
72 : // Record session start time to filter out "zombie" events from previous runs.
73 12 : _sessionStartTime = DateTime.now().millisecondsSinceEpoch;
74 :
75 : // Initialize event streams
76 4 : _initEventStreams();
77 :
78 : // Pass config to native side.
79 4 : final args = <String, dynamic>{
80 : 'maxConcurrentTasks': maxConcurrentTasks,
81 : 'diskSpaceBufferMB': diskSpaceBufferMB,
82 : 'cleanupAfterDays': cleanupAfterDays,
83 : 'enforceHttps': enforceHttps,
84 : 'blockPrivateIPs': blockPrivateIPs,
85 : 'registerPlugins': registerPlugins,
86 : };
87 0 : if (callbackHandle != null) args['callbackHandle'] = callbackHandle;
88 0 : if (debugMode) args['debugMode'] = debugMode;
89 8 : await methodChannel.invokeMethod<void>('initialize', args);
90 : }
91 :
92 4 : void _initEventStreams() {
93 : // Cancel existing subscriptions and close old controllers before re-initializing.
94 : // This prevents memory leaks and duplicate event emissions during hot restarts.
95 6 : _eventSubscription?.cancel();
96 6 : _progressSubscription?.cancel();
97 6 : _systemErrorSubscription?.cancel();
98 6 : _eventController?.close();
99 6 : _progressController?.close();
100 6 : _systemErrorController?.close();
101 :
102 : // Clear stale terminal-state entries from any previous session so that
103 : // re-initialisation (hot restart, engine re-attach) starts clean.
104 8 : _completedTaskIds.clear();
105 :
106 8 : _eventController = StreamController<TaskEvent>.broadcast();
107 8 : _progressController = StreamController<TaskProgress>.broadcast();
108 8 : _systemErrorController = StreamController<SystemError>.broadcast();
109 :
110 4 : _eventSubscription =
111 13 : eventChannel.receiveBroadcastStream().listen((dynamic event) {
112 1 : if (event is Map) {
113 1 : final map = Map<String, dynamic>.from(event);
114 :
115 : // Drop stale zombie events from previous sessions (pre-hot-restart).
116 1 : final timestamp = map['timestamp'] as int? ?? 0;
117 2 : if (timestamp < _sessionStartTime) return;
118 :
119 1 : final taskEvent = TaskEvent.fromMap(map);
120 : // Only terminal events (success/failure) block future progress events.
121 1 : if (!taskEvent.isStarted) {
122 3 : _completedTaskIds.add(taskEvent.taskId);
123 : }
124 2 : _eventController?.add(taskEvent);
125 : }
126 0 : }, onError: (error) {
127 0 : developer.log('Event channel error: $error', error: error);
128 : });
129 :
130 4 : _progressSubscription =
131 13 : progressChannel.receiveBroadcastStream().listen((dynamic event) {
132 1 : if (event is Map) {
133 1 : final map = Map<String, dynamic>.from(event);
134 :
135 : // Drop stale progress events (stale session or already completed).
136 1 : final timestamp = map['timestamp'] as int? ?? 0;
137 2 : if (timestamp < _sessionStartTime) return;
138 :
139 1 : final taskProgress = TaskProgress.fromMap(map);
140 3 : if (_completedTaskIds.contains(taskProgress.taskId)) return;
141 2 : _progressController?.add(taskProgress);
142 : }
143 0 : }, onError: (error) {
144 0 : developer.log('Progress channel error: $error', error: error);
145 : });
146 :
147 4 : _systemErrorSubscription =
148 12 : systemErrorChannel.receiveBroadcastStream().listen((dynamic event) {
149 0 : if (event is Map) {
150 0 : final map = Map<String, dynamic>.from(event);
151 0 : final systemError = SystemError.fromMap(map);
152 0 : _systemErrorController?.add(systemError);
153 : }
154 0 : }, onError: (error) {
155 0 : developer.log('System error channel error: $error', error: error);
156 : });
157 : }
158 :
159 0 : Future<dynamic> _handleMethodCall(MethodCall call) async {
160 0 : switch (call.method) {
161 0 : case 'executeDartCallback':
162 0 : return _executeDartCallback(call.arguments as Map<dynamic, dynamic>);
163 : default:
164 0 : throw MissingPluginException('Unknown method: ${call.method}');
165 : }
166 : }
167 :
168 0 : Future<bool> _executeDartCallback(Map<dynamic, dynamic> args) async {
169 0 : final callbackId = args['callbackId'] as String;
170 0 : final inputJson = args['input'] as String?;
171 :
172 0 : if (_callbackExecutor == null) {
173 0 : throw StateError('No callback executor registered for: $callbackId');
174 : }
175 :
176 : Map<String, dynamic>? input;
177 0 : if (inputJson != null && inputJson.isNotEmpty) {
178 : try {
179 0 : final decoded = jsonDecode(inputJson);
180 0 : if (decoded is Map) {
181 0 : input = Map<String, dynamic>.from(decoded);
182 : }
183 : } catch (_) {
184 : // Non-JSON scalar — wrap so callbacks always receive a Map
185 0 : input = {'value': inputJson};
186 : }
187 : }
188 :
189 0 : return _callbackExecutor!(callbackId, input);
190 : }
191 :
192 3 : @override
193 : void setCallbackExecutor(
194 : Future<bool> Function(String callbackId, Map<String, dynamic>? input)
195 : executor) {
196 3 : _callbackExecutor = executor;
197 : }
198 :
199 1 : @override
200 : Future<ScheduleResult> enqueue({
201 : required String taskId,
202 : required TaskTrigger trigger,
203 : required Worker worker,
204 : required Constraints constraints,
205 : required ExistingTaskPolicy existingPolicy,
206 : String? tag,
207 : }) async {
208 3 : final result = await methodChannel.invokeMethod<String>('enqueue', {
209 1 : 'taskId': taskId,
210 2 : 'trigger': trigger.toMap(),
211 2 : 'workerClassName': worker.workerClassName,
212 2 : 'workerConfig': worker.toMap(),
213 2 : 'constraints': constraints.toMap(),
214 2 : 'existingPolicy': existingPolicy.name,
215 0 : if (tag != null) 'tag': tag,
216 : });
217 :
218 1 : return _parseScheduleResult(result);
219 : }
220 :
221 0 : @override
222 : Future<void> cancelByTag({required String tag}) async {
223 0 : await methodChannel.invokeMethod<void>('cancelByTag', {'tag': tag});
224 : }
225 :
226 0 : @override
227 : Future<List<String>> getTasksByTag({required String tag}) async {
228 0 : final result = await methodChannel
229 0 : .invokeMethod<List<dynamic>>('getTasksByTag', {'tag': tag});
230 0 : return result?.cast<String>() ?? [];
231 : }
232 :
233 0 : @override
234 : Future<List<String>> getAllTags() async {
235 : final result =
236 0 : await methodChannel.invokeMethod<List<dynamic>>('getAllTags');
237 0 : return result?.cast<String>() ?? [];
238 : }
239 :
240 0 : @override
241 : Future<void> cancel({required String taskId}) async {
242 0 : await methodChannel.invokeMethod<void>('cancel', {'taskId': taskId});
243 : }
244 :
245 0 : @override
246 : Future<void> cancelAll() async {
247 0 : await methodChannel.invokeMethod<void>('cancelAll');
248 : }
249 :
250 0 : @override
251 : Future<TaskStatus?> getTaskStatus({required String taskId}) async {
252 0 : final result = await methodChannel.invokeMethod<String?>(
253 : 'getTaskStatus',
254 0 : {'taskId': taskId},
255 : );
256 :
257 : if (result == null) return null;
258 0 : return TaskStatus.values.where((e) => e.name == result).firstOrNull;
259 : }
260 :
261 0 : @override
262 : Future<TaskRecord?> getTaskRecord({required String taskId}) async {
263 0 : developer.log(
264 0 : 'MethodChannel[${methodChannel.name}]: invoking getTaskRecord for $taskId');
265 : try {
266 0 : final result = await methodChannel.invokeMapMethod<String, dynamic>(
267 : 'getTaskRecord',
268 0 : {'taskId': taskId},
269 : );
270 0 : developer.log(
271 0 : 'MethodChannel[${methodChannel.name}]: getTaskRecord result: ${result != null}');
272 :
273 : if (result == null) return null;
274 0 : return TaskRecord.fromMap(result);
275 : } catch (e, s) {
276 0 : developer.log(
277 0 : 'MethodChannel[${methodChannel.name}]: error in getTaskRecord: $e\n$s');
278 : return null;
279 : }
280 : }
281 :
282 0 : @override
283 : Future<ScheduleResult> enqueueChain(Map<String, dynamic> chainData) async {
284 0 : final result = await methodChannel.invokeMethod<String>(
285 : 'enqueueChain',
286 : chainData,
287 : );
288 :
289 0 : return _parseScheduleResult(result);
290 : }
291 :
292 0 : @override
293 : Future<void> pauseTask({required String taskId}) async {
294 0 : await methodChannel.invokeMethod<void>('pause', {'taskId': taskId});
295 : }
296 :
297 0 : @override
298 : Future<void> resumeTask({required String taskId}) async {
299 0 : await methodChannel.invokeMethod<void>('resume', {'taskId': taskId});
300 : }
301 :
302 0 : @override
303 : Future<String?> getServerFilename({
304 : required String url,
305 : Map<String, String>? headers,
306 : int timeoutMs = 30000,
307 : }) async {
308 0 : return methodChannel.invokeMethod<String>('getServerFilename', {
309 0 : 'url': url,
310 0 : if (headers != null) 'headers': headers,
311 0 : 'timeoutMs': timeoutMs,
312 : });
313 : }
314 :
315 0 : @override
316 : Future<List<TaskRecord>> allTasks() async {
317 0 : final result = await methodChannel.invokeMethod<List<dynamic>>('allTasks');
318 0 : if (result == null) return [];
319 : return result
320 0 : .map((e) => TaskRecord.fromMap(Map<String, dynamic>.from(e as Map)))
321 0 : .toList();
322 : }
323 :
324 2 : @override
325 : Stream<TaskEvent> get events =>
326 3 : _eventController?.stream ?? const Stream.empty();
327 :
328 2 : @override
329 : Stream<TaskProgress> get progress =>
330 3 : _progressController?.stream ?? const Stream.empty();
331 :
332 0 : @override
333 : Stream<SystemError> get systemErrors =>
334 0 : _systemErrorController?.stream ?? const Stream.empty();
335 :
336 1 : ScheduleResult _parseScheduleResult(String? result) {
337 : if (result == null) return ScheduleResult.accepted;
338 :
339 1 : final lower = result.toLowerCase();
340 1 : if (lower == 'accepted') return ScheduleResult.accepted;
341 0 : if (lower == 'rejected_os_policy' || lower == 'rejectedospolicy') {
342 : return ScheduleResult.rejectedOsPolicy;
343 : }
344 0 : if (lower == 'throttled') return ScheduleResult.throttled;
345 :
346 : // FIX L1: Log unknown values instead of silently treating them as accepted.
347 : // This surfaces native-side bugs (e.g. typos, new values) during development.
348 0 : developer.log(
349 : 'NativeWorkManager: Unrecognised schedule result "$result" — defaulting to accepted. '
350 : 'This may indicate a platform bug or version mismatch.',
351 : name: 'NativeWorkManager',
352 : level: 900, // WARNING
353 : );
354 : return ScheduleResult.accepted;
355 : }
356 :
357 0 : @override
358 : Future<Map<String, dynamic>> getRunningProgress() async {
359 0 : final result = await methodChannel
360 0 : .invokeMethod<Map<Object?, Object?>>('getRunningProgress');
361 0 : if (result == null) return {};
362 0 : return result.map((key, value) => MapEntry(key.toString(), value));
363 : }
364 :
365 0 : @override
366 : Future<void> openFile(String path, {String? mimeType}) async {
367 0 : await methodChannel.invokeMethod<void>('openFile', {
368 0 : 'filePath': path,
369 0 : if (mimeType != null) 'mimeType': mimeType,
370 : });
371 : }
372 :
373 0 : @override
374 : Future<void> setMaxConcurrentPerHost(int max) async {
375 0 : await methodChannel
376 0 : .invokeMethod<void>('setMaxConcurrentPerHost', {'max': max});
377 : }
378 :
379 0 : @override
380 : Future<void> registerRemoteTrigger({
381 : required RemoteTriggerSource source,
382 : required RemoteTriggerRule rule,
383 : }) async {
384 0 : await methodChannel.invokeMethod<void>('registerRemoteTrigger', {
385 0 : 'source': source.name,
386 0 : 'rule': rule.toMap(),
387 : });
388 : }
389 :
390 0 : @override
391 : Future<String> enqueueGraph(Map<String, dynamic> graphMap) async {
392 0 : return await methodChannel.invokeMethod<String>('enqueueGraph', {
393 : 'graph': graphMap,
394 : }) ??
395 : 'ACCEPTED';
396 : }
397 :
398 0 : @override
399 : Future<void> offlineQueueEnqueue(
400 : String queueId, Map<String, dynamic> entryMap) async {
401 0 : await methodChannel.invokeMethod<void>('offlineQueueEnqueue', {
402 : 'queueId': queueId,
403 : 'entry': entryMap,
404 : });
405 : }
406 :
407 0 : @override
408 : Future<void> registerMiddleware(Map<String, dynamic> middlewareMap) async {
409 0 : await methodChannel.invokeMethod<void>('registerMiddleware', middlewareMap);
410 : }
411 :
412 0 : @override
413 : Future<Map<String, dynamic>> getMetrics() async {
414 : final result =
415 0 : await methodChannel.invokeMethod<Map<dynamic, dynamic>>('getMetrics');
416 0 : if (result == null) return {};
417 0 : return result.map((key, value) => MapEntry(key.toString(), value));
418 : }
419 :
420 0 : @override
421 : Future<bool> syncOfflineQueue() async {
422 0 : final result = await methodChannel.invokeMethod<bool>('syncOfflineQueue');
423 : return result ?? false;
424 : }
425 :
426 : /// Dispose resources.
427 1 : void dispose() {
428 2 : _eventSubscription?.cancel();
429 2 : _progressSubscription?.cancel();
430 2 : _systemErrorSubscription?.cancel();
431 2 : _eventController?.close();
432 2 : _progressController?.close();
433 2 : _systemErrorController?.close();
434 2 : _completedTaskIds.clear();
435 : }
436 : }
|