feat: parallel cache check

This commit is contained in:
tapframe 2026-05-20 23:26:08 +05:30
parent 41b0bd95ad
commit 782f65aaff
3 changed files with 163 additions and 119 deletions

View file

@ -162,6 +162,7 @@ object PlayerStreamsRepository {
val installedAddonNames = installedAddons.map { it.displayTitle }.toSet()
PlayerSettingsRepository.ensureLoaded()
val playerSettings = PlayerSettingsRepository.uiState.value
val debridSettings = DebridSettingsRepository.snapshot()
val pluginScrapers = if (AppFeaturePolicy.pluginsEnabled) {
PluginRepository.initialize()
PluginRepository.getEnabledScrapersForType(type)
@ -234,6 +235,61 @@ object PlayerStreamsRepository {
val job = scope.launch {
val pendingStreamAddons = streamAddons.filterNot { it.addonId in warmedAddonIds }
val installedAddonIds = streamAddons.map { it.addonId }.toSet()
val debridAvailabilityJobs = mutableListOf<Job>()
fun emptyStateReason(groups: List<AddonStreamGroup>, anyLoading: Boolean) =
if (!anyLoading && groups.all { it.streams.isEmpty() }) {
if (groups.all { !it.error.isNullOrBlank() }) {
com.nuvio.app.features.streams.StreamsEmptyStateReason.StreamFetchFailed
} else {
com.nuvio.app.features.streams.StreamsEmptyStateReason.NoStreamsFound
}
} else {
null
}
fun presentDebridGroup(group: AddonStreamGroup): AddonStreamGroup =
DebridStreamPresentation.apply(
groups = listOf(group),
settings = debridSettings,
).firstOrNull() ?: group
fun publishStreamGroup(group: AddonStreamGroup) {
stateFlow.update { current ->
val updated = StreamAutoPlaySelector.orderAddonStreams(
groups = current.groups.map { currentGroup ->
if (currentGroup.addonId == group.addonId) group else currentGroup
},
installedOrder = installedAddonOrder,
)
val anyLoading = updated.any { it.isLoading }
current.copy(
groups = updated,
isAnyLoading = anyLoading,
emptyStateReason = emptyStateReason(updated, anyLoading),
)
}
}
fun launchDebridAvailability(group: AddonStreamGroup) {
if (group.addonId !in installedAddonIds || group.streams.isEmpty()) return
val eligibleGroupIds = setOf(group.addonId)
val checkingGroup = TorboxAvailabilityService.markChecking(
groups = listOf(group),
eligibleGroupIds = eligibleGroupIds,
).firstOrNull() ?: group
publishStreamGroup(checkingGroup)
val availabilityJob = launch {
val availabilityGroup = TorboxAvailabilityService.annotateCachedAvailability(
groups = listOf(checkingGroup),
eligibleGroupIds = eligibleGroupIds,
).firstOrNull() ?: checkingGroup
publishStreamGroup(presentDebridGroup(availabilityGroup))
}
debridAvailabilityJobs += availabilityJob
}
val addonJobs = pendingStreamAddons.map { addon ->
async {
val url = buildAddonResourceUrl(
@ -301,64 +357,33 @@ object PlayerStreamsRepository {
completions.send(deferred.await())
}
}
var debridPreparationLaunched = false
repeat(jobs.size) {
val result = completions.receive()
stateFlow.update { current ->
val updated = StreamAutoPlaySelector.orderAddonStreams(
groups = current.groups.map { g -> if (g.addonId == result.addonId) result else g },
installedOrder = installedAddonOrder,
)
val anyLoading = updated.any { it.isLoading }
current.copy(
groups = updated,
isAnyLoading = anyLoading,
emptyStateReason = if (!anyLoading && updated.all { it.streams.isEmpty() }) {
if (updated.all { !it.error.isNullOrBlank() }) {
com.nuvio.app.features.streams.StreamsEmptyStateReason.StreamFetchFailed
} else {
com.nuvio.app.features.streams.StreamsEmptyStateReason.NoStreamsFound
}
} else null,
)
}
publishStreamGroup(result)
launchDebridAvailability(result)
}
if (!debridPreparationLaunched) {
debridPreparationLaunched = true
val checkingGroups = TorboxAvailabilityService.markChecking(
groups = stateFlow.value.groups,
eligibleGroupIds = installedAddonIds,
)
stateFlow.update { current -> current.copy(groups = checkingGroups) }
val availabilityGroups = TorboxAvailabilityService.annotateCachedAvailability(
groups = stateFlow.value.groups,
eligibleGroupIds = installedAddonIds,
)
val presentedGroups = DebridStreamPresentation.apply(
groups = availabilityGroups,
settings = DebridSettingsRepository.snapshot(),
)
stateFlow.update { current -> current.copy(groups = presentedGroups) }
launch {
DirectDebridStreamPreparer.prepare(
streams = stateFlow.value.groups
.filter { it.addonId in installedAddonIds }
.flatMap { it.streams },
season = season,
episode = episode,
playerSettings = playerSettings,
installedAddonNames = installedAddonNames,
) { original, prepared ->
stateFlow.update { current ->
current.copy(
groups = DirectDebridStreamPreparer.replacePreparedStream(
groups = current.groups,
original = original,
prepared = prepared,
eligibleGroupIds = installedAddonIds,
),
)
}
for (availabilityJob in debridAvailabilityJobs) {
availabilityJob.join()
}
launch {
DirectDebridStreamPreparer.prepare(
streams = stateFlow.value.groups
.filter { it.addonId in installedAddonIds }
.flatMap { it.streams },
season = season,
episode = episode,
playerSettings = playerSettings,
installedAddonNames = installedAddonNames,
) { original, prepared ->
stateFlow.update { current ->
current.copy(
groups = DirectDebridStreamPreparer.replacePreparedStream(
groups = current.groups,
original = original,
prepared = prepared,
eligibleGroupIds = installedAddonIds,
),
)
}
}
}

View file

@ -110,14 +110,28 @@ object AddonStreamWarmupRepository {
val targets = key.addonTargets
if (targets.isEmpty()) return emptyList()
val addonIds = targets.map { it.addonId }.toSet()
val orderedGroups = coroutineScope {
targets.map { target ->
async {
fetchAddonStreams(
val group = fetchAddonStreams(
target = target,
type = key.type,
videoId = key.videoId,
)
val eligibleGroupIds = setOf(group.addonId)
val checkingGroup = TorboxAvailabilityService.markChecking(
groups = listOf(group),
eligibleGroupIds = eligibleGroupIds,
).firstOrNull() ?: group
val availabilityGroup = TorboxAvailabilityService.annotateCachedAvailability(
groups = listOf(checkingGroup),
eligibleGroupIds = eligibleGroupIds,
).firstOrNull() ?: checkingGroup
DebridStreamPresentation.apply(
groups = listOf(availabilityGroup),
settings = key.settings,
).firstOrNull() ?: availabilityGroup
}
}.awaitAll()
}.let { groups ->
@ -127,18 +141,7 @@ object AddonStreamWarmupRepository {
)
}
val addonIds = targets.map { it.addonId }.toSet()
val availabilityGroups = TorboxAvailabilityService.annotateCachedAvailability(
groups = TorboxAvailabilityService.markChecking(
groups = orderedGroups,
eligibleGroupIds = addonIds,
),
eligibleGroupIds = addonIds,
)
var preparedGroups = DebridStreamPresentation.apply(
groups = availabilityGroups,
settings = key.settings,
)
var preparedGroups = orderedGroups
PlayerSettingsRepository.ensureLoaded()
DirectDebridStreamPreparer.prepare(

View file

@ -230,14 +230,56 @@ object StreamsRepository {
val installedAddonNames = installedAddonOrder.toSet()
val installedAddonIds = streamAddons.map { it.addonId }.toSet()
val debridAvailabilityJobs = mutableListOf<Job>()
var autoSelectTriggered = false
var timeoutElapsed = false
var debridPreparationLaunched = false
fun publishCompletion(completion: StreamLoadCompletion) {
if (completions.trySend(completion).isFailure) {
log.d { "Ignoring late stream load completion after channel close" }
}
}
fun presentDebridGroup(group: AddonStreamGroup): AddonStreamGroup =
DebridStreamPresentation.apply(
groups = listOf(group),
settings = debridSettings,
).firstOrNull() ?: group
fun publishAddonGroup(group: AddonStreamGroup) {
_uiState.update { current ->
val updated = StreamAutoPlaySelector.orderAddonStreams(
groups = current.groups.map { currentGroup ->
if (currentGroup.addonId == group.addonId) group else currentGroup
},
installedOrder = installedAddonOrder,
)
val anyLoading = updated.any { it.isLoading }
current.copy(
groups = updated,
isAnyLoading = anyLoading,
emptyStateReason = updated.toEmptyStateReason(anyLoading),
)
}
}
fun launchDebridAvailability(group: AddonStreamGroup) {
if (group.addonId !in installedAddonIds || group.streams.isEmpty()) return
val eligibleGroupIds = setOf(group.addonId)
val checkingGroup = TorboxAvailabilityService.markChecking(
groups = listOf(group),
eligibleGroupIds = eligibleGroupIds,
).firstOrNull() ?: group
publishAddonGroup(checkingGroup)
val availabilityJob = launch {
val availabilityGroup = TorboxAvailabilityService.annotateCachedAvailability(
groups = listOf(checkingGroup),
eligibleGroupIds = eligibleGroupIds,
).firstOrNull() ?: checkingGroup
publishAddonGroup(presentDebridGroup(availabilityGroup))
}
debridAvailabilityJobs += availabilityJob
}
val timeoutJob = if (isAutoPlayEnabled) {
val timeoutMs = playerSettings.streamAutoPlayTimeoutSeconds * 1_000L
@ -370,20 +412,8 @@ object StreamsRepository {
when (val completion = completions.receive()) {
is StreamLoadCompletion.Addon -> {
val result = completion.group
_uiState.update { current ->
val updated = StreamAutoPlaySelector.orderAddonStreams(
groups = current.groups.map { group ->
if (group.addonId == result.addonId) result else group
},
installedOrder = installedAddonOrder,
)
val anyLoading = updated.any { it.isLoading }
current.copy(
groups = updated,
isAnyLoading = anyLoading,
emptyStateReason = updated.toEmptyStateReason(anyLoading),
)
}
publishAddonGroup(result)
launchDebridAvailability(result)
}
is StreamLoadCompletion.PluginScraper -> {
@ -431,42 +461,28 @@ object StreamsRepository {
}
}
if (!debridPreparationLaunched) {
debridPreparationLaunched = true
val checkingGroups = TorboxAvailabilityService.markChecking(
groups = _uiState.value.groups,
eligibleGroupIds = installedAddonIds,
)
_uiState.update { current -> current.copy(groups = checkingGroups) }
val availabilityGroups = TorboxAvailabilityService.annotateCachedAvailability(
groups = _uiState.value.groups,
eligibleGroupIds = installedAddonIds,
)
val presentedGroups = DebridStreamPresentation.apply(
groups = availabilityGroups,
settings = debridSettings,
)
_uiState.update { current -> current.copy(groups = presentedGroups) }
launch {
DirectDebridStreamPreparer.prepare(
streams = _uiState.value.groups
.filter { it.addonId in installedAddonIds }
.flatMap { it.streams },
season = season,
episode = episode,
playerSettings = playerSettings,
installedAddonNames = installedAddonNames,
) { original, prepared ->
_uiState.update { current ->
current.copy(
groups = DirectDebridStreamPreparer.replacePreparedStream(
groups = current.groups,
original = original,
prepared = prepared,
eligibleGroupIds = installedAddonIds,
),
)
}
for (availabilityJob in debridAvailabilityJobs) {
availabilityJob.join()
}
launch {
DirectDebridStreamPreparer.prepare(
streams = _uiState.value.groups
.filter { it.addonId in installedAddonIds }
.flatMap { it.streams },
season = season,
episode = episode,
playerSettings = playerSettings,
installedAddonNames = installedAddonNames,
) { original, prepared ->
_uiState.update { current ->
current.copy(
groups = DirectDebridStreamPreparer.replacePreparedStream(
groups = current.groups,
original = original,
prepared = prepared,
eligibleGroupIds = installedAddonIds,
),
)
}
}
}