diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt index 1b4c486d..efb9d71b 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt @@ -48,6 +48,7 @@ import com.nuvio.app.features.watched.WatchedRepository import com.nuvio.app.features.watchprogress.CachedInProgressItem import com.nuvio.app.features.watchprogress.CachedNextUpItem import com.nuvio.app.features.watchprogress.ContinueWatchingEnrichmentCache +import com.nuvio.app.features.watchprogress.ContinueWatchingLimit import com.nuvio.app.features.watchprogress.CurrentDateProvider import com.nuvio.app.features.watchprogress.ContinueWatchingPreferencesRepository import com.nuvio.app.features.watchprogress.ContinueWatchingItem @@ -81,6 +82,7 @@ import kotlinx.coroutines.sync.withPermit import com.nuvio.app.features.home.components.ContinueWatchingLayout import com.nuvio.app.features.home.components.homeSectionHorizontalPaddingForWidth import com.nuvio.app.features.home.components.rememberContinueWatchingLayout +import kotlinx.coroutines.CancellationException import nuvio.composeapp.generated.resources.* import org.jetbrains.compose.resources.stringResource @@ -371,6 +373,9 @@ fun HomeScreen( val enabledAddons = remember(addonsUiState.addons) { addonsUiState.addons.enabledAddons() } + val isRefreshingEnabledAddons = remember(enabledAddons) { + enabledAddons.any { addon -> addon.isRefreshing } + } val availableManifests = remember(enabledAddons) { enabledAddons.mapNotNull { addon -> addon.manifest } } @@ -413,8 +418,10 @@ fun HomeScreen( metaProviderKey, continueWatchingPreferences.showUnairedNextUp, continueWatchingPreferences.upNextFromFurthestEpisode, + isRefreshingEnabledAddons, watchProgressUiState.entries, watchedUiState.items, + watchedUiState.isLoaded, ) { if (completedSeriesCandidates.isEmpty()) { nextUpItemsBySeries = emptyMap() @@ -422,6 +429,14 @@ fun HomeScreen( return@LaunchedEffect } + if (!isTraktProgressActive && !watchedUiState.isLoaded) { + return@LaunchedEffect + } + + if (isRefreshingEnabledAddons) { + return@LaunchedEffect + } + val cachedResolvedNextUpItems = completedSeriesCandidates.mapNotNull { candidate -> val cached = cachedNextUpItems[candidate.content.id] ?: return@mapNotNull null val item = cached.second @@ -436,6 +451,7 @@ fun HomeScreen( val candidatesToResolve = completedSeriesCandidates.filter { candidate -> candidate.content.id !in cachedResolvedNextUpItems } + val resolutionCandidates = candidatesToResolve.take(NEXT_UP_INITIAL_RESOLUTION_LIMIT) if (candidatesToResolve.isEmpty()) { nextUpItemsBySeries = cachedResolvedNextUpItems processedNextUpContentIds = completedSeriesCandidates.mapTo(mutableSetOf()) { candidate -> @@ -454,46 +470,54 @@ fun HomeScreen( } val todayIsoDate = CurrentDateProvider.todayIsoDate() - val semaphore = Semaphore(4) - val freshResults = candidatesToResolve.map { completedEntry -> - async { - semaphore.withPermit { - val meta = MetaDetailsRepository.fetch( - type = completedEntry.content.type, - id = completedEntry.content.id, - ) - if (meta == null) { - return@withPermit null + val semaphore = Semaphore(NEXT_UP_RESOLUTION_CONCURRENCY) + val freshResults = mutableMapOf>() + val processedFreshContentIds = mutableSetOf() + val candidateBatches = resolutionCandidates.chunked(NEXT_UP_RESOLUTION_BATCH_SIZE) + + for (batch in candidateBatches) { + val batchResults = batch.map { completedEntry -> + async { + semaphore.withPermit { + resolveHomeNextUpCandidate( + completedEntry = completedEntry, + watchProgressEntries = watchProgressUiState.entries, + watchedItems = watchedUiState.items, + todayIsoDate = todayIsoDate, + preferFurthestEpisode = continueWatchingPreferences.upNextFromFurthestEpisode, + showUnairedNextUp = continueWatchingPreferences.showUnairedNextUp, + dismissedNextUpKeys = continueWatchingPreferences.dismissedNextUpKeys, + ) } - val action = meta.seriesPrimaryAction( - content = completedEntry.content, - entries = watchProgressUiState.entries, - watchedItems = watchedUiState.items, - todayIsoDate = todayIsoDate, - preferFurthestEpisode = continueWatchingPreferences.upNextFromFurthestEpisode, - showUnairedNextUp = continueWatchingPreferences.showUnairedNextUp, - ) - if (action?.resumePositionMs != null) { - return@withPermit null - } - val nextEpisode = action?.let { meta.videoForSeriesAction(it) } - if (nextEpisode == null) { - return@withPermit null - } - val item = completedEntry.toContinueWatchingSeed(meta) - .toUpNextContinueWatchingItem(nextEpisode) - if (nextUpDismissKey(item.parentMetaId, item.nextUpSeedSeasonNumber, item.nextUpSeedEpisodeNumber) in continueWatchingPreferences.dismissedNextUpKeys) { - return@withPermit null - } - completedEntry.content.id to (completedEntry.markedAtEpochMs to item) } + }.awaitAll() + batch.forEach { candidate -> processedFreshContentIds += candidate.content.id } + + val resolvedBeforeBatch = freshResults.size + batchResults.filterNotNull().forEach { (contentId, item) -> + freshResults[contentId] = item } - }.awaitAll().filterNotNull().toMap() + val batchResolvedCount = freshResults.size - resolvedBeforeBatch + if (batchResolvedCount > 0) { + val progressiveResults = cachedResolvedNextUpItems + freshResults + nextUpItemsBySeries = progressiveResults + processedNextUpContentIds = ( + cachedResolvedNextUpItems.keys + + processedFreshContentIds + ).toSet() + } + + if (cachedResolvedNextUpItems.size + freshResults.size >= ContinueWatchingLimit) { + break + } + } + val results = cachedResolvedNextUpItems + freshResults nextUpItemsBySeries = results - processedNextUpContentIds = completedSeriesCandidates.mapTo(mutableSetOf()) { candidate -> - candidate.content.id - } + processedNextUpContentIds = ( + cachedResolvedNextUpItems.keys + + processedFreshContentIds + ).toSet() saveContinueWatchingSnapshots( nextUpItemsBySeries = results, @@ -730,6 +754,9 @@ fun HomeScreen( private const val HOME_CATALOG_PREVIEW_LIMIT = 18 private const val MILLIS_PER_DAY = 24L * 60L * 60L * 1000L private const val OPTIMISTIC_NEXT_UP_SEED_WINDOW_MS = 3L * 60L * 1000L +private const val NEXT_UP_INITIAL_RESOLUTION_LIMIT = ContinueWatchingLimit * 2 +private const val NEXT_UP_RESOLUTION_CONCURRENCY = 8 +private const val NEXT_UP_RESOLUTION_BATCH_SIZE = NEXT_UP_RESOLUTION_CONCURRENCY internal fun filterEntriesForTraktContinueWatchingWindow( entries: List, @@ -824,6 +851,49 @@ internal fun filterNextUpItemsByCurrentSeeds( item.nextUpSeedEpisodeNumber == currentSeed.second } +private suspend fun resolveHomeNextUpCandidate( + completedEntry: CompletedSeriesCandidate, + watchProgressEntries: List, + watchedItems: List, + todayIsoDate: String, + preferFurthestEpisode: Boolean, + showUnairedNextUp: Boolean, + dismissedNextUpKeys: Set, +): Pair>? { + val contentId = completedEntry.content.id + val meta = try { + MetaDetailsRepository.fetch( + type = completedEntry.content.type, + id = contentId, + ) + } catch (error: Throwable) { + if (error is CancellationException) throw error + null + } + if (meta == null) return null + + val action = meta.seriesPrimaryAction( + content = completedEntry.content, + entries = watchProgressEntries, + watchedItems = watchedItems, + todayIsoDate = todayIsoDate, + preferFurthestEpisode = preferFurthestEpisode, + showUnairedNextUp = showUnairedNextUp, + ) + if (action == null) return null + if (action.resumePositionMs != null) return null + + val nextEpisode = meta.videoForSeriesAction(action) + if (nextEpisode == null) return null + val item = completedEntry.toContinueWatchingSeed(meta) + .toUpNextContinueWatchingItem(nextEpisode) + if (nextUpDismissKey(item.parentMetaId, item.nextUpSeedSeasonNumber, item.nextUpSeedEpisodeNumber) in dismissedNextUpKeys) { + return null + } + + return contentId to (completedEntry.markedAtEpochMs to item) +} + private fun MetaDetails.videoForSeriesAction(action: SeriesPrimaryAction): MetaVideo? { if (action.seasonNumber != null && action.episodeNumber != null) { videos.firstOrNull { video -> diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/ContinueWatchingEnrichmentCache.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/ContinueWatchingEnrichmentCache.kt index 19d6c046..c247eeda 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/ContinueWatchingEnrichmentCache.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/ContinueWatchingEnrichmentCache.kt @@ -70,7 +70,9 @@ internal object ContinueWatchingEnrichmentCache { fun getSnapshots(): Pair, List> { val payload = loadPayload() - return (payload?.nextUp ?: emptyList()) to (payload?.inProgress ?: emptyList()) + val nextUp = payload?.nextUp ?: emptyList() + val inProgress = payload?.inProgress ?: emptyList() + return nextUp to inProgress } fun saveSnapshots( @@ -80,7 +82,9 @@ internal object ContinueWatchingEnrichmentCache { ) { val payload = CachedEnrichmentPayload(nextUp = nextUp, inProgress = inProgress) val payloadHash = payload.hashCode() - if (!force && lastPayloadHash == payloadHash) return + if (!force && lastPayloadHash == payloadHash) { + return + } val encoded = runCatching { json.encodeToString(payload) diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt index 8f4569b3..96e5bf6d 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt @@ -4,6 +4,7 @@ import co.touchlab.kermit.Logger import com.nuvio.app.core.auth.AuthRepository import com.nuvio.app.core.auth.AuthState import com.nuvio.app.features.addons.AddonRepository +import com.nuvio.app.features.details.MetaDetails import com.nuvio.app.features.details.MetaDetailsRepository import com.nuvio.app.features.player.PlayerPlaybackSnapshot import com.nuvio.app.features.profiles.ProfileRepository @@ -20,15 +21,28 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.withTimeoutOrNull private const val NUVIO_SYNC_PERIODIC_INTERVAL_MS = 5L * 60L * 1000L +private const val WATCH_PROGRESS_METADATA_RESOLUTION_CONCURRENCY = 4 + +private data class RemoteMetadataResolutionResult( + val key: Pair, + val entries: List, + val meta: MetaDetails?, +) object WatchProgressRepository { private val syncScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) @@ -189,6 +203,11 @@ object WatchProgressRepository { sinceLastWatched = sinceLastWatched, ) val isIncrementalPull = sinceLastWatched != null + if (isIncrementalPull && serverEntries.isEmpty()) { + hasLoaded = true + hasCompletedInitialNuvioSyncPull = true + return@runCatching + } val oldLocal = entriesByVideoId.toMap() val newMap = if (isIncrementalPull) { entriesByVideoId.toMutableMap() @@ -245,8 +264,10 @@ object WatchProgressRepository { ) private fun resolveRemoteMetadata() { - val needsResolution = entriesByVideoId.values + val missingMetadataEntries = entriesByVideoId.values .filter { it.poster.isNullOrBlank() || it.background.isNullOrBlank() } + val entriesToResolve = missingMetadataEntries.continueWatchingEntries(limit = ContinueWatchingLimit) + val needsResolution = entriesToResolve .groupBy { it.parentMetaId to it.contentType } if (needsResolution.isEmpty()) { @@ -262,41 +283,79 @@ object WatchProgressRepository { return@launch } - for ((key, entries) in needsResolution) { - val (metaId, metaType) = key - val meta = runCatching { - MetaDetailsRepository.fetch(metaType, metaId) - }.getOrNull() + var resolvedEntries = 0 + val semaphore = Semaphore(WATCH_PROGRESS_METADATA_RESOLUTION_CONCURRENCY) + val resolutionResults = coroutineScope { + needsResolution.map { (key, entries) -> + async { + semaphore.withPermit { + fetchRemoteMetadataGroup(key = key, entries = entries) + } + } + }.awaitAll() + } + + for (result in resolutionResults) { + ensureActive() + val meta = result.meta if (meta == null) { continue } - for (entry in entries) { - val episodeVideo = if (entry.seasonNumber != null && entry.episodeNumber != null) { + var appliedEntries = 0 + for (entry in result.entries) { + val current = entriesByVideoId[entry.videoId] ?: continue + val episodeVideo = if (current.seasonNumber != null && current.episodeNumber != null) { meta.videos.find { v -> - v.season == entry.seasonNumber && v.episode == entry.episodeNumber + v.season == current.seasonNumber && v.episode == current.episodeNumber } } else null - entriesByVideoId[entry.videoId] = entry.copy( + entriesByVideoId[current.videoId] = current.copy( title = meta.name, poster = meta.poster, background = meta.background, logo = meta.logo, - episodeTitle = episodeVideo?.title ?: entry.episodeTitle, - episodeThumbnail = episodeVideo?.thumbnail ?: entry.episodeThumbnail, + episodeTitle = episodeVideo?.title ?: current.episodeTitle, + episodeThumbnail = episodeVideo?.thumbnail ?: current.episodeThumbnail, pauseDescription = episodeVideo?.overview ?: meta.description - ?: entry.pauseDescription, + ?: current.pauseDescription, ) + appliedEntries += 1 + } + if (appliedEntries == 0) { + continue } - publish() + resolvedEntries += appliedEntries + } + if (resolvedEntries > 0) { + publish() + persist() } - persist() } } + private suspend fun fetchRemoteMetadataGroup( + key: Pair, + entries: List, + ): RemoteMetadataResolutionResult { + val (metaId, metaType) = key + val meta = try { + MetaDetailsRepository.fetch(metaType, metaId) + } catch (error: CancellationException) { + throw error + } catch (_: Throwable) { + null + } + return RemoteMetadataResolutionResult( + key = key, + entries = entries, + meta = meta, + ) + } + fun upsertPlaybackProgress( session: WatchProgressPlaybackSession, snapshot: PlayerPlaybackSnapshot, @@ -494,13 +553,14 @@ object WatchProgressRepository { private fun publish() { val entries = currentEntries() val sortedEntries = entries.sortedByDescending { it.lastUpdatedEpochMs } + val hasLoadedRemoteProgress = if (shouldUseTraktProgress()) { + TraktProgressRepository.uiState.value.hasLoadedRemoteProgress + } else { + hasLoaded + } _uiState.value = WatchProgressUiState( entries = sortedEntries, - hasLoadedRemoteProgress = if (shouldUseTraktProgress()) { - TraktProgressRepository.uiState.value.hasLoadedRemoteProgress - } else { - hasLoaded - }, + hasLoadedRemoteProgress = hasLoadedRemoteProgress, ) } diff --git a/scripts/nuvio_debug_logs.sh b/scripts/nuvio_debug_logs.sh index 8ae32969..020e7bec 100755 --- a/scripts/nuvio_debug_logs.sh +++ b/scripts/nuvio_debug_logs.sh @@ -128,7 +128,14 @@ fi # ── Colour-coding function ────────────────────────────────────────────────── colorize_line() { local line="$1" - local level="${line:0:1}" + local level="" + if [[ "$line" =~ ^[[:space:]]*[0-9]{2}-[0-9]{2}[[:space:]]+[0-9:.]+[[:space:]]+([VDIWEF])/ ]]; then + level="${BASH_REMATCH[1]}" + elif [[ "$line" =~ ^([VDIWEF])/ ]]; then + level="${BASH_REMATCH[1]}" + else + level="${line:0:1}" + fi local clr="" case "$level" in V) clr="$CLR_V" ;; @@ -160,7 +167,7 @@ stream_logcat_colored() { local value="$2" local noise_re="$NOISE_TAGS" - "${ADB[@]}" logcat -v brief "$mode" "$value" 2>/dev/null \ + "${ADB[@]}" logcat -v time "$mode" "$value" 2>/dev/null \ | while IFS= read -r raw_line; do if [[ "$raw_line" =~ $noise_re ]]; then continue