diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt index aa4be057..5df53b1c 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/home/HomeScreen.kt @@ -14,6 +14,7 @@ import androidx.compose.ui.Modifier import androidx.compose.ui.unit.Dp import androidx.compose.ui.unit.dp import androidx.lifecycle.compose.collectAsStateWithLifecycle +import co.touchlab.kermit.Logger import com.nuvio.app.core.network.NetworkCondition import com.nuvio.app.core.network.NetworkStatusRepository import com.nuvio.app.core.ui.LocalNuvioBottomNavigationOverlayPadding @@ -35,6 +36,7 @@ import com.nuvio.app.features.trakt.TRAKT_CONTINUE_WATCHING_DAYS_CAP_ALL import com.nuvio.app.features.trakt.TraktSettingsRepository import com.nuvio.app.features.trakt.normalizeTraktContinueWatchingDaysCap import com.nuvio.app.features.trakt.shouldUseTraktProgress +import com.nuvio.app.features.watched.WatchedItem import com.nuvio.app.features.watched.WatchedRepository import com.nuvio.app.features.watchprogress.CachedInProgressItem import com.nuvio.app.features.watchprogress.CachedNextUpItem @@ -45,13 +47,19 @@ import com.nuvio.app.features.watchprogress.ContinueWatchingItem import com.nuvio.app.features.watchprogress.ContinueWatchingSortMode import com.nuvio.app.features.watchprogress.isSeriesTypeForContinueWatching import com.nuvio.app.features.watchprogress.nextUpDismissKey +import com.nuvio.app.features.watchprogress.shouldTreatAsInProgressForContinueWatching +import com.nuvio.app.features.watchprogress.shouldUseAsCompletedSeedForContinueWatching import com.nuvio.app.features.watchprogress.WatchProgressClock import com.nuvio.app.features.watchprogress.WatchProgressEntry import com.nuvio.app.features.watchprogress.WatchProgressRepository +import com.nuvio.app.features.watchprogress.WatchProgressSourceLocal +import com.nuvio.app.features.watchprogress.WatchProgressSourceTraktHistory +import com.nuvio.app.features.watchprogress.WatchProgressSourceTraktPlayback +import com.nuvio.app.features.watchprogress.WatchProgressSourceTraktShowProgress import com.nuvio.app.features.watchprogress.buildContinueWatchingEpisodeSubtitle +import com.nuvio.app.features.watchprogress.continueWatchingEntries import com.nuvio.app.features.watchprogress.toContinueWatchingItem import com.nuvio.app.features.watchprogress.toUpNextContinueWatchingItem -import com.nuvio.app.features.watching.application.WatchingState import com.nuvio.app.features.watching.domain.WatchingContentRef import com.nuvio.app.features.watching.domain.isReleasedBy import com.nuvio.app.features.collection.CollectionRepository @@ -164,46 +172,100 @@ fun HomeScreen( if (isTraktProgressActive) emptyList() else watchedUiState.items } - val latestCompletedBySeries = remember(effectiveWatchProgressEntries, effectiveWatchedItems, continueWatchingPreferences.upNextFromFurthestEpisode) { - WatchingState.latestCompletedBySeries( - progressEntries = effectiveWatchProgressEntries, + val allNextUpSeedEntries = remember( + watchProgressUiState.entries, + effectiveWatchedItems, + isTraktProgressActive, + continueWatchingPreferences.upNextFromFurthestEpisode, + ) { + buildTvParityNextUpSeedEntries( + progressEntries = watchProgressUiState.entries, watchedItems = effectiveWatchedItems, + isTraktProgressActive = isTraktProgressActive, preferFurthestEpisode = continueWatchingPreferences.upNextFromFurthestEpisode, + nowEpochMs = WatchProgressClock.nowEpochMs(), ) } - val completedSeriesCandidates = remember(latestCompletedBySeries) { - latestCompletedBySeries.map { (content, completed) -> + + val recentNextUpSeedEntries = remember( + allNextUpSeedEntries, + isTraktProgressActive, + traktSettingsUiState.continueWatchingDaysCap, + ) { + filterEntriesForTraktContinueWatchingWindow( + entries = allNextUpSeedEntries, + isTraktProgressActive = isTraktProgressActive, + daysCap = traktSettingsUiState.continueWatchingDaysCap, + nowEpochMs = WatchProgressClock.nowEpochMs(), + ) + } + + val activeNextUpSeedContentIds = remember(allNextUpSeedEntries) { + allNextUpSeedEntries.mapTo(mutableSetOf()) { entry -> entry.parentMetaId } + } + + val currentNextUpSeedByContentId = remember(allNextUpSeedEntries) { + allNextUpSeedEntries.mapNotNull { entry -> + val season = entry.seasonNumber ?: return@mapNotNull null + val episode = entry.episodeNumber ?: return@mapNotNull null + entry.parentMetaId to (season to episode) + }.toMap() + } + + val visibleContinueWatchingEntries = remember(effectiveWatchProgressEntries) { + effectiveWatchProgressEntries.continueWatchingEntries() + } + + val latestCompletedAtBySeries = remember(allNextUpSeedEntries) { + allNextUpSeedEntries + .groupBy { entry -> entry.parentMetaId } + .mapValues { (_, entries) -> entries.maxOfOrNull { entry -> entry.lastUpdatedEpochMs } ?: Long.MIN_VALUE } + } + + val nextUpSuppressedSeriesIds = remember(visibleContinueWatchingEntries, latestCompletedAtBySeries) { + visibleContinueWatchingEntries + .asSequence() + .filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() } + .filter { entry -> + shouldTreatAsActiveInProgressForNextUpSuppression( + progress = entry, + latestCompletedAt = latestCompletedAtBySeries[entry.parentMetaId], + ) + } + .map { entry -> entry.parentMetaId } + .filter(String::isNotBlank) + .toSet() + } + + val completedSeriesCandidates = remember(recentNextUpSeedEntries, nextUpSuppressedSeriesIds) { + recentNextUpSeedEntries.mapNotNull { seed -> + val season = seed.seasonNumber ?: return@mapNotNull null + val episode = seed.episodeNumber ?: return@mapNotNull null + if (season == 0 || seed.parentMetaId in nextUpSuppressedSeriesIds) return@mapNotNull null CompletedSeriesCandidate( - content = content, - seasonNumber = completed.seasonNumber, - episodeNumber = completed.episodeNumber, - markedAtEpochMs = completed.markedAtEpochMs, + content = WatchingContentRef(type = seed.parentMetaType, id = seed.parentMetaId), + seasonNumber = season, + episodeNumber = episode, + markedAtEpochMs = seed.lastUpdatedEpochMs, ) } } - val completedSeriesContentIds = remember(completedSeriesCandidates) { - completedSeriesCandidates.mapTo(mutableSetOf()) { candidate -> candidate.content.id } - } - val visibleContinueWatchingEntries = remember( - effectiveWatchProgressEntries, - latestCompletedBySeries, - ) { - WatchingState.visibleContinueWatchingEntries( - progressEntries = effectiveWatchProgressEntries, - latestCompletedBySeries = latestCompletedBySeries, - ) - } val profileState by ProfileRepository.state.collectAsStateWithLifecycle() val activeProfileId = profileState.activeProfile?.profileIndex ?: 1 var nextUpItemsBySeries by remember(activeProfileId) { mutableStateOf>>(emptyMap()) } + var processedNextUpContentIds by remember(activeProfileId) { mutableStateOf>(emptySet()) } val cachedSnapshots = remember(activeProfileId) { ContinueWatchingEnrichmentCache.getSnapshots() } val cachedNextUpItems = remember( cachedSnapshots.first, continueWatchingPreferences.dismissedNextUpKeys, - completedSeriesContentIds, + activeNextUpSeedContentIds, + currentNextUpSeedByContentId, isTraktProgressActive, + watchProgressUiState.hasLoadedRemoteProgress, + processedNextUpContentIds, + nextUpItemsBySeries, continueWatchingPreferences.showUnairedNextUp, watchedUiState.isLoaded, ) { @@ -211,7 +273,32 @@ fun HomeScreen( if ( !isTraktProgressActive && watchedUiState.isLoaded && - cached.contentId !in completedSeriesContentIds + cached.contentId !in activeNextUpSeedContentIds + ) { + return@mapNotNull null + } + if ( + isTraktProgressActive && + watchProgressUiState.hasLoadedRemoteProgress && + cached.contentId !in activeNextUpSeedContentIds + ) { + return@mapNotNull null + } + val currentSeed = currentNextUpSeedByContentId[cached.contentId] + if ( + currentSeed != null && + cached.seedSeason != null && + cached.seedEpisode != null + ) { + val (currentSeason, currentEpisode) = currentSeed + val seedChanged = currentSeason != cached.seedSeason || currentEpisode != cached.seedEpisode + if (seedChanged) return@mapNotNull null + } + if ( + isTraktProgressActive && + watchProgressUiState.hasLoadedRemoteProgress && + cached.contentId in processedNextUpContentIds && + cached.contentId !in nextUpItemsBySeries.keys ) { return@mapNotNull null } @@ -257,16 +344,55 @@ fun HomeScreen( visibleContinueWatchingEntries, cachedInProgressItems, effectivNextUpItems, + nextUpSuppressedSeriesIds, continueWatchingPreferences.sortMode, ) { buildHomeContinueWatchingItems( visibleEntries = visibleContinueWatchingEntries, cachedInProgressByVideoId = cachedInProgressItems, nextUpItemsBySeries = effectivNextUpItems, + nextUpSuppressedSeriesIds = nextUpSuppressedSeriesIds, sortMode = continueWatchingPreferences.sortMode, todayIsoDate = CurrentDateProvider.todayIsoDate(), ) } + LaunchedEffect( + isTraktProgressActive, + traktSettingsUiState.continueWatchingDaysCap, + watchProgressUiState.hasLoadedRemoteProgress, + continueWatchingPreferences.upNextFromFurthestEpisode, + watchProgressUiState.entries, + effectiveWatchProgressEntries, + allNextUpSeedEntries, + recentNextUpSeedEntries, + nextUpSuppressedSeriesIds, + visibleContinueWatchingEntries, + completedSeriesCandidates, + cachedInProgressItems, + cachedNextUpItems, + nextUpItemsBySeries, + processedNextUpContentIds, + effectivNextUpItems, + continueWatchingItems, + ) { + homeCwLog.d { + "build summary source=${if (isTraktProgressActive) "trakt" else "nuvio_sync"} " + + "remoteLoaded=${watchProgressUiState.hasLoadedRemoteProgress} " + + "daysCap=${traktSettingsUiState.continueWatchingDaysCap} " + + "raw=${watchProgressUiState.entries.size} rawSources=${watchProgressUiState.entries.debugSourceCounts()} " + + "effective=${effectiveWatchProgressEntries.size} seedAll=${allNextUpSeedEntries.size} " + + "seedRecent=${recentNextUpSeedEntries.size} seedSuppressed=${nextUpSuppressedSeriesIds.size} " + + "useFurthest=${continueWatchingPreferences.upNextFromFurthestEpisode} " + + "visibleInProgress=${visibleContinueWatchingEntries.size} " + + "completedCandidates=${completedSeriesCandidates.size} cachedInProgress=${cachedInProgressItems.size} " + + "cachedNextUp=${cachedNextUpItems.size} liveNextUp=${nextUpItemsBySeries.size} " + + "processedNextUp=${processedNextUpContentIds.size} " + + "effectiveNextUp=${effectivNextUpItems.size} final=${continueWatchingItems.size} " + + "rawItems=${watchProgressUiState.entries.debugWatchProgressSummary()} " + + "completed=${completedSeriesCandidates.debugCompletedSeriesSummary()} " + + "finalItems=${continueWatchingItems.debugContinueWatchingSummary()}" + } + } val availableManifests = remember(addonsUiState.addons) { addonsUiState.addons.mapNotNull { addon -> addon.manifest } } @@ -308,37 +434,75 @@ fun HomeScreen( continueWatchingPreferences.showUnairedNextUp, ) { if (completedSeriesCandidates.isEmpty()) { + homeCwLog.d { + "next-up resolve skipped: no completed series candidates " + + "entries=${effectiveWatchProgressEntries.size} sources=${effectiveWatchProgressEntries.debugSourceCounts()}" + } nextUpItemsBySeries = emptyMap() + processedNextUpContentIds = emptySet() return@LaunchedEffect } - if (metaProviderKey.isEmpty()) return@LaunchedEffect + if (metaProviderKey.isEmpty()) { + homeCwLog.d { + "next-up resolve deferred: no meta providers candidates=${completedSeriesCandidates.size} " + + "candidates=${completedSeriesCandidates.debugCompletedSeriesSummary()}" + } + return@LaunchedEffect + } val todayIsoDate = CurrentDateProvider.todayIsoDate() val semaphore = Semaphore(4) + homeCwLog.d { + "next-up resolve start candidates=${completedSeriesCandidates.size} " + + "showUnaired=${continueWatchingPreferences.showUnairedNextUp} " + + "metaProviders=${metaProviderKey.size} candidates=${completedSeriesCandidates.debugCompletedSeriesSummary()}" + } val results = completedSeriesCandidates.map { completedEntry -> async { semaphore.withPermit { val meta = MetaDetailsRepository.fetch( type = completedEntry.content.type, id = completedEntry.content.id, - ) ?: return@withPermit null + ) + if (meta == null) { + homeCwLog.d { + "next-up meta miss content=${completedEntry.debugSummary()} " + + "type=${completedEntry.content.type} id=${completedEntry.content.id}" + } + return@withPermit null + } val nextEpisode = meta.nextReleasedEpisodeAfter( seasonNumber = completedEntry.seasonNumber, episodeNumber = completedEntry.episodeNumber, todayIsoDate = todayIsoDate, showUnairedNextUp = continueWatchingPreferences.showUnairedNextUp, - ) ?: return@withPermit null + ) + if (nextEpisode == null) { + homeCwLog.d { + "next-up no next episode content=${completedEntry.debugSummary()} " + + "videos=${meta.videos.size}" + } + return@withPermit null + } val item = completedEntry.toContinueWatchingSeed(meta) .toUpNextContinueWatchingItem(nextEpisode) if (nextUpDismissKey(item.parentMetaId, item.nextUpSeedSeasonNumber, item.nextUpSeedEpisodeNumber) in continueWatchingPreferences.dismissedNextUpKeys) { + homeCwLog.d { "next-up dismissed item=${item.debugSummary()}" } return@withPermit null } + homeCwLog.d { + "next-up built seed=${completedEntry.debugSummary()} item=${item.debugSummary()} " + + "released=${nextEpisode.released}" + } completedEntry.content.id to (completedEntry.markedAtEpochMs to item) } } }.awaitAll().filterNotNull().toMap() nextUpItemsBySeries = results + processedNextUpContentIds = completedSeriesCandidates.mapTo(mutableSetOf()) { candidate -> + candidate.content.id + } val nextUpCache = results.mapNotNull { (contentId, pair) -> val item = pair.second @@ -389,6 +553,10 @@ fun HomeScreen( nextUp = nextUpCache, inProgress = inProgressCache, ) + homeCwLog.d { + "next-up resolve complete results=${results.size} nextUpCache=${nextUpCache.size} " + + "inProgressCache=${inProgressCache.size} items=${results.values.map { it.second }.debugContinueWatchingSummary()}" + } } val hasActiveAddons = addonsUiState.addons.any { it.manifest != null } @@ -615,6 +783,8 @@ fun HomeScreen( private const val HOME_CATALOG_PREVIEW_LIMIT = 18 private const val MILLIS_PER_DAY = 24L * 60L * 60L * 1000L +private const val OPTIMISTIC_NEXT_UP_SEED_WINDOW_MS = 3L * 60L * 1000L +private val homeCwLog = Logger.withTag("HomeCW") internal fun filterEntriesForTraktContinueWatchingWindow( entries: List, @@ -630,6 +800,169 @@ internal fun filterEntriesForTraktContinueWatchingWindow( return entries.filter { entry -> entry.lastUpdatedEpochMs >= cutoffMs } } +private fun buildTvParityNextUpSeedEntries( + progressEntries: List, + watchedItems: List, + isTraktProgressActive: Boolean, + preferFurthestEpisode: Boolean, + nowEpochMs: Long, +): List { + val rawSeeds = if (isTraktProgressActive) { + progressEntries.asSequence() + .filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() } + .filter { entry -> entry.seasonNumber != null && entry.episodeNumber != null && entry.seasonNumber != 0 } + .filter { entry -> shouldUseAsTraktNextUpSeed(entry, nowEpochMs) } + .toList() + } else { + watchedItems.asSequence() + .filter { item -> item.type.isSeriesTypeForContinueWatching() } + .filter { item -> item.season != null && item.episode != null && item.season != 0 } + .filter { item -> !isMalformedNextUpSeedContentId(item.id) } + .map { item -> item.toNextUpSeedEntry() } + .toList() + } + + return if (isTraktProgressActive) { + mergeTvTraktNextUpSeeds(rawSeeds) + } else { + rawSeeds + .groupBy { entry -> nextUpSeedKey(entry) } + .mapNotNull { (_, entries) -> + choosePreferredNextUpSeed( + entries = entries, + preferFurthestEpisode = preferFurthestEpisode, + ) + } + .sortedByDescending { entry -> entry.lastUpdatedEpochMs } + } +} + +private fun shouldUseAsTraktNextUpSeed( + entry: WatchProgressEntry, + nowEpochMs: Long, +): Boolean { + if (!entry.shouldUseAsCompletedSeedForContinueWatching()) return false + if (entry.source != WatchProgressSourceTraktPlayback) return true + + val ageMs = nowEpochMs - entry.lastUpdatedEpochMs + return ageMs in 0..OPTIMISTIC_NEXT_UP_SEED_WINDOW_MS +} + +private fun WatchedItem.toNextUpSeedEntry(): WatchProgressEntry = + WatchProgressEntry( + contentType = type, + parentMetaId = id, + parentMetaType = type, + videoId = id, + title = name, + poster = poster, + seasonNumber = season, + episodeNumber = episode, + lastPositionMs = 1L, + durationMs = 1L, + lastUpdatedEpochMs = markedAtEpochMs, + isCompleted = true, + progressPercent = 100f, + source = WatchProgressSourceLocal, + ) + +private fun nextUpSeedKey(entry: WatchProgressEntry): String = + entry.parentMetaId.trim() + +private fun mergeTvTraktNextUpSeeds(entries: List): List { + val merged = linkedMapOf() + entries + .filter { entry -> entry.source == WatchProgressSourceTraktShowProgress } + .forEach { seed -> + merged[nextUpSeedKey(seed)] = seed + } + entries + .filter { entry -> entry.source == WatchProgressSourceTraktHistory || entry.source == WatchProgressSourceTraktPlayback } + .forEach { seed -> + val key = nextUpSeedKey(seed) + val existing = merged[key] + if (existing == null || shouldReplaceNextUpSeed(existing, seed)) { + merged[key] = seed + } + } + return merged.values.sortedByDescending { entry -> entry.lastUpdatedEpochMs } +} + +private fun shouldReplaceNextUpSeed( + existing: WatchProgressEntry, + candidate: WatchProgressEntry, +): Boolean { + val candidateSeason = candidate.seasonNumber ?: -1 + val candidateEpisode = candidate.episodeNumber ?: -1 + val existingSeason = existing.seasonNumber ?: -1 + val existingEpisode = existing.episodeNumber ?: -1 + return candidateSeason > existingSeason || + ( + candidateSeason == existingSeason && + ( + candidateEpisode > existingEpisode || + ( + candidateEpisode == existingEpisode && + candidate.lastUpdatedEpochMs >= existing.lastUpdatedEpochMs + ) + ) + ) +} + +private fun choosePreferredNextUpSeed( + entries: List, + preferFurthestEpisode: Boolean, +): WatchProgressEntry? { + if (entries.isEmpty()) return null + val bestRank = entries.minOf(::nextUpSeedSourceRank) + return entries + .asSequence() + .filter { entry -> nextUpSeedSourceRank(entry) == bestRank } + .maxWithOrNull( + if (preferFurthestEpisode) { + compareBy( + { it.seasonNumber ?: -1 }, + { it.episodeNumber ?: -1 }, + { it.lastUpdatedEpochMs }, + ) + } else { + compareBy( + { it.lastUpdatedEpochMs }, + { it.seasonNumber ?: -1 }, + { it.episodeNumber ?: -1 }, + ) + }, + ) +} + +private fun nextUpSeedSourceRank(entry: WatchProgressEntry): Int = + when (entry.source) { + WatchProgressSourceTraktPlayback, + WatchProgressSourceTraktShowProgress, + -> 0 + WatchProgressSourceTraktHistory -> 1 + WatchProgressSourceLocal -> 2 + else -> 4 + } + +private fun shouldTreatAsActiveInProgressForNextUpSuppression( + progress: WatchProgressEntry, + latestCompletedAt: Long?, +): Boolean { + if (!progress.shouldTreatAsInProgressForContinueWatching()) return false + if (latestCompletedAt == null || latestCompletedAt == Long.MIN_VALUE) return true + return progress.lastUpdatedEpochMs >= latestCompletedAt +} + +private fun isMalformedNextUpSeedContentId(contentId: String?): Boolean { + val trimmed = contentId?.trim().orEmpty() + if (trimmed.isEmpty()) return true + return when (trimmed.lowercase()) { + "tmdb", "imdb", "trakt", "tmdb:", "imdb:", "trakt:" -> true + else -> false + } +} + private fun heroMobileBelowSectionHeightHint( maxWidthDp: Float, continueWatchingVisible: Boolean, @@ -652,15 +985,17 @@ internal fun buildHomeContinueWatchingItems( visibleEntries: List, cachedInProgressByVideoId: Map = emptyMap(), nextUpItemsBySeries: Map>, + nextUpSuppressedSeriesIds: Set? = null, sortMode: ContinueWatchingSortMode = ContinueWatchingSortMode.DEFAULT, todayIsoDate: String = "", ): List { - val inProgressSeriesIds = visibleEntries - .asSequence() - .filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() } - .map { entry -> entry.parentMetaId } - .filter(String::isNotBlank) - .toSet() + val suppressedSeriesIds = nextUpSuppressedSeriesIds + ?: visibleEntries + .asSequence() + .filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() } + .map { entry -> entry.parentMetaId } + .filter(String::isNotBlank) + .toSet() val candidates = buildList { addAll( @@ -675,7 +1010,7 @@ internal fun buildHomeContinueWatchingItems( ) addAll( nextUpItemsBySeries.values.mapNotNull { (lastUpdatedEpochMs, item) -> - if (item.parentMetaId in inProgressSeriesIds) return@mapNotNull null + if (item.parentMetaId in suppressedSeriesIds) return@mapNotNull null HomeContinueWatchingCandidate( lastUpdatedEpochMs = lastUpdatedEpochMs, item = item, @@ -866,3 +1201,83 @@ private fun ContinueWatchingItem.withFallbackMetadata( released = released ?: fallback.released, ) } + +private fun WatchProgressEntry.debugSummary(): String = + buildString { + append(parentMetaType) + append(":") + append(parentMetaId) + if (seasonNumber != null || episodeNumber != null) { + append(" s=") + append(seasonNumber) + append(" e=") + append(episodeNumber) + } + append(" video=") + append(videoId) + append(" pct=") + append(progressPercent) + append(" completed=") + append(isCompleted) + append(" effectiveCompleted=") + append(isEffectivelyCompleted) + append(" src=") + append(source) + append(" last=") + append(lastUpdatedEpochMs) + } + +private fun Collection.debugWatchProgressSummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" } + +private fun Collection.debugSourceCounts(): String = + groupingBy { it.source } + .eachCount() + .entries + .sortedBy { it.key } + .joinToString(separator = ",") { "${it.key}=${it.value}" } + .ifBlank { "none" } + +private fun CompletedSeriesCandidate.debugSummary(): String = + buildString { + append(content.type) + append(":") + append(content.id) + append(" s=") + append(seasonNumber) + append(" e=") + append(episodeNumber) + append(" marked=") + append(markedAtEpochMs) + } + +private fun Collection.debugCompletedSeriesSummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" } + +private fun ContinueWatchingItem.debugSummary(): String = + buildString { + append(if (isNextUp) "next_up" else "in_progress") + append(":") + append(parentMetaType) + append(":") + append(parentMetaId) + if (seasonNumber != null || episodeNumber != null) { + append(" s=") + append(seasonNumber) + append(" e=") + append(episodeNumber) + } + append(" video=") + append(videoId) + append(" seed=") + append(nextUpSeedSeasonNumber) + append("x") + append(nextUpSeedEpisodeNumber) + append(" progress=") + append(progressFraction) + append(" resume=") + append(resumePositionMs) + } + +private fun Collection.debugContinueWatchingSummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" } diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/trakt/TraktProgressRepository.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/trakt/TraktProgressRepository.kt index dc43c983..6d9f99ee 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/trakt/TraktProgressRepository.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/trakt/TraktProgressRepository.kt @@ -13,6 +13,7 @@ import com.nuvio.app.features.watchprogress.buildPlaybackVideoId import com.nuvio.app.features.watchprogress.shouldTreatAsInProgressForContinueWatching import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async @@ -22,7 +23,9 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull @@ -44,6 +47,7 @@ data class TraktProgressUiState( val entries: List = emptyList(), val isLoading: Boolean = false, val errorMessage: String? = null, + val hasLoadedRemoteProgress: Boolean = false, ) object TraktProgressRepository { @@ -56,6 +60,8 @@ object TraktProgressRepository { private var hasLoaded = false private var refreshRequestId: Long = 0L + private val refreshJobMutex = Mutex() + private var inFlightRefresh: Deferred? = null fun ensureLoaded() { if (hasLoaded) return @@ -82,14 +88,35 @@ object TraktProgressRepository { } suspend fun refreshNow() { + ensureLoaded() + val refresh = refreshJobMutex.withLock { + inFlightRefresh?.takeIf { it.isActive } ?: scope.async { + refreshNowInternal() + }.also { inFlightRefresh = it } + } + + try { + refresh.await() + } finally { + refreshJobMutex.withLock { + if (inFlightRefresh == refresh && refresh.isCompleted) { + inFlightRefresh = null + } + } + } + } + + private suspend fun refreshNowInternal() { ensureLoaded() val requestId = nextRefreshRequestId() val headers = TraktAuthRepository.authorizedHeaders() if (headers == null) { + log.d { "refreshNow request=$requestId skipped: missing authorized headers" } _uiState.value = TraktProgressUiState() return } + log.d { "refreshNow request=$requestId start currentEntries=${_uiState.value.entries.size}" } _uiState.value = _uiState.value.copy(isLoading = true, errorMessage = null) val playbackEntries = runCatching { @@ -109,34 +136,56 @@ object TraktProgressRepository { _uiState.value = TraktProgressUiState( entries = playbackEntries, - isLoading = false, + isLoading = true, errorMessage = null, + hasLoadedRemoteProgress = false, ) + log.d { + "refreshNow request=$requestId playback applied entries=${playbackEntries.size} " + + "sources=${playbackEntries.debugSourceCounts()} items=${playbackEntries.debugWatchProgressSummary()}" + } if (playbackEntries.isNotEmpty()) { launchHydration(requestId = requestId, entries = playbackEntries) } - scope.launch { - val completedEntries = runCatching { - fetchHistoryEntries(headers) + fetchWatchedShowSeedEntries(headers) - }.onFailure { error -> - if (error is CancellationException) throw error - log.w { "Failed to fetch Trakt history snapshot: ${error.message}" } - }.getOrNull() ?: return@launch + val completedEntries = runCatching { + coroutineScope { + val history = async { fetchHistoryEntries(headers) } + val watchedShowSeeds = async { fetchWatchedShowSeedEntries(headers) } + history.await() + watchedShowSeeds.await() + } + }.onFailure { error -> + if (error is CancellationException) throw error + log.w { "Failed to fetch Trakt history snapshot: ${error.message}" } + }.getOrNull() - if (!isLatestRefreshRequest(requestId)) return@launch - - val merged = mergeNewestByVideoId(playbackEntries + completedEntries) + if (completedEntries == null) { _uiState.value = _uiState.value.copy( - entries = merged.sortedByDescending { it.lastUpdatedEpochMs }, isLoading = false, errorMessage = null, + hasLoadedRemoteProgress = false, ) + return + } - if (merged.isNotEmpty()) { - launchHydration(requestId = requestId, entries = merged) - } + if (!isLatestRefreshRequest(requestId)) return + + val merged = mergeNewestByVideoId(playbackEntries + completedEntries) + _uiState.value = _uiState.value.copy( + entries = merged.sortedByDescending { it.lastUpdatedEpochMs }, + isLoading = false, + errorMessage = null, + hasLoadedRemoteProgress = true, + ) + log.d { + "refreshNow request=$requestId completed snapshot applied " + + "completedEntries=${completedEntries.size} merged=${merged.size} " + + "sources=${merged.debugSourceCounts()} items=${merged.debugWatchProgressSummary()}" + } + + if (merged.isNotEmpty()) { + launchHydration(requestId = requestId, entries = merged) } } @@ -163,6 +212,10 @@ object TraktProgressRepository { isLoading = false, errorMessage = null, ) + log.d { + "hydrate request=$requestId applied hydrated=${hydrated.size} merged=${merged.size} " + + "items=${merged.debugWatchProgressSummary()}" + } } } @@ -175,6 +228,10 @@ object TraktProgressRepository { current[normalizedEntry.videoId] = normalizedEntry } _uiState.value = _uiState.value.copy(entries = current.values.sortedByDescending { it.lastUpdatedEpochMs }) + log.d { + "optimistic progress applied entry=${normalizedEntry.debugSummary()} " + + "entries=${_uiState.value.entries.size}" + } } fun applyOptimisticRemoval(videoId: String) { @@ -182,6 +239,7 @@ object TraktProgressRepository { if (videoId.isBlank()) return val filtered = _uiState.value.entries.filterNot { it.videoId == videoId } _uiState.value = _uiState.value.copy(entries = filtered) + log.d { "optimistic removal videoId=$videoId entries=${filtered.size}" } } fun applyOptimisticRemoval( @@ -202,6 +260,10 @@ object TraktProgressRepository { } } _uiState.value = _uiState.value.copy(entries = filtered) + log.d { + "optimistic removal contentId=$normalizedContentId s=$seasonNumber e=$episodeNumber " + + "entries=${filtered.size}" + } } suspend fun removeProgress( @@ -213,6 +275,7 @@ object TraktProgressRepository { if (normalizedContentId.isBlank()) return val headers = TraktAuthRepository.authorizedHeaders() ?: return + log.d { "removeProgress start contentId=$normalizedContentId s=$seasonNumber e=$episodeNumber" } applyOptimisticRemoval( contentId = normalizedContentId, seasonNumber = seasonNumber, @@ -280,10 +343,12 @@ object TraktProgressRepository { } } + log.d { "removeProgress complete contentId=$normalizedContentId refreshing" } refreshNow() } private suspend fun fetchPlaybackEntries(headers: Map): List = withContext(Dispatchers.Default) { + log.d { "fetchPlaybackEntries start" } val payloads = coroutineScope { val moviesPayload = async { httpGetTextWithHeaders( @@ -306,6 +371,10 @@ object TraktProgressRepository { val moviePlayback = json.decodeFromString>(moviesPayload) val episodePlayback = json.decodeFromString>(episodesPayload) + log.d { + "fetchPlaybackEntries raw movies=${moviePlayback.size} episodes=${episodePlayback.size} " + + "movieItems=${moviePlayback.debugPlaybackSummary()} episodeItems=${episodePlayback.debugPlaybackSummary()}" + } val inProgressMovies = moviePlayback.mapIndexedNotNull { index, item -> mapPlaybackMovie(item = item, fallbackIndex = index) @@ -314,10 +383,16 @@ object TraktProgressRepository { mapPlaybackEpisode(item = item, fallbackIndex = index) } - mergeNewestByVideoId(inProgressMovies + inProgressEpisodes) + val merged = mergeNewestByVideoId(inProgressMovies + inProgressEpisodes) + log.d { + "fetchPlaybackEntries mapped movies=${inProgressMovies.size} episodes=${inProgressEpisodes.size} " + + "merged=${merged.size} items=${merged.debugWatchProgressSummary()}" + } + merged } private suspend fun fetchHistoryEntries(headers: Map): List = withContext(Dispatchers.Default) { + log.d { "fetchHistoryEntries start limit=$HISTORY_LIMIT" } val payloads = coroutineScope { val historyPayload = async { httpGetTextWithHeaders( @@ -339,6 +414,10 @@ object TraktProgressRepository { val movieHistoryPayload = payloads[1] val episodeHistory = json.decodeFromString>(historyPayload) val movieHistory = json.decodeFromString>(movieHistoryPayload) + log.d { + "fetchHistoryEntries raw episodes=${episodeHistory.size} movies=${movieHistory.size} " + + "episodeItems=${episodeHistory.debugHistoryEpisodeSummary()} movieItems=${movieHistory.debugHistoryMovieSummary()}" + } val completedEpisodes = episodeHistory .mapIndexedNotNull { index, item -> mapHistoryEpisode(item = item, fallbackIndex = index) } @@ -347,7 +426,12 @@ object TraktProgressRepository { .mapIndexedNotNull { index, item -> mapHistoryMovie(item = item, fallbackIndex = index) } .distinctBy { entry -> entry.videoId } - mergeNewestByVideoId(completedEpisodes + completedMovies) + val merged = mergeNewestByVideoId(completedEpisodes + completedMovies) + log.d { + "fetchHistoryEntries mapped episodes=${completedEpisodes.size} movies=${completedMovies.size} " + + "merged=${merged.size} items=${merged.debugWatchProgressSummary()}" + } + merged } private suspend fun fetchWatchedShowSeedEntries( @@ -360,7 +444,11 @@ object TraktProgressRepository { headers = headers, ) val watchedShows = json.decodeFromString>(payload) - watchedShows + log.d { + "fetchWatchedShowSeedEntries raw shows=${watchedShows.size} " + + "items=${watchedShows.debugWatchedShowSummary()}" + } + val mapped = watchedShows .mapNotNull { item -> mapWatchedShowSeed( item = item, @@ -368,6 +456,11 @@ object TraktProgressRepository { ) } .sortedByDescending { entry -> entry.lastUpdatedEpochMs } + log.d { + "fetchWatchedShowSeedEntries mapped=${mapped.size} useFurthest=$useFurthestEpisode " + + "items=${mapped.debugWatchProgressSummary()}" + } + mapped } private fun mergeNewestByVideoId(entries: List): List { @@ -436,6 +529,8 @@ object TraktProgressRepository { private fun invalidateInFlightRefreshes() { refreshRequestId += 1L + inFlightRefresh?.cancel() + inFlightRefresh = null } private fun isLatestRefreshRequest(requestId: Long): Boolean = refreshRequestId == requestId @@ -819,3 +914,165 @@ private data class TraktEpisode( @SerialName("number") val number: Int? = null, @SerialName("ids") val ids: TraktExternalIds? = null, ) + +private fun WatchProgressEntry.debugSummary(): String = + buildString { + append(parentMetaType) + append(":") + append(parentMetaId) + if (seasonNumber != null || episodeNumber != null) { + append(" s=") + append(seasonNumber) + append(" e=") + append(episodeNumber) + } + append(" video=") + append(videoId) + append(" pct=") + append(progressPercent) + append(" completed=") + append(isCompleted) + append(" effectiveCompleted=") + append(isEffectivelyCompleted) + append(" src=") + append(source) + append(" last=") + append(lastUpdatedEpochMs) + } + +private fun Collection.debugWatchProgressSummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" } + +private fun Collection.debugSourceCounts(): String = + groupingBy { it.source } + .eachCount() + .entries + .sortedBy { it.key } + .joinToString(separator = ",") { "${it.key}=${it.value}" } + .ifBlank { "none" } + +private fun Collection.debugPlaybackSummary(limit: Int = 8): String = + take(limit).joinToString(separator = " | ") { item -> + val media = item.movie ?: item.show + val episode = item.episode + buildString { + append(media?.title ?: "unknown") + append(" ids=") + append(media?.ids.debugIds()) + if (episode != null) { + append(" ep=") + append(episode.season) + append("x") + append(episode.number) + append(" epIds=") + append(episode.ids.debugIds()) + } + append(" progress=") + append(item.progress) + append(" pausedAt=") + append(item.pausedAt) + append(" playbackId=") + append(item.id) + } + }.ifBlank { "none" } + +private fun Collection.debugHistoryEpisodeSummary(limit: Int = 8): String = + take(limit).joinToString(separator = " | ") { item -> + buildString { + append(item.show?.title ?: "unknown") + append(" ids=") + append(item.show?.ids.debugIds()) + append(" ep=") + append(item.episode?.season) + append("x") + append(item.episode?.number) + append(" epIds=") + append(item.episode?.ids.debugIds()) + append(" watchedAt=") + append(item.watchedAt) + } + }.ifBlank { "none" } + +private fun Collection.debugHistoryMovieSummary(limit: Int = 8): String = + take(limit).joinToString(separator = " | ") { item -> + buildString { + append(item.movie?.title ?: "unknown") + append(" ids=") + append(item.movie?.ids.debugIds()) + append(" watchedAt=") + append(item.watchedAt) + } + }.ifBlank { "none" } + +private fun Collection.debugWatchedShowSummary(limit: Int = 8): String = + take(limit).joinToString(separator = " | ") { item -> + val episodeCount = item.seasons.orEmpty().sumOf { season -> + season.episodes.orEmpty().count { episode -> + (episode.number ?: 0) > 0 && (episode.plays ?: 1) > 0 + } + } + val latest = item.seasons.orEmpty() + .flatMap { season -> + val seasonNumber = season.number + season.episodes.orEmpty().mapNotNull { episode -> + val episodeNumber = episode.number ?: return@mapNotNull null + val watchedAt = episode.lastWatchedAt ?: item.lastWatchedAt + TraktWatchedShowEpisodeSeed( + season = seasonNumber ?: 0, + episode = episodeNumber, + watchedAt = watchedAt + ?.let { value -> + runCatching { TraktPlatformClock.parseIsoDateTimeToEpochMs(value) }.getOrNull() + } + ?: 0L, + ) + } + } + .maxWithOrNull( + compareBy( + { it.watchedAt }, + { it.season }, + { it.episode }, + ), + ) + buildString { + append(item.show?.title ?: "unknown") + append(" ids=") + append(item.show?.ids.debugIds()) + append(" episodes=") + append(episodeCount) + append(" latest=") + append(latest?.season) + append("x") + append(latest?.episode) + append(" lastWatchedAt=") + append(item.lastWatchedAt) + } + }.ifBlank { "none" } + +private fun TraktExternalIds?.debugIds(): String = + if (this == null) { + "none" + } else { + buildString { + imdb?.takeIf { it.isNotBlank() }?.let { + append("imdb:") + append(it) + } + tmdb?.let { + if (isNotEmpty()) append(",") + append("tmdb:") + append(it) + } + trakt?.let { + if (isNotEmpty()) append(",") + append("trakt:") + append(it) + } + slug?.takeIf { it.isNotBlank() }?.let { + if (isNotEmpty()) append(",") + append("slug:") + append(it) + } + }.ifBlank { "none" } + } diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watching/sync/SupabaseProgressSyncAdapter.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watching/sync/SupabaseProgressSyncAdapter.kt index c1099d50..80146409 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watching/sync/SupabaseProgressSyncAdapter.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watching/sync/SupabaseProgressSyncAdapter.kt @@ -1,5 +1,6 @@ package com.nuvio.app.features.watching.sync +import co.touchlab.kermit.Logger import com.nuvio.app.core.network.SupabaseProvider import com.nuvio.app.features.watchprogress.WatchProgressEntry import io.github.jan.supabase.postgrest.postgrest @@ -12,12 +13,14 @@ import kotlinx.serialization.json.encodeToJsonElement import kotlinx.serialization.json.put object SupabaseProgressSyncAdapter : ProgressSyncAdapter { + private val log = Logger.withTag("NuvioSyncProgress") private val json = Json { ignoreUnknownKeys = true encodeDefaults = true } override suspend fun pull(profileId: Int): List { + log.d { "pull start profileId=$profileId" } val params = buildJsonObject { put("p_profile_id", profileId) } val result = SupabaseProvider.client.postgrest.rpc("sync_pull_watch_progress", params) val serverEntries = result.decodeList() @@ -33,6 +36,10 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter { lastWatched = entry.lastWatched, ) } + log.d { + "pull returned raw=${serverEntries.size} records=${records.size} " + + "items=${records.debugProgressRecordSummary()}" + } return records } @@ -40,6 +47,10 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter { profileId: Int, entries: Collection, ) { + log.d { + "push start profileId=$profileId entries=${entries.size} " + + "items=${entries.debugWatchProgressEntrySummary()}" + } val syncEntries = entries.map { entry -> WatchProgressSyncEntry( contentId = entry.parentMetaId, @@ -58,12 +69,17 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter { put("p_entries", json.encodeToJsonElement(syncEntries)) } SupabaseProvider.client.postgrest.rpc("sync_push_watch_progress", params) + log.d { "push complete profileId=$profileId entries=${syncEntries.size}" } } override suspend fun delete( profileId: Int, entries: Collection, ) { + log.d { + "delete start profileId=$profileId entries=${entries.size} " + + "items=${entries.debugWatchProgressEntrySummary()}" + } val progressKeys = entries.map { entry -> if (entry.seasonNumber != null && entry.episodeNumber != null) { "${entry.parentMetaId}_s${entry.seasonNumber}e${entry.episodeNumber}" @@ -76,6 +92,7 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter { put("p_keys", json.encodeToJsonElement(progressKeys)) } SupabaseProvider.client.postgrest.rpc("sync_delete_watch_progress", params) + log.d { "delete complete profileId=$profileId keys=${progressKeys.joinToString(limit = 12)}" } } private fun progressKeyForEntry(entry: WatchProgressEntry): String = @@ -98,3 +115,53 @@ private data class WatchProgressSyncEntry( @SerialName("last_watched") val lastWatched: Long = 0, @SerialName("progress_key") val progressKey: String = "", ) + +private fun Collection.debugProgressRecordSummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { record -> + buildString { + append(record.contentType) + append(":") + append(record.contentId) + if (record.season != null || record.episode != null) { + append(" s=") + append(record.season) + append(" e=") + append(record.episode) + } + append(" video=") + append(record.videoId) + append(" pos=") + append(record.position) + append(" dur=") + append(record.duration) + append(" last=") + append(record.lastWatched) + } + }.ifBlank { "none" } + +private fun Collection.debugWatchProgressEntrySummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { entry -> + buildString { + append(entry.parentMetaType) + append(":") + append(entry.parentMetaId) + if (entry.seasonNumber != null || entry.episodeNumber != null) { + append(" s=") + append(entry.seasonNumber) + append(" e=") + append(entry.episodeNumber) + } + append(" video=") + append(entry.videoId) + append(" pos=") + append(entry.lastPositionMs) + append(" dur=") + append(entry.durationMs) + append(" pct=") + append(entry.progressPercent) + append(" completed=") + append(entry.isCompleted) + append(" last=") + append(entry.lastUpdatedEpochMs) + } + }.ifBlank { "none" } diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressModels.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressModels.kt index 0485986b..9fb84629 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressModels.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressModels.kt @@ -118,6 +118,7 @@ data class WatchProgressEntry( data class WatchProgressUiState( val entries: List = emptyList(), + val hasLoadedRemoteProgress: Boolean = false, ) { val byVideoId: Map get() = entries.associateBy { it.videoId } diff --git a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt index ebdb27d5..741bfd00 100644 --- a/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt +++ b/composeApp/src/commonMain/kotlin/com/nuvio/app/features/watchprogress/WatchProgressRepository.kt @@ -1,6 +1,8 @@ package com.nuvio.app.features.watchprogress import co.touchlab.kermit.Logger +import com.nuvio.app.core.auth.AuthRepository +import com.nuvio.app.core.auth.AuthState import com.nuvio.app.features.addons.AddonRepository import com.nuvio.app.features.details.MetaDetailsRepository import com.nuvio.app.features.player.PlayerPlaybackSnapshot @@ -11,11 +13,14 @@ import com.nuvio.app.features.trakt.TraktSettingsRepository import com.nuvio.app.features.trakt.shouldUseTraktProgress as shouldUseTraktProgressSource import com.nuvio.app.features.watching.application.WatchingActions import com.nuvio.app.features.watching.sync.ProgressSyncAdapter +import com.nuvio.app.features.watching.sync.ProgressSyncRecord import com.nuvio.app.features.watching.sync.SupabaseProgressSyncAdapter +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -23,6 +28,8 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull +private const val NUVIO_SYNC_PERIODIC_INTERVAL_MS = 5L * 60L * 1000L + object WatchProgressRepository { private val syncScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val log = Logger.withTag("WatchProgressRepository") @@ -34,6 +41,8 @@ object WatchProgressRepository { private var currentProfileId: Int = 1 private var entriesByVideoId: MutableMap = mutableMapOf() private var metadataResolutionJob: Job? = null + private var isPullingNuvioSyncFromServer = false + private var hasCompletedInitialNuvioSyncPull = false internal var syncAdapter: ProgressSyncAdapter = SupabaseProgressSyncAdapter init { @@ -45,7 +54,10 @@ object WatchProgressRepository { ) ) { runCatching { TraktProgressRepository.refreshNow() } - .onFailure { error -> log.w { "Failed to refresh Trakt progress after auth: ${error.message}" } } + .onFailure { error -> + if (error is CancellationException) throw error + log.w { "Failed to refresh Trakt progress after auth: ${error.message}" } + } } publish() } @@ -59,7 +71,10 @@ object WatchProgressRepository { ) ) { runCatching { TraktProgressRepository.refreshNow() } - .onFailure { error -> log.w { "Failed to refresh Trakt progress after source change: ${error.message}" } } + .onFailure { error -> + if (error is CancellationException) throw error + log.w { "Failed to refresh Trakt progress after source change: ${error.message}" } + } } publish() } @@ -72,6 +87,35 @@ object WatchProgressRepository { } } } + + syncScope.launch { + while (true) { + delay(NUVIO_SYNC_PERIODIC_INTERVAL_MS) + TraktAuthRepository.ensureLoaded() + TraktSettingsRepository.ensureLoaded() + if (shouldUseTraktProgress()) continue + + val authState = AuthRepository.state.value + if (authState !is AuthState.Authenticated || authState.isAnonymous) continue + if (!hasCompletedInitialNuvioSyncPull || isPullingNuvioSyncFromServer) continue + + log.d { + "periodic NuvioSync pull start profileId=${ProfileRepository.activeProfileId} " + + "entries=${entriesByVideoId.size}" + } + runCatching { pullFromServer(ProfileRepository.activeProfileId) } + .onSuccess { + log.d { + "periodic NuvioSync pull complete profileId=${ProfileRepository.activeProfileId} " + + "entries=${entriesByVideoId.size}" + } + } + .onFailure { error -> + if (error is CancellationException) throw error + log.w { "Periodic NuvioSync pull failed: ${error.message}" } + } + } + } } fun ensureLoaded() { @@ -127,57 +171,93 @@ object WatchProgressRepository { currentProfileId = profileId val useTraktProgress = shouldUseTraktProgress() - - if (useTraktProgress) { - runCatching { TraktProgressRepository.refreshNow() } - .onFailure { e -> log.e(e) { "Failed to pull Trakt progress" } } - publish() - return + log.d { + "pullFromServer start profileId=$profileId source=${if (useTraktProgress) "trakt" else "nuvio_sync"} " + + "localEntries=${entriesByVideoId.size}" } - runCatching { - val serverEntries = syncAdapter.pull(profileId = profileId) + if (!useTraktProgress && isPullingNuvioSyncFromServer) { + log.d { "pullFromServer NuvioSync skipped: pull already in flight profileId=$profileId" } + return + } + if (!useTraktProgress) { + isPullingNuvioSyncFromServer = true + } - val oldLocal = entriesByVideoId.toMap() - val newMap = mutableMapOf() - - serverEntries.forEach { entry -> - val videoId = entry.videoId - val cached = oldLocal[videoId] - newMap[videoId] = WatchProgressEntry( - contentType = entry.contentType, - parentMetaId = entry.contentId, - parentMetaType = cached?.parentMetaType ?: entry.contentType, - videoId = videoId, - title = cached?.title?.takeIf { it.isNotBlank() } ?: entry.contentId, - logo = cached?.logo, - poster = cached?.poster, - background = cached?.background, - seasonNumber = entry.season, - episodeNumber = entry.episode, - episodeTitle = cached?.episodeTitle, - episodeThumbnail = cached?.episodeThumbnail, - lastPositionMs = entry.position, - durationMs = entry.duration, - lastUpdatedEpochMs = entry.lastWatched, - providerName = cached?.providerName, - providerAddonId = cached?.providerAddonId, - lastStreamTitle = cached?.lastStreamTitle, - lastStreamSubtitle = cached?.lastStreamSubtitle, - pauseDescription = cached?.pauseDescription, - lastSourceUrl = cached?.lastSourceUrl, - isCompleted = isWatchProgressComplete(entry.position, entry.duration, false), - ) + try { + if (useTraktProgress) { + runCatching { TraktProgressRepository.refreshNow() } + .onFailure { e -> + if (e is CancellationException) throw e + log.e(e) { "Failed to pull Trakt progress" } + } + publish() + log.d { + "pullFromServer trakt complete entries=${TraktProgressRepository.uiState.value.entries.size} " + + "sources=${TraktProgressRepository.uiState.value.entries.debugSourceCounts()} " + + "items=${TraktProgressRepository.uiState.value.entries.debugWatchProgressEntrySummary()}" + } + return } - entriesByVideoId = newMap - hasLoaded = true - publish() - persist() + runCatching { + val serverEntries = syncAdapter.pull(profileId = profileId) + log.d { + "pullFromServer NuvioSync returned ${serverEntries.size} records " + + "items=${serverEntries.debugProgressRecordSummary()}" + } - resolveRemoteMetadata() - }.onFailure { e -> - log.e(e) { "Failed to pull watch progress from server" } + val oldLocal = entriesByVideoId.toMap() + val newMap = mutableMapOf() + + serverEntries.forEach { entry -> + val videoId = entry.videoId + val cached = oldLocal[videoId] + newMap[videoId] = WatchProgressEntry( + contentType = entry.contentType, + parentMetaId = entry.contentId, + parentMetaType = cached?.parentMetaType ?: entry.contentType, + videoId = videoId, + title = cached?.title?.takeIf { it.isNotBlank() } ?: entry.contentId, + logo = cached?.logo, + poster = cached?.poster, + background = cached?.background, + seasonNumber = entry.season, + episodeNumber = entry.episode, + episodeTitle = cached?.episodeTitle, + episodeThumbnail = cached?.episodeThumbnail, + lastPositionMs = entry.position, + durationMs = entry.duration, + lastUpdatedEpochMs = entry.lastWatched, + providerName = cached?.providerName, + providerAddonId = cached?.providerAddonId, + lastStreamTitle = cached?.lastStreamTitle, + lastStreamSubtitle = cached?.lastStreamSubtitle, + pauseDescription = cached?.pauseDescription, + lastSourceUrl = cached?.lastSourceUrl, + isCompleted = isWatchProgressComplete(entry.position, entry.duration, false), + ) + } + + entriesByVideoId = newMap + hasLoaded = true + hasCompletedInitialNuvioSyncPull = true + publish() + persist() + log.d { + "pullFromServer NuvioSync applied entries=${entriesByVideoId.size} " + + "items=${entriesByVideoId.values.debugWatchProgressEntrySummary()}" + } + + resolveRemoteMetadata() + }.onFailure { e -> + if (e is CancellationException) throw e + log.e(e) { "Failed to pull watch progress from server" } + } + } finally { + if (!useTraktProgress) { + isPullingNuvioSyncFromServer = false + } } } @@ -186,7 +266,15 @@ object WatchProgressRepository { .filter { it.poster.isNullOrBlank() || it.background.isNullOrBlank() } .groupBy { it.parentMetaId to it.contentType } - if (needsResolution.isEmpty()) return + if (needsResolution.isEmpty()) { + log.d { "resolveRemoteMetadata skipped: all entries have artwork" } + return + } + log.d { + "resolveRemoteMetadata start groups=${needsResolution.size} " + + "entries=${needsResolution.values.sumOf { it.size }} " + + "keys=${needsResolution.keys.joinToString(limit = 12) { (metaId, type) -> "$type:$metaId" }}" + } metadataResolutionJob?.cancel() metadataResolutionJob = syncScope.launch { @@ -201,7 +289,11 @@ object WatchProgressRepository { val (metaId, metaType) = key val meta = runCatching { MetaDetailsRepository.fetch(metaType, metaId) - }.getOrNull() ?: continue + }.getOrNull() + if (meta == null) { + log.d { "resolveRemoteMetadata miss type=$metaType id=$metaId entries=${entries.size}" } + continue + } for (entry in entries) { val episodeVideo = if (entry.seasonNumber != null && entry.episodeNumber != null) { @@ -224,8 +316,13 @@ object WatchProgressRepository { } publish() + log.d { + "resolveRemoteMetadata applied type=$metaType id=$metaId entries=${entries.size} " + + "metaVideos=${meta.videos.size}" + } } persist() + log.d { "resolveRemoteMetadata complete entries=${entriesByVideoId.size}" } } } @@ -350,6 +447,10 @@ object WatchProgressRepository { isEnded = snapshot.isEnded, ) if (!isCompleted && !shouldStoreWatchProgress(positionMs = positionMs, durationMs = durationMs)) { + log.d { + "upsert skipped below threshold video=${session.videoId} content=${session.parentMetaId} " + + "s=${session.seasonNumber} e=${session.episodeNumber} pos=$positionMs dur=$durationMs ended=${snapshot.isEnded}" + } return } @@ -383,6 +484,10 @@ object WatchProgressRepository { } val useTraktProgress = shouldUseTraktProgress() + log.d { + "upsert progress source=${if (useTraktProgress) "trakt" else "nuvio_sync"} " + + "entry=${entry.debugSummary()} snapshotEnded=${snapshot.isEnded}" + } entriesByVideoId[session.videoId] = entry if (useTraktProgress) { @@ -403,7 +508,9 @@ object WatchProgressRepository { syncScope.launch { runCatching { val profileId = ProfileRepository.activeProfileId + log.d { "pushScrobbleToServer profileId=$profileId entry=${entry.debugSummary()}" } syncAdapter.push(profileId = profileId, entries = listOf(entry)) + log.d { "pushScrobbleToServer complete profileId=$profileId video=${entry.videoId}" } }.onFailure { e -> log.e(e) { "Failed to push watch progress scrobble" } } @@ -416,7 +523,12 @@ object WatchProgressRepository { runCatching { if (entries.isEmpty()) return@runCatching val profileId = ProfileRepository.activeProfileId + log.d { + "pushDeleteToServer profileId=$profileId entries=${entries.size} " + + "items=${entries.debugWatchProgressEntrySummary()}" + } syncAdapter.delete(profileId = profileId, entries = entries) + log.d { "pushDeleteToServer complete profileId=$profileId entries=${entries.size}" } }.onFailure { e -> log.e(e) { "Failed to push watch progress delete" } } @@ -426,8 +538,18 @@ object WatchProgressRepository { private fun publish() { val entries = currentEntries() val sortedEntries = entries.sortedByDescending { it.lastUpdatedEpochMs } + log.d { + "publish source=${if (shouldUseTraktProgress()) "trakt" else "nuvio_sync"} " + + "entries=${sortedEntries.size} cw=${sortedEntries.continueWatchingEntries().size} " + + "sources=${sortedEntries.debugSourceCounts()} items=${sortedEntries.debugWatchProgressEntrySummary()}" + } _uiState.value = WatchProgressUiState( entries = sortedEntries, + hasLoadedRemoteProgress = if (shouldUseTraktProgress()) { + TraktProgressRepository.uiState.value.hasLoadedRemoteProgress + } else { + hasLoaded + }, ) } @@ -453,3 +575,67 @@ object WatchProgressRepository { } } + +private fun ProgressSyncRecord.debugSummary(): String = + buildString { + append(contentType) + append(":") + append(contentId) + if (season != null || episode != null) { + append(" s=") + append(season) + append(" e=") + append(episode) + } + append(" video=") + append(videoId) + append(" pos=") + append(position) + append(" dur=") + append(duration) + append(" last=") + append(lastWatched) + } + +private fun Collection.debugProgressRecordSummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" } + +private fun WatchProgressEntry.debugSummary(): String = + buildString { + append(parentMetaType) + append(":") + append(parentMetaId) + if (seasonNumber != null || episodeNumber != null) { + append(" s=") + append(seasonNumber) + append(" e=") + append(episodeNumber) + } + append(" video=") + append(videoId) + append(" pos=") + append(lastPositionMs) + append(" dur=") + append(durationMs) + append(" pct=") + append(progressPercent) + append(" completed=") + append(isCompleted) + append(" effectiveCompleted=") + append(isEffectivelyCompleted) + append(" src=") + append(source) + append(" last=") + append(lastUpdatedEpochMs) + } + +private fun Collection.debugWatchProgressEntrySummary(limit: Int = 10): String = + take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" } + +private fun Collection.debugSourceCounts(): String = + groupingBy { it.source } + .eachCount() + .entries + .sortedBy { it.key } + .joinToString(separator = ",") { "${it.key}=${it.value}" } + .ifBlank { "none" }