revert "improve stream shutdown handling" pr as everything broke in prod
This commit is contained in:
commit
1391a308da
2 changed files with 72 additions and 87 deletions
|
@ -25,7 +25,6 @@
|
||||||
},
|
},
|
||||||
"homepage": "https://github.com/wukko/cobalt#readme",
|
"homepage": "https://github.com/wukko/cobalt#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"abort-controller": "3.0.0",
|
|
||||||
"content-disposition-header": "0.6.0",
|
"content-disposition-header": "0.6.0",
|
||||||
"cors": "^2.8.5",
|
"cors": "^2.8.5",
|
||||||
"dotenv": "^16.0.1",
|
"dotenv": "^16.0.1",
|
||||||
|
|
|
@ -2,71 +2,40 @@ import { spawn } from "child_process";
|
||||||
import ffmpeg from "ffmpeg-static";
|
import ffmpeg from "ffmpeg-static";
|
||||||
import { ffmpegArgs, genericUserAgent } from "../config.js";
|
import { ffmpegArgs, genericUserAgent } from "../config.js";
|
||||||
import { getThreads, metadataManager } from "../sub/utils.js";
|
import { getThreads, metadataManager } from "../sub/utils.js";
|
||||||
import { request } from "undici";
|
import { request } from 'undici';
|
||||||
import { create as contentDisposition } from "content-disposition-header";
|
import { create as contentDisposition } from "content-disposition-header";
|
||||||
import { AbortController } from "abort-controller"
|
|
||||||
|
|
||||||
function closeRequest(controller) {
|
function fail(res) {
|
||||||
try { controller.abort() } catch {}
|
|
||||||
}
|
|
||||||
|
|
||||||
function closeResponse(res) {
|
|
||||||
if (!res.headersSent) res.sendStatus(500);
|
if (!res.headersSent) res.sendStatus(500);
|
||||||
return res.destroy();
|
return res.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
function killProcess(p) {
|
|
||||||
// ask the process to terminate itself gracefully
|
|
||||||
p?.kill('SIGTERM');
|
|
||||||
setTimeout(() => {
|
|
||||||
if (p?.exitCode === null)
|
|
||||||
// brutally murder the process if it didn't quit
|
|
||||||
p?.kill('SIGKILL');
|
|
||||||
}, 5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
function pipe(from, to, done) {
|
|
||||||
from.on('error', done)
|
|
||||||
.on('close', done);
|
|
||||||
|
|
||||||
to.on('error', done)
|
|
||||||
.on('close', done);
|
|
||||||
|
|
||||||
from.pipe(to);
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function streamDefault(streamInfo, res) {
|
export async function streamDefault(streamInfo, res) {
|
||||||
const abortController = new AbortController();
|
|
||||||
const shutdown = () => (closeRequest(abortController), closeResponse(res));
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const filename = streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename;
|
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
|
||||||
res.setHeader('Content-disposition', contentDisposition(filename));
|
res.setHeader('Content-disposition', contentDisposition(streamInfo.isAudioOnly ? `${streamInfo.filename}.${streamInfo.audioFormat}` : streamInfo.filename));
|
||||||
|
|
||||||
const { body: stream, headers } = await request(streamInfo.urls, {
|
const { body: stream, headers } = await request(streamInfo.urls, {
|
||||||
headers: { 'user-agent': genericUserAgent },
|
headers: { 'user-agent': genericUserAgent },
|
||||||
signal: abortController.signal,
|
|
||||||
maxRedirections: 16
|
maxRedirections: 16
|
||||||
});
|
});
|
||||||
|
|
||||||
res.setHeader('content-type', headers['content-type']);
|
res.setHeader('content-type', headers['content-type']);
|
||||||
res.setHeader('content-length', headers['content-length']);
|
res.setHeader('content-length', headers['content-length']);
|
||||||
|
|
||||||
pipe(stream, res, shutdown);
|
stream.pipe(res).on('error', () => fail(res));
|
||||||
} catch {
|
stream.on('error', () => fail(res));
|
||||||
shutdown();
|
stream.on('aborted', () => fail(res));
|
||||||
|
} catch (e) {
|
||||||
|
fail(res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function streamLiveRender(streamInfo, res) {
|
export async function streamLiveRender(streamInfo, res) {
|
||||||
let abortController = new AbortController(), process;
|
|
||||||
const shutdown = () => (closeRequest(abortController), killProcess(process), closeResponse(res));
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (streamInfo.urls.length !== 2) return shutdown();
|
if (streamInfo.urls.length !== 2) return fail(res);
|
||||||
|
|
||||||
const { body: audio } = await request(streamInfo.urls[1], {
|
let { body: audio } = await request(streamInfo.urls[1], {
|
||||||
maxRedirections: 16, signal: abortController.signal
|
maxRedirections: 16
|
||||||
});
|
});
|
||||||
|
|
||||||
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1],
|
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1],
|
||||||
|
@ -82,41 +51,58 @@ export async function streamLiveRender(streamInfo, res) {
|
||||||
args = args.concat(ffmpegArgs[format]);
|
args = args.concat(ffmpegArgs[format]);
|
||||||
if (streamInfo.metadata) args = args.concat(metadataManager(streamInfo.metadata));
|
if (streamInfo.metadata) args = args.concat(metadataManager(streamInfo.metadata));
|
||||||
args.push('-f', format, 'pipe:4');
|
args.push('-f', format, 'pipe:4');
|
||||||
|
let ffmpegProcess = spawn(ffmpeg, args, {
|
||||||
process = spawn(ffmpeg, args, {
|
|
||||||
windowsHide: true,
|
windowsHide: true,
|
||||||
stdio: [
|
stdio: [
|
||||||
'inherit', 'inherit', 'inherit',
|
'inherit', 'inherit', 'inherit',
|
||||||
'pipe', 'pipe'
|
'pipe', 'pipe'
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const [,,, audioInput, muxOutput] = process.stdio;
|
|
||||||
|
|
||||||
res.setHeader('Connection', 'keep-alive');
|
res.setHeader('Connection', 'keep-alive');
|
||||||
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
||||||
|
res.on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
ffmpegProcess.stdio[4].pipe(res).on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
audio.pipe(ffmpegProcess.stdio[3]).on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
|
||||||
|
audio.on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
audio.on('aborted', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
|
||||||
pipe(audio, audioInput, shutdown);
|
ffmpegProcess.on('disconnect', () => ffmpegProcess.kill());
|
||||||
pipe(muxOutput, res, shutdown);
|
ffmpegProcess.on('close', () => ffmpegProcess.kill());
|
||||||
|
ffmpegProcess.on('exit', () => ffmpegProcess.kill());
|
||||||
|
res.on('finish', () => ffmpegProcess.kill());
|
||||||
|
res.on('close', () => ffmpegProcess.kill());
|
||||||
|
ffmpegProcess.on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
|
||||||
process.on('close', shutdown);
|
} catch (e) {
|
||||||
res.on('finish', shutdown);
|
fail(res);
|
||||||
} catch {
|
|
||||||
shutdown();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function streamAudioOnly(streamInfo, res) {
|
export function streamAudioOnly(streamInfo, res) {
|
||||||
let process;
|
|
||||||
const shutdown = () => (killProcess(process), closeResponse(res));
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let args = [
|
let args = [
|
||||||
'-loglevel', '-8',
|
'-loglevel', '-8',
|
||||||
'-threads', `${getThreads()}`,
|
'-threads', `${getThreads()}`,
|
||||||
'-i', streamInfo.urls
|
'-i', streamInfo.urls
|
||||||
]
|
]
|
||||||
|
|
||||||
if (streamInfo.metadata) {
|
if (streamInfo.metadata) {
|
||||||
if (streamInfo.metadata.cover) { // currently corrupts the audio
|
if (streamInfo.metadata.cover) { // currently corrupts the audio
|
||||||
args.push('-i', streamInfo.metadata.cover, '-map', '0:a', '-map', '1:0')
|
args.push('-i', streamInfo.metadata.cover, '-map', '0:a', '-map', '1:0')
|
||||||
|
@ -127,39 +113,39 @@ export function streamAudioOnly(streamInfo, res) {
|
||||||
} else {
|
} else {
|
||||||
args.push('-vn')
|
args.push('-vn')
|
||||||
}
|
}
|
||||||
|
|
||||||
let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"];
|
let arg = streamInfo.copy ? ffmpegArgs["copy"] : ffmpegArgs["audio"];
|
||||||
args = args.concat(arg);
|
args = args.concat(arg);
|
||||||
|
|
||||||
if (ffmpegArgs[streamInfo.audioFormat]) args = args.concat(ffmpegArgs[streamInfo.audioFormat]);
|
if (ffmpegArgs[streamInfo.audioFormat]) args = args.concat(ffmpegArgs[streamInfo.audioFormat]);
|
||||||
args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3');
|
args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3');
|
||||||
|
|
||||||
process = spawn(ffmpeg, args, {
|
const ffmpegProcess = spawn(ffmpeg, args, {
|
||||||
windowsHide: true,
|
windowsHide: true,
|
||||||
stdio: [
|
stdio: [
|
||||||
'inherit', 'inherit', 'inherit',
|
'inherit', 'inherit', 'inherit',
|
||||||
'pipe'
|
'pipe'
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const [,,, muxOutput] = process.stdio;
|
|
||||||
|
|
||||||
res.setHeader('Connection', 'keep-alive');
|
res.setHeader('Connection', 'keep-alive');
|
||||||
res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`));
|
res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`));
|
||||||
|
ffmpegProcess.stdio[3].pipe(res);
|
||||||
|
|
||||||
pipe(muxOutput, res, shutdown);
|
ffmpegProcess.on('disconnect', () => ffmpegProcess.kill());
|
||||||
res.on('finish', shutdown);
|
ffmpegProcess.on('close', () => ffmpegProcess.kill());
|
||||||
} catch {
|
ffmpegProcess.on('exit', () => ffmpegProcess.kill());
|
||||||
shutdown();
|
res.on('finish', () => ffmpegProcess.kill());
|
||||||
|
res.on('close', () => ffmpegProcess.kill());
|
||||||
|
ffmpegProcess.on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
fail(res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function streamVideoOnly(streamInfo, res) {
|
export function streamVideoOnly(streamInfo, res) {
|
||||||
let process;
|
|
||||||
const shutdown = () => (killProcess(process), closeResponse(res));
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let args = [
|
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1], args = [
|
||||||
'-loglevel', '-8',
|
'-loglevel', '-8',
|
||||||
'-threads', `${getThreads()}`,
|
'-threads', `${getThreads()}`,
|
||||||
'-i', streamInfo.urls,
|
'-i', streamInfo.urls,
|
||||||
|
@ -167,29 +153,29 @@ export function streamVideoOnly(streamInfo, res) {
|
||||||
]
|
]
|
||||||
if (streamInfo.mute) args.push('-an');
|
if (streamInfo.mute) args.push('-an');
|
||||||
if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc');
|
if (streamInfo.service === "vimeo" || streamInfo.service === "rutube") args.push('-bsf:a', 'aac_adtstoasc');
|
||||||
|
|
||||||
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
|
|
||||||
if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov');
|
if (format === "mp4") args.push('-movflags', 'faststart+frag_keyframe+empty_moov');
|
||||||
args.push('-f', format, 'pipe:3');
|
args.push('-f', format, 'pipe:3');
|
||||||
|
const ffmpegProcess = spawn(ffmpeg, args, {
|
||||||
process = spawn(ffmpeg, args, {
|
|
||||||
windowsHide: true,
|
windowsHide: true,
|
||||||
stdio: [
|
stdio: [
|
||||||
'inherit', 'inherit', 'inherit',
|
'inherit', 'inherit', 'inherit',
|
||||||
'pipe'
|
'pipe'
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const [,,, muxOutput] = process.stdio;
|
|
||||||
|
|
||||||
res.setHeader('Connection', 'keep-alive');
|
res.setHeader('Connection', 'keep-alive');
|
||||||
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
||||||
|
ffmpegProcess.stdio[3].pipe(res);
|
||||||
|
|
||||||
pipe(muxOutput, res, shutdown);
|
ffmpegProcess.on('disconnect', () => ffmpegProcess.kill());
|
||||||
|
ffmpegProcess.on('close', () => ffmpegProcess.kill());
|
||||||
process.on('close', shutdown);
|
ffmpegProcess.on('exit', () => ffmpegProcess.kill());
|
||||||
res.on('finish', shutdown);
|
res.on('finish', () => ffmpegProcess.kill());
|
||||||
} catch {
|
res.on('close', () => ffmpegProcess.kill());
|
||||||
shutdown();
|
ffmpegProcess.on('error', () => {
|
||||||
|
ffmpegProcess.kill();
|
||||||
|
fail(res);
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
fail(res);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue