improve stream shutdown handling

merge pull request #225 from dumbmoron/stream-close-handling
This commit is contained in:
wukko 2023-11-06 06:56:11 +06:00 committed by GitHub
commit 4f47a68c17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 74 deletions

View file

@ -25,6 +25,7 @@
}, },
"homepage": "https://github.com/wukko/cobalt#readme", "homepage": "https://github.com/wukko/cobalt#readme",
"dependencies": { "dependencies": {
"abort-controller": "3.0.0",
"content-disposition-header": "0.6.0", "content-disposition-header": "0.6.0",
"cors": "^2.8.5", "cors": "^2.8.5",
"dotenv": "^16.0.1", "dotenv": "^16.0.1",

View file

@ -2,40 +2,71 @@ import { spawn } from "child_process";
import ffmpeg from "ffmpeg-static"; import ffmpeg from "ffmpeg-static";
import { ffmpegArgs, genericUserAgent } from "../config.js"; import { ffmpegArgs, genericUserAgent } from "../config.js";
import { getThreads, metadataManager } from "../sub/utils.js"; import { getThreads, metadataManager } from "../sub/utils.js";
import { request } from 'undici'; import { request } from "undici";
import { create as contentDisposition } from "content-disposition-header"; import { create as contentDisposition } from "content-disposition-header";
import { AbortController } from "abort-controller"
function fail(res) { function closeRequest(controller) {
try { controller.abort() } catch {}
}
function closeResponse(res) {
if (!res.headersSent) res.sendStatus(500); if (!res.headersSent) res.sendStatus(500);
return res.destroy(); return res.destroy();
} }
function killProcess(p) {
// ask the process to terminate itself gracefully
p?.kill('SIGTERM');
setTimeout(() => {
if (p?.exitCode === null)
// brutally murder the process if it didn't quit
p?.kill('SIGKILL');
}, 5000);
}
function pipe(from, to, done) {
from.on('error', done)
.on('close', done);
to.on('error', done)
.on('close', done);
from.pipe(to);
}
export async function streamDefault(streamInfo, res) { export async function streamDefault(streamInfo, res) {
const abortController = new AbortController();
const shutdown = () => (closeRequest(abortController), closeResponse(res));
try { try {
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1]; const filename = streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename;
res.setHeader('Content-disposition', contentDisposition(streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename)); res.setHeader('Content-disposition', contentDisposition(filename));
const { body: stream, headers } = await request(streamInfo.urls, { const { body: stream, headers } = await request(streamInfo.urls, {
headers: { 'user-agent': genericUserAgent }, headers: { 'user-agent': genericUserAgent },
signal: abortController.signal,
maxRedirections: 16 maxRedirections: 16
}); });
res.setHeader('content-type', headers['content-type']); res.setHeader('content-type', headers['content-type']);
res.setHeader('content-length', headers['content-length']); res.setHeader('content-length', headers['content-length']);
stream.pipe(res).on('error', () => fail(res)); pipe(stream, res, shutdown);
stream.on('error', () => fail(res)); } catch {
stream.on('aborted', () => fail(res)); shutdown();
} catch (e) {
fail(res);
} }
} }
export async function streamLiveRender(streamInfo, res) {
try {
if (streamInfo.urls.length !== 2) return fail(res);
let { body: audio } = await request(streamInfo.urls[1], { export async function streamLiveRender(streamInfo, res) {
maxRedirections: 16 let abortController = new AbortController(), process;
const shutdown = () => (closeRequest(abortController), killProcess(process), closeResponse(res));
try {
if (streamInfo.urls.length !== 2) return shutdown();
const { body: audio } = await request(streamInfo.urls[1], {
maxRedirections: 16, signal: abortController.signal
}); });
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1],
@ -51,58 +82,41 @@ export async function streamLiveRender(streamInfo, res) {
args = args.concat(ffmpegArgs[format]); args = args.concat(ffmpegArgs[format]);
if (streamInfo.metadata) args = args.concat(metadataManager(streamInfo.metadata)); if (streamInfo.metadata) args = args.concat(metadataManager(streamInfo.metadata));
args.push('-f', format, 'pipe:4'); args.push('-f', format, 'pipe:4');
let ffmpegProcess = spawn(ffmpeg, args, {
process = spawn(ffmpeg, args, {
windowsHide: true, windowsHide: true,
stdio: [ stdio: [
'inherit', 'inherit', 'inherit', 'inherit', 'inherit', 'inherit',
'pipe', 'pipe' 'pipe', 'pipe'
], ],
}); });
const [,,, audioInput, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
res.on('error', () => {
ffmpegProcess.kill();
fail(res);
});
ffmpegProcess.stdio[4].pipe(res).on('error', () => {
ffmpegProcess.kill();
fail(res);
});
audio.pipe(ffmpegProcess.stdio[3]).on('error', () => {
ffmpegProcess.kill();
fail(res);
});
audio.on('error', () => {
ffmpegProcess.kill();
fail(res);
});
audio.on('aborted', () => {
ffmpegProcess.kill();
fail(res);
});
ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); pipe(audio, audioInput, shutdown);
ffmpegProcess.on('close', () => ffmpegProcess.kill()); pipe(muxOutput, res, shutdown);
ffmpegProcess.on('exit', () => ffmpegProcess.kill());
res.on('finish', () => ffmpegProcess.kill());
res.on('close', () => ffmpegProcess.kill());
ffmpegProcess.on('error', () => {
ffmpegProcess.kill();
fail(res);
});
} catch (e) { process.on('close', shutdown);
fail(res); res.on('finish', shutdown);
} catch {
shutdown();
} }
} }
export function streamAudioOnly(streamInfo, res) { export function streamAudioOnly(streamInfo, res) {
let process;
const shutdown = () => (killProcess(process), closeResponse(res));
try { try {
let args = [ let args = [
'-loglevel', '-8', '-loglevel', '-8',
'-threads', `${getThreads()}`, '-threads', `${getThreads()}`,
'-i', streamInfo.urls '-i', streamInfo.urls
] ]
if (streamInfo.metadata) { if (streamInfo.metadata) {
if (streamInfo.metadata.cover) { // currently corrupts the audio if (streamInfo.metadata.cover) { // currently corrupts the audio
args.push('-i', streamInfo.metadata.cover, '-map', '0:a', '-map', '1:0') args.push('-i', streamInfo.metadata.cover, '-map', '0:a', '-map', '1:0')
@ -113,39 +127,39 @@ export function streamAudioOnly(streamInfo, res) {
} else { } else {
args.push('-vn') args.push('-vn')
} }
let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"]; let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"];
args = args.concat(arg); args = args.concat(arg);
if (ffmpegArgs[streamInfo.audioFormat]) args = args.concat(ffmpegArgs[streamInfo.audioFormat]); if (ffmpegArgs[streamInfo.audioFormat]) args = args.concat(ffmpegArgs[streamInfo.audioFormat]);
args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3'); args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3');
const ffmpegProcess = spawn(ffmpeg, args, { process = spawn(ffmpeg, args, {
windowsHide: true, windowsHide: true,
stdio: [ stdio: [
'inherit', 'inherit', 'inherit', 'inherit', 'inherit', 'inherit',
'pipe' 'pipe'
], ],
}); });
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`)); res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`));
ffmpegProcess.stdio[3].pipe(res);
ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); pipe(muxOutput, res, shutdown);
ffmpegProcess.on('close', () => ffmpegProcess.kill()); res.on('finish', shutdown);
ffmpegProcess.on('exit', () => ffmpegProcess.kill()); } catch {
res.on('finish', () => ffmpegProcess.kill()); shutdown();
res.on('close', () => ffmpegProcess.kill());
ffmpegProcess.on('error', () => {
ffmpegProcess.kill();
fail(res);
});
} catch (e) {
fail(res);
} }
} }
export function streamVideoOnly(streamInfo, res) { export function streamVideoOnly(streamInfo, res) {
let process;
const shutdown = () => (killProcess(process), closeResponse(res));
try { try {
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], args = [ let args = [
'-loglevel', '-8', '-loglevel', '-8',
'-threads', `${getThreads()}`, '-threads', `${getThreads()}`,
'-i', streamInfo.urls, '-i', streamInfo.urls,
@ -153,29 +167,29 @@ export function streamVideoOnly(streamInfo, res) {
] ]
if (streamInfo.mute) args.push('-an'); if (streamInfo.mute) args.push('-an');
if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc'); if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc');
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov'); if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov');
args.push('-f', format, 'pipe:3'); args.push('-f', format, 'pipe:3');
const ffmpegProcess = spawn(ffmpeg, args, {
process = spawn(ffmpeg, args, {
windowsHide: true, windowsHide: true,
stdio: [ stdio: [
'inherit', 'inherit', 'inherit', 'inherit', 'inherit', 'inherit',
'pipe' 'pipe'
], ],
}); });
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename)); res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
ffmpegProcess.stdio[3].pipe(res);
ffmpegProcess.on('disconnect', () => ffmpegProcess.kill()); pipe(muxOutput, res, shutdown);
ffmpegProcess.on('close', () => ffmpegProcess.kill());
ffmpegProcess.on('exit', () => ffmpegProcess.kill()); process.on('close', shutdown);
res.on('finish', () => ffmpegProcess.kill()); res.on('finish', shutdown);
res.on('close', () => ffmpegProcess.kill()); } catch {
ffmpegProcess.on('error', () => { shutdown();
ffmpegProcess.kill();
fail(res);
});
} catch (e) {
fail(res);
} }
} }