feat: optimize cw enrichmnet logic

This commit is contained in:
tapframe 2026-05-22 20:03:00 +05:30
parent 96010101b8
commit a789d7b455
4 changed files with 200 additions and 59 deletions

View file

@ -48,6 +48,7 @@ import com.nuvio.app.features.watched.WatchedRepository
import com.nuvio.app.features.watchprogress.CachedInProgressItem
import com.nuvio.app.features.watchprogress.CachedNextUpItem
import com.nuvio.app.features.watchprogress.ContinueWatchingEnrichmentCache
import com.nuvio.app.features.watchprogress.ContinueWatchingLimit
import com.nuvio.app.features.watchprogress.CurrentDateProvider
import com.nuvio.app.features.watchprogress.ContinueWatchingPreferencesRepository
import com.nuvio.app.features.watchprogress.ContinueWatchingItem
@ -81,6 +82,7 @@ import kotlinx.coroutines.sync.withPermit
import com.nuvio.app.features.home.components.ContinueWatchingLayout
import com.nuvio.app.features.home.components.homeSectionHorizontalPaddingForWidth
import com.nuvio.app.features.home.components.rememberContinueWatchingLayout
import kotlinx.coroutines.CancellationException
import nuvio.composeapp.generated.resources.*
import org.jetbrains.compose.resources.stringResource
@ -371,6 +373,9 @@ fun HomeScreen(
val enabledAddons = remember(addonsUiState.addons) {
addonsUiState.addons.enabledAddons()
}
val isRefreshingEnabledAddons = remember(enabledAddons) {
enabledAddons.any { addon -> addon.isRefreshing }
}
val availableManifests = remember(enabledAddons) {
enabledAddons.mapNotNull { addon -> addon.manifest }
}
@ -413,8 +418,10 @@ fun HomeScreen(
metaProviderKey,
continueWatchingPreferences.showUnairedNextUp,
continueWatchingPreferences.upNextFromFurthestEpisode,
isRefreshingEnabledAddons,
watchProgressUiState.entries,
watchedUiState.items,
watchedUiState.isLoaded,
) {
if (completedSeriesCandidates.isEmpty()) {
nextUpItemsBySeries = emptyMap()
@ -422,6 +429,14 @@ fun HomeScreen(
return@LaunchedEffect
}
if (!isTraktProgressActive && !watchedUiState.isLoaded) {
return@LaunchedEffect
}
if (isRefreshingEnabledAddons) {
return@LaunchedEffect
}
val cachedResolvedNextUpItems = completedSeriesCandidates.mapNotNull { candidate ->
val cached = cachedNextUpItems[candidate.content.id] ?: return@mapNotNull null
val item = cached.second
@ -436,6 +451,7 @@ fun HomeScreen(
val candidatesToResolve = completedSeriesCandidates.filter { candidate ->
candidate.content.id !in cachedResolvedNextUpItems
}
val resolutionCandidates = candidatesToResolve.take(NEXT_UP_INITIAL_RESOLUTION_LIMIT)
if (candidatesToResolve.isEmpty()) {
nextUpItemsBySeries = cachedResolvedNextUpItems
processedNextUpContentIds = completedSeriesCandidates.mapTo(mutableSetOf()) { candidate ->
@ -454,46 +470,54 @@ fun HomeScreen(
}
val todayIsoDate = CurrentDateProvider.todayIsoDate()
val semaphore = Semaphore(4)
val freshResults = candidatesToResolve.map { completedEntry ->
async {
semaphore.withPermit {
val meta = MetaDetailsRepository.fetch(
type = completedEntry.content.type,
id = completedEntry.content.id,
)
if (meta == null) {
return@withPermit null
val semaphore = Semaphore(NEXT_UP_RESOLUTION_CONCURRENCY)
val freshResults = mutableMapOf<String, Pair<Long, ContinueWatchingItem>>()
val processedFreshContentIds = mutableSetOf<String>()
val candidateBatches = resolutionCandidates.chunked(NEXT_UP_RESOLUTION_BATCH_SIZE)
for (batch in candidateBatches) {
val batchResults = batch.map { completedEntry ->
async {
semaphore.withPermit {
resolveHomeNextUpCandidate(
completedEntry = completedEntry,
watchProgressEntries = watchProgressUiState.entries,
watchedItems = watchedUiState.items,
todayIsoDate = todayIsoDate,
preferFurthestEpisode = continueWatchingPreferences.upNextFromFurthestEpisode,
showUnairedNextUp = continueWatchingPreferences.showUnairedNextUp,
dismissedNextUpKeys = continueWatchingPreferences.dismissedNextUpKeys,
)
}
val action = meta.seriesPrimaryAction(
content = completedEntry.content,
entries = watchProgressUiState.entries,
watchedItems = watchedUiState.items,
todayIsoDate = todayIsoDate,
preferFurthestEpisode = continueWatchingPreferences.upNextFromFurthestEpisode,
showUnairedNextUp = continueWatchingPreferences.showUnairedNextUp,
)
if (action?.resumePositionMs != null) {
return@withPermit null
}
val nextEpisode = action?.let { meta.videoForSeriesAction(it) }
if (nextEpisode == null) {
return@withPermit null
}
val item = completedEntry.toContinueWatchingSeed(meta)
.toUpNextContinueWatchingItem(nextEpisode)
if (nextUpDismissKey(item.parentMetaId, item.nextUpSeedSeasonNumber, item.nextUpSeedEpisodeNumber) in continueWatchingPreferences.dismissedNextUpKeys) {
return@withPermit null
}
completedEntry.content.id to (completedEntry.markedAtEpochMs to item)
}
}.awaitAll()
batch.forEach { candidate -> processedFreshContentIds += candidate.content.id }
val resolvedBeforeBatch = freshResults.size
batchResults.filterNotNull().forEach { (contentId, item) ->
freshResults[contentId] = item
}
}.awaitAll().filterNotNull().toMap()
val batchResolvedCount = freshResults.size - resolvedBeforeBatch
if (batchResolvedCount > 0) {
val progressiveResults = cachedResolvedNextUpItems + freshResults
nextUpItemsBySeries = progressiveResults
processedNextUpContentIds = (
cachedResolvedNextUpItems.keys +
processedFreshContentIds
).toSet()
}
if (cachedResolvedNextUpItems.size + freshResults.size >= ContinueWatchingLimit) {
break
}
}
val results = cachedResolvedNextUpItems + freshResults
nextUpItemsBySeries = results
processedNextUpContentIds = completedSeriesCandidates.mapTo(mutableSetOf()) { candidate ->
candidate.content.id
}
processedNextUpContentIds = (
cachedResolvedNextUpItems.keys +
processedFreshContentIds
).toSet()
saveContinueWatchingSnapshots(
nextUpItemsBySeries = results,
@ -730,6 +754,9 @@ fun HomeScreen(
private const val HOME_CATALOG_PREVIEW_LIMIT = 18
private const val MILLIS_PER_DAY = 24L * 60L * 60L * 1000L
private const val OPTIMISTIC_NEXT_UP_SEED_WINDOW_MS = 3L * 60L * 1000L
private const val NEXT_UP_INITIAL_RESOLUTION_LIMIT = ContinueWatchingLimit * 2
private const val NEXT_UP_RESOLUTION_CONCURRENCY = 8
private const val NEXT_UP_RESOLUTION_BATCH_SIZE = NEXT_UP_RESOLUTION_CONCURRENCY
internal fun filterEntriesForTraktContinueWatchingWindow(
entries: List<WatchProgressEntry>,
@ -824,6 +851,49 @@ internal fun filterNextUpItemsByCurrentSeeds(
item.nextUpSeedEpisodeNumber == currentSeed.second
}
private suspend fun resolveHomeNextUpCandidate(
completedEntry: CompletedSeriesCandidate,
watchProgressEntries: List<WatchProgressEntry>,
watchedItems: List<WatchedItem>,
todayIsoDate: String,
preferFurthestEpisode: Boolean,
showUnairedNextUp: Boolean,
dismissedNextUpKeys: Set<String>,
): Pair<String, Pair<Long, ContinueWatchingItem>>? {
val contentId = completedEntry.content.id
val meta = try {
MetaDetailsRepository.fetch(
type = completedEntry.content.type,
id = contentId,
)
} catch (error: Throwable) {
if (error is CancellationException) throw error
null
}
if (meta == null) return null
val action = meta.seriesPrimaryAction(
content = completedEntry.content,
entries = watchProgressEntries,
watchedItems = watchedItems,
todayIsoDate = todayIsoDate,
preferFurthestEpisode = preferFurthestEpisode,
showUnairedNextUp = showUnairedNextUp,
)
if (action == null) return null
if (action.resumePositionMs != null) return null
val nextEpisode = meta.videoForSeriesAction(action)
if (nextEpisode == null) return null
val item = completedEntry.toContinueWatchingSeed(meta)
.toUpNextContinueWatchingItem(nextEpisode)
if (nextUpDismissKey(item.parentMetaId, item.nextUpSeedSeasonNumber, item.nextUpSeedEpisodeNumber) in dismissedNextUpKeys) {
return null
}
return contentId to (completedEntry.markedAtEpochMs to item)
}
private fun MetaDetails.videoForSeriesAction(action: SeriesPrimaryAction): MetaVideo? {
if (action.seasonNumber != null && action.episodeNumber != null) {
videos.firstOrNull { video ->

View file

@ -70,7 +70,9 @@ internal object ContinueWatchingEnrichmentCache {
fun getSnapshots(): Pair<List<CachedNextUpItem>, List<CachedInProgressItem>> {
val payload = loadPayload()
return (payload?.nextUp ?: emptyList()) to (payload?.inProgress ?: emptyList())
val nextUp = payload?.nextUp ?: emptyList()
val inProgress = payload?.inProgress ?: emptyList()
return nextUp to inProgress
}
fun saveSnapshots(
@ -80,7 +82,9 @@ internal object ContinueWatchingEnrichmentCache {
) {
val payload = CachedEnrichmentPayload(nextUp = nextUp, inProgress = inProgress)
val payloadHash = payload.hashCode()
if (!force && lastPayloadHash == payloadHash) return
if (!force && lastPayloadHash == payloadHash) {
return
}
val encoded = runCatching {
json.encodeToString(payload)

View file

@ -4,6 +4,7 @@ import co.touchlab.kermit.Logger
import com.nuvio.app.core.auth.AuthRepository
import com.nuvio.app.core.auth.AuthState
import com.nuvio.app.features.addons.AddonRepository
import com.nuvio.app.features.details.MetaDetails
import com.nuvio.app.features.details.MetaDetailsRepository
import com.nuvio.app.features.player.PlayerPlaybackSnapshot
import com.nuvio.app.features.profiles.ProfileRepository
@ -20,15 +21,28 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withTimeoutOrNull
private const val NUVIO_SYNC_PERIODIC_INTERVAL_MS = 5L * 60L * 1000L
private const val WATCH_PROGRESS_METADATA_RESOLUTION_CONCURRENCY = 4
private data class RemoteMetadataResolutionResult(
val key: Pair<String, String>,
val entries: List<WatchProgressEntry>,
val meta: MetaDetails?,
)
object WatchProgressRepository {
private val syncScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
@ -189,6 +203,11 @@ object WatchProgressRepository {
sinceLastWatched = sinceLastWatched,
)
val isIncrementalPull = sinceLastWatched != null
if (isIncrementalPull && serverEntries.isEmpty()) {
hasLoaded = true
hasCompletedInitialNuvioSyncPull = true
return@runCatching
}
val oldLocal = entriesByVideoId.toMap()
val newMap = if (isIncrementalPull) {
entriesByVideoId.toMutableMap()
@ -245,8 +264,10 @@ object WatchProgressRepository {
)
private fun resolveRemoteMetadata() {
val needsResolution = entriesByVideoId.values
val missingMetadataEntries = entriesByVideoId.values
.filter { it.poster.isNullOrBlank() || it.background.isNullOrBlank() }
val entriesToResolve = missingMetadataEntries.continueWatchingEntries(limit = ContinueWatchingLimit)
val needsResolution = entriesToResolve
.groupBy { it.parentMetaId to it.contentType }
if (needsResolution.isEmpty()) {
@ -262,41 +283,79 @@ object WatchProgressRepository {
return@launch
}
for ((key, entries) in needsResolution) {
val (metaId, metaType) = key
val meta = runCatching {
MetaDetailsRepository.fetch(metaType, metaId)
}.getOrNull()
var resolvedEntries = 0
val semaphore = Semaphore(WATCH_PROGRESS_METADATA_RESOLUTION_CONCURRENCY)
val resolutionResults = coroutineScope {
needsResolution.map { (key, entries) ->
async {
semaphore.withPermit {
fetchRemoteMetadataGroup(key = key, entries = entries)
}
}
}.awaitAll()
}
for (result in resolutionResults) {
ensureActive()
val meta = result.meta
if (meta == null) {
continue
}
for (entry in entries) {
val episodeVideo = if (entry.seasonNumber != null && entry.episodeNumber != null) {
var appliedEntries = 0
for (entry in result.entries) {
val current = entriesByVideoId[entry.videoId] ?: continue
val episodeVideo = if (current.seasonNumber != null && current.episodeNumber != null) {
meta.videos.find { v ->
v.season == entry.seasonNumber && v.episode == entry.episodeNumber
v.season == current.seasonNumber && v.episode == current.episodeNumber
}
} else null
entriesByVideoId[entry.videoId] = entry.copy(
entriesByVideoId[current.videoId] = current.copy(
title = meta.name,
poster = meta.poster,
background = meta.background,
logo = meta.logo,
episodeTitle = episodeVideo?.title ?: entry.episodeTitle,
episodeThumbnail = episodeVideo?.thumbnail ?: entry.episodeThumbnail,
episodeTitle = episodeVideo?.title ?: current.episodeTitle,
episodeThumbnail = episodeVideo?.thumbnail ?: current.episodeThumbnail,
pauseDescription = episodeVideo?.overview
?: meta.description
?: entry.pauseDescription,
?: current.pauseDescription,
)
appliedEntries += 1
}
if (appliedEntries == 0) {
continue
}
publish()
resolvedEntries += appliedEntries
}
if (resolvedEntries > 0) {
publish()
persist()
}
persist()
}
}
private suspend fun fetchRemoteMetadataGroup(
key: Pair<String, String>,
entries: List<WatchProgressEntry>,
): RemoteMetadataResolutionResult {
val (metaId, metaType) = key
val meta = try {
MetaDetailsRepository.fetch(metaType, metaId)
} catch (error: CancellationException) {
throw error
} catch (_: Throwable) {
null
}
return RemoteMetadataResolutionResult(
key = key,
entries = entries,
meta = meta,
)
}
fun upsertPlaybackProgress(
session: WatchProgressPlaybackSession,
snapshot: PlayerPlaybackSnapshot,
@ -494,13 +553,14 @@ object WatchProgressRepository {
private fun publish() {
val entries = currentEntries()
val sortedEntries = entries.sortedByDescending { it.lastUpdatedEpochMs }
val hasLoadedRemoteProgress = if (shouldUseTraktProgress()) {
TraktProgressRepository.uiState.value.hasLoadedRemoteProgress
} else {
hasLoaded
}
_uiState.value = WatchProgressUiState(
entries = sortedEntries,
hasLoadedRemoteProgress = if (shouldUseTraktProgress()) {
TraktProgressRepository.uiState.value.hasLoadedRemoteProgress
} else {
hasLoaded
},
hasLoadedRemoteProgress = hasLoadedRemoteProgress,
)
}

View file

@ -128,7 +128,14 @@ fi
# ── Colour-coding function ──────────────────────────────────────────────────
colorize_line() {
local line="$1"
local level="${line:0:1}"
local level=""
if [[ "$line" =~ ^[[:space:]]*[0-9]{2}-[0-9]{2}[[:space:]]+[0-9:.]+[[:space:]]+([VDIWEF])/ ]]; then
level="${BASH_REMATCH[1]}"
elif [[ "$line" =~ ^([VDIWEF])/ ]]; then
level="${BASH_REMATCH[1]}"
else
level="${line:0:1}"
fi
local clr=""
case "$level" in
V) clr="$CLR_V" ;;
@ -160,7 +167,7 @@ stream_logcat_colored() {
local value="$2"
local noise_re="$NOISE_TAGS"
"${ADB[@]}" logcat -v brief "$mode" "$value" 2>/dev/null \
"${ADB[@]}" logcat -v time "$mode" "$value" 2>/dev/null \
| while IFS= read -r raw_line; do
if [[ "$raw_line" =~ $noise_re ]]; then
continue