This commit is contained in:
TheTank10 2026-02-08 17:06:49 +03:00 committed by GitHub
commit 7ae0377931
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 220 additions and 3 deletions

View file

@ -2,4 +2,4 @@
"printWidth": 120,
"trailingComma": "all",
"singleQuote": true
}
}

View file

@ -1,6 +1,6 @@
{
"name": "@p-stream/providers",
"version": "3.2.0",
"version": "3.2.1",
"description": "Package that contains all the providers of p-stream",
"type": "module",
"main": "./lib/index.js",

View file

@ -6,7 +6,7 @@ import { makeFetcher } from '@/fetchers/common';
import { Fetcher } from '@/fetchers/types';
import { Embed, EmbedOutput, Sourcerer, SourcererOutput } from '@/providers/base';
import { scrapeIndividualEmbed, scrapeInvidualSource } from '@/runners/individualRunner';
import { RunOutput, runAllProviders } from '@/runners/runner';
import { RunOutput, runAllProviders, runAllProvidersConcurrent } from '@/runners/runner';
export interface ProviderControlsInput {
fetcher: Fetcher;
@ -15,6 +15,7 @@ export interface ProviderControlsInput {
sources: Sourcerer[];
embeds: Embed[];
proxyStreams?: boolean; // temporary
concurrency?: number;
}
export interface RunnerOptions {
@ -26,6 +27,9 @@ export interface RunnerOptions {
// any omitted ids are in added to the end in order of rank (highest first)
embedOrder?: string[];
// how many sources to run concurrently when running runAllConcurrent
concurrency?: number;
// object of event functions
events?: FullScraperEvents;
@ -72,6 +76,9 @@ export interface ProviderControls {
// returns the stream, or null if none found
runAll(runnerOps: RunnerOptions): Promise<RunOutput | null>;
// Run all providers concurrently. in order of rank (highest first)
runAllConcurrent(runnerOps: RunnerOptions): Promise<RunOutput | null>;
// Run a specific source scraper
runSourceScraper(runnerOps: SourceRunnerOptions): Promise<SourcererOutput>;
@ -99,6 +106,7 @@ export function makeControls(ops: ProviderControlsInput): ProviderControls {
fetcher: makeFetcher(ops.fetcher),
proxiedFetcher: makeFetcher(ops.proxiedFetcher ?? ops.fetcher),
proxyStreams: ops.proxyStreams,
concurrency: ops.concurrency ?? 3, // default to 3 workers
};
return {
@ -108,6 +116,12 @@ export function makeControls(ops: ProviderControlsInput): ProviderControls {
...runnerOps,
});
},
runAllConcurrent(runnerOps) {
return runAllProvidersConcurrent(list, {
...providerRunnerOps,
...runnerOps,
});
},
runSourceScraper(runnerOps) {
return scrapeInvidualSource(list, {
...providerRunnerOps,

View file

@ -37,6 +37,7 @@ export type ProviderRunnerOptions = {
events?: FullScraperEvents;
media: ScrapeMedia;
proxyStreams?: boolean; // temporary
concurrency?: number;
};
export async function runAllProviders(list: ProviderList, ops: ProviderRunnerOptions): Promise<RunOutput | null> {
@ -201,3 +202,205 @@ export async function runAllProviders(list: ProviderList, ops: ProviderRunnerOpt
// no providers or embeds returns streams
return null;
}
export async function runAllProvidersConcurrent(
list: ProviderList,
ops: ProviderRunnerOptions,
): Promise<RunOutput | null> {
const concurrency = ops.concurrency ?? 3;
const sources = reorderOnIdList(ops.sourceOrder ?? [], list.sources).filter((source) => {
if (ops.media.type === 'movie') return !!source.scrapeMovie;
if (ops.media.type === 'show') return !!source.scrapeShow;
return false;
});
if (sources.length === 0) return null;
const embeds = reorderOnIdList(ops.embedOrder ?? [], list.embeds);
const embedIds = embeds.map((embed) => embed.id);
const contextBase: ScrapeContext = {
fetcher: ops.fetcher,
proxiedFetcher: ops.proxiedFetcher,
features: ops.features,
progress(val) {
ops.events?.update?.({
id: '',
percentage: val,
status: 'pending',
});
},
};
ops.events?.init?.({
sourceIds: sources.map((v) => v.id),
});
const processSource = async (source: any): Promise<RunOutput | null> => {
ops.events?.start?.(source.id);
let output: SourcererOutput | null = null;
try {
if (ops.media.type === 'movie' && source.scrapeMovie)
output = await source.scrapeMovie({
...contextBase,
media: ops.media,
});
else if (ops.media.type === 'show' && source.scrapeShow)
output = await source.scrapeShow({
...contextBase,
media: ops.media,
});
if (output) {
output.stream = (output.stream ?? [])
.filter(isValidStream)
.filter((stream) => flagsAllowedInFeatures(ops.features, stream.flags));
output.stream = output.stream.map((stream) =>
requiresProxy(stream) && ops.proxyStreams ? setupProxy(stream) : stream,
);
}
if (!output || (!output.stream?.length && !output.embeds.length)) {
throw new NotFoundError('No streams found');
}
} catch (error) {
const updateParams: UpdateEvent = {
id: source.id,
percentage: 100,
status: error instanceof NotFoundError ? 'notfound' : 'failure',
reason: error instanceof NotFoundError ? error.message : undefined,
error: error instanceof NotFoundError ? undefined : error,
};
ops.events?.update?.(updateParams);
return null;
}
if (!output) return null;
if (output.stream?.[0]) {
try {
const playableStream = await validatePlayableStream(output.stream[0], ops, source.id);
if (playableStream) {
return {
sourceId: source.id,
stream: playableStream,
};
}
} catch (error) {
const updateParams: UpdateEvent = {
id: source.id,
percentage: 100,
status: error instanceof NotFoundError ? 'notfound' : 'failure',
reason: error instanceof NotFoundError ? error.message : 'Stream validation failed',
error: error instanceof NotFoundError ? undefined : error,
};
ops.events?.update?.(updateParams);
}
}
const sortedEmbeds = output.embeds
.filter((embed) => {
const e = list.embeds.find((v) => v.id === embed.embedId);
return e && !e.disabled;
})
.sort((a, b) => embedIds.indexOf(a.embedId) - embedIds.indexOf(b.embedId));
if (sortedEmbeds.length > 0) {
ops.events?.discoverEmbeds?.({
embeds: sortedEmbeds.map((embed, i) => ({
id: [source.id, i].join('-'),
embedScraperId: embed.embedId,
})),
sourceId: source.id,
});
}
for (const [ind, embed] of sortedEmbeds.entries()) {
const scraper = embeds.find((v) => v.id === embed.embedId);
if (!scraper) continue;
const id = [source.id, ind].join('-');
ops.events?.start?.(id);
try {
const embedOutput = await scraper.scrape({
...contextBase,
url: embed.url,
});
embedOutput.stream = embedOutput.stream
.filter(isValidStream)
.filter((stream) => flagsAllowedInFeatures(ops.features, stream.flags));
embedOutput.stream = embedOutput.stream.map((stream) =>
requiresProxy(stream) && ops.proxyStreams ? setupProxy(stream) : stream,
);
if (embedOutput.stream.length === 0) {
throw new NotFoundError('No streams found');
}
const playableStream = await validatePlayableStream(embedOutput.stream[0], ops, embed.embedId);
if (playableStream) {
return {
sourceId: source.id,
embedId: scraper.id,
stream: playableStream,
};
}
} catch (error) {
const updateParams: UpdateEvent = {
id,
percentage: 100,
status: error instanceof NotFoundError ? 'notfound' : 'failure',
reason: error instanceof NotFoundError ? error.message : undefined,
error: error instanceof NotFoundError ? undefined : error,
};
ops.events?.update?.(updateParams);
}
}
return null;
};
// worker pool
return new Promise<RunOutput | null>((resolve) => {
let sourceIndex = 0;
let activeWorkers = 0;
let resolved = false;
const startWorker = () => {
if (resolved || sourceIndex >= sources.length) return;
const source = sources[sourceIndex++];
activeWorkers++;
processSource(source)
.then((result) => {
if (resolved) return;
if (result) {
resolved = true;
resolve(result);
} else {
activeWorkers--;
startWorker();
if (activeWorkers === 0 && sourceIndex >= sources.length) {
resolve(null);
}
}
})
.catch(() => {
activeWorkers--;
startWorker();
if (activeWorkers === 0 && sourceIndex >= sources.length) {
resolve(null);
}
});
};
// run workers
for (let i = 0; i < Math.min(concurrency, sources.length); i++) {
startWorker();
}
});
}