mirror of
https://github.com/tapframe/NuvioStreaming.git
synced 2026-03-11 17:45:38 +00:00
update sync logic
This commit is contained in:
parent
37ad5647f8
commit
81d528e0f6
2 changed files with 130 additions and 33 deletions
|
|
@ -178,8 +178,8 @@ const SyncSettingsScreen: React.FC = () => {
|
||||||
</View>
|
</View>
|
||||||
<Text style={[styles.cardText, { color: currentTheme.colors.mediumEmphasis }]}>
|
<Text style={[styles.cardText, { color: currentTheme.colors.mediumEmphasis }]}>
|
||||||
{externalSyncActive
|
{externalSyncActive
|
||||||
? `${externalSyncServices.join(' + ')} is active. Watch progress and library updates are managed by these services instead of Nuvio cloud database.`
|
? `${externalSyncServices.join(' + ')} is active. Watch progress and watched status are managed by these services instead of Nuvio cloud database. Library sync still uses Nuvio cloud.`
|
||||||
: 'If Trakt or Simkl sync is enabled, watch progress and library updates will use those services instead of Nuvio cloud database.'}
|
: 'If Trakt or Simkl sync is enabled, watch progress and watched status will use those services instead of Nuvio cloud database. Library sync still uses Nuvio cloud.'}
|
||||||
</Text>
|
</Text>
|
||||||
</View>
|
</View>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import { catalogService, StreamingContent } from './catalogService';
|
||||||
import { storageService } from './storageService';
|
import { storageService } from './storageService';
|
||||||
import { watchedService, LocalWatchedItem } from './watchedService';
|
import { watchedService, LocalWatchedItem } from './watchedService';
|
||||||
import { TraktService } from './traktService';
|
import { TraktService } from './traktService';
|
||||||
|
import { SimklService } from './simklService';
|
||||||
|
|
||||||
const SUPABASE_SESSION_KEY = '@supabase:session';
|
const SUPABASE_SESSION_KEY = '@supabase:session';
|
||||||
const DEFAULT_SYNC_DEBOUNCE_MS = 2000;
|
const DEFAULT_SYNC_DEBOUNCE_MS = 2000;
|
||||||
|
|
@ -123,6 +124,7 @@ class SupabaseSyncService {
|
||||||
private readonly foregroundPullCooldownMs = 30000;
|
private readonly foregroundPullCooldownMs = 30000;
|
||||||
private pendingWatchProgressDeleteKeys = new Set<string>();
|
private pendingWatchProgressDeleteKeys = new Set<string>();
|
||||||
private watchProgressDeleteTimer: ReturnType<typeof setTimeout> | null = null;
|
private watchProgressDeleteTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
private watchProgressPushedSignatures = new Map<string, string>();
|
||||||
|
|
||||||
private pendingPushTimers: Record<PushTarget, ReturnType<typeof setTimeout> | null> = {
|
private pendingPushTimers: Record<PushTarget, ReturnType<typeof setTimeout> | null> = {
|
||||||
plugins: null,
|
plugins: null,
|
||||||
|
|
@ -236,6 +238,7 @@ class SupabaseSyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.session = null;
|
this.session = null;
|
||||||
|
this.watchProgressPushedSignatures.clear();
|
||||||
await mmkvStorage.removeItem(SUPABASE_SESSION_KEY);
|
await mmkvStorage.removeItem(SUPABASE_SESSION_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -277,13 +280,15 @@ class SupabaseSyncService {
|
||||||
await this.pushPluginsFromLocal();
|
await this.pushPluginsFromLocal();
|
||||||
await this.pushAddonsFromLocal();
|
await this.pushAddonsFromLocal();
|
||||||
|
|
||||||
const traktConnected = await this.isTraktConnected();
|
await this.pushLibraryFromLocal();
|
||||||
if (traktConnected) {
|
const externalProgressSyncConnected = await this.isExternalProgressSyncConnected();
|
||||||
|
if (externalProgressSyncConnected) {
|
||||||
|
logger.log('[SupabaseSyncService] External sync (Trakt/Simkl) is connected; skipping watch progress/watched-items push, keeping library sync.');
|
||||||
|
logger.log('[SupabaseSyncService] pushAllLocalData: complete');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.pushWatchProgressFromLocal();
|
await this.pushWatchProgressFromLocal();
|
||||||
await this.pushLibraryFromLocal();
|
|
||||||
await this.pushWatchedItemsFromLocal();
|
await this.pushWatchedItemsFromLocal();
|
||||||
logger.log('[SupabaseSyncService] pushAllLocalData: complete');
|
logger.log('[SupabaseSyncService] pushAllLocalData: complete');
|
||||||
}
|
}
|
||||||
|
|
@ -296,13 +301,14 @@ class SupabaseSyncService {
|
||||||
await this.pullPluginsToLocal();
|
await this.pullPluginsToLocal();
|
||||||
await this.pullAddonsToLocal();
|
await this.pullAddonsToLocal();
|
||||||
|
|
||||||
const traktConnected = await this.isTraktConnected();
|
await this.pullLibraryToLocal();
|
||||||
if (traktConnected) {
|
const externalProgressSyncConnected = await this.isExternalProgressSyncConnected();
|
||||||
|
if (externalProgressSyncConnected) {
|
||||||
|
logger.log('[SupabaseSyncService] External sync (Trakt/Simkl) is connected; skipping watch progress/watched-items pull, keeping library sync.');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.pullWatchProgressToLocal();
|
await this.pullWatchProgressToLocal();
|
||||||
await this.pullLibraryToLocal();
|
|
||||||
await this.pullWatchedItemsToLocal();
|
await this.pullWatchedItemsToLocal();
|
||||||
});
|
});
|
||||||
logger.log('[SupabaseSyncService] pullAllToLocal: complete');
|
logger.log('[SupabaseSyncService] pullAllToLocal: complete');
|
||||||
|
|
@ -493,9 +499,18 @@ class SupabaseSyncService {
|
||||||
logger.warn('[SupabaseSyncService] runStartupSync: one or more pull steps failed; skipped startup push-by-design');
|
logger.warn('[SupabaseSyncService] runStartupSync: one or more pull steps failed; skipped startup push-by-design');
|
||||||
}
|
}
|
||||||
|
|
||||||
const traktConnected = await this.isTraktConnected();
|
const libraryPullOk = await this.safeRun('pull_library', async () => {
|
||||||
if (traktConnected) {
|
await this.withSuppressedPushes(async () => {
|
||||||
logger.log('[SupabaseSyncService] Trakt is connected; skipping progress/library/watched Supabase sync.');
|
await this.pullLibraryToLocal();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const externalProgressSyncConnected = await this.isExternalProgressSyncConnected();
|
||||||
|
if (externalProgressSyncConnected) {
|
||||||
|
logger.log('[SupabaseSyncService] External sync (Trakt/Simkl) is connected; skipping watch progress/watched-items Supabase sync (library still synced).');
|
||||||
|
if (!libraryPullOk) {
|
||||||
|
logger.warn('[SupabaseSyncService] runStartupSync: library pull failed while external sync priority is active');
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -505,12 +520,6 @@ class SupabaseSyncService {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
const libraryPullOk = await this.safeRun('pull_library', async () => {
|
|
||||||
await this.withSuppressedPushes(async () => {
|
|
||||||
await this.pullLibraryToLocal();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
const watchedPullOk = await this.safeRun('pull_watched_items', async () => {
|
const watchedPullOk = await this.safeRun('pull_watched_items', async () => {
|
||||||
await this.withSuppressedPushes(async () => {
|
await this.withSuppressedPushes(async () => {
|
||||||
await this.pullWatchedItemsToLocal();
|
await this.pullWatchedItemsToLocal();
|
||||||
|
|
@ -629,8 +638,8 @@ class SupabaseSyncService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const traktConnected = await this.isTraktConnected();
|
const externalProgressSyncConnected = await this.isExternalProgressSyncConnected();
|
||||||
if (traktConnected) return;
|
if (externalProgressSyncConnected) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.log(`[SupabaseSyncService] flushWatchProgressDeletes: deleting ${keys.length} keys`);
|
logger.log(`[SupabaseSyncService] flushWatchProgressDeletes: deleting ${keys.length} keys`);
|
||||||
|
|
@ -700,12 +709,12 @@ class SupabaseSyncService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const traktConnected = await this.isTraktConnected();
|
|
||||||
if (traktConnected) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (target === 'watch_progress') {
|
if (target === 'watch_progress') {
|
||||||
|
const externalProgressSyncConnected = await this.isExternalProgressSyncConnected();
|
||||||
|
if (externalProgressSyncConnected) {
|
||||||
|
logger.log('[SupabaseSyncService] executeScheduledPush: skipping watch_progress due to external sync priority (Trakt/Simkl)');
|
||||||
|
return;
|
||||||
|
}
|
||||||
await this.pushWatchProgressFromLocal();
|
await this.pushWatchProgressFromLocal();
|
||||||
logger.log(`[SupabaseSyncService] executeScheduledPush: target=${target}:done`);
|
logger.log(`[SupabaseSyncService] executeScheduledPush: target=${target}:done`);
|
||||||
return;
|
return;
|
||||||
|
|
@ -716,6 +725,12 @@ class SupabaseSyncService {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const externalProgressSyncConnected = await this.isExternalProgressSyncConnected();
|
||||||
|
if (externalProgressSyncConnected) {
|
||||||
|
logger.log('[SupabaseSyncService] executeScheduledPush: skipping watched_items due to external sync priority (Trakt/Simkl)');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
await this.pushWatchedItemsFromLocal();
|
await this.pushWatchedItemsFromLocal();
|
||||||
logger.log(`[SupabaseSyncService] executeScheduledPush: target=${target}:done`);
|
logger.log(`[SupabaseSyncService] executeScheduledPush: target=${target}:done`);
|
||||||
}
|
}
|
||||||
|
|
@ -745,6 +760,8 @@ class SupabaseSyncService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async setSession(session: SupabaseSession): Promise<void> {
|
private async setSession(session: SupabaseSession): Promise<void> {
|
||||||
|
// Reset per-entry push cache on session changes to avoid cross-account state bleed.
|
||||||
|
this.watchProgressPushedSignatures.clear();
|
||||||
this.session = session;
|
this.session = session;
|
||||||
await mmkvStorage.setItem(SUPABASE_SESSION_KEY, JSON.stringify(session));
|
await mmkvStorage.setItem(SUPABASE_SESSION_KEY, JSON.stringify(session));
|
||||||
}
|
}
|
||||||
|
|
@ -775,6 +792,7 @@ class SupabaseSyncService {
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('[SupabaseSyncService] Failed to refresh session:', error);
|
logger.error('[SupabaseSyncService] Failed to refresh session:', error);
|
||||||
this.session = null;
|
this.session = null;
|
||||||
|
this.watchProgressPushedSignatures.clear();
|
||||||
await mmkvStorage.removeItem(SUPABASE_SESSION_KEY);
|
await mmkvStorage.removeItem(SUPABASE_SESSION_KEY);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -791,6 +809,7 @@ class SupabaseSyncService {
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('[SupabaseSyncService] Token refresh failed:', error);
|
logger.error('[SupabaseSyncService] Token refresh failed:', error);
|
||||||
this.session = null;
|
this.session = null;
|
||||||
|
this.watchProgressPushedSignatures.clear();
|
||||||
await mmkvStorage.removeItem(SUPABASE_SESSION_KEY);
|
await mmkvStorage.removeItem(SUPABASE_SESSION_KEY);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
@ -997,6 +1016,22 @@ class SupabaseSyncService {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private getWatchProgressEntrySignature(value: { currentTime?: number; duration?: number; lastUpdated?: number }): string {
|
||||||
|
return [
|
||||||
|
Number(value.currentTime || 0),
|
||||||
|
Number(value.duration || 0),
|
||||||
|
Number(value.lastUpdated || 0),
|
||||||
|
].join('|');
|
||||||
|
}
|
||||||
|
|
||||||
|
private buildLocalWatchProgressKey(
|
||||||
|
contentType: 'movie' | 'series',
|
||||||
|
contentId: string,
|
||||||
|
episodeId?: string
|
||||||
|
): string {
|
||||||
|
return `${contentType}:${contentId}${episodeId ? `:${episodeId}` : ''}`;
|
||||||
|
}
|
||||||
|
|
||||||
private toStreamingContent(item: LibraryRow): StreamingContent {
|
private toStreamingContent(item: LibraryRow): StreamingContent {
|
||||||
const type = item.content_type === 'movie' ? 'movie' : 'series';
|
const type = item.content_type === 'movie' ? 'movie' : 'series';
|
||||||
const posterShape = (item.poster_shape || 'POSTER').toLowerCase() as 'poster' | 'square' | 'landscape';
|
const posterShape = (item.poster_shape || 'POSTER').toLowerCase() as 'poster' | 'square' | 'landscape';
|
||||||
|
|
@ -1037,6 +1072,19 @@ class SupabaseSyncService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async isSimklConnected(): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
return await SimklService.getInstance().isAuthenticated();
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async isExternalProgressSyncConnected(): Promise<boolean> {
|
||||||
|
if (await this.isTraktConnected()) return true;
|
||||||
|
return await this.isSimklConnected();
|
||||||
|
}
|
||||||
|
|
||||||
private async pullPluginsToLocal(): Promise<void> {
|
private async pullPluginsToLocal(): Promise<void> {
|
||||||
const token = await this.getValidAccessToken();
|
const token = await this.getValidAccessToken();
|
||||||
if (!token) return;
|
if (!token) return;
|
||||||
|
|
@ -1254,11 +1302,10 @@ class SupabaseSyncService {
|
||||||
const type = row.content_type === 'movie' ? 'movie' : 'series';
|
const type = row.content_type === 'movie' ? 'movie' : 'series';
|
||||||
const season = row.season == null ? null : Number(row.season);
|
const season = row.season == null ? null : Number(row.season);
|
||||||
const episode = row.episode == null ? null : Number(row.episode);
|
const episode = row.episode == null ? null : Number(row.episode);
|
||||||
remoteSet.add(`${type}:${row.content_id}:${season ?? ''}:${episode ?? ''}`);
|
|
||||||
|
|
||||||
const episodeId = type === 'series' && season != null && episode != null
|
const episodeId = type === 'series' && season != null && episode != null
|
||||||
? `${row.content_id}:${season}:${episode}`
|
? `${row.content_id}:${season}:${episode}`
|
||||||
: undefined;
|
: undefined;
|
||||||
|
remoteSet.add(this.buildLocalWatchProgressKey(type, row.content_id, episodeId));
|
||||||
|
|
||||||
const local = await storageService.getWatchProgress(row.content_id, type, episodeId);
|
const local = await storageService.getWatchProgress(row.content_id, type, episodeId);
|
||||||
const remoteLastWatched = this.normalizeEpochMs(row.last_watched);
|
const remoteLastWatched = this.normalizeEpochMs(row.last_watched);
|
||||||
|
|
@ -1284,15 +1331,46 @@ class SupabaseSyncService {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.log(`[SupabaseSyncService] pullWatchProgressToLocal: merged ${(rows || []).length} remote entries (no local prune)`);
|
// Remote-first continue watching: remove local entries that no longer exist remotely.
|
||||||
|
// This intentionally treats the successful remote pull as authoritative.
|
||||||
|
const allLocal = await storageService.getAllWatchProgress();
|
||||||
|
let pruned = 0;
|
||||||
|
for (const [localKey] of Object.entries(allLocal)) {
|
||||||
|
if (remoteSet.has(localKey)) continue;
|
||||||
|
|
||||||
|
const parsed = this.parseWatchProgressKey(localKey);
|
||||||
|
if (!parsed) continue;
|
||||||
|
|
||||||
|
const episodeId = parsed.videoId && parsed.videoId !== parsed.contentId ? parsed.videoId : undefined;
|
||||||
|
await storageService.removeWatchProgress(parsed.contentId, parsed.contentType, episodeId);
|
||||||
|
this.watchProgressPushedSignatures.delete(localKey);
|
||||||
|
pruned += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.log(`[SupabaseSyncService] pullWatchProgressToLocal: merged=${(rows || []).length} prunedLocalMissing=${pruned}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async pushWatchProgressFromLocal(): Promise<void> {
|
private async pushWatchProgressFromLocal(): Promise<void> {
|
||||||
const all = await storageService.getAllWatchProgress();
|
const all = await storageService.getAllWatchProgress();
|
||||||
const entries: WatchProgressRow[] = Object.entries(all).reduce<WatchProgressRow[]>((acc, [key, value]) => {
|
const nextSeenKeys = new Set<string>();
|
||||||
|
const changedEntries: Array<{ key: string; row: WatchProgressRow; signature: string }> = [];
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(all)) {
|
||||||
|
nextSeenKeys.add(key);
|
||||||
|
const signature = this.getWatchProgressEntrySignature(value);
|
||||||
|
if (this.watchProgressPushedSignatures.get(key) === signature) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
const parsed = this.parseWatchProgressKey(key);
|
const parsed = this.parseWatchProgressKey(key);
|
||||||
if (!parsed) return acc;
|
if (!parsed) {
|
||||||
acc.push({
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
changedEntries.push({
|
||||||
|
key,
|
||||||
|
signature,
|
||||||
|
row: {
|
||||||
content_id: parsed.contentId,
|
content_id: parsed.contentId,
|
||||||
content_type: parsed.contentType,
|
content_type: parsed.contentType,
|
||||||
video_id: parsed.videoId,
|
video_id: parsed.videoId,
|
||||||
|
|
@ -1302,11 +1380,30 @@ class SupabaseSyncService {
|
||||||
duration: this.secondsToMsLong(value.duration),
|
duration: this.secondsToMsLong(value.duration),
|
||||||
last_watched: this.normalizeEpochMs(value.lastUpdated || Date.now()),
|
last_watched: this.normalizeEpochMs(value.lastUpdated || Date.now()),
|
||||||
progress_key: parsed.progressKey,
|
progress_key: parsed.progressKey,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
return acc;
|
}
|
||||||
}, []);
|
|
||||||
|
|
||||||
await this.callRpc<void>('sync_push_watch_progress', { p_entries: entries });
|
// Prune signatures for entries no longer present locally (deletes are handled separately).
|
||||||
|
for (const existingKey of Array.from(this.watchProgressPushedSignatures.keys())) {
|
||||||
|
if (!nextSeenKeys.has(existingKey)) {
|
||||||
|
this.watchProgressPushedSignatures.delete(existingKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (changedEntries.length === 0) {
|
||||||
|
logger.log('[SupabaseSyncService] pushWatchProgressFromLocal: no changed entries; skipping push');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.callRpc<void>('sync_push_watch_progress', {
|
||||||
|
p_entries: changedEntries.map((entry) => entry.row),
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const entry of changedEntries) {
|
||||||
|
this.watchProgressPushedSignatures.set(entry.key, entry.signature);
|
||||||
|
}
|
||||||
|
logger.log(`[SupabaseSyncService] pushWatchProgressFromLocal: pushedChanged=${changedEntries.length} totalLocal=${Object.keys(all).length}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async pullLibraryToLocal(): Promise<void> {
|
private async pullLibraryToLocal(): Promise<void> {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue