fix: multiple trakt cw stale cache and pull logic

This commit is contained in:
tapframe 2026-05-19 01:11:16 +05:30
parent 4094151108
commit cff9512d47
5 changed files with 1027 additions and 101 deletions

View file

@ -14,6 +14,7 @@ import androidx.compose.ui.Modifier
import androidx.compose.ui.unit.Dp
import androidx.compose.ui.unit.dp
import androidx.lifecycle.compose.collectAsStateWithLifecycle
import co.touchlab.kermit.Logger
import com.nuvio.app.core.network.NetworkCondition
import com.nuvio.app.core.network.NetworkStatusRepository
import com.nuvio.app.core.ui.LocalNuvioBottomNavigationOverlayPadding
@ -35,6 +36,7 @@ import com.nuvio.app.features.trakt.TRAKT_CONTINUE_WATCHING_DAYS_CAP_ALL
import com.nuvio.app.features.trakt.TraktSettingsRepository
import com.nuvio.app.features.trakt.normalizeTraktContinueWatchingDaysCap
import com.nuvio.app.features.trakt.shouldUseTraktProgress
import com.nuvio.app.features.watched.WatchedItem
import com.nuvio.app.features.watched.WatchedRepository
import com.nuvio.app.features.watchprogress.CachedInProgressItem
import com.nuvio.app.features.watchprogress.CachedNextUpItem
@ -45,13 +47,19 @@ import com.nuvio.app.features.watchprogress.ContinueWatchingItem
import com.nuvio.app.features.watchprogress.ContinueWatchingSortMode
import com.nuvio.app.features.watchprogress.isSeriesTypeForContinueWatching
import com.nuvio.app.features.watchprogress.nextUpDismissKey
import com.nuvio.app.features.watchprogress.shouldTreatAsInProgressForContinueWatching
import com.nuvio.app.features.watchprogress.shouldUseAsCompletedSeedForContinueWatching
import com.nuvio.app.features.watchprogress.WatchProgressClock
import com.nuvio.app.features.watchprogress.WatchProgressEntry
import com.nuvio.app.features.watchprogress.WatchProgressRepository
import com.nuvio.app.features.watchprogress.WatchProgressSourceLocal
import com.nuvio.app.features.watchprogress.WatchProgressSourceTraktHistory
import com.nuvio.app.features.watchprogress.WatchProgressSourceTraktPlayback
import com.nuvio.app.features.watchprogress.WatchProgressSourceTraktShowProgress
import com.nuvio.app.features.watchprogress.buildContinueWatchingEpisodeSubtitle
import com.nuvio.app.features.watchprogress.continueWatchingEntries
import com.nuvio.app.features.watchprogress.toContinueWatchingItem
import com.nuvio.app.features.watchprogress.toUpNextContinueWatchingItem
import com.nuvio.app.features.watching.application.WatchingState
import com.nuvio.app.features.watching.domain.WatchingContentRef
import com.nuvio.app.features.watching.domain.isReleasedBy
import com.nuvio.app.features.collection.CollectionRepository
@ -164,46 +172,100 @@ fun HomeScreen(
if (isTraktProgressActive) emptyList() else watchedUiState.items
}
val latestCompletedBySeries = remember(effectiveWatchProgressEntries, effectiveWatchedItems, continueWatchingPreferences.upNextFromFurthestEpisode) {
WatchingState.latestCompletedBySeries(
progressEntries = effectiveWatchProgressEntries,
val allNextUpSeedEntries = remember(
watchProgressUiState.entries,
effectiveWatchedItems,
isTraktProgressActive,
continueWatchingPreferences.upNextFromFurthestEpisode,
) {
buildTvParityNextUpSeedEntries(
progressEntries = watchProgressUiState.entries,
watchedItems = effectiveWatchedItems,
isTraktProgressActive = isTraktProgressActive,
preferFurthestEpisode = continueWatchingPreferences.upNextFromFurthestEpisode,
nowEpochMs = WatchProgressClock.nowEpochMs(),
)
}
val completedSeriesCandidates = remember(latestCompletedBySeries) {
latestCompletedBySeries.map { (content, completed) ->
val recentNextUpSeedEntries = remember(
allNextUpSeedEntries,
isTraktProgressActive,
traktSettingsUiState.continueWatchingDaysCap,
) {
filterEntriesForTraktContinueWatchingWindow(
entries = allNextUpSeedEntries,
isTraktProgressActive = isTraktProgressActive,
daysCap = traktSettingsUiState.continueWatchingDaysCap,
nowEpochMs = WatchProgressClock.nowEpochMs(),
)
}
val activeNextUpSeedContentIds = remember(allNextUpSeedEntries) {
allNextUpSeedEntries.mapTo(mutableSetOf()) { entry -> entry.parentMetaId }
}
val currentNextUpSeedByContentId = remember(allNextUpSeedEntries) {
allNextUpSeedEntries.mapNotNull { entry ->
val season = entry.seasonNumber ?: return@mapNotNull null
val episode = entry.episodeNumber ?: return@mapNotNull null
entry.parentMetaId to (season to episode)
}.toMap()
}
val visibleContinueWatchingEntries = remember(effectiveWatchProgressEntries) {
effectiveWatchProgressEntries.continueWatchingEntries()
}
val latestCompletedAtBySeries = remember(allNextUpSeedEntries) {
allNextUpSeedEntries
.groupBy { entry -> entry.parentMetaId }
.mapValues { (_, entries) -> entries.maxOfOrNull { entry -> entry.lastUpdatedEpochMs } ?: Long.MIN_VALUE }
}
val nextUpSuppressedSeriesIds = remember(visibleContinueWatchingEntries, latestCompletedAtBySeries) {
visibleContinueWatchingEntries
.asSequence()
.filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() }
.filter { entry ->
shouldTreatAsActiveInProgressForNextUpSuppression(
progress = entry,
latestCompletedAt = latestCompletedAtBySeries[entry.parentMetaId],
)
}
.map { entry -> entry.parentMetaId }
.filter(String::isNotBlank)
.toSet()
}
val completedSeriesCandidates = remember(recentNextUpSeedEntries, nextUpSuppressedSeriesIds) {
recentNextUpSeedEntries.mapNotNull { seed ->
val season = seed.seasonNumber ?: return@mapNotNull null
val episode = seed.episodeNumber ?: return@mapNotNull null
if (season == 0 || seed.parentMetaId in nextUpSuppressedSeriesIds) return@mapNotNull null
CompletedSeriesCandidate(
content = content,
seasonNumber = completed.seasonNumber,
episodeNumber = completed.episodeNumber,
markedAtEpochMs = completed.markedAtEpochMs,
content = WatchingContentRef(type = seed.parentMetaType, id = seed.parentMetaId),
seasonNumber = season,
episodeNumber = episode,
markedAtEpochMs = seed.lastUpdatedEpochMs,
)
}
}
val completedSeriesContentIds = remember(completedSeriesCandidates) {
completedSeriesCandidates.mapTo(mutableSetOf()) { candidate -> candidate.content.id }
}
val visibleContinueWatchingEntries = remember(
effectiveWatchProgressEntries,
latestCompletedBySeries,
) {
WatchingState.visibleContinueWatchingEntries(
progressEntries = effectiveWatchProgressEntries,
latestCompletedBySeries = latestCompletedBySeries,
)
}
val profileState by ProfileRepository.state.collectAsStateWithLifecycle()
val activeProfileId = profileState.activeProfile?.profileIndex ?: 1
var nextUpItemsBySeries by remember(activeProfileId) { mutableStateOf<Map<String, Pair<Long, ContinueWatchingItem>>>(emptyMap()) }
var processedNextUpContentIds by remember(activeProfileId) { mutableStateOf<Set<String>>(emptySet()) }
val cachedSnapshots = remember(activeProfileId) { ContinueWatchingEnrichmentCache.getSnapshots() }
val cachedNextUpItems = remember(
cachedSnapshots.first,
continueWatchingPreferences.dismissedNextUpKeys,
completedSeriesContentIds,
activeNextUpSeedContentIds,
currentNextUpSeedByContentId,
isTraktProgressActive,
watchProgressUiState.hasLoadedRemoteProgress,
processedNextUpContentIds,
nextUpItemsBySeries,
continueWatchingPreferences.showUnairedNextUp,
watchedUiState.isLoaded,
) {
@ -211,7 +273,32 @@ fun HomeScreen(
if (
!isTraktProgressActive &&
watchedUiState.isLoaded &&
cached.contentId !in completedSeriesContentIds
cached.contentId !in activeNextUpSeedContentIds
) {
return@mapNotNull null
}
if (
isTraktProgressActive &&
watchProgressUiState.hasLoadedRemoteProgress &&
cached.contentId !in activeNextUpSeedContentIds
) {
return@mapNotNull null
}
val currentSeed = currentNextUpSeedByContentId[cached.contentId]
if (
currentSeed != null &&
cached.seedSeason != null &&
cached.seedEpisode != null
) {
val (currentSeason, currentEpisode) = currentSeed
val seedChanged = currentSeason != cached.seedSeason || currentEpisode != cached.seedEpisode
if (seedChanged) return@mapNotNull null
}
if (
isTraktProgressActive &&
watchProgressUiState.hasLoadedRemoteProgress &&
cached.contentId in processedNextUpContentIds &&
cached.contentId !in nextUpItemsBySeries.keys
) {
return@mapNotNull null
}
@ -257,16 +344,55 @@ fun HomeScreen(
visibleContinueWatchingEntries,
cachedInProgressItems,
effectivNextUpItems,
nextUpSuppressedSeriesIds,
continueWatchingPreferences.sortMode,
) {
buildHomeContinueWatchingItems(
visibleEntries = visibleContinueWatchingEntries,
cachedInProgressByVideoId = cachedInProgressItems,
nextUpItemsBySeries = effectivNextUpItems,
nextUpSuppressedSeriesIds = nextUpSuppressedSeriesIds,
sortMode = continueWatchingPreferences.sortMode,
todayIsoDate = CurrentDateProvider.todayIsoDate(),
)
}
LaunchedEffect(
isTraktProgressActive,
traktSettingsUiState.continueWatchingDaysCap,
watchProgressUiState.hasLoadedRemoteProgress,
continueWatchingPreferences.upNextFromFurthestEpisode,
watchProgressUiState.entries,
effectiveWatchProgressEntries,
allNextUpSeedEntries,
recentNextUpSeedEntries,
nextUpSuppressedSeriesIds,
visibleContinueWatchingEntries,
completedSeriesCandidates,
cachedInProgressItems,
cachedNextUpItems,
nextUpItemsBySeries,
processedNextUpContentIds,
effectivNextUpItems,
continueWatchingItems,
) {
homeCwLog.d {
"build summary source=${if (isTraktProgressActive) "trakt" else "nuvio_sync"} " +
"remoteLoaded=${watchProgressUiState.hasLoadedRemoteProgress} " +
"daysCap=${traktSettingsUiState.continueWatchingDaysCap} " +
"raw=${watchProgressUiState.entries.size} rawSources=${watchProgressUiState.entries.debugSourceCounts()} " +
"effective=${effectiveWatchProgressEntries.size} seedAll=${allNextUpSeedEntries.size} " +
"seedRecent=${recentNextUpSeedEntries.size} seedSuppressed=${nextUpSuppressedSeriesIds.size} " +
"useFurthest=${continueWatchingPreferences.upNextFromFurthestEpisode} " +
"visibleInProgress=${visibleContinueWatchingEntries.size} " +
"completedCandidates=${completedSeriesCandidates.size} cachedInProgress=${cachedInProgressItems.size} " +
"cachedNextUp=${cachedNextUpItems.size} liveNextUp=${nextUpItemsBySeries.size} " +
"processedNextUp=${processedNextUpContentIds.size} " +
"effectiveNextUp=${effectivNextUpItems.size} final=${continueWatchingItems.size} " +
"rawItems=${watchProgressUiState.entries.debugWatchProgressSummary()} " +
"completed=${completedSeriesCandidates.debugCompletedSeriesSummary()} " +
"finalItems=${continueWatchingItems.debugContinueWatchingSummary()}"
}
}
val availableManifests = remember(addonsUiState.addons) {
addonsUiState.addons.mapNotNull { addon -> addon.manifest }
}
@ -308,37 +434,75 @@ fun HomeScreen(
continueWatchingPreferences.showUnairedNextUp,
) {
if (completedSeriesCandidates.isEmpty()) {
homeCwLog.d {
"next-up resolve skipped: no completed series candidates " +
"entries=${effectiveWatchProgressEntries.size} sources=${effectiveWatchProgressEntries.debugSourceCounts()}"
}
nextUpItemsBySeries = emptyMap()
processedNextUpContentIds = emptySet()
return@LaunchedEffect
}
if (metaProviderKey.isEmpty()) return@LaunchedEffect
if (metaProviderKey.isEmpty()) {
homeCwLog.d {
"next-up resolve deferred: no meta providers candidates=${completedSeriesCandidates.size} " +
"candidates=${completedSeriesCandidates.debugCompletedSeriesSummary()}"
}
return@LaunchedEffect
}
val todayIsoDate = CurrentDateProvider.todayIsoDate()
val semaphore = Semaphore(4)
homeCwLog.d {
"next-up resolve start candidates=${completedSeriesCandidates.size} " +
"showUnaired=${continueWatchingPreferences.showUnairedNextUp} " +
"metaProviders=${metaProviderKey.size} candidates=${completedSeriesCandidates.debugCompletedSeriesSummary()}"
}
val results = completedSeriesCandidates.map { completedEntry ->
async {
semaphore.withPermit {
val meta = MetaDetailsRepository.fetch(
type = completedEntry.content.type,
id = completedEntry.content.id,
) ?: return@withPermit null
)
if (meta == null) {
homeCwLog.d {
"next-up meta miss content=${completedEntry.debugSummary()} " +
"type=${completedEntry.content.type} id=${completedEntry.content.id}"
}
return@withPermit null
}
val nextEpisode = meta.nextReleasedEpisodeAfter(
seasonNumber = completedEntry.seasonNumber,
episodeNumber = completedEntry.episodeNumber,
todayIsoDate = todayIsoDate,
showUnairedNextUp = continueWatchingPreferences.showUnairedNextUp,
) ?: return@withPermit null
)
if (nextEpisode == null) {
homeCwLog.d {
"next-up no next episode content=${completedEntry.debugSummary()} " +
"videos=${meta.videos.size}"
}
return@withPermit null
}
val item = completedEntry.toContinueWatchingSeed(meta)
.toUpNextContinueWatchingItem(nextEpisode)
if (nextUpDismissKey(item.parentMetaId, item.nextUpSeedSeasonNumber, item.nextUpSeedEpisodeNumber) in continueWatchingPreferences.dismissedNextUpKeys) {
homeCwLog.d { "next-up dismissed item=${item.debugSummary()}" }
return@withPermit null
}
homeCwLog.d {
"next-up built seed=${completedEntry.debugSummary()} item=${item.debugSummary()} " +
"released=${nextEpisode.released}"
}
completedEntry.content.id to (completedEntry.markedAtEpochMs to item)
}
}
}.awaitAll().filterNotNull().toMap()
nextUpItemsBySeries = results
processedNextUpContentIds = completedSeriesCandidates.mapTo(mutableSetOf()) { candidate ->
candidate.content.id
}
val nextUpCache = results.mapNotNull { (contentId, pair) ->
val item = pair.second
@ -389,6 +553,10 @@ fun HomeScreen(
nextUp = nextUpCache,
inProgress = inProgressCache,
)
homeCwLog.d {
"next-up resolve complete results=${results.size} nextUpCache=${nextUpCache.size} " +
"inProgressCache=${inProgressCache.size} items=${results.values.map { it.second }.debugContinueWatchingSummary()}"
}
}
val hasActiveAddons = addonsUiState.addons.any { it.manifest != null }
@ -615,6 +783,8 @@ fun HomeScreen(
private const val HOME_CATALOG_PREVIEW_LIMIT = 18
private const val MILLIS_PER_DAY = 24L * 60L * 60L * 1000L
private const val OPTIMISTIC_NEXT_UP_SEED_WINDOW_MS = 3L * 60L * 1000L
private val homeCwLog = Logger.withTag("HomeCW")
internal fun filterEntriesForTraktContinueWatchingWindow(
entries: List<WatchProgressEntry>,
@ -630,6 +800,169 @@ internal fun filterEntriesForTraktContinueWatchingWindow(
return entries.filter { entry -> entry.lastUpdatedEpochMs >= cutoffMs }
}
private fun buildTvParityNextUpSeedEntries(
progressEntries: List<WatchProgressEntry>,
watchedItems: List<WatchedItem>,
isTraktProgressActive: Boolean,
preferFurthestEpisode: Boolean,
nowEpochMs: Long,
): List<WatchProgressEntry> {
val rawSeeds = if (isTraktProgressActive) {
progressEntries.asSequence()
.filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() }
.filter { entry -> entry.seasonNumber != null && entry.episodeNumber != null && entry.seasonNumber != 0 }
.filter { entry -> shouldUseAsTraktNextUpSeed(entry, nowEpochMs) }
.toList()
} else {
watchedItems.asSequence()
.filter { item -> item.type.isSeriesTypeForContinueWatching() }
.filter { item -> item.season != null && item.episode != null && item.season != 0 }
.filter { item -> !isMalformedNextUpSeedContentId(item.id) }
.map { item -> item.toNextUpSeedEntry() }
.toList()
}
return if (isTraktProgressActive) {
mergeTvTraktNextUpSeeds(rawSeeds)
} else {
rawSeeds
.groupBy { entry -> nextUpSeedKey(entry) }
.mapNotNull { (_, entries) ->
choosePreferredNextUpSeed(
entries = entries,
preferFurthestEpisode = preferFurthestEpisode,
)
}
.sortedByDescending { entry -> entry.lastUpdatedEpochMs }
}
}
private fun shouldUseAsTraktNextUpSeed(
entry: WatchProgressEntry,
nowEpochMs: Long,
): Boolean {
if (!entry.shouldUseAsCompletedSeedForContinueWatching()) return false
if (entry.source != WatchProgressSourceTraktPlayback) return true
val ageMs = nowEpochMs - entry.lastUpdatedEpochMs
return ageMs in 0..OPTIMISTIC_NEXT_UP_SEED_WINDOW_MS
}
private fun WatchedItem.toNextUpSeedEntry(): WatchProgressEntry =
WatchProgressEntry(
contentType = type,
parentMetaId = id,
parentMetaType = type,
videoId = id,
title = name,
poster = poster,
seasonNumber = season,
episodeNumber = episode,
lastPositionMs = 1L,
durationMs = 1L,
lastUpdatedEpochMs = markedAtEpochMs,
isCompleted = true,
progressPercent = 100f,
source = WatchProgressSourceLocal,
)
private fun nextUpSeedKey(entry: WatchProgressEntry): String =
entry.parentMetaId.trim()
private fun mergeTvTraktNextUpSeeds(entries: List<WatchProgressEntry>): List<WatchProgressEntry> {
val merged = linkedMapOf<String, WatchProgressEntry>()
entries
.filter { entry -> entry.source == WatchProgressSourceTraktShowProgress }
.forEach { seed ->
merged[nextUpSeedKey(seed)] = seed
}
entries
.filter { entry -> entry.source == WatchProgressSourceTraktHistory || entry.source == WatchProgressSourceTraktPlayback }
.forEach { seed ->
val key = nextUpSeedKey(seed)
val existing = merged[key]
if (existing == null || shouldReplaceNextUpSeed(existing, seed)) {
merged[key] = seed
}
}
return merged.values.sortedByDescending { entry -> entry.lastUpdatedEpochMs }
}
private fun shouldReplaceNextUpSeed(
existing: WatchProgressEntry,
candidate: WatchProgressEntry,
): Boolean {
val candidateSeason = candidate.seasonNumber ?: -1
val candidateEpisode = candidate.episodeNumber ?: -1
val existingSeason = existing.seasonNumber ?: -1
val existingEpisode = existing.episodeNumber ?: -1
return candidateSeason > existingSeason ||
(
candidateSeason == existingSeason &&
(
candidateEpisode > existingEpisode ||
(
candidateEpisode == existingEpisode &&
candidate.lastUpdatedEpochMs >= existing.lastUpdatedEpochMs
)
)
)
}
private fun choosePreferredNextUpSeed(
entries: List<WatchProgressEntry>,
preferFurthestEpisode: Boolean,
): WatchProgressEntry? {
if (entries.isEmpty()) return null
val bestRank = entries.minOf(::nextUpSeedSourceRank)
return entries
.asSequence()
.filter { entry -> nextUpSeedSourceRank(entry) == bestRank }
.maxWithOrNull(
if (preferFurthestEpisode) {
compareBy<WatchProgressEntry>(
{ it.seasonNumber ?: -1 },
{ it.episodeNumber ?: -1 },
{ it.lastUpdatedEpochMs },
)
} else {
compareBy<WatchProgressEntry>(
{ it.lastUpdatedEpochMs },
{ it.seasonNumber ?: -1 },
{ it.episodeNumber ?: -1 },
)
},
)
}
private fun nextUpSeedSourceRank(entry: WatchProgressEntry): Int =
when (entry.source) {
WatchProgressSourceTraktPlayback,
WatchProgressSourceTraktShowProgress,
-> 0
WatchProgressSourceTraktHistory -> 1
WatchProgressSourceLocal -> 2
else -> 4
}
private fun shouldTreatAsActiveInProgressForNextUpSuppression(
progress: WatchProgressEntry,
latestCompletedAt: Long?,
): Boolean {
if (!progress.shouldTreatAsInProgressForContinueWatching()) return false
if (latestCompletedAt == null || latestCompletedAt == Long.MIN_VALUE) return true
return progress.lastUpdatedEpochMs >= latestCompletedAt
}
private fun isMalformedNextUpSeedContentId(contentId: String?): Boolean {
val trimmed = contentId?.trim().orEmpty()
if (trimmed.isEmpty()) return true
return when (trimmed.lowercase()) {
"tmdb", "imdb", "trakt", "tmdb:", "imdb:", "trakt:" -> true
else -> false
}
}
private fun heroMobileBelowSectionHeightHint(
maxWidthDp: Float,
continueWatchingVisible: Boolean,
@ -652,15 +985,17 @@ internal fun buildHomeContinueWatchingItems(
visibleEntries: List<WatchProgressEntry>,
cachedInProgressByVideoId: Map<String, ContinueWatchingItem> = emptyMap(),
nextUpItemsBySeries: Map<String, Pair<Long, ContinueWatchingItem>>,
nextUpSuppressedSeriesIds: Set<String>? = null,
sortMode: ContinueWatchingSortMode = ContinueWatchingSortMode.DEFAULT,
todayIsoDate: String = "",
): List<ContinueWatchingItem> {
val inProgressSeriesIds = visibleEntries
.asSequence()
.filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() }
.map { entry -> entry.parentMetaId }
.filter(String::isNotBlank)
.toSet()
val suppressedSeriesIds = nextUpSuppressedSeriesIds
?: visibleEntries
.asSequence()
.filter { entry -> entry.parentMetaType.isSeriesTypeForContinueWatching() }
.map { entry -> entry.parentMetaId }
.filter(String::isNotBlank)
.toSet()
val candidates = buildList {
addAll(
@ -675,7 +1010,7 @@ internal fun buildHomeContinueWatchingItems(
)
addAll(
nextUpItemsBySeries.values.mapNotNull { (lastUpdatedEpochMs, item) ->
if (item.parentMetaId in inProgressSeriesIds) return@mapNotNull null
if (item.parentMetaId in suppressedSeriesIds) return@mapNotNull null
HomeContinueWatchingCandidate(
lastUpdatedEpochMs = lastUpdatedEpochMs,
item = item,
@ -866,3 +1201,83 @@ private fun ContinueWatchingItem.withFallbackMetadata(
released = released ?: fallback.released,
)
}
private fun WatchProgressEntry.debugSummary(): String =
buildString {
append(parentMetaType)
append(":")
append(parentMetaId)
if (seasonNumber != null || episodeNumber != null) {
append(" s=")
append(seasonNumber)
append(" e=")
append(episodeNumber)
}
append(" video=")
append(videoId)
append(" pct=")
append(progressPercent)
append(" completed=")
append(isCompleted)
append(" effectiveCompleted=")
append(isEffectivelyCompleted)
append(" src=")
append(source)
append(" last=")
append(lastUpdatedEpochMs)
}
private fun Collection<WatchProgressEntry>.debugWatchProgressSummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" }
private fun Collection<WatchProgressEntry>.debugSourceCounts(): String =
groupingBy { it.source }
.eachCount()
.entries
.sortedBy { it.key }
.joinToString(separator = ",") { "${it.key}=${it.value}" }
.ifBlank { "none" }
private fun CompletedSeriesCandidate.debugSummary(): String =
buildString {
append(content.type)
append(":")
append(content.id)
append(" s=")
append(seasonNumber)
append(" e=")
append(episodeNumber)
append(" marked=")
append(markedAtEpochMs)
}
private fun Collection<CompletedSeriesCandidate>.debugCompletedSeriesSummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" }
private fun ContinueWatchingItem.debugSummary(): String =
buildString {
append(if (isNextUp) "next_up" else "in_progress")
append(":")
append(parentMetaType)
append(":")
append(parentMetaId)
if (seasonNumber != null || episodeNumber != null) {
append(" s=")
append(seasonNumber)
append(" e=")
append(episodeNumber)
}
append(" video=")
append(videoId)
append(" seed=")
append(nextUpSeedSeasonNumber)
append("x")
append(nextUpSeedEpisodeNumber)
append(" progress=")
append(progressFraction)
append(" resume=")
append(resumePositionMs)
}
private fun Collection<ContinueWatchingItem>.debugContinueWatchingSummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" }

View file

@ -13,6 +13,7 @@ import com.nuvio.app.features.watchprogress.buildPlaybackVideoId
import com.nuvio.app.features.watchprogress.shouldTreatAsInProgressForContinueWatching
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
@ -22,7 +23,9 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
@ -44,6 +47,7 @@ data class TraktProgressUiState(
val entries: List<WatchProgressEntry> = emptyList(),
val isLoading: Boolean = false,
val errorMessage: String? = null,
val hasLoadedRemoteProgress: Boolean = false,
)
object TraktProgressRepository {
@ -56,6 +60,8 @@ object TraktProgressRepository {
private var hasLoaded = false
private var refreshRequestId: Long = 0L
private val refreshJobMutex = Mutex()
private var inFlightRefresh: Deferred<Unit>? = null
fun ensureLoaded() {
if (hasLoaded) return
@ -82,14 +88,35 @@ object TraktProgressRepository {
}
suspend fun refreshNow() {
ensureLoaded()
val refresh = refreshJobMutex.withLock {
inFlightRefresh?.takeIf { it.isActive } ?: scope.async {
refreshNowInternal()
}.also { inFlightRefresh = it }
}
try {
refresh.await()
} finally {
refreshJobMutex.withLock {
if (inFlightRefresh == refresh && refresh.isCompleted) {
inFlightRefresh = null
}
}
}
}
private suspend fun refreshNowInternal() {
ensureLoaded()
val requestId = nextRefreshRequestId()
val headers = TraktAuthRepository.authorizedHeaders()
if (headers == null) {
log.d { "refreshNow request=$requestId skipped: missing authorized headers" }
_uiState.value = TraktProgressUiState()
return
}
log.d { "refreshNow request=$requestId start currentEntries=${_uiState.value.entries.size}" }
_uiState.value = _uiState.value.copy(isLoading = true, errorMessage = null)
val playbackEntries = runCatching {
@ -109,34 +136,56 @@ object TraktProgressRepository {
_uiState.value = TraktProgressUiState(
entries = playbackEntries,
isLoading = false,
isLoading = true,
errorMessage = null,
hasLoadedRemoteProgress = false,
)
log.d {
"refreshNow request=$requestId playback applied entries=${playbackEntries.size} " +
"sources=${playbackEntries.debugSourceCounts()} items=${playbackEntries.debugWatchProgressSummary()}"
}
if (playbackEntries.isNotEmpty()) {
launchHydration(requestId = requestId, entries = playbackEntries)
}
scope.launch {
val completedEntries = runCatching {
fetchHistoryEntries(headers) + fetchWatchedShowSeedEntries(headers)
}.onFailure { error ->
if (error is CancellationException) throw error
log.w { "Failed to fetch Trakt history snapshot: ${error.message}" }
}.getOrNull() ?: return@launch
val completedEntries = runCatching {
coroutineScope {
val history = async { fetchHistoryEntries(headers) }
val watchedShowSeeds = async { fetchWatchedShowSeedEntries(headers) }
history.await() + watchedShowSeeds.await()
}
}.onFailure { error ->
if (error is CancellationException) throw error
log.w { "Failed to fetch Trakt history snapshot: ${error.message}" }
}.getOrNull()
if (!isLatestRefreshRequest(requestId)) return@launch
val merged = mergeNewestByVideoId(playbackEntries + completedEntries)
if (completedEntries == null) {
_uiState.value = _uiState.value.copy(
entries = merged.sortedByDescending { it.lastUpdatedEpochMs },
isLoading = false,
errorMessage = null,
hasLoadedRemoteProgress = false,
)
return
}
if (merged.isNotEmpty()) {
launchHydration(requestId = requestId, entries = merged)
}
if (!isLatestRefreshRequest(requestId)) return
val merged = mergeNewestByVideoId(playbackEntries + completedEntries)
_uiState.value = _uiState.value.copy(
entries = merged.sortedByDescending { it.lastUpdatedEpochMs },
isLoading = false,
errorMessage = null,
hasLoadedRemoteProgress = true,
)
log.d {
"refreshNow request=$requestId completed snapshot applied " +
"completedEntries=${completedEntries.size} merged=${merged.size} " +
"sources=${merged.debugSourceCounts()} items=${merged.debugWatchProgressSummary()}"
}
if (merged.isNotEmpty()) {
launchHydration(requestId = requestId, entries = merged)
}
}
@ -163,6 +212,10 @@ object TraktProgressRepository {
isLoading = false,
errorMessage = null,
)
log.d {
"hydrate request=$requestId applied hydrated=${hydrated.size} merged=${merged.size} " +
"items=${merged.debugWatchProgressSummary()}"
}
}
}
@ -175,6 +228,10 @@ object TraktProgressRepository {
current[normalizedEntry.videoId] = normalizedEntry
}
_uiState.value = _uiState.value.copy(entries = current.values.sortedByDescending { it.lastUpdatedEpochMs })
log.d {
"optimistic progress applied entry=${normalizedEntry.debugSummary()} " +
"entries=${_uiState.value.entries.size}"
}
}
fun applyOptimisticRemoval(videoId: String) {
@ -182,6 +239,7 @@ object TraktProgressRepository {
if (videoId.isBlank()) return
val filtered = _uiState.value.entries.filterNot { it.videoId == videoId }
_uiState.value = _uiState.value.copy(entries = filtered)
log.d { "optimistic removal videoId=$videoId entries=${filtered.size}" }
}
fun applyOptimisticRemoval(
@ -202,6 +260,10 @@ object TraktProgressRepository {
}
}
_uiState.value = _uiState.value.copy(entries = filtered)
log.d {
"optimistic removal contentId=$normalizedContentId s=$seasonNumber e=$episodeNumber " +
"entries=${filtered.size}"
}
}
suspend fun removeProgress(
@ -213,6 +275,7 @@ object TraktProgressRepository {
if (normalizedContentId.isBlank()) return
val headers = TraktAuthRepository.authorizedHeaders() ?: return
log.d { "removeProgress start contentId=$normalizedContentId s=$seasonNumber e=$episodeNumber" }
applyOptimisticRemoval(
contentId = normalizedContentId,
seasonNumber = seasonNumber,
@ -280,10 +343,12 @@ object TraktProgressRepository {
}
}
log.d { "removeProgress complete contentId=$normalizedContentId refreshing" }
refreshNow()
}
private suspend fun fetchPlaybackEntries(headers: Map<String, String>): List<WatchProgressEntry> = withContext(Dispatchers.Default) {
log.d { "fetchPlaybackEntries start" }
val payloads = coroutineScope {
val moviesPayload = async {
httpGetTextWithHeaders(
@ -306,6 +371,10 @@ object TraktProgressRepository {
val moviePlayback = json.decodeFromString<List<TraktPlaybackItem>>(moviesPayload)
val episodePlayback = json.decodeFromString<List<TraktPlaybackItem>>(episodesPayload)
log.d {
"fetchPlaybackEntries raw movies=${moviePlayback.size} episodes=${episodePlayback.size} " +
"movieItems=${moviePlayback.debugPlaybackSummary()} episodeItems=${episodePlayback.debugPlaybackSummary()}"
}
val inProgressMovies = moviePlayback.mapIndexedNotNull { index, item ->
mapPlaybackMovie(item = item, fallbackIndex = index)
@ -314,10 +383,16 @@ object TraktProgressRepository {
mapPlaybackEpisode(item = item, fallbackIndex = index)
}
mergeNewestByVideoId(inProgressMovies + inProgressEpisodes)
val merged = mergeNewestByVideoId(inProgressMovies + inProgressEpisodes)
log.d {
"fetchPlaybackEntries mapped movies=${inProgressMovies.size} episodes=${inProgressEpisodes.size} " +
"merged=${merged.size} items=${merged.debugWatchProgressSummary()}"
}
merged
}
private suspend fun fetchHistoryEntries(headers: Map<String, String>): List<WatchProgressEntry> = withContext(Dispatchers.Default) {
log.d { "fetchHistoryEntries start limit=$HISTORY_LIMIT" }
val payloads = coroutineScope {
val historyPayload = async {
httpGetTextWithHeaders(
@ -339,6 +414,10 @@ object TraktProgressRepository {
val movieHistoryPayload = payloads[1]
val episodeHistory = json.decodeFromString<List<TraktHistoryEpisodeItem>>(historyPayload)
val movieHistory = json.decodeFromString<List<TraktHistoryMovieItem>>(movieHistoryPayload)
log.d {
"fetchHistoryEntries raw episodes=${episodeHistory.size} movies=${movieHistory.size} " +
"episodeItems=${episodeHistory.debugHistoryEpisodeSummary()} movieItems=${movieHistory.debugHistoryMovieSummary()}"
}
val completedEpisodes = episodeHistory
.mapIndexedNotNull { index, item -> mapHistoryEpisode(item = item, fallbackIndex = index) }
@ -347,7 +426,12 @@ object TraktProgressRepository {
.mapIndexedNotNull { index, item -> mapHistoryMovie(item = item, fallbackIndex = index) }
.distinctBy { entry -> entry.videoId }
mergeNewestByVideoId(completedEpisodes + completedMovies)
val merged = mergeNewestByVideoId(completedEpisodes + completedMovies)
log.d {
"fetchHistoryEntries mapped episodes=${completedEpisodes.size} movies=${completedMovies.size} " +
"merged=${merged.size} items=${merged.debugWatchProgressSummary()}"
}
merged
}
private suspend fun fetchWatchedShowSeedEntries(
@ -360,7 +444,11 @@ object TraktProgressRepository {
headers = headers,
)
val watchedShows = json.decodeFromString<List<TraktWatchedShowItem>>(payload)
watchedShows
log.d {
"fetchWatchedShowSeedEntries raw shows=${watchedShows.size} " +
"items=${watchedShows.debugWatchedShowSummary()}"
}
val mapped = watchedShows
.mapNotNull { item ->
mapWatchedShowSeed(
item = item,
@ -368,6 +456,11 @@ object TraktProgressRepository {
)
}
.sortedByDescending { entry -> entry.lastUpdatedEpochMs }
log.d {
"fetchWatchedShowSeedEntries mapped=${mapped.size} useFurthest=$useFurthestEpisode " +
"items=${mapped.debugWatchProgressSummary()}"
}
mapped
}
private fun mergeNewestByVideoId(entries: List<WatchProgressEntry>): List<WatchProgressEntry> {
@ -436,6 +529,8 @@ object TraktProgressRepository {
private fun invalidateInFlightRefreshes() {
refreshRequestId += 1L
inFlightRefresh?.cancel()
inFlightRefresh = null
}
private fun isLatestRefreshRequest(requestId: Long): Boolean = refreshRequestId == requestId
@ -819,3 +914,165 @@ private data class TraktEpisode(
@SerialName("number") val number: Int? = null,
@SerialName("ids") val ids: TraktExternalIds? = null,
)
private fun WatchProgressEntry.debugSummary(): String =
buildString {
append(parentMetaType)
append(":")
append(parentMetaId)
if (seasonNumber != null || episodeNumber != null) {
append(" s=")
append(seasonNumber)
append(" e=")
append(episodeNumber)
}
append(" video=")
append(videoId)
append(" pct=")
append(progressPercent)
append(" completed=")
append(isCompleted)
append(" effectiveCompleted=")
append(isEffectivelyCompleted)
append(" src=")
append(source)
append(" last=")
append(lastUpdatedEpochMs)
}
private fun Collection<WatchProgressEntry>.debugWatchProgressSummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" }
private fun Collection<WatchProgressEntry>.debugSourceCounts(): String =
groupingBy { it.source }
.eachCount()
.entries
.sortedBy { it.key }
.joinToString(separator = ",") { "${it.key}=${it.value}" }
.ifBlank { "none" }
private fun Collection<TraktPlaybackItem>.debugPlaybackSummary(limit: Int = 8): String =
take(limit).joinToString(separator = " | ") { item ->
val media = item.movie ?: item.show
val episode = item.episode
buildString {
append(media?.title ?: "unknown")
append(" ids=")
append(media?.ids.debugIds())
if (episode != null) {
append(" ep=")
append(episode.season)
append("x")
append(episode.number)
append(" epIds=")
append(episode.ids.debugIds())
}
append(" progress=")
append(item.progress)
append(" pausedAt=")
append(item.pausedAt)
append(" playbackId=")
append(item.id)
}
}.ifBlank { "none" }
private fun Collection<TraktHistoryEpisodeItem>.debugHistoryEpisodeSummary(limit: Int = 8): String =
take(limit).joinToString(separator = " | ") { item ->
buildString {
append(item.show?.title ?: "unknown")
append(" ids=")
append(item.show?.ids.debugIds())
append(" ep=")
append(item.episode?.season)
append("x")
append(item.episode?.number)
append(" epIds=")
append(item.episode?.ids.debugIds())
append(" watchedAt=")
append(item.watchedAt)
}
}.ifBlank { "none" }
private fun Collection<TraktHistoryMovieItem>.debugHistoryMovieSummary(limit: Int = 8): String =
take(limit).joinToString(separator = " | ") { item ->
buildString {
append(item.movie?.title ?: "unknown")
append(" ids=")
append(item.movie?.ids.debugIds())
append(" watchedAt=")
append(item.watchedAt)
}
}.ifBlank { "none" }
private fun Collection<TraktWatchedShowItem>.debugWatchedShowSummary(limit: Int = 8): String =
take(limit).joinToString(separator = " | ") { item ->
val episodeCount = item.seasons.orEmpty().sumOf { season ->
season.episodes.orEmpty().count { episode ->
(episode.number ?: 0) > 0 && (episode.plays ?: 1) > 0
}
}
val latest = item.seasons.orEmpty()
.flatMap { season ->
val seasonNumber = season.number
season.episodes.orEmpty().mapNotNull { episode ->
val episodeNumber = episode.number ?: return@mapNotNull null
val watchedAt = episode.lastWatchedAt ?: item.lastWatchedAt
TraktWatchedShowEpisodeSeed(
season = seasonNumber ?: 0,
episode = episodeNumber,
watchedAt = watchedAt
?.let { value ->
runCatching { TraktPlatformClock.parseIsoDateTimeToEpochMs(value) }.getOrNull()
}
?: 0L,
)
}
}
.maxWithOrNull(
compareBy<TraktWatchedShowEpisodeSeed>(
{ it.watchedAt },
{ it.season },
{ it.episode },
),
)
buildString {
append(item.show?.title ?: "unknown")
append(" ids=")
append(item.show?.ids.debugIds())
append(" episodes=")
append(episodeCount)
append(" latest=")
append(latest?.season)
append("x")
append(latest?.episode)
append(" lastWatchedAt=")
append(item.lastWatchedAt)
}
}.ifBlank { "none" }
private fun TraktExternalIds?.debugIds(): String =
if (this == null) {
"none"
} else {
buildString {
imdb?.takeIf { it.isNotBlank() }?.let {
append("imdb:")
append(it)
}
tmdb?.let {
if (isNotEmpty()) append(",")
append("tmdb:")
append(it)
}
trakt?.let {
if (isNotEmpty()) append(",")
append("trakt:")
append(it)
}
slug?.takeIf { it.isNotBlank() }?.let {
if (isNotEmpty()) append(",")
append("slug:")
append(it)
}
}.ifBlank { "none" }
}

View file

@ -1,5 +1,6 @@
package com.nuvio.app.features.watching.sync
import co.touchlab.kermit.Logger
import com.nuvio.app.core.network.SupabaseProvider
import com.nuvio.app.features.watchprogress.WatchProgressEntry
import io.github.jan.supabase.postgrest.postgrest
@ -12,12 +13,14 @@ import kotlinx.serialization.json.encodeToJsonElement
import kotlinx.serialization.json.put
object SupabaseProgressSyncAdapter : ProgressSyncAdapter {
private val log = Logger.withTag("NuvioSyncProgress")
private val json = Json {
ignoreUnknownKeys = true
encodeDefaults = true
}
override suspend fun pull(profileId: Int): List<ProgressSyncRecord> {
log.d { "pull start profileId=$profileId" }
val params = buildJsonObject { put("p_profile_id", profileId) }
val result = SupabaseProvider.client.postgrest.rpc("sync_pull_watch_progress", params)
val serverEntries = result.decodeList<WatchProgressSyncEntry>()
@ -33,6 +36,10 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter {
lastWatched = entry.lastWatched,
)
}
log.d {
"pull returned raw=${serverEntries.size} records=${records.size} " +
"items=${records.debugProgressRecordSummary()}"
}
return records
}
@ -40,6 +47,10 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter {
profileId: Int,
entries: Collection<WatchProgressEntry>,
) {
log.d {
"push start profileId=$profileId entries=${entries.size} " +
"items=${entries.debugWatchProgressEntrySummary()}"
}
val syncEntries = entries.map { entry ->
WatchProgressSyncEntry(
contentId = entry.parentMetaId,
@ -58,12 +69,17 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter {
put("p_entries", json.encodeToJsonElement(syncEntries))
}
SupabaseProvider.client.postgrest.rpc("sync_push_watch_progress", params)
log.d { "push complete profileId=$profileId entries=${syncEntries.size}" }
}
override suspend fun delete(
profileId: Int,
entries: Collection<WatchProgressEntry>,
) {
log.d {
"delete start profileId=$profileId entries=${entries.size} " +
"items=${entries.debugWatchProgressEntrySummary()}"
}
val progressKeys = entries.map { entry ->
if (entry.seasonNumber != null && entry.episodeNumber != null) {
"${entry.parentMetaId}_s${entry.seasonNumber}e${entry.episodeNumber}"
@ -76,6 +92,7 @@ object SupabaseProgressSyncAdapter : ProgressSyncAdapter {
put("p_keys", json.encodeToJsonElement(progressKeys))
}
SupabaseProvider.client.postgrest.rpc("sync_delete_watch_progress", params)
log.d { "delete complete profileId=$profileId keys=${progressKeys.joinToString(limit = 12)}" }
}
private fun progressKeyForEntry(entry: WatchProgressEntry): String =
@ -98,3 +115,53 @@ private data class WatchProgressSyncEntry(
@SerialName("last_watched") val lastWatched: Long = 0,
@SerialName("progress_key") val progressKey: String = "",
)
private fun Collection<ProgressSyncRecord>.debugProgressRecordSummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { record ->
buildString {
append(record.contentType)
append(":")
append(record.contentId)
if (record.season != null || record.episode != null) {
append(" s=")
append(record.season)
append(" e=")
append(record.episode)
}
append(" video=")
append(record.videoId)
append(" pos=")
append(record.position)
append(" dur=")
append(record.duration)
append(" last=")
append(record.lastWatched)
}
}.ifBlank { "none" }
private fun Collection<WatchProgressEntry>.debugWatchProgressEntrySummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { entry ->
buildString {
append(entry.parentMetaType)
append(":")
append(entry.parentMetaId)
if (entry.seasonNumber != null || entry.episodeNumber != null) {
append(" s=")
append(entry.seasonNumber)
append(" e=")
append(entry.episodeNumber)
}
append(" video=")
append(entry.videoId)
append(" pos=")
append(entry.lastPositionMs)
append(" dur=")
append(entry.durationMs)
append(" pct=")
append(entry.progressPercent)
append(" completed=")
append(entry.isCompleted)
append(" last=")
append(entry.lastUpdatedEpochMs)
}
}.ifBlank { "none" }

View file

@ -118,6 +118,7 @@ data class WatchProgressEntry(
data class WatchProgressUiState(
val entries: List<WatchProgressEntry> = emptyList(),
val hasLoadedRemoteProgress: Boolean = false,
) {
val byVideoId: Map<String, WatchProgressEntry>
get() = entries.associateBy { it.videoId }

View file

@ -1,6 +1,8 @@
package com.nuvio.app.features.watchprogress
import co.touchlab.kermit.Logger
import com.nuvio.app.core.auth.AuthRepository
import com.nuvio.app.core.auth.AuthState
import com.nuvio.app.features.addons.AddonRepository
import com.nuvio.app.features.details.MetaDetailsRepository
import com.nuvio.app.features.player.PlayerPlaybackSnapshot
@ -11,11 +13,14 @@ import com.nuvio.app.features.trakt.TraktSettingsRepository
import com.nuvio.app.features.trakt.shouldUseTraktProgress as shouldUseTraktProgressSource
import com.nuvio.app.features.watching.application.WatchingActions
import com.nuvio.app.features.watching.sync.ProgressSyncAdapter
import com.nuvio.app.features.watching.sync.ProgressSyncRecord
import com.nuvio.app.features.watching.sync.SupabaseProgressSyncAdapter
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
@ -23,6 +28,8 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
private const val NUVIO_SYNC_PERIODIC_INTERVAL_MS = 5L * 60L * 1000L
object WatchProgressRepository {
private val syncScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val log = Logger.withTag("WatchProgressRepository")
@ -34,6 +41,8 @@ object WatchProgressRepository {
private var currentProfileId: Int = 1
private var entriesByVideoId: MutableMap<String, WatchProgressEntry> = mutableMapOf()
private var metadataResolutionJob: Job? = null
private var isPullingNuvioSyncFromServer = false
private var hasCompletedInitialNuvioSyncPull = false
internal var syncAdapter: ProgressSyncAdapter = SupabaseProgressSyncAdapter
init {
@ -45,7 +54,10 @@ object WatchProgressRepository {
)
) {
runCatching { TraktProgressRepository.refreshNow() }
.onFailure { error -> log.w { "Failed to refresh Trakt progress after auth: ${error.message}" } }
.onFailure { error ->
if (error is CancellationException) throw error
log.w { "Failed to refresh Trakt progress after auth: ${error.message}" }
}
}
publish()
}
@ -59,7 +71,10 @@ object WatchProgressRepository {
)
) {
runCatching { TraktProgressRepository.refreshNow() }
.onFailure { error -> log.w { "Failed to refresh Trakt progress after source change: ${error.message}" } }
.onFailure { error ->
if (error is CancellationException) throw error
log.w { "Failed to refresh Trakt progress after source change: ${error.message}" }
}
}
publish()
}
@ -72,6 +87,35 @@ object WatchProgressRepository {
}
}
}
syncScope.launch {
while (true) {
delay(NUVIO_SYNC_PERIODIC_INTERVAL_MS)
TraktAuthRepository.ensureLoaded()
TraktSettingsRepository.ensureLoaded()
if (shouldUseTraktProgress()) continue
val authState = AuthRepository.state.value
if (authState !is AuthState.Authenticated || authState.isAnonymous) continue
if (!hasCompletedInitialNuvioSyncPull || isPullingNuvioSyncFromServer) continue
log.d {
"periodic NuvioSync pull start profileId=${ProfileRepository.activeProfileId} " +
"entries=${entriesByVideoId.size}"
}
runCatching { pullFromServer(ProfileRepository.activeProfileId) }
.onSuccess {
log.d {
"periodic NuvioSync pull complete profileId=${ProfileRepository.activeProfileId} " +
"entries=${entriesByVideoId.size}"
}
}
.onFailure { error ->
if (error is CancellationException) throw error
log.w { "Periodic NuvioSync pull failed: ${error.message}" }
}
}
}
}
fun ensureLoaded() {
@ -127,57 +171,93 @@ object WatchProgressRepository {
currentProfileId = profileId
val useTraktProgress = shouldUseTraktProgress()
if (useTraktProgress) {
runCatching { TraktProgressRepository.refreshNow() }
.onFailure { e -> log.e(e) { "Failed to pull Trakt progress" } }
publish()
return
log.d {
"pullFromServer start profileId=$profileId source=${if (useTraktProgress) "trakt" else "nuvio_sync"} " +
"localEntries=${entriesByVideoId.size}"
}
runCatching {
val serverEntries = syncAdapter.pull(profileId = profileId)
if (!useTraktProgress && isPullingNuvioSyncFromServer) {
log.d { "pullFromServer NuvioSync skipped: pull already in flight profileId=$profileId" }
return
}
if (!useTraktProgress) {
isPullingNuvioSyncFromServer = true
}
val oldLocal = entriesByVideoId.toMap()
val newMap = mutableMapOf<String, WatchProgressEntry>()
serverEntries.forEach { entry ->
val videoId = entry.videoId
val cached = oldLocal[videoId]
newMap[videoId] = WatchProgressEntry(
contentType = entry.contentType,
parentMetaId = entry.contentId,
parentMetaType = cached?.parentMetaType ?: entry.contentType,
videoId = videoId,
title = cached?.title?.takeIf { it.isNotBlank() } ?: entry.contentId,
logo = cached?.logo,
poster = cached?.poster,
background = cached?.background,
seasonNumber = entry.season,
episodeNumber = entry.episode,
episodeTitle = cached?.episodeTitle,
episodeThumbnail = cached?.episodeThumbnail,
lastPositionMs = entry.position,
durationMs = entry.duration,
lastUpdatedEpochMs = entry.lastWatched,
providerName = cached?.providerName,
providerAddonId = cached?.providerAddonId,
lastStreamTitle = cached?.lastStreamTitle,
lastStreamSubtitle = cached?.lastStreamSubtitle,
pauseDescription = cached?.pauseDescription,
lastSourceUrl = cached?.lastSourceUrl,
isCompleted = isWatchProgressComplete(entry.position, entry.duration, false),
)
try {
if (useTraktProgress) {
runCatching { TraktProgressRepository.refreshNow() }
.onFailure { e ->
if (e is CancellationException) throw e
log.e(e) { "Failed to pull Trakt progress" }
}
publish()
log.d {
"pullFromServer trakt complete entries=${TraktProgressRepository.uiState.value.entries.size} " +
"sources=${TraktProgressRepository.uiState.value.entries.debugSourceCounts()} " +
"items=${TraktProgressRepository.uiState.value.entries.debugWatchProgressEntrySummary()}"
}
return
}
entriesByVideoId = newMap
hasLoaded = true
publish()
persist()
runCatching {
val serverEntries = syncAdapter.pull(profileId = profileId)
log.d {
"pullFromServer NuvioSync returned ${serverEntries.size} records " +
"items=${serverEntries.debugProgressRecordSummary()}"
}
resolveRemoteMetadata()
}.onFailure { e ->
log.e(e) { "Failed to pull watch progress from server" }
val oldLocal = entriesByVideoId.toMap()
val newMap = mutableMapOf<String, WatchProgressEntry>()
serverEntries.forEach { entry ->
val videoId = entry.videoId
val cached = oldLocal[videoId]
newMap[videoId] = WatchProgressEntry(
contentType = entry.contentType,
parentMetaId = entry.contentId,
parentMetaType = cached?.parentMetaType ?: entry.contentType,
videoId = videoId,
title = cached?.title?.takeIf { it.isNotBlank() } ?: entry.contentId,
logo = cached?.logo,
poster = cached?.poster,
background = cached?.background,
seasonNumber = entry.season,
episodeNumber = entry.episode,
episodeTitle = cached?.episodeTitle,
episodeThumbnail = cached?.episodeThumbnail,
lastPositionMs = entry.position,
durationMs = entry.duration,
lastUpdatedEpochMs = entry.lastWatched,
providerName = cached?.providerName,
providerAddonId = cached?.providerAddonId,
lastStreamTitle = cached?.lastStreamTitle,
lastStreamSubtitle = cached?.lastStreamSubtitle,
pauseDescription = cached?.pauseDescription,
lastSourceUrl = cached?.lastSourceUrl,
isCompleted = isWatchProgressComplete(entry.position, entry.duration, false),
)
}
entriesByVideoId = newMap
hasLoaded = true
hasCompletedInitialNuvioSyncPull = true
publish()
persist()
log.d {
"pullFromServer NuvioSync applied entries=${entriesByVideoId.size} " +
"items=${entriesByVideoId.values.debugWatchProgressEntrySummary()}"
}
resolveRemoteMetadata()
}.onFailure { e ->
if (e is CancellationException) throw e
log.e(e) { "Failed to pull watch progress from server" }
}
} finally {
if (!useTraktProgress) {
isPullingNuvioSyncFromServer = false
}
}
}
@ -186,7 +266,15 @@ object WatchProgressRepository {
.filter { it.poster.isNullOrBlank() || it.background.isNullOrBlank() }
.groupBy { it.parentMetaId to it.contentType }
if (needsResolution.isEmpty()) return
if (needsResolution.isEmpty()) {
log.d { "resolveRemoteMetadata skipped: all entries have artwork" }
return
}
log.d {
"resolveRemoteMetadata start groups=${needsResolution.size} " +
"entries=${needsResolution.values.sumOf { it.size }} " +
"keys=${needsResolution.keys.joinToString(limit = 12) { (metaId, type) -> "$type:$metaId" }}"
}
metadataResolutionJob?.cancel()
metadataResolutionJob = syncScope.launch {
@ -201,7 +289,11 @@ object WatchProgressRepository {
val (metaId, metaType) = key
val meta = runCatching {
MetaDetailsRepository.fetch(metaType, metaId)
}.getOrNull() ?: continue
}.getOrNull()
if (meta == null) {
log.d { "resolveRemoteMetadata miss type=$metaType id=$metaId entries=${entries.size}" }
continue
}
for (entry in entries) {
val episodeVideo = if (entry.seasonNumber != null && entry.episodeNumber != null) {
@ -224,8 +316,13 @@ object WatchProgressRepository {
}
publish()
log.d {
"resolveRemoteMetadata applied type=$metaType id=$metaId entries=${entries.size} " +
"metaVideos=${meta.videos.size}"
}
}
persist()
log.d { "resolveRemoteMetadata complete entries=${entriesByVideoId.size}" }
}
}
@ -350,6 +447,10 @@ object WatchProgressRepository {
isEnded = snapshot.isEnded,
)
if (!isCompleted && !shouldStoreWatchProgress(positionMs = positionMs, durationMs = durationMs)) {
log.d {
"upsert skipped below threshold video=${session.videoId} content=${session.parentMetaId} " +
"s=${session.seasonNumber} e=${session.episodeNumber} pos=$positionMs dur=$durationMs ended=${snapshot.isEnded}"
}
return
}
@ -383,6 +484,10 @@ object WatchProgressRepository {
}
val useTraktProgress = shouldUseTraktProgress()
log.d {
"upsert progress source=${if (useTraktProgress) "trakt" else "nuvio_sync"} " +
"entry=${entry.debugSummary()} snapshotEnded=${snapshot.isEnded}"
}
entriesByVideoId[session.videoId] = entry
if (useTraktProgress) {
@ -403,7 +508,9 @@ object WatchProgressRepository {
syncScope.launch {
runCatching {
val profileId = ProfileRepository.activeProfileId
log.d { "pushScrobbleToServer profileId=$profileId entry=${entry.debugSummary()}" }
syncAdapter.push(profileId = profileId, entries = listOf(entry))
log.d { "pushScrobbleToServer complete profileId=$profileId video=${entry.videoId}" }
}.onFailure { e ->
log.e(e) { "Failed to push watch progress scrobble" }
}
@ -416,7 +523,12 @@ object WatchProgressRepository {
runCatching {
if (entries.isEmpty()) return@runCatching
val profileId = ProfileRepository.activeProfileId
log.d {
"pushDeleteToServer profileId=$profileId entries=${entries.size} " +
"items=${entries.debugWatchProgressEntrySummary()}"
}
syncAdapter.delete(profileId = profileId, entries = entries)
log.d { "pushDeleteToServer complete profileId=$profileId entries=${entries.size}" }
}.onFailure { e ->
log.e(e) { "Failed to push watch progress delete" }
}
@ -426,8 +538,18 @@ object WatchProgressRepository {
private fun publish() {
val entries = currentEntries()
val sortedEntries = entries.sortedByDescending { it.lastUpdatedEpochMs }
log.d {
"publish source=${if (shouldUseTraktProgress()) "trakt" else "nuvio_sync"} " +
"entries=${sortedEntries.size} cw=${sortedEntries.continueWatchingEntries().size} " +
"sources=${sortedEntries.debugSourceCounts()} items=${sortedEntries.debugWatchProgressEntrySummary()}"
}
_uiState.value = WatchProgressUiState(
entries = sortedEntries,
hasLoadedRemoteProgress = if (shouldUseTraktProgress()) {
TraktProgressRepository.uiState.value.hasLoadedRemoteProgress
} else {
hasLoaded
},
)
}
@ -453,3 +575,67 @@ object WatchProgressRepository {
}
}
private fun ProgressSyncRecord.debugSummary(): String =
buildString {
append(contentType)
append(":")
append(contentId)
if (season != null || episode != null) {
append(" s=")
append(season)
append(" e=")
append(episode)
}
append(" video=")
append(videoId)
append(" pos=")
append(position)
append(" dur=")
append(duration)
append(" last=")
append(lastWatched)
}
private fun Collection<ProgressSyncRecord>.debugProgressRecordSummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" }
private fun WatchProgressEntry.debugSummary(): String =
buildString {
append(parentMetaType)
append(":")
append(parentMetaId)
if (seasonNumber != null || episodeNumber != null) {
append(" s=")
append(seasonNumber)
append(" e=")
append(episodeNumber)
}
append(" video=")
append(videoId)
append(" pos=")
append(lastPositionMs)
append(" dur=")
append(durationMs)
append(" pct=")
append(progressPercent)
append(" completed=")
append(isCompleted)
append(" effectiveCompleted=")
append(isEffectivelyCompleted)
append(" src=")
append(source)
append(" last=")
append(lastUpdatedEpochMs)
}
private fun Collection<WatchProgressEntry>.debugWatchProgressEntrySummary(limit: Int = 10): String =
take(limit).joinToString(separator = " | ") { it.debugSummary() }.ifBlank { "none" }
private fun Collection<WatchProgressEntry>.debugSourceCounts(): String =
groupingBy { it.source }
.eachCount()
.entries
.sortedBy { it.key }
.joinToString(separator = ",") { "${it.key}=${it.value}" }
.ifBlank { "none" }