diff --git a/package.json b/package.json index 104bbe0d..ff28a75d 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,6 @@ }, "homepage": "https://github.com/wukko/cobalt#readme", "dependencies": { - "abort-controller": "3.0.0", "content-disposition-header": "0.6.0", "cors": "^2.8.5", "dotenv": "^16.0.1", diff --git a/src/modules/stream/types.js b/src/modules/stream/types.js index bf6d7594..e979e347 100644 --- a/src/modules/stream/types.js +++ b/src/modules/stream/types.js @@ -2,71 +2,40 @@ import { spawn } from "child_process"; import ffmpeg from "ffmpeg-static"; import { ffmpegArgs, genericUserAgent } from "../config.js"; import { getThreads, metadataManager } from "../sub/utils.js"; -import { request } from "undici"; +import { request } from 'undici'; import { create as contentDisposition } from "content-disposition-header"; -import { AbortController } from "abort-controller" -function closeRequest(controller) { - try { controller.abort() } catch {} -} - -function closeResponse(res) { +function fail(res) { if (!res.headersSent) res.sendStatus(500); return res.destroy(); } -function killProcess(p) { - // ask the process to terminate itself gracefully - p?.kill('SIGTERM'); - setTimeout(() => { - if (p?.exitCode === null) - // brutally murder the process if it didn't quit - p?.kill('SIGKILL'); - }, 5000); -} - -function pipe(from, to, done) { - from.on('error', done) - .on('close', done); - - to.on('error', done) - .on('close', done); - - from.pipe(to); -} - export async function streamDefault(streamInfo, res) { - const abortController = new AbortController(); - const shutdown = () => (closeRequest(abortController), closeResponse(res)); - try { - const filename = streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename; - res.setHeader('Content-disposition', contentDisposition(filename)); + let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; + res.setHeader('Content-disposition', contentDisposition(streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename)); const { body: stream, headers } = await request(streamInfo.urls, { headers: { 'user-agent': genericUserAgent }, - signal: abortController.signal, maxRedirections: 16 }); res.setHeader('content-type', headers['content-type']); res.setHeader('content-length', headers['content-length']); - pipe(stream, res, shutdown); - } catch { - shutdown(); + stream.pipe(res).on('error', () => fail(res)); + stream.on('error', () => fail(res)); + stream.on('aborted', () => fail(res)); + } catch (e) { + fail(res); } } - export async function streamLiveRender(streamInfo, res) { - let abortController = new AbortController(), process; - const shutdown = () => (closeRequest(abortController), killProcess(process), closeResponse(res)); - try { - if (streamInfo.urls.length !== 2) return shutdown(); + if (streamInfo.urls.length !== 2) return fail(res); - const { body: audio } = await request(streamInfo.urls[1], { - maxRedirections: 16, signal: abortController.signal + let { body: audio } = await request(streamInfo.urls[1], { + maxRedirections: 16 }); let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], @@ -82,41 +51,58 @@ export async function streamLiveRender(streamInfo, res) { args = args.concat(ffmpegArgs[format]); if (streamInfo.metadata) args = args.concat(metadataManager(streamInfo.metadata)); args.push('-f', format, 'pipe:4'); - - process = spawn(ffmpeg, args, { + let ffmpegProcess = spawn(ffmpeg, args, { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', 'pipe', 'pipe' ], }); - - const [,,, audioInput, muxOutput] = process.stdio; - res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + res.on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); + ffmpegProcess.stdio[4].pipe(res).on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); + audio.pipe(ffmpegProcess.stdio[3]).on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); + + audio.on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); + audio.on('aborted', () => { + ffmpegProcess.kill(); + fail(res); + }); - pipe(audio, audioInput, shutdown); - pipe(muxOutput, res, shutdown); + ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); + ffmpegProcess.on('close', () => ffmpegProcess.kill()); + ffmpegProcess.on('exit', () => ffmpegProcess.kill()); + res.on('finish', () => ffmpegProcess.kill()); + res.on('close', () => ffmpegProcess.kill()); + ffmpegProcess.on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); - process.on('close', shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); + } catch (e) { + fail(res); } } - export function streamAudioOnly(streamInfo, res) { - let process; - const shutdown = () => (killProcess(process), closeResponse(res)); - try { let args = [ '-loglevel', '-8', '-threads', `${getThreads()}`, '-i', streamInfo.urls ] - if (streamInfo.metadata) { if (streamInfo.metadata.cover) { // currently corrupts the audio args.push('-i', streamInfo.metadata.cover, '-map', '0:a', '-map', '1:0') @@ -127,39 +113,39 @@ export function streamAudioOnly(streamInfo, res) { } else { args.push('-vn') } - let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"]; args = args.concat(arg); if (ffmpegArgs[streamInfo.audioFormat]) args = args.concat(ffmpegArgs[streamInfo.audioFormat]); args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); - process = spawn(ffmpeg, args, { + const ffmpegProcess = spawn(ffmpeg, args, { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', 'pipe' ], }); - - const [,,, muxOutput] = process.stdio; - res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); + ffmpegProcess.stdio[3].pipe(res); - pipe(muxOutput, res, shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); + ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); + ffmpegProcess.on('close', () => ffmpegProcess.kill()); + ffmpegProcess.on('exit', () => ffmpegProcess.kill()); + res.on('finish', () => ffmpegProcess.kill()); + res.on('close', () => ffmpegProcess.kill()); + ffmpegProcess.on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); + } catch (e) { + fail(res); } } - export function streamVideoOnly(streamInfo, res) { - let process; - const shutdown = () => (killProcess(process), closeResponse(res)); - try { - let args = [ + let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], args = [ '-loglevel', '-8', '-threads', `${getThreads()}`, '-i', streamInfo.urls, @@ -167,29 +153,29 @@ export function streamVideoOnly(streamInfo, res) { ] if (streamInfo.mute) args.push('-an'); if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc'); - - let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov'); args.push('-f', format, 'pipe:3'); - - process = spawn(ffmpeg, args, { + const ffmpegProcess = spawn(ffmpeg, args, { windowsHide: true, stdio: [ 'inherit', 'inherit', 'inherit', 'pipe' ], }); - - const [,,, muxOutput] = process.stdio; - res.setHeader('Connection', 'keep-alive'); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); + ffmpegProcess.stdio[3].pipe(res); - pipe(muxOutput, res, shutdown); - - process.on('close', shutdown); - res.on('finish', shutdown); - } catch { - shutdown(); + ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); + ffmpegProcess.on('close', () => ffmpegProcess.kill()); + ffmpegProcess.on('exit', () => ffmpegProcess.kill()); + res.on('finish', () => ffmpegProcess.kill()); + res.on('close', () => ffmpegProcess.kill()); + ffmpegProcess.on('error', () => { + ffmpegProcess.kill(); + fail(res); + }); + } catch (e) { + fail(res); } }