Refactor download management with shared isolate pool and optimize concurrent downloads

This commit is contained in:
Moustapha Kodjo Amadou 2026-01-08 13:01:15 +01:00
parent 9458ae120b
commit b9f9a8398f
7 changed files with 784 additions and 348 deletions

View file

@ -34,6 +34,7 @@ import 'package:mangayomi/l10n/generated/app_localizations.dart';
import 'package:mangayomi/services/http/m_client.dart';
import 'package:mangayomi/services/isolate_service.dart';
import 'package:mangayomi/services/m_extension_server.dart';
import 'package:mangayomi/services/download_manager/m_downloader.dart';
import 'package:mangayomi/src/rust/frb_generated.dart';
import 'package:mangayomi/utils/discord_rpc.dart';
import 'package:mangayomi/utils/log/logger.dart';
@ -83,6 +84,9 @@ void main(List<String> args) async {
Future<void> _postLaunchInit(StorageProvider storage) async {
await AppLogger.init();
// Initialiser le pool d'Isolates partagé (3 workers)
// Optimise la mémoire pour 50+ téléchargements simultanés
unawaited(MDownloader.initializeIsolatePool(poolSize: 3));
final hivePath = (Platform.isIOS || Platform.isMacOS)
? "databases"
: p.join("Mangayomi", "databases");

View file

@ -406,6 +406,7 @@ Future<void> processDownloads(Ref ref, {bool? useWifi}) async {
final downloadItem = ongoingDownloads[index++];
final chapter = downloadItem.chapter.value!;
chapter.cancelDownloads(downloadItem.id);
await Future.delayed(const Duration(milliseconds: 500));
ref.read(
downloadChapterProvider(
chapter: chapter,

View file

@ -82,7 +82,7 @@ class DownloadLocationState extends _$DownloadLocationState {
class ConcurrentDownloadsState extends _$ConcurrentDownloadsState {
@override
int build() {
return isar.settings.getSync(227)!.concurrentDownloads ?? 2;
return isar.settings.getSync(227)!.concurrentDownloads ?? 1;
}
void set(int value) {

View file

@ -0,0 +1,689 @@
import 'dart:collection';
import 'dart:isolate';
import 'dart:async';
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart';
import 'package:mangayomi/models/manga.dart';
import 'package:mangayomi/models/page.dart';
import 'package:mangayomi/services/http/m_client.dart';
import 'package:mangayomi/services/http/rhttp/src/model/settings.dart';
import 'package:mangayomi/services/download_manager/m3u8/models/download.dart';
import 'package:mangayomi/services/download_manager/m3u8/models/ts_info.dart';
import 'package:mangayomi/src/rust/frb_generated.dart';
import 'package:mangayomi/utils/extensions/string_extensions.dart';
import 'package:path/path.dart' as path;
import 'package:encrypt/encrypt.dart' as encrypt;
final downloadTaskCancellation = <String, bool>{};
/// Shared Isolate pool to optimize performance
/// Instead of creating a new Isolate for each download,
/// we use a limited pool of workers that process tasks in queue.
class DownloadIsolatePool {
static DownloadIsolatePool? _instance;
final List<_PoolWorker> _workers = [];
final Queue<_DownloadTask> _taskQueue = Queue();
final int poolSize;
bool _initialized = false;
int _activeWorkers = 0;
DownloadIsolatePool._({this.poolSize = 3});
/// Get the singleton instance of the pool
static DownloadIsolatePool get instance {
_instance ??= DownloadIsolatePool._();
return _instance!;
}
/// Configure the pool size (call before initialize)
static void configure({int poolSize = 3}) {
if (_instance != null && _instance!._initialized) {
if (kDebugMode) {
print('[DownloadPool] Cannot reconfigure after initialization');
}
return;
}
_instance = DownloadIsolatePool._(poolSize: poolSize);
}
/// Initialize the Isolate pool
Future<void> initialize() async {
if (_initialized) return;
if (kDebugMode) {
print('[DownloadPool] Initializing with $poolSize workers...');
}
for (int i = 0; i < poolSize; i++) {
final worker = await _PoolWorker.create(i);
_workers.add(worker);
}
_initialized = true;
if (kDebugMode) {
print('[DownloadPool] Pool initialized with $poolSize workers');
}
}
/// Submit a file download task (manga/anime)
Future<void> submitFileDownload({
required String taskId,
required List<PageUrl> pageUrls,
required int concurrentDownloads,
required ItemType itemType,
required void Function(DownloadProgress) onProgress,
required void Function() onComplete,
required void Function(Exception) onError,
}) async {
if (!_initialized) await initialize();
// Mark the task as active (not cancelled)
downloadTaskCancellation[taskId] = false;
final receivePort = ReceivePort();
final task = _DownloadTask(
taskId: taskId,
type: _TaskType.fileDownload,
params: FileDownloadParams(
pageUrls: pageUrls,
concurrentDownloads: concurrentDownloads,
itemType: itemType,
),
sendPort: receivePort.sendPort,
);
// Listen for progress messages
receivePort.listen((message) {
if (downloadTaskCancellation[taskId] == true) {
receivePort.close();
return;
}
if (message is DownloadProgress) {
onProgress(message);
} else if (message is DownloadComplete) {
downloadTaskCancellation.remove(taskId);
receivePort.close();
onComplete();
} else if (message is Exception) {
downloadTaskCancellation.remove(taskId);
receivePort.close();
onError(message);
}
});
_enqueueTask(task);
}
/// Submit an M3U8 segment download task
Future<void> submitM3u8Download({
required String taskId,
required List<TsInfo> segments,
required String tempDir,
required Uint8List? key,
required Uint8List? iv,
required int? mediaSequence,
required int concurrentDownloads,
required Map<String, String>? headers,
required ItemType itemType,
required void Function(DownloadProgress) onProgress,
required void Function() onComplete,
required void Function(Exception) onError,
}) async {
if (!_initialized) await initialize();
downloadTaskCancellation[taskId] = false;
final receivePort = ReceivePort();
final task = _DownloadTask(
taskId: taskId,
type: _TaskType.m3u8Download,
params: M3u8DownloadParams(
segments: segments,
tempDir: tempDir,
key: key,
iv: iv,
mediaSequence: mediaSequence,
concurrentDownloads: concurrentDownloads,
headers: headers,
itemType: itemType,
),
sendPort: receivePort.sendPort,
);
receivePort.listen((message) {
if (downloadTaskCancellation[taskId] == true) {
receivePort.close();
return;
}
if (message is DownloadProgress) {
onProgress(message);
} else if (message is DownloadComplete) {
downloadTaskCancellation.remove(taskId);
receivePort.close();
onComplete();
} else if (message is Exception) {
downloadTaskCancellation.remove(taskId);
receivePort.close();
onError(message);
}
});
_enqueueTask(task);
}
/// Cancel a download task
void cancelTask(String taskId) {
downloadTaskCancellation[taskId] = true;
}
/// Add a task to the queue and try to process it
void _enqueueTask(_DownloadTask task) {
_taskQueue.add(task);
_processQueue();
}
/// Process the task queue
void _processQueue() {
while (_taskQueue.isNotEmpty && _activeWorkers < _workers.length) {
final task = _taskQueue.removeFirst();
final worker = _workers[_activeWorkers];
_activeWorkers++;
worker.executeTask(task).then((_) {
_activeWorkers--;
_processQueue(); // Process the next task
});
}
}
/// Number of pending tasks
int get pendingTasks => _taskQueue.length;
/// Number of active workers
int get activeWorkers => _activeWorkers;
/// Close the pool
void dispose() {
for (final worker in _workers) {
worker.dispose();
}
_workers.clear();
_taskQueue.clear();
downloadTaskCancellation.clear();
_initialized = false;
_activeWorkers = 0;
}
}
/// Supported task types
enum _TaskType { fileDownload, m3u8Download }
/// Download task
class _DownloadTask {
final String taskId;
final _TaskType type;
final dynamic params;
final SendPort sendPort;
_DownloadTask({
required this.taskId,
required this.type,
required this.params,
required this.sendPort,
});
}
/// Parameters for file download
class FileDownloadParams {
final List<PageUrl> pageUrls;
final int concurrentDownloads;
final ItemType itemType;
FileDownloadParams({
required this.pageUrls,
required this.concurrentDownloads,
required this.itemType,
});
}
/// Parameters for M3U8 download
class M3u8DownloadParams {
final List<TsInfo> segments;
final String tempDir;
final Uint8List? key;
final Uint8List? iv;
final int? mediaSequence;
final int concurrentDownloads;
final Map<String, String>? headers;
final ItemType itemType;
M3u8DownloadParams({
required this.segments,
required this.tempDir,
required this.key,
required this.iv,
required this.mediaSequence,
required this.concurrentDownloads,
required this.headers,
required this.itemType,
});
}
/// Pool worker that executes tasks in a persistent Isolate
class _PoolWorker {
final int id;
late Isolate _isolate;
late SendPort _sendPort;
late ReceivePort _receivePort;
final Completer<void> _ready = Completer();
_PoolWorker._(this.id);
static Future<_PoolWorker> create(int id) async {
final worker = _PoolWorker._(id);
await worker._spawn();
return worker;
}
Future<void> _spawn() async {
_receivePort = ReceivePort();
_isolate = await Isolate.spawn(
_workerEntryPoint,
_WorkerInit(id, _receivePort.sendPort),
);
// Wait for the worker to be ready and get its SendPort
final completer = Completer<SendPort>();
_receivePort.listen((message) {
if (message is SendPort) {
completer.complete(message);
}
});
_sendPort = await completer.future;
_ready.complete();
}
/// Execute a task in this worker
Future<void> executeTask(_DownloadTask task) async {
await _ready.future;
final completer = Completer<void>();
// Create a port to receive messages from this worker
final taskPort = ReceivePort();
taskPort.listen((message) {
// Forward the message to the original task port
task.sendPort.send(message);
if (message is DownloadComplete || message is Exception) {
taskPort.close();
completer.complete();
}
});
// Send the task to the worker
_sendPort.send(
_WorkerTask(
taskId: task.taskId,
type: task.type,
params: task.params,
replyPort: taskPort.sendPort,
),
);
return completer.future;
}
void dispose() {
_isolate.kill();
_receivePort.close();
}
}
/// Worker initialization message
class _WorkerInit {
final int workerId;
final SendPort mainPort;
_WorkerInit(this.workerId, this.mainPort);
}
/// Task sent to the worker
class _WorkerTask {
final String taskId;
final _TaskType type;
final dynamic params;
final SendPort replyPort;
_WorkerTask({
required this.taskId,
required this.type,
required this.params,
required this.replyPort,
});
}
/// Isolate worker entry point
void _workerEntryPoint(_WorkerInit init) async {
// Initialize dependencies in the Isolate
await RustLib.init();
final httpClient = MClient.httpClient(
settings: const ClientSettings(
throwOnStatusCode: false,
tlsSettings: TlsSettings(verifyCertificates: false),
),
);
// Create the receive port for this worker
final receivePort = ReceivePort();
// Send the SendPort to the main isolate
init.mainPort.send(receivePort.sendPort);
if (kDebugMode) {
print('[Worker ${init.workerId}] Ready');
}
// Listen for tasks
await for (final message in receivePort) {
if (message is _WorkerTask) {
try {
if (message.type == _TaskType.fileDownload) {
await _processFileDownload(
message.params as FileDownloadParams,
message.replyPort,
httpClient,
);
} else if (message.type == _TaskType.m3u8Download) {
await _processM3u8Download(
message.params as M3u8DownloadParams,
message.replyPort,
httpClient,
);
}
} catch (e) {
message.replyPort.send(DownloadPoolException('Task failed', e));
}
}
}
}
/// Process a file download
Future<void> _processFileDownload(
FileDownloadParams params,
SendPort replyPort,
Client client,
) async {
int completed = 0;
final total = params.pageUrls.length;
final queue = Queue<PageUrl>.from(params.pageUrls);
final List<Future<void>> activeTasks = [];
try {
while (queue.isNotEmpty || activeTasks.isNotEmpty) {
while (queue.isNotEmpty &&
activeTasks.length < params.concurrentDownloads) {
final pageUrl = queue.removeFirst();
final task = _downloadFile(pageUrl, client, params.itemType, replyPort)
.then((_) {
if (params.itemType != ItemType.anime) {
completed++;
replyPort.send(
DownloadProgress(
pageUrl: pageUrl,
completed,
total,
params.itemType,
),
);
}
})
.catchError((error) {
replyPort.send(
DownloadPoolException(
'Error downloading ${pageUrl.fileName}',
error,
),
);
throw error;
});
activeTasks.add(task);
}
if (activeTasks.isNotEmpty) {
await Future.wait(activeTasks.toList(), eagerError: true);
activeTasks.clear();
}
}
replyPort.send(DownloadComplete());
} catch (e) {
replyPort.send(DownloadPoolException('Download failed', e));
}
}
/// Download an individual file
Future<void> _downloadFile(
PageUrl pageUrl,
Client client,
ItemType itemType,
SendPort replyPort,
) async {
try {
if (itemType != ItemType.anime) {
final response = await _withRetry(
() => client.get(Uri.parse(pageUrl.url), headers: pageUrl.headers),
3,
);
if (response.statusCode != 200) {
throw DownloadPoolException(
'Failed to download file: ${pageUrl.fileName!}',
);
}
final file = File(pageUrl.fileName!);
await file.writeAsBytes(response.bodyBytes);
} else {
// Streaming for videos (saves RAM)
await _withRetry(() async {
var request = Request('GET', Uri.parse(pageUrl.url));
request.headers.addAll(pageUrl.headers ?? {});
StreamedResponse response = await client.send(request);
if (response.statusCode != 200) {
throw DownloadPoolException(
'Failed to download file: ${pageUrl.fileName!}',
);
}
int total = response.contentLength ?? 0;
int received = 0;
final file = File(pageUrl.fileName!);
final sink = file.openWrite();
try {
await for (var value in response.stream) {
sink.add(value);
received += value.length;
try {
replyPort.send(
DownloadProgress(
(received / total * 100).toInt(),
100,
pageUrl: pageUrl,
itemType,
),
);
} catch (_) {}
}
} finally {
await sink.flush();
await sink.close();
}
}, 3);
}
} catch (e) {
throw DownloadPoolException(
'Failed to process file: ${pageUrl.fileName!}',
e,
);
}
}
/// Process an M3U8 download
Future<void> _processM3u8Download(
M3u8DownloadParams params,
SendPort replyPort,
Client client,
) async {
int completed = 0;
final total = params.segments.length;
final queue = Queue<TsInfo>.from(params.segments);
final List<Future<void>> activeTasks = [];
try {
while (queue.isNotEmpty || activeTasks.isNotEmpty) {
while (queue.isNotEmpty &&
activeTasks.length < params.concurrentDownloads) {
final segment = queue.removeFirst();
final task = _downloadSegment(segment, params, client)
.then((_) {
completed++;
replyPort.send(
DownloadProgress(
segment: segment,
completed,
total,
params.itemType,
),
);
})
.catchError((error) {
replyPort.send(
DownloadPoolException(
'Error downloading segment ${segment.name}',
error,
),
);
throw error;
});
activeTasks.add(task);
}
if (activeTasks.isNotEmpty) {
await Future.wait(activeTasks.toList(), eagerError: true);
activeTasks.clear();
}
}
replyPort.send(DownloadComplete());
} catch (e) {
replyPort.send(DownloadPoolException('M3U8 download failed', e));
}
}
/// Download a TS segment
Future<void> _downloadSegment(
TsInfo ts,
M3u8DownloadParams params,
Client client,
) async {
try {
final file = File(path.join(params.tempDir, '${ts.name}.ts'));
// Streaming to save memory
var request = Request('GET', Uri.parse(ts.url));
if (params.headers != null) {
request.headers.addAll(params.headers!);
}
StreamedResponse response = await _withRetry(() => client.send(request), 3);
if (response.statusCode != 200) {
throw DownloadPoolException('Failed to download segment: ${ts.name}');
}
final sink = file.openWrite();
try {
await for (var chunk in response.stream) {
sink.add(chunk);
}
} finally {
await sink.flush();
await sink.close();
}
// Decrypt if necessary
if (params.key != null) {
final bytes = await file.readAsBytes();
final index = int.parse(ts.name.substringAfter("TS_"));
final decrypted = _aesDecrypt(
(params.mediaSequence ?? 1) + (index - 1),
bytes,
params.key!,
iv: params.iv,
);
await file.writeAsBytes(decrypted);
}
} catch (e) {
throw DownloadPoolException('Failed to process segment: ${ts.name}', e);
}
}
/// AES decryption
Uint8List _aesDecrypt(
int sequence,
Uint8List encrypted,
Uint8List key, {
Uint8List? iv,
}) {
try {
if (iv == null) {
iv = Uint8List(16);
ByteData.view(iv.buffer).setUint64(8, sequence);
}
final encrypter = encrypt.Encrypter(
encrypt.AES(encrypt.Key(key), mode: encrypt.AESMode.cbc),
);
return Uint8List.fromList(
encrypter.decryptBytes(encrypt.Encrypted(encrypted), iv: encrypt.IV(iv)),
);
} catch (e) {
throw DownloadPoolException('Decryption failed', e);
}
}
/// Helper for retry
Future<T> _withRetry<T>(Future<T> Function() operation, int maxRetries) async {
int attempts = 0;
while (true) {
try {
attempts++;
return await operation();
} catch (e) {
if (attempts >= maxRetries) {
throw DownloadPoolException(
'Operation failed after $maxRetries attempts',
e,
);
}
}
}
}
/// Pool exception
class DownloadPoolException implements Exception {
final String message;
final dynamic originalError;
DownloadPoolException(this.message, [this.originalError]);
@override
String toString() =>
'DownloadPoolException: $message${originalError != null ? ' ($originalError)' : ''}';
}

View file

@ -1,10 +1,7 @@
import 'dart:collection';
import 'dart:developer';
import 'dart:io';
import 'dart:async';
import 'dart:isolate';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart';
import 'package:mangayomi/models/chapter.dart';
import 'package:mangayomi/models/video.dart';
import 'package:mangayomi/providers/storage_provider.dart';
@ -12,15 +9,13 @@ import 'package:mangayomi/services/http/m_client.dart';
import 'package:mangayomi/services/http/rhttp/src/model/settings.dart';
import 'package:mangayomi/services/download_manager/m3u8/models/download.dart';
import 'package:mangayomi/services/download_manager/m3u8/models/ts_info.dart';
import 'package:mangayomi/src/rust/frb_generated.dart';
import 'package:mangayomi/services/download_manager/download_isolate_pool.dart';
import 'package:mangayomi/services/download_manager/m_downloader.dart';
import 'package:mangayomi/utils/extensions/string_extensions.dart';
import 'package:mangayomi/utils/log/logger.dart';
import 'package:path/path.dart' as path;
import 'package:encrypt/encrypt.dart' as encrypt;
import 'package:convert/convert.dart';
final isolateChapsSendPorts = <String?, (ReceivePort?, Isolate?)>{};
class M3u8Downloader {
final String m3u8Url;
final String downloadDir;
@ -29,21 +24,21 @@ class M3u8Downloader {
final int concurrentDownloads;
final Chapter chapter;
final List<Track>? subtitles;
Isolate? _isolate;
ReceivePort? _receivePort;
static var httpClient = MClient.httpClient(
settings: const ClientSettings(
throwOnStatusCode: false,
tlsSettings: TlsSettings(verifyCertificates: false),
),
);
M3u8Downloader({
required this.m3u8Url,
required this.downloadDir,
required this.fileName,
this.headers,
required this.chapter,
this.concurrentDownloads = 2,
this.concurrentDownloads = 1,
required this.subtitles,
});
@ -55,38 +50,8 @@ class M3u8Downloader {
}
void close() {
_isolate?.kill();
_receivePort?.close();
}
static Future<void> _recreateClient() async {
await RustLib.init();
httpClient = MClient.httpClient(
settings: const ClientSettings(
throwOnStatusCode: false,
tlsSettings: TlsSettings(verifyCertificates: false),
),
);
}
static Future<T> _withRetryStatic<T>(
Future<T> Function() operation,
int maxRetries,
) async {
int attempts = 0;
while (true) {
try {
attempts++;
return await operation();
} catch (e) {
if (attempts >= maxRetries) {
throw M3u8DownloaderException(
'Operation failed after $maxRetries attempts',
e,
);
}
}
}
DownloadIsolatePool.instance.cancelTask('m3u8_${chapter.id}');
isolateChapsSendPorts.remove('${chapter.id}');
}
Future<T> _withRetry<T>(Future<T> Function() operation) async {
@ -198,35 +163,30 @@ class M3u8Downloader {
int? mediaSequence,
void Function(DownloadProgress) onProgress,
) async {
_receivePort = ReceivePort();
final completer = Completer<void>();
final taskId = 'm3u8_${chapter.id}';
final errorPort = ReceivePort();
_isolate = await Isolate.spawn(
_downloadWorker,
DownloadParams(
segments: segments,
tempDir: tempDir,
key: key,
iv: iv,
mediaSequence: mediaSequence,
concurrentDownloads: concurrentDownloads,
headers: headers,
sendPort: _receivePort!.sendPort,
itemType: chapter.manga.value!.itemType,
),
onError: errorPort.sendPort,
);
isolateChapsSendPorts['${chapter.id}'] = (_receivePort, _isolate);
errorPort.listen((message) {
final stackTrace = message.last;
_log('Stack trace: $stackTrace');
_receivePort!.close();
});
await for (final message in _receivePort!) {
if (message is DownloadProgress) {
onProgress.call(message);
} else if (message is DownloadComplete) {
// Mark as active for compatibility with cancelDownloads()
isolateChapsSendPorts['${chapter.id}'] = true;
await DownloadIsolatePool.instance.submitM3u8Download(
taskId: taskId,
segments: segments,
tempDir: tempDir,
key: key,
iv: iv,
mediaSequence: mediaSequence,
concurrentDownloads: concurrentDownloads,
headers: headers,
itemType: chapter.manga.value!.itemType,
onProgress: (progress) {
onProgress(progress);
},
onComplete: () async {
// Merge the segments after downloading
await _mergeSegments(fileName, tempDir, onProgress);
// Clean up the temporary directory
if (await Directory(tempDir).exists()) {
try {
await Directory(tempDir).delete(recursive: true);
@ -234,122 +194,19 @@ class M3u8Downloader {
_log('Warning: Failed to clean up temporary directory: $e');
}
}
errorPort.close();
break;
} else if (message is Exception) {
errorPort.close();
throw message;
}
}
}
static void _downloadWorker(DownloadParams params) async {
await _recreateClient();
int completed = 0;
final total = params.segments!.length;
final queue = Queue<TsInfo>.from(params.segments!);
final List<Future<void>> activeTasks = [];
try {
while (queue.isNotEmpty || activeTasks.isNotEmpty) {
while (queue.isNotEmpty &&
activeTasks.length < params.concurrentDownloads!) {
final segment = queue.removeFirst();
final task = _processSegment(segment, params, httpClient)
.then((_) {
completed++;
params.sendPort!.send(
DownloadProgress(
segment: segment,
completed,
total,
params.itemType!,
),
);
})
.catchError((error) {
params.sendPort!.send(
M3u8DownloaderException(
'Error downloading segment ${segment.name}',
error,
),
);
throw error;
});
activeTasks.add(task);
if (!completer.isCompleted) {
completer.complete();
}
if (activeTasks.isNotEmpty) {
await Future.wait(activeTasks.toList(), eagerError: true);
activeTasks.clear();
},
onError: (error) {
if (!completer.isCompleted) {
completer.completeError(error);
}
}
},
);
params.sendPort!.send(DownloadComplete());
} catch (e) {
params.sendPort!.send(M3u8DownloaderException('Download failed', e));
} finally {
httpClient.close();
}
}
static Future<void> _processSegment(
TsInfo ts,
DownloadParams params,
Client client,
) async {
try {
final response = await _withRetryStatic(
() => client.get(Uri.parse(ts.url), headers: params.headers),
3,
);
if (response.statusCode != 200) {
throw M3u8DownloaderException('Failed to download segment: ${ts.name}');
}
final file = File(path.join('${params.tempDir}', '${ts.name}.ts'));
await file.writeAsBytes(response.bodyBytes);
if (params.key != null) {
final bytes = await file.readAsBytes();
final index = int.parse(ts.name.substringAfter("TS_"));
final decrypted = _aesDecryptStatic(
(params.mediaSequence ?? 1) + (index - 1),
bytes,
params.key!,
iv: params.iv,
);
await file.writeAsBytes(decrypted);
}
} catch (e) {
throw M3u8DownloaderException('Failed to process segment: ${ts.name}', e);
}
}
static Uint8List _aesDecryptStatic(
int sequence,
Uint8List encrypted,
Uint8List key, {
Uint8List? iv,
}) {
try {
if (iv == null) {
iv = Uint8List(16);
ByteData.view(iv.buffer).setUint64(8, sequence);
}
final encrypter = encrypt.Encrypter(
encrypt.AES(encrypt.Key(key), mode: encrypt.AESMode.cbc),
);
return Uint8List.fromList(
encrypter.decryptBytes(
encrypt.Encrypted(encrypted),
iv: encrypt.IV(iv),
),
);
} catch (e) {
throw M3u8DownloaderException('Decryption failed', e);
}
return completer.future;
}
Future<void> _mergeSegments(

View file

@ -1,41 +1,39 @@
import 'dart:collection';
import 'dart:developer';
import 'dart:io';
import 'dart:async';
import 'dart:isolate';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart';
import 'package:mangayomi/models/chapter.dart';
import 'package:mangayomi/models/manga.dart';
import 'package:mangayomi/models/page.dart';
import 'package:mangayomi/models/video.dart';
import 'package:mangayomi/services/http/m_client.dart';
import 'package:mangayomi/services/http/rhttp/src/model/settings.dart';
import 'package:mangayomi/services/download_manager/m3u8/m3u8_downloader.dart';
import 'package:mangayomi/services/download_manager/download_isolate_pool.dart';
import 'package:mangayomi/services/download_manager/m3u8/models/download.dart';
import 'package:mangayomi/src/rust/frb_generated.dart';
import 'package:path/path.dart' as path;
/// Map to allow cancellation of downloads
final isolateChapsSendPorts = <String?, dynamic>{};
class MDownloader {
List<PageUrl> pageUrls;
final int concurrentDownloads;
final Chapter chapter;
final List<Track>? subtitles;
final String? subDownloadDir;
Isolate? _isolate;
ReceivePort? _receivePort;
static var httpClient = MClient.httpClient(
settings: const ClientSettings(
throwOnStatusCode: false,
tlsSettings: TlsSettings(verifyCertificates: false),
),
);
MDownloader({
required this.chapter,
required this.pageUrls,
required this.subtitles,
required this.subDownloadDir,
this.concurrentDownloads = 2,
this.concurrentDownloads = 1,
});
void _log(String message) {
@ -44,19 +42,16 @@ class MDownloader {
}
}
void close() {
_isolate?.kill();
_receivePort?.close();
/// Initialize the Isolate pool (call once at app startup)
static Future<void> initializeIsolatePool({int poolSize = 3}) async {
DownloadIsolatePool.configure(poolSize: poolSize);
await DownloadIsolatePool.instance.initialize();
}
static Future<void> _recreateClient() async {
await RustLib.init();
httpClient = MClient.httpClient(
settings: const ClientSettings(
throwOnStatusCode: false,
tlsSettings: TlsSettings(verifyCertificates: false),
),
);
void close() {
// Cancel the task in the pool
DownloadIsolatePool.instance.cancelTask('${chapter.id}');
isolateChapsSendPorts.remove('${chapter.id}');
}
static Future<T> _withRetryStatic<T>(
@ -70,7 +65,7 @@ class MDownloader {
return await operation();
} catch (e) {
if (attempts >= maxRetries) {
throw M3u8DownloaderException(
throw MDownloaderException(
'Operation failed after $maxRetries attempts',
e,
);
@ -82,6 +77,8 @@ class MDownloader {
Future<void> download(void Function(DownloadProgress) onProgress) async {
try {
await _downloadFilesWithProgress(pageUrls, onProgress);
// Download subtitles (on the main isolate, no need for pool)
for (var element in subtitles ?? <Track>[]) {
final subtitleFile = File(
path.join('${subDownloadDir}_subtitles', '${element.label}.srt'),
@ -114,30 +111,22 @@ class MDownloader {
List<PageUrl> pageUrls,
void Function(DownloadProgress) onProgress,
) async {
_receivePort = ReceivePort();
final completer = Completer<void>();
final taskId = '${chapter.id}';
final errorPort = ReceivePort();
_isolate = await Isolate.spawn(
_downloadWorker,
DownloadParams(
pageUrls: pageUrls,
sendPort: _receivePort!.sendPort,
concurrentDownloads: concurrentDownloads,
itemType: chapter.manga.value!.itemType,
),
onError: errorPort.sendPort,
);
isolateChapsSendPorts['${chapter.id}'] = (_receivePort, _isolate);
errorPort.listen((message) {
final stackTrace = message.last;
_log('Stack trace: $stackTrace');
_receivePort!.close();
});
await for (final message in _receivePort!) {
if (message is DownloadProgress) {
onProgress.call(message);
} else if (message is DownloadComplete) {
onProgress.call(
// Mark as active for compatibility with cancelDownloads()
isolateChapsSendPorts[taskId] = true;
await DownloadIsolatePool.instance.submitFileDownload(
taskId: taskId,
pageUrls: pageUrls,
concurrentDownloads: concurrentDownloads,
itemType: chapter.manga.value!.itemType,
onProgress: (progress) {
onProgress(progress);
},
onComplete: () {
onProgress(
DownloadProgress(
1,
1,
@ -145,127 +134,18 @@ class MDownloader {
isCompleted: true,
),
);
errorPort.close();
break;
} else if (message is Exception) {
errorPort.close();
throw message;
}
}
}
static void _downloadWorker(DownloadParams params) async {
await _recreateClient();
int completed = 0;
final total = params.pageUrls!.length;
final queue = Queue<PageUrl>.from(params.pageUrls!);
final List<Future<void>> activeTasks = [];
try {
while (queue.isNotEmpty || activeTasks.isNotEmpty) {
while (queue.isNotEmpty &&
activeTasks.length < params.concurrentDownloads!) {
final pageUrl = queue.removeFirst();
final task = _processFile(pageUrl, httpClient, params)
.then((_) {
if (params.itemType! != ItemType.anime) {
completed++;
params.sendPort!.send(
DownloadProgress(
pageUrl: pageUrl,
completed,
total,
params.itemType!,
),
);
}
})
.catchError((error) {
params.sendPort!.send(
MDownloaderException(
'Error downloading ${pageUrl.fileName}',
error,
),
);
throw error;
});
activeTasks.add(task);
if (!completer.isCompleted) {
completer.complete();
}
if (activeTasks.isNotEmpty) {
await Future.wait(activeTasks.toList(), eagerError: true);
activeTasks.clear();
},
onError: (error) {
if (!completer.isCompleted) {
completer.completeError(error);
}
}
},
);
params.sendPort!.send(DownloadComplete());
} catch (e) {
params.sendPort!.send(MDownloaderException('Download failed', e));
} finally {
httpClient.close();
}
}
static Future<void> _processFile(
PageUrl pageUrl,
Client client,
DownloadParams params,
) async {
try {
if (params.itemType! != ItemType.anime) {
final response = await _withRetryStatic(
() => client.get(Uri.parse(pageUrl.url), headers: pageUrl.headers),
3,
);
if (response.statusCode != 200) {
throw MDownloaderException(
'Failed to download file: ${pageUrl.fileName!}',
);
}
final file = File(pageUrl.fileName!);
await file.writeAsBytes(response.bodyBytes);
} else {
final bytes = await _withRetryStatic(() async {
List<int> bytes = [];
var request = Request('GET', Uri.parse(pageUrl.url));
request.headers.addAll(pageUrl.headers ?? {});
StreamedResponse response = await client.send(request);
if (response.statusCode != 200) {
throw MDownloaderException(
'Failed to download file: ${pageUrl.fileName!}',
);
}
int total = response.contentLength ?? 0;
int recieved = 0;
await for (var value in response.stream) {
bytes.addAll(value);
try {
recieved += value.length;
params.sendPort!.send(
DownloadProgress(
(recieved / total * 100).toInt(),
100,
pageUrl: pageUrl,
params.itemType!,
),
);
} catch (_) {}
}
return bytes;
}, 3);
final file = File(pageUrl.fileName!);
await file.writeAsBytes(bytes);
}
} catch (e) {
throw MDownloaderException(
'Failed to process file: ${pageUrl.fileName!}',
e,
);
}
return completer.future;
}
}

View file

@ -4,7 +4,8 @@ import 'package:mangayomi/models/chapter.dart';
import 'package:mangayomi/models/download.dart';
import 'package:mangayomi/modules/manga/reader/providers/push_router.dart';
import 'package:mangayomi/modules/manga/reader/providers/reader_controller_provider.dart';
import 'package:mangayomi/services/download_manager/m3u8/m3u8_downloader.dart';
import 'package:mangayomi/services/download_manager/download_isolate_pool.dart';
import 'package:mangayomi/services/download_manager/m_downloader.dart';
extension ChapterExtension on Chapter {
Future<void> pushToReaderView(
@ -29,9 +30,13 @@ extension ChapterExtension on Chapter {
}
void cancelDownloads(int? downloadId) {
final (receivePort, isolate) = isolateChapsSendPorts['$id'] ?? (null, null);
isolate?.kill();
receivePort?.close();
// Cancel via the Isolate pool (new system)
DownloadIsolatePool.instance.cancelTask('$id');
DownloadIsolatePool.instance.cancelTask('m3u8_$id');
// Clean the map for compatibility
isolateChapsSendPorts.remove('$id');
isar.writeTxnSync(() {
isar.downloads.deleteSync(id!);
if (downloadId != null) {