From facf7741ce727e250532bf314aebffb3d5b3f189 Mon Sep 17 00:00:00 2001 From: wukko Date: Thu, 22 Aug 2024 17:37:31 +0600 Subject: [PATCH] api/stream: standardize stream types & clean up related functions --- api/src/config.js | 1 - api/src/processing/match-action.js | 32 +++-- api/src/processing/services/instagram.js | 4 +- api/src/processing/services/reddit.js | 2 +- api/src/processing/services/twitter.js | 12 +- api/src/processing/services/youtube.js | 4 +- api/src/stream/internal-hls.js | 4 +- api/src/stream/internal.js | 8 +- api/src/stream/manage.js | 9 +- api/src/stream/stream.js | 40 ++++--- api/src/stream/types.js | 144 ++++++++++++----------- 11 files changed, 134 insertions(+), 126 deletions(-) diff --git a/api/src/config.js b/api/src/config.js index 1fd35427..9792729e 100644 --- a/api/src/config.js +++ b/api/src/config.js @@ -3,7 +3,6 @@ const supportedAudio = ["mp3", "ogg", "wav", "opus"]; const ffmpegArgs = { webm: ["-c:v", "copy", "-c:a", "copy"], mp4: ["-c:v", "copy", "-c:a", "copy", "-movflags", "faststart+frag_keyframe+empty_moov"], - copy: ["-c:a", "copy"], audio: ["-ar", "48000", "-ac", "2", "-b:a", "320k"], m4a: ["-movflags", "frag_keyframe+empty_moov"], gif: ["-vf", "scale=-1:-1:flags=lanczos,split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse", "-loop", "0"] diff --git a/api/src/processing/match-action.js b/api/src/processing/match-action.js index 4620d232..f14d57dc 100644 --- a/api/src/processing/match-action.js +++ b/api/src/processing/match-action.js @@ -1,9 +1,9 @@ -import { supportedAudio } from "../config.js"; -import { audioIgnore, services } from "./service-config.js"; - -import { createResponse } from "./request.js"; import createFilename from "./create-filename.js"; + +import { supportedAudio } from "../config.js"; +import { createResponse } from "./request.js"; import { createStream } from "../stream/manage.js"; +import { audioIgnore, services } from "./service-config.js"; export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disableMetadata, filenameStyle, twitterGif, requestIP }) { let action, @@ -29,9 +29,9 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab if (action === "picker" || action === "audio") { if (!r.filenameAttributes) defaultParams.filename = r.audioFilename; - defaultParams.isAudioOnly = true; defaultParams.audioFormat = audioFormat; } + if (isAudioMuted && !r.filenameAttributes) { defaultParams.filename = r.filename.replace('.', '_mute.') } @@ -47,12 +47,12 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab break; case "gif": - params = { type: "gif" } + params = { type: "gif" }; break; case "m3u8": params = { - type: Array.isArray(r.urls) ? "render" : "remux" + type: Array.isArray(r.urls) ? "merge" : "remux" } break; @@ -63,8 +63,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab } params = { type: muteType, - u: Array.isArray(r.urls) ? r.urls[0] : r.urls, - mute: true + u: Array.isArray(r.urls) ? r.urls[0] : r.urls } if (host === "reddit" && r.typeId === "redirect") responseType = "redirect"; @@ -79,7 +78,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab params = { picker: r.picker }; break; case "tiktok": - let audioStreamType = "render"; + let audioStreamType = "audio"; if (r.bestAudio === "mp3" && (audioFormat === "mp3" || audioFormat === "best")) { audioFormat = "mp3"; audioStreamType = "proxy" @@ -94,8 +93,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab filename: r.audioFilename, isAudioOnly: true, audioFormat, - }), - copy: audioFormat === "best" + }) } } break; @@ -103,7 +101,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab case "video": switch (host) { case "bilibili": - params = { type: "render" }; + params = { type: "merge" }; break; case "youtube": params = { type: r.type }; @@ -114,7 +112,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab break; case "vimeo": if (Array.isArray(r.urls)) { - params = { type: "render" } + params = { type: "merge" } } else { responseType = "redirect"; } @@ -153,7 +151,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab }) } - let processType = "render", + let processType = "audio", copy = false; if (!supportedAudio.includes(audioFormat)) { @@ -174,7 +172,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab audioFormat = serviceBestAudio; processType = "proxy"; if (isSoundCloud || (isTiktok && audioFormat === "m4a")) { - processType = "render" + processType = "audio" copy = true } } else if (isBestAudio && !isSoundCloud) { @@ -189,7 +187,7 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab if (r.isM3U8 || host === "vimeo") { copy = false; - processType = "render" + processType = "audio" } params = { diff --git a/api/src/processing/services/instagram.js b/api/src/processing/services/instagram.js index 9a43a0d9..66c1ad23 100644 --- a/api/src/processing/services/instagram.js +++ b/api/src/processing/services/instagram.js @@ -177,7 +177,7 @@ export default function(obj) { ** set to `same-origin`, so we need to proxy them */ thumb: createStream({ service: "instagram", - type: "default", + type: "proxy", u: e.node?.display_url, filename: "image.jpg" }) @@ -219,7 +219,7 @@ export default function(obj) { ** set to `same-origin`, so we need to proxy them */ thumb: createStream({ service: "instagram", - type: "default", + type: "proxy", u: imageUrl, filename: "image.jpg" }) diff --git a/api/src/processing/services/reddit.js b/api/src/processing/services/reddit.js index cd712aa6..607a78b6 100644 --- a/api/src/processing/services/reddit.js +++ b/api/src/processing/services/reddit.js @@ -123,7 +123,7 @@ export default async function(obj) { return { typeId: "stream", - type: "render", + type: "merge", urls: [video, audioFileLink], audioFilename: `reddit_${id}_audio`, filename: `reddit_${id}.mp4` diff --git a/api/src/processing/services/twitter.js b/api/src/processing/services/twitter.js index 974db53c..51e9fa5d 100644 --- a/api/src/processing/services/twitter.js +++ b/api/src/processing/services/twitter.js @@ -166,14 +166,14 @@ export default async function({ id, index, toGif, dispatcher }) { case 1: if (media[0].type === "photo") { return { - type: "normal", + type: "proxy", isPhoto: true, urls: `${media[0].media_url_https}?name=4096x4096` } } return { - type: needsFixing(media[0]) ? "remux" : "normal", + type: needsFixing(media[0]) ? "remux" : "proxy", urls: bestQuality(media[0].video_info.variants), filename: `twitter_${id}.mp4`, audioFilename: `twitter_${id}_audio`, @@ -183,7 +183,7 @@ export default async function({ id, index, toGif, dispatcher }) { const proxyThumb = (url) => createStream({ service: "twitter", - type: "default", + type: "proxy", u: url, filename: `image.${new URL(url).pathname.split(".", 2)[1]}` }) @@ -199,15 +199,15 @@ export default async function({ id, index, toGif, dispatcher }) { } let url = bestQuality(content.video_info.variants); - const shouldRenderGif = content.type === 'animated_gif' && toGif; + const shouldRenderGif = content.type === "animated_gif" && toGif; let type = "video"; if (shouldRenderGif) type = "gif"; if (needsFixing(content) || shouldRenderGif) { url = createStream({ - service: 'twitter', - type: shouldRenderGif ? 'gif' : 'remux', + service: "twitter", + type: shouldRenderGif ? "gif" : "remux", u: url, filename: `twitter_${id}_${i + 1}.mp4` }) diff --git a/api/src/processing/services/youtube.js b/api/src/processing/services/youtube.js index 59d409e2..c7d31555 100644 --- a/api/src/processing/services/youtube.js +++ b/api/src/processing/services/youtube.js @@ -263,7 +263,7 @@ export default async function(o) { } if (audio && o.isAudioOnly) return { - type: "render", + type: "audio", isAudioOnly: true, urls: audio.decipher(yt.session.player), filenameAttributes: filenameAttributes, @@ -290,7 +290,7 @@ export default async function(o) { if (!match && video && audio) { match = video; - type = "render"; + type = "merge"; urls = [ video.decipher(yt.session.player), audio.decipher(yt.session.player) diff --git a/api/src/stream/internal-hls.js b/api/src/stream/internal-hls.js index 3ed7e17b..07fcebde 100644 --- a/api/src/stream/internal-hls.js +++ b/api/src/stream/internal-hls.js @@ -1,5 +1,5 @@ -import { createInternalStream } from './manage.js'; -import HLS from 'hls-parser'; +import HLS from "hls-parser"; +import { createInternalStream } from "./manage.js"; function getURL(url) { try { diff --git a/api/src/stream/internal.js b/api/src/stream/internal.js index 253f98bb..51552d4c 100644 --- a/api/src/stream/internal.js +++ b/api/src/stream/internal.js @@ -1,7 +1,7 @@ -import { request } from 'undici'; -import { Readable } from 'node:stream'; -import { closeRequest, getHeaders, pipe } from './shared.js'; -import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; +import { request } from "undici"; +import { Readable } from "node:stream"; +import { closeRequest, getHeaders, pipe } from "./shared.js"; +import { handleHlsPlaylist, isHlsRequest } from "./internal-hls.js"; const CHUNK_SIZE = BigInt(8e6); // 8 MB const min = (a, b) => a < b ? a : b; diff --git a/api/src/stream/manage.js b/api/src/stream/manage.js index 07ee7f07..90a341bb 100644 --- a/api/src/stream/manage.js +++ b/api/src/stream/manage.js @@ -1,12 +1,13 @@ import NodeCache from "node-cache"; -import { randomBytes } from "crypto"; + import { nanoid } from "nanoid"; +import { randomBytes } from "crypto"; +import { strict as assert } from "assert"; import { setMaxListeners } from "node:events"; -import { decryptStream, encryptStream, generateHmac } from "../misc/crypto.js"; import { env } from "../config.js"; -import { strict as assert } from "assert"; import { closeRequest } from "./shared.js"; +import { decryptStream, encryptStream, generateHmac } from "../misc/crypto.js"; // optional dependency const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); @@ -37,10 +38,8 @@ export function createStream(obj) { service: obj.service, filename: obj.filename, audioFormat: obj.audioFormat, - isAudioOnly: !!obj.isAudioOnly, headers: obj.headers, copy: !!obj.copy, - mute: !!obj.mute, metadata: obj.fileMetadata || false, requestIP: obj.requestIP }; diff --git a/api/src/stream/stream.js b/api/src/stream/stream.js index 51cb4376..a6d41200 100644 --- a/api/src/stream/stream.js +++ b/api/src/stream/stream.js @@ -1,31 +1,33 @@ -import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js"; -import { internalStream } from './internal.js'; +import stream from "./types.js"; + import { closeResponse } from "./shared.js"; +import { internalStream } from "./internal.js"; export default async function(res, streamInfo) { try { - if (streamInfo.isAudioOnly && streamInfo.type !== "proxy") { - streamAudioOnly(streamInfo, res); - return; - } switch (streamInfo.type) { + case "proxy": + return await stream.proxy(streamInfo, res); + case "internal": - return await internalStream(streamInfo, res); - case "render": - await streamLiveRender(streamInfo, res); - break; - case "gif": - convertToGif(streamInfo, res); - break; + return internalStream(streamInfo, res); + + case "merge": + return stream.merge(streamInfo, res); + case "remux": case "mute": - streamVideoOnly(streamInfo, res); - break; - default: - await streamDefault(streamInfo, res); - break; + return stream.remux(streamInfo, res); + + case "audio": + return stream.convertAudio(streamInfo, res); + + case "gif": + return stream.convertGif(streamInfo, res); } + + closeResponse(res); } catch { - closeResponse(res) + closeResponse(res); } } diff --git a/api/src/stream/types.js b/api/src/stream/types.js index 01b79dc1..575e522d 100644 --- a/api/src/stream/types.js +++ b/api/src/stream/types.js @@ -9,30 +9,29 @@ import { destroyInternalStream } from "./manage.js"; import { hlsExceptions } from "../processing/service-config.js"; import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js"; -function toRawHeaders(headers) { +const toRawHeaders = (headers) => { return Object.entries(headers) .map(([key, value]) => `${key}: ${value}\r\n`) .join(''); } -function killProcess(p) { - // ask the process to terminate itself gracefully - p?.kill('SIGTERM'); +const killProcess = (p) => { + p?.kill('SIGTERM'); // ask the process to terminate itself gracefully + setTimeout(() => { if (p?.exitCode === null) - // brutally murder the process if it didn't quit - p?.kill('SIGKILL'); + p?.kill('SIGKILL'); // brutally murder the process if it didn't quit }, 5000); } -function getCommand(args) { +const getCommand = (args) => { if (typeof env.processingPriority === 'number' && !isNaN(env.processingPriority)) { return ['nice', ['-n', env.processingPriority.toString(), ffmpeg, ...args]] } return [ffmpeg, args] } -export async function streamDefault(streamInfo, res) { +const proxy = async (streamInfo, res) => { const abortController = new AbortController(); const shutdown = () => ( closeRequest(abortController), @@ -42,7 +41,7 @@ export async function streamDefault(streamInfo, res) { try { let filename = streamInfo.filename; - if (streamInfo.isAudioOnly) { + if (streamInfo.audioFormat) { filename = `${streamInfo.filename}.${streamInfo.audioFormat}` } @@ -67,7 +66,7 @@ export async function streamDefault(streamInfo, res) { } } -export function streamLiveRender(streamInfo, res) { +const merge = (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -127,61 +126,7 @@ export function streamLiveRender(streamInfo, res) { } } -export function streamAudioOnly(streamInfo, res) { - let process; - const shutdown = () => ( - killProcess(process), - closeResponse(res), - destroyInternalStream(streamInfo.urls) - ); - - try { - let args = [ - '-loglevel', '-8', - '-headers', toRawHeaders(getHeaders(streamInfo.service)), - ] - - if (streamInfo.service === "twitter") { - args.push('-seekable', '0'); - } - - args.push( - '-i', streamInfo.urls, - '-vn' - ) - - if (streamInfo.metadata) { - args = args.concat(metadataManager(streamInfo.metadata)) - } - - args = args.concat(ffmpegArgs[streamInfo.copy ? 'copy' : 'audio']); - if (ffmpegArgs[streamInfo.audioFormat]) { - args = args.concat(ffmpegArgs[streamInfo.audioFormat]) - } - - args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); - - process = spawn(...getCommand(args), { - windowsHide: true, - stdio: [ - 'inherit', 'inherit', 'inherit', - 'pipe' - ], - }); - - const [,,, muxOutput] = process.stdio; - - res.setHeader('Connection', 'keep-alive'); - res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); - - pipe(muxOutput, res, shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); - } -} - -export function streamVideoOnly(streamInfo, res) { +const remux = (streamInfo, res) => { let process; const shutdown = () => ( killProcess(process), @@ -204,7 +149,7 @@ export function streamVideoOnly(streamInfo, res) { '-c', 'copy' ) - if (streamInfo.mute) { + if (streamInfo.type === "mute") { args.push('-an') } @@ -241,7 +186,64 @@ export function streamVideoOnly(streamInfo, res) { } } -export function convertToGif(streamInfo, res) { +const convertAudio = (streamInfo, res) => { + let process; + const shutdown = () => ( + killProcess(process), + closeResponse(res), + destroyInternalStream(streamInfo.urls) + ); + + try { + let args = [ + '-loglevel', '-8', + '-headers', toRawHeaders(getHeaders(streamInfo.service)), + ] + + if (streamInfo.service === "twitter") { + args.push('-seekable', '0'); + } + + args.push( + '-i', streamInfo.urls, + '-vn' + ) + + if (streamInfo.metadata) { + args = args.concat(metadataManager(streamInfo.metadata)) + } + + args = args.concat( + streamInfo.copy ? ["-c:a", "copy"] : ffmpegArgs.audio + ); + + if (ffmpegArgs[streamInfo.audioFormat]) { + args = args.concat(ffmpegArgs[streamInfo.audioFormat]) + } + + args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); + + process = spawn(...getCommand(args), { + windowsHide: true, + stdio: [ + 'inherit', 'inherit', 'inherit', + 'pipe' + ], + }); + + const [,,, muxOutput] = process.stdio; + + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); + + pipe(muxOutput, res, shutdown); + res.on('finish', shutdown); + } catch { + shutdown(); + } +} + +const convertGif = (streamInfo, res) => { let process; const shutdown = () => (killProcess(process), closeResponse(res)); @@ -279,3 +281,11 @@ export function convertToGif(streamInfo, res) { shutdown(); } } + +export default { + proxy, + merge, + remux, + convertAudio, + convertGif, +}