diff --git a/.prettierrc b/.prettierrc index a0d1c9a..4ddba9a 100644 --- a/.prettierrc +++ b/.prettierrc @@ -2,4 +2,4 @@ "printWidth": 120, "trailingComma": "all", "singleQuote": true -} +} \ No newline at end of file diff --git a/package.json b/package.json index 91c49b1..db0df2b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@p-stream/providers", - "version": "3.2.0", + "version": "3.2.1", "description": "Package that contains all the providers of p-stream", "type": "module", "main": "./lib/index.js", diff --git a/src/entrypoint/controls.ts b/src/entrypoint/controls.ts index 6599f28..6be2683 100644 --- a/src/entrypoint/controls.ts +++ b/src/entrypoint/controls.ts @@ -6,7 +6,7 @@ import { makeFetcher } from '@/fetchers/common'; import { Fetcher } from '@/fetchers/types'; import { Embed, EmbedOutput, Sourcerer, SourcererOutput } from '@/providers/base'; import { scrapeIndividualEmbed, scrapeInvidualSource } from '@/runners/individualRunner'; -import { RunOutput, runAllProviders } from '@/runners/runner'; +import { RunOutput, runAllProviders, runAllProvidersConcurrent } from '@/runners/runner'; export interface ProviderControlsInput { fetcher: Fetcher; @@ -15,6 +15,7 @@ export interface ProviderControlsInput { sources: Sourcerer[]; embeds: Embed[]; proxyStreams?: boolean; // temporary + concurrency?: number; } export interface RunnerOptions { @@ -26,6 +27,9 @@ export interface RunnerOptions { // any omitted ids are in added to the end in order of rank (highest first) embedOrder?: string[]; + // how many sources to run concurrently when running runAllConcurrent + concurrency?: number; + // object of event functions events?: FullScraperEvents; @@ -72,6 +76,9 @@ export interface ProviderControls { // returns the stream, or null if none found runAll(runnerOps: RunnerOptions): Promise; + // Run all providers concurrently. in order of rank (highest first) + runAllConcurrent(runnerOps: RunnerOptions): Promise; + // Run a specific source scraper runSourceScraper(runnerOps: SourceRunnerOptions): Promise; @@ -99,6 +106,7 @@ export function makeControls(ops: ProviderControlsInput): ProviderControls { fetcher: makeFetcher(ops.fetcher), proxiedFetcher: makeFetcher(ops.proxiedFetcher ?? ops.fetcher), proxyStreams: ops.proxyStreams, + concurrency: ops.concurrency ?? 3, // default to 3 workers }; return { @@ -108,6 +116,12 @@ export function makeControls(ops: ProviderControlsInput): ProviderControls { ...runnerOps, }); }, + runAllConcurrent(runnerOps) { + return runAllProvidersConcurrent(list, { + ...providerRunnerOps, + ...runnerOps, + }); + }, runSourceScraper(runnerOps) { return scrapeInvidualSource(list, { ...providerRunnerOps, diff --git a/src/runners/runner.ts b/src/runners/runner.ts index 1f1bcb6..59f3a3a 100644 --- a/src/runners/runner.ts +++ b/src/runners/runner.ts @@ -37,6 +37,7 @@ export type ProviderRunnerOptions = { events?: FullScraperEvents; media: ScrapeMedia; proxyStreams?: boolean; // temporary + concurrency?: number; }; export async function runAllProviders(list: ProviderList, ops: ProviderRunnerOptions): Promise { @@ -201,3 +202,205 @@ export async function runAllProviders(list: ProviderList, ops: ProviderRunnerOpt // no providers or embeds returns streams return null; } + +export async function runAllProvidersConcurrent( + list: ProviderList, + ops: ProviderRunnerOptions, +): Promise { + const concurrency = ops.concurrency ?? 3; + + const sources = reorderOnIdList(ops.sourceOrder ?? [], list.sources).filter((source) => { + if (ops.media.type === 'movie') return !!source.scrapeMovie; + if (ops.media.type === 'show') return !!source.scrapeShow; + return false; + }); + + if (sources.length === 0) return null; + + const embeds = reorderOnIdList(ops.embedOrder ?? [], list.embeds); + const embedIds = embeds.map((embed) => embed.id); + + const contextBase: ScrapeContext = { + fetcher: ops.fetcher, + proxiedFetcher: ops.proxiedFetcher, + features: ops.features, + progress(val) { + ops.events?.update?.({ + id: '', + percentage: val, + status: 'pending', + }); + }, + }; + + ops.events?.init?.({ + sourceIds: sources.map((v) => v.id), + }); + + const processSource = async (source: any): Promise => { + ops.events?.start?.(source.id); + + let output: SourcererOutput | null = null; + try { + if (ops.media.type === 'movie' && source.scrapeMovie) + output = await source.scrapeMovie({ + ...contextBase, + media: ops.media, + }); + else if (ops.media.type === 'show' && source.scrapeShow) + output = await source.scrapeShow({ + ...contextBase, + media: ops.media, + }); + + if (output) { + output.stream = (output.stream ?? []) + .filter(isValidStream) + .filter((stream) => flagsAllowedInFeatures(ops.features, stream.flags)); + + output.stream = output.stream.map((stream) => + requiresProxy(stream) && ops.proxyStreams ? setupProxy(stream) : stream, + ); + } + + if (!output || (!output.stream?.length && !output.embeds.length)) { + throw new NotFoundError('No streams found'); + } + } catch (error) { + const updateParams: UpdateEvent = { + id: source.id, + percentage: 100, + status: error instanceof NotFoundError ? 'notfound' : 'failure', + reason: error instanceof NotFoundError ? error.message : undefined, + error: error instanceof NotFoundError ? undefined : error, + }; + ops.events?.update?.(updateParams); + return null; + } + + if (!output) return null; + + if (output.stream?.[0]) { + try { + const playableStream = await validatePlayableStream(output.stream[0], ops, source.id); + if (playableStream) { + return { + sourceId: source.id, + stream: playableStream, + }; + } + } catch (error) { + const updateParams: UpdateEvent = { + id: source.id, + percentage: 100, + status: error instanceof NotFoundError ? 'notfound' : 'failure', + reason: error instanceof NotFoundError ? error.message : 'Stream validation failed', + error: error instanceof NotFoundError ? undefined : error, + }; + ops.events?.update?.(updateParams); + } + } + + const sortedEmbeds = output.embeds + .filter((embed) => { + const e = list.embeds.find((v) => v.id === embed.embedId); + return e && !e.disabled; + }) + .sort((a, b) => embedIds.indexOf(a.embedId) - embedIds.indexOf(b.embedId)); + + if (sortedEmbeds.length > 0) { + ops.events?.discoverEmbeds?.({ + embeds: sortedEmbeds.map((embed, i) => ({ + id: [source.id, i].join('-'), + embedScraperId: embed.embedId, + })), + sourceId: source.id, + }); + } + + for (const [ind, embed] of sortedEmbeds.entries()) { + const scraper = embeds.find((v) => v.id === embed.embedId); + if (!scraper) continue; + + const id = [source.id, ind].join('-'); + ops.events?.start?.(id); + + try { + const embedOutput = await scraper.scrape({ + ...contextBase, + url: embed.url, + }); + embedOutput.stream = embedOutput.stream + .filter(isValidStream) + .filter((stream) => flagsAllowedInFeatures(ops.features, stream.flags)); + embedOutput.stream = embedOutput.stream.map((stream) => + requiresProxy(stream) && ops.proxyStreams ? setupProxy(stream) : stream, + ); + if (embedOutput.stream.length === 0) { + throw new NotFoundError('No streams found'); + } + const playableStream = await validatePlayableStream(embedOutput.stream[0], ops, embed.embedId); + if (playableStream) { + return { + sourceId: source.id, + embedId: scraper.id, + stream: playableStream, + }; + } + } catch (error) { + const updateParams: UpdateEvent = { + id, + percentage: 100, + status: error instanceof NotFoundError ? 'notfound' : 'failure', + reason: error instanceof NotFoundError ? error.message : undefined, + error: error instanceof NotFoundError ? undefined : error, + }; + ops.events?.update?.(updateParams); + } + } + + return null; + }; + + // worker pool + return new Promise((resolve) => { + let sourceIndex = 0; + let activeWorkers = 0; + let resolved = false; + + const startWorker = () => { + if (resolved || sourceIndex >= sources.length) return; + + const source = sources[sourceIndex++]; + activeWorkers++; + + processSource(source) + .then((result) => { + if (resolved) return; + if (result) { + resolved = true; + resolve(result); + } else { + activeWorkers--; + startWorker(); + if (activeWorkers === 0 && sourceIndex >= sources.length) { + resolve(null); + } + } + }) + .catch(() => { + activeWorkers--; + startWorker(); + + if (activeWorkers === 0 && sourceIndex >= sources.length) { + resolve(null); + } + }); + }; + + // run workers + for (let i = 0; i < Math.min(concurrency, sources.length); i++) { + startWorker(); + } + }); +}