From ef97ff06af7a7d77de943712cfd7176dfe8e5f90 Mon Sep 17 00:00:00 2001 From: jj Date: Sat, 22 Jun 2024 12:57:30 +0200 Subject: [PATCH] stream: fix some memory leaks in internal stream handling (#581) --- src/modules/stream/internal.js | 23 ++++++++++++++--------- src/modules/stream/manage.js | 11 ++++++++++- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/modules/stream/internal.js b/src/modules/stream/internal.js index 535bba2d..ef8ec6f7 100644 --- a/src/modules/stream/internal.js +++ b/src/modules/stream/internal.js @@ -1,6 +1,5 @@ import { request } from 'undici'; import { Readable } from 'node:stream'; -import { assert } from 'console'; import { getHeaders, pipe } from './shared.js'; import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js'; @@ -36,21 +35,17 @@ async function* readChunks(streamInfo, size) { read += received; } -} - -function chunkedStream(streamInfo, size) { - assert(streamInfo.controller instanceof AbortController); - const stream = Readable.from(readChunks(streamInfo, size)); - return stream; } async function handleYoutubeStream(streamInfo, res) { + const { signal } = streamInfo.controller; + try { const req = await fetch(streamInfo.url, { headers: getHeaders('youtube'), method: 'HEAD', dispatcher: streamInfo.dispatcher, - signal: streamInfo.controller.signal + signal }); streamInfo.url = req.url; @@ -60,7 +55,16 @@ async function handleYoutubeStream(streamInfo, res) { return res.end(); } - const stream = chunkedStream(streamInfo, size); + const generator = readChunks(streamInfo, size); + + const abortGenerator = () => { + generator.return(); + signal.removeEventListener('abort', abortGenerator); + } + + signal.addEventListener('abort', abortGenerator); + + const stream = Readable.from(generator); for (const headerName of ['content-type', 'content-length']) { const headerValue = req.headers.get(headerName); @@ -69,6 +73,7 @@ async function handleYoutubeStream(streamInfo, res) { pipe(stream, res, () => res.end()); } catch { + signal.abort(); res.end(); } } diff --git a/src/modules/stream/manage.js b/src/modules/stream/manage.js index 0dec1972..b02c6084 100644 --- a/src/modules/stream/manage.js +++ b/src/modules/stream/manage.js @@ -78,16 +78,25 @@ export function createInternalStream(url, obj = {}) { } const streamID = nanoid(); + const controller = new AbortController(); internalStreamCache[streamID] = { url, service: obj.service, headers: obj.headers, - controller: new AbortController(), + controller, dispatcher }; let streamLink = new URL('/api/istream', `http://127.0.0.1:${env.apiPort}`); streamLink.searchParams.set('id', streamID); + + const cleanup = () => { + destroyInternalStream(streamLink); + controller.signal.removeEventListener('abort', cleanup); + } + + controller.signal.addEventListener('abort', cleanup); + return streamLink.toString(); }