fix: trakt sync rate limiting
Some checks are pending
Build and Deploy / build_windows (push) Waiting to run
Build and Deploy / build_android (push) Waiting to run
Build and Deploy / build_android_tv (push) Waiting to run
Build and Deploy / build_ipa (push) Waiting to run
Build and Deploy / build_linux (push) Waiting to run
Build and Deploy / build_macos (push) Waiting to run

This commit is contained in:
omkar 2025-01-10 06:47:57 +05:30
parent 6eeaf5b682
commit 4e58663523
6 changed files with 192 additions and 64 deletions

View file

@ -484,9 +484,6 @@ class StremioConnectionService extends BaseConnectionService {
String title = meta.name ?? item.title ?? "No title";
if (season != null) title += " S$season";
if (episode != null) title += " E$episode";
DocSource? source;
if (item.url != null) {

View file

@ -303,18 +303,33 @@ class _RenderStreamListState extends State<RenderStreamList> {
}
}
int? season;
int? episode;
if (widget.season != null) {
season = int.parse(widget.season!);
} else if ((widget.id as Meta).nextSeason != null) {
season = (widget.id as Meta).nextSeason!;
}
if (widget.episode != null) {
episode = int.parse(widget.episode!);
} else if ((widget.id as Meta).nextEpisode != null) {
episode = (widget.id as Meta).nextEpisode!;
}
final meta = (widget.id as Meta).copyWith(
nextSeason: season,
nextEpisode: episode,
);
Navigator.of(context).push(
MaterialPageRoute(
builder: (ctx) => DocViewer(
source: item.source,
service: widget.service,
meta: widget.season != null && widget.episode != null
? (widget.id as Meta).copyWith(
nextSeason: int.parse(widget.season!),
nextEpisode: int.parse(widget.episode!),
)
: widget.id,
season: widget.season,
meta: meta,
season: meta.nextSeason?.toString(),
progress: widget.progress,
),
),

View file

@ -1,10 +1,12 @@
import 'dart:async';
import 'dart:convert';
import 'package:cached_query/cached_query.dart';
import 'package:cached_storage/cached_storage.dart';
import 'package:flutter_dotenv/flutter_dotenv.dart';
import 'package:http/http.dart' as http;
import 'package:intl/intl.dart';
import 'package:logging/logging.dart';
import 'package:pocketbase/pocketbase.dart';
import '../../../engine/connection_type.dart';
@ -14,6 +16,8 @@ import '../../connections/types/stremio/stremio_base.types.dart';
import '../../settings/types/connection.dart';
class TraktService {
static final Logger _logger = Logger('TraktService');
static const String _baseUrl = 'https://api.trakt.tv';
static const String _apiVersion = '2';
@ -21,7 +25,9 @@ class TraktService {
static const int _authedGetLimit = 1000;
static const Duration _rateLimitWindow = Duration(minutes: 5);
static const Duration _cacheRevalidationInterval = Duration(minutes: 5);
static const Duration _cacheRevalidationInterval = Duration(
hours: 1,
);
static TraktService? _instance;
static TraktService? get instance => _instance;
@ -31,20 +37,41 @@ class TraktService {
int _getRequestCount = 0;
DateTime _lastRateLimitReset = DateTime.now();
final Map<String, dynamic> _cache = {};
Map<String, dynamic> _cache = {};
saveCacheToDisk() {
_logger.fine('Saving cache to disk');
CachedQuery.instance.storage?.put(
StoredQuery(
key: "trakt_integration_cache",
data: _cache,
createdAt: DateTime.now(),
),
);
}
Timer? _cacheRevalidationTimer;
clearCache() {
_logger.info('Clearing cache');
_cache.clear();
}
static ensureInitialized() async {
if (_instance != null) {
_logger.fine('Instance already initialized');
return _instance;
}
_logger.info('Initializing TraktService');
final result = await CachedQuery.instance.storage?.get(
"trakt_integration_cache",
);
AppEngine.engine.pb.authStore.onChange.listen((item) {
if (!AppEngine.engine.pb.authStore.isValid) {
_logger.info('Auth store is invalid, clearing cache');
_instance?._cache.clear();
}
});
@ -53,15 +80,20 @@ class TraktService {
await traktService.initStremioService();
_instance = traktService;
_instance?._cache = result?.data ?? {};
// Start cache revalidation timer
_instance!._startCacheRevalidation();
}
Future<BaseConnectionService> initStremioService() async {
if (stremioService != null) {
_logger.fine('StremioService already initialized');
return stremioService!;
}
_logger.info('Initializing StremioService');
final model_ =
await AppEngine.engine.pb.collection("connection").getFirstListItem(
"type.type = 'stremio_addons'",
@ -84,6 +116,7 @@ class TraktService {
final client = "" ?? DotEnv().get("trakt_client_id");
if (client == "") {
_logger.warning('Using default Trakt client ID');
return "b47864365ac88ecc253c3b0bdf1c82a619c1833e8806f702895a7e8cb06b536a";
}
@ -94,6 +127,8 @@ class TraktService {
return AppEngine.engine.pb.authStore.record!.getStringValue("trakt_token");
}
final Map<String, Completer<void>> _activeScrobbleRequests = {};
List<String> debugLogs = [];
Map<String, String> get headers => {
@ -114,13 +149,13 @@ class TraktService {
if (method == 'GET') {
if (_getRequestCount >= _authedGetLimit) {
debugLogs.add("GET rate limit exceeded");
_logger.severe('GET rate limit exceeded');
throw Exception('GET rate limit exceeded');
}
_getRequestCount++;
} else if (method == 'POST' || method == 'PUT' || method == 'DELETE') {
if (_postRequestCount >= _authedPostLimit) {
debugLogs.add("GET rate limit exceeded");
_logger.severe('POST/PUT/DELETE rate limit exceeded');
throw Exception('POST/PUT/DELETE rate limit exceeded');
}
_postRequestCount++;
@ -128,6 +163,7 @@ class TraktService {
}
void _startCacheRevalidation() {
_logger.info('Starting cache revalidation timer');
_cacheRevalidationTimer = Timer.periodic(
_cacheRevalidationInterval,
(_) async {
@ -137,6 +173,7 @@ class TraktService {
}
Future<void> _revalidateCache() async {
_logger.info('Revalidating cache');
for (final key in _cache.keys) {
final cachedData = _cache[key];
if (cachedData != null) {
@ -144,30 +181,32 @@ class TraktService {
_cache[key] = updatedData;
}
}
saveCacheToDisk();
}
Future<dynamic> _makeRequest(String url, {bool bypassCache = false}) async {
if (!bypassCache && _cache.containsKey(url)) {
_logger.fine('Returning cached data for $url');
return _cache[url];
}
await _checkRateLimit('GET');
_logger.info('Making GET request to $url');
final response = await http.get(Uri.parse(url), headers: headers);
if (response.statusCode != 200) {
debugLogs.add(
"Failed to fetch data from ${url.replaceFirst("https://api.trakt.tv/", "")} ${response.statusCode}");
_logger.severe('Failed to fetch data from $url: ${response.statusCode}');
throw Exception('Failed to fetch data from $url ${response.statusCode}');
}
debugLogs.add(
"Success calling api ${url.replaceFirst("https://api.trakt.tv/", "")}",
);
_logger.info('Successfully fetched data from $url');
final data = json.decode(response.body);
_cache[url] = data;
saveCacheToDisk();
return data;
}
@ -179,6 +218,7 @@ class TraktService {
'year': meta.year,
'ids': {
'imdb': meta.imdbId ?? meta.id,
...(meta.externalIds ?? {}),
},
},
};
@ -189,6 +229,7 @@ class TraktService {
"year": meta.year,
"ids": {
"imdb": meta.imdbId ?? meta.id,
...(meta.externalIds ?? {}),
}
},
"episode": {
@ -203,10 +244,12 @@ class TraktService {
await initStremioService();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
try {
_logger.info('Fetching up next series');
final List<dynamic> watchedShows = await _makeRequest(
'$_baseUrl/sync/watched/shows',
);
@ -240,7 +283,7 @@ class TraktService {
);
}
} catch (e) {
print('Error fetching progress for show $showId: $e');
_logger.severe('Error fetching progress for show $showId: $e');
return null;
}
@ -251,8 +294,7 @@ class TraktService {
return results.whereType<Meta>().toList();
} catch (e, stack) {
print('Error fetching up next episodes: $e');
print(stack);
_logger.severe('Error fetching up next episodes: $e', stack);
return [];
}
}
@ -261,10 +303,12 @@ class TraktService {
await initStremioService();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
try {
_logger.info('Fetching continue watching');
final continueWatching = await _makeRequest('$_baseUrl/sync/playback');
final Map<String, double> progress = {};
@ -295,6 +339,7 @@ class TraktService {
progress: movie['progress'],
);
} catch (e) {
_logger.warning('Error mapping movie: $e');
return null;
}
})
@ -318,22 +363,64 @@ class TraktService {
return returnValue;
}).toList();
} catch (e, stack) {
print('Error fetching up next movies: $e');
debugLogs.add('Error fetching up next movies: $e');
debugLogs.add(stack.toString());
print(stack);
_logger.severe('Error fetching continue watching: $e', stack);
return [];
}
}
Future<void> _retryPostRequest(
String cacheKey,
String url,
Map<String, dynamic> body, {
int retryCount = 2,
}) async {
for (int i = 0; i < retryCount; i++) {
try {
await _checkRateLimit('POST');
_logger.info('Making POST request to $url');
final response = await http.post(
Uri.parse(url),
headers: headers,
body: json.encode(body),
);
if (response.statusCode == 201) {
_logger.info('POST request successful');
return;
} else if (response.statusCode == 429) {
_logger.warning('Rate limit hit, retrying...');
await Future.delayed(Duration(seconds: 10));
continue;
} else {
_logger.severe('Failed to make POST request: ${response.statusCode}');
throw Exception(
'Failed to make POST request: ${response.statusCode}');
}
} catch (e) {
if (i == retryCount - 1) {
_logger
.severe('Failed to make POST request after $retryCount attempts');
if (_cache.containsKey(cacheKey)) {
_logger.info('Returning cached data');
return _cache[cacheKey];
}
rethrow;
}
}
}
}
Future<List<LibraryItem>> getUpcomingSchedule() async {
await initStremioService();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
try {
_logger.info('Fetching upcoming schedule');
final List<dynamic> scheduleShows = await _makeRequest(
'$_baseUrl/calendars/my/shows/${DateFormat('yyyy-MM-dd').format(DateTime.now())}/7',
);
@ -348,6 +435,7 @@ class TraktService {
id: imdb,
);
} catch (e) {
_logger.warning('Error mapping show: $e');
return null;
}
})
@ -357,8 +445,7 @@ class TraktService {
return result;
} catch (e, stack) {
print('Error fetching upcoming schedule: $e');
print(stack);
_logger.severe('Error fetching upcoming schedule: $e', stack);
return [];
}
}
@ -367,10 +454,12 @@ class TraktService {
await initStremioService();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
try {
_logger.info('Fetching watchlist');
final watchlistItems = await _makeRequest('$_baseUrl/sync/watchlist');
final result = await stremioService!.getBulkItem(
@ -392,6 +481,7 @@ class TraktService {
id: imdb,
);
} catch (e) {
_logger.warning('Error mapping watchlist item: $e');
return null;
}
})
@ -401,8 +491,7 @@ class TraktService {
return result;
} catch (e, stack) {
print('Error fetching watchlist: $e');
print(stack);
_logger.severe('Error fetching watchlist: $e', stack);
return [];
}
}
@ -411,10 +500,12 @@ class TraktService {
await initStremioService();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
try {
_logger.info('Fetching show recommendations');
final recommendedShows =
await _makeRequest('$_baseUrl/recommendations/shows');
@ -438,8 +529,7 @@ class TraktService {
return result;
} catch (e, stack) {
print('Error fetching show recommendations: $e');
print(stack);
_logger.severe('Error fetching show recommendations: $e', stack);
return [];
}
}
@ -448,10 +538,12 @@ class TraktService {
await initStremioService();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
try {
_logger.info('Fetching movie recommendations');
final recommendedMovies =
await _makeRequest('$_baseUrl/recommendations/movies');
@ -465,6 +557,7 @@ class TraktService {
id: imdb,
);
} catch (e) {
_logger.warning('Error mapping movie: $e');
return null;
}
})
@ -474,8 +567,7 @@ class TraktService {
return result;
} catch (e, stack) {
print('Error fetching movie recommendations: $e');
print(stack);
_logger.severe('Error fetching movie recommendations: $e', stack);
return [];
}
}
@ -488,6 +580,7 @@ class TraktService {
.toList();
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
@ -501,9 +594,11 @@ class TraktService {
}
Future<int?> getTraktIdForMovie(String imdb) async {
_logger.info('Fetching Trakt ID for movie with IMDb ID: $imdb');
final body = await _makeRequest("$_baseUrl/search/imdb/$imdb");
if (body.isEmpty) {
_logger.warning('No Trakt ID found for IMDb ID: $imdb');
return null;
}
@ -525,12 +620,14 @@ class TraktService {
required double progress,
}) async {
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return;
}
await _checkRateLimit('POST');
try {
_logger.info('Starting scrobbling for ${meta.type} with ID: ${meta.id}');
final response = await http.post(
Uri.parse('$_baseUrl/scrobble/start'),
headers: headers,
@ -541,16 +638,15 @@ class TraktService {
);
if (response.statusCode != 201) {
print(response.statusCode);
print(response.body);
_logger.severe('Failed to start scrobbling: ${response.statusCode}');
throw Exception('Failed to start scrobbling');
}
_logger.info('Scrobbling started successfully');
_cache.remove('$_baseUrl/sync/watched/shows');
_cache.remove('$_baseUrl/sync/playback');
} catch (e, stack) {
print('Error starting scrobbling: $e');
print(stack);
_logger.severe('Error starting scrobbling: $e', stack);
rethrow;
}
}
@ -560,28 +656,30 @@ class TraktService {
required double progress,
}) async {
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return;
}
await _checkRateLimit('POST');
final cacheKey = '${meta.id}_pauseScrobbling';
_activeScrobbleRequests[cacheKey]?.completeError('Cancelled');
_activeScrobbleRequests[cacheKey] = Completer<void>();
try {
final response = await http.post(
Uri.parse('$_baseUrl/scrobble/pause'),
headers: headers,
body: json.encode({
_logger.info('Pausing scrobbling for ${meta.type} with ID: ${meta.id}');
await _retryPostRequest(
cacheKey,
'$_baseUrl/scrobble/pause',
{
'progress': progress,
..._buildObjectForMeta(meta),
}),
},
);
if (response.statusCode != 201) {
throw Exception('Failed to pause scrobbling');
}
} catch (e, stack) {
print('Error pausing scrobbling: $e');
print(stack);
_logger.severe('Error pausing scrobbling: $e', stack);
rethrow;
} finally {
_activeScrobbleRequests.remove(cacheKey);
}
}
@ -590,36 +688,39 @@ class TraktService {
required double progress,
}) async {
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return;
}
await _checkRateLimit('POST');
final cacheKey = '${meta.id}_stopScrobbling';
_activeScrobbleRequests[cacheKey]?.completeError('Cancelled');
_activeScrobbleRequests[cacheKey] = Completer<void>();
try {
final response = await http.post(
Uri.parse('$_baseUrl/scrobble/stop'),
headers: headers,
body: json.encode({
_logger.info('Stopping scrobbling for ${meta.type} with ID: ${meta.id}');
await _retryPostRequest(
cacheKey,
'$_baseUrl/scrobble/stop',
{
'progress': progress,
..._buildObjectForMeta(meta),
}),
},
);
if (response.statusCode != 201) {
throw Exception('Failed to stop scrobbling');
}
_cache.remove('$_baseUrl/sync/watched/shows');
_cache.remove('$_baseUrl/sync/playback');
} catch (e, stack) {
print('Error stopping scrobbling: $e');
print(stack);
_logger.severe('Error stopping scrobbling: $e', stack);
rethrow;
} finally {
_activeScrobbleRequests.remove(cacheKey);
}
}
Future<List<TraktProgress>> getProgress(Meta meta) async {
if (!isEnabled()) {
_logger.info('Trakt integration is not enabled');
return [];
}
@ -684,7 +785,7 @@ class TraktService {
}
}
} catch (e) {
print(e);
_logger.severe('Error fetching progress: $e');
return [];
}

View file

@ -8,6 +8,7 @@ import 'package:flutter_dotenv/flutter_dotenv.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:go_router/go_router.dart';
import 'package:google_fonts/google_fonts.dart';
import 'package:logging/logging.dart';
import 'package:madari_client/engine/engine.dart';
import 'package:madari_client/features/doc_viewer/container/doc_viewer.dart';
import 'package:madari_client/features/doc_viewer/types/doc_source.dart';
@ -28,6 +29,18 @@ void main() async {
print("Unable");
}
Logger.root.level = Level.ALL;
Logger.root.onRecord.listen((record) {
print('${record.level.name}: ${record.time}: ${record.message}');
if (record.error != null) {
print('Error: ${record.error}');
}
if (record.stackTrace != null) {
print('StackTrace: ${record.stackTrace}');
}
});
try {
CachedQuery.instance.configFlutter(
storage: await CachedStorage.ensureInitialized(),

View file

@ -57,6 +57,7 @@ class _HomeTabPageState extends State<HomeTabPage> {
}
Future<void> _onRefresh() async {
TraktService.instance?.clearCache();
final List<Future> promises = [];
for (final item in traktLibraries) {
final state = _getKey(traktLibraries.indexOf(item)).currentState;

View file

@ -31,6 +31,7 @@ dependencies:
media_kit_libs_video: ^1.0.5
cached_query: ^2.2.1
cached_query_flutter: ^2.5.1
logging: ^1.3.0
cached_storage: ^2.0.14
provider: ^6.1.2
flutter_riverpod: ^2.6.1