From 7229d2221594ef43ff28305e8b3f3a13c3bbb1d5 Mon Sep 17 00:00:00 2001 From: dumbmoron Date: Fri, 16 Aug 2024 20:58:57 +0000 Subject: [PATCH] web/libav: initial implementation of transcoding with WebCodecs --- pnpm-lock.yaml | 9 + web/package.json | 3 +- web/src/lib/buffer-stream.ts | 32 +++ web/src/lib/libav.ts | 351 +++++++++++++++++++++++++++- web/src/lib/types/libav.ts | 38 +++ web/src/routes/convert/+page.svelte | 69 ++++++ 6 files changed, 499 insertions(+), 3 deletions(-) create mode 100644 web/src/lib/buffer-stream.ts create mode 100644 web/src/routes/convert/+page.svelte diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8e054cbc..6709382e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -118,6 +118,9 @@ importers: ts-deepmerge: specifier: ^7.0.0 version: 7.0.1 + yocto-queue: + specifier: ^1.1.1 + version: 1.1.1 devDependencies: '@eslint/js': specifier: ^9.5.0 @@ -2291,6 +2294,10 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + yocto-queue@1.1.1: + resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} + engines: {node: '>=12.20'} + youtubei.js@10.3.0: resolution: {integrity: sha512-tLmeJCECK2xF2hZZtF2nEqirdKVNLFSDpa0LhTaXY3tngtL7doQXyy7M2CLueramDTlmCnFaW+rctHirTPFaRQ==} @@ -4207,6 +4214,8 @@ snapshots: yocto-queue@0.1.0: {} + yocto-queue@1.1.1: {} + youtubei.js@10.3.0: dependencies: jintr: 2.1.1 diff --git a/web/package.json b/web/package.json index 5e09009b..2d41e9e0 100644 --- a/web/package.json +++ b/web/package.json @@ -57,6 +57,7 @@ "libavjs-webcodecs-bridge": "^0.1.0", "mime": "^4.0.4", "sveltekit-i18n": "^2.4.2", - "ts-deepmerge": "^7.0.0" + "ts-deepmerge": "^7.0.0", + "yocto-queue": "^1.1.1" } } diff --git a/web/src/lib/buffer-stream.ts b/web/src/lib/buffer-stream.ts new file mode 100644 index 00000000..9e0502ff --- /dev/null +++ b/web/src/lib/buffer-stream.ts @@ -0,0 +1,32 @@ +import Queue from 'yocto-queue'; + +export class BufferStream extends ReadableStream { + queue = new Queue(); + res?: () => void; + + constructor() { + super({ + pull: async (controller) => { + while (!this.queue.size) { + await new Promise(res => this.res = res); + } + const next = this.queue.dequeue(); + if (next !== null) + controller.enqueue(next); + else + controller.close(); + } + }); + + } + + push(next: T | null) { + this.queue.enqueue(next); + + if (this.res) { + const res = this.res; + this.res = undefined; + res(); + } + } +} diff --git a/web/src/lib/libav.ts b/web/src/lib/libav.ts index 7e50d911..343d7e01 100644 --- a/web/src/lib/libav.ts +++ b/web/src/lib/libav.ts @@ -1,9 +1,14 @@ import mime from "mime"; -import LibAV, { type LibAV as LibAVInstance } from "@imput/libav.js-remux-cli"; -import type { FFmpegProgressCallback, FFmpegProgressEvent, FFmpegProgressStatus, FileInfo, RenderParams } from "./types/libav"; +import LibAV, { type LibAV as LibAVInstance, type Packet, type Stream } from "@imput/libav.js-encode-cli"; +import type { Chunk, ChunkMetadata, Decoder, FFmpegProgressCallback, FFmpegProgressEvent, FFmpegProgressStatus, FileInfo, OutputStream, RenderingPipeline, RenderParams } from "./types/libav"; import type { FfprobeData } from "fluent-ffmpeg"; +import * as LibAVWebCodecs from "libavjs-webcodecs-bridge"; +import { BufferStream } from "./buffer-stream"; import { browser } from "$app/environment"; +const QUEUE_THRESHOLD_MIN = 16; +const QUEUE_THRESHOLD_MAX = 128; + export default class LibAVWrapper { libav: Promise | null; concurrency: number; @@ -188,6 +193,348 @@ export default class LibAVWrapper { } } + async transcode(blob: Blob) { + const libav = await this.#get(); + let fmtctx; + + await libav.mkreadaheadfile('input', blob); + try { + const [ fmt_ctx, streams ] = await libav.ff_init_demuxer_file('input'); + fmtctx = fmt_ctx; + + const pipes: RenderingPipeline[] = []; + const output_streams: OutputStream[] = []; + for (const stream of streams) { + const { + pipe, + stream: ostream + } = await this.#createEncoder(stream, 'avc1.64083e'); + + pipes.push({ + decoder: await this.#createDecoder(stream), + encoder: pipe + } as RenderingPipeline); + output_streams.push(ostream); + } + + await Promise.all([ + this.#decodeStreams(fmt_ctx, pipes, streams), + this.#encodeStreams(pipes), + this.#mux(pipes, output_streams) + ]) + } catch(e) { + console.error(e); + } finally { + await libav.unlinkreadaheadfile('input'); + + if (fmtctx) { + await libav.avformat_close_input_js(fmtctx); + } + } + } + + async #decodeStreams(fmt_ctx: number, pipes: RenderingPipeline[], streams: Stream[]) { + for await (const { index, packet } of this.#demux(fmt_ctx)) { + const { decoder } = pipes[index]; + + this.#decodePacket(decoder.instance, packet, streams[index]); + + let currentSize = decoder.instance.decodeQueueSize + decoder.output.queue.size; + if (currentSize >= QUEUE_THRESHOLD_MAX) { + while (currentSize > QUEUE_THRESHOLD_MIN) { + await new Promise(res => { + if (decoder.instance.decodeQueueSize) + decoder.instance.addEventListener("dequeue", res, { once: true }); + else + setTimeout(res, 100); + }); + currentSize = decoder.instance.decodeQueueSize + decoder.output.queue.size; + } + } + } + + for (const { decoder } of pipes) { + await decoder.instance.flush(); + decoder.instance.close(); + decoder.output.push(null); + } + } + + async #encodeStream( + frames: RenderingPipeline['decoder']['output'], + { instance: encoder, output }: RenderingPipeline['encoder'] + ) { + const reader = frames.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + let currentSize = encoder.encodeQueueSize + output.queue.size; + + if (currentSize >= QUEUE_THRESHOLD_MAX) { + while (currentSize > QUEUE_THRESHOLD_MIN) { + await new Promise(res => { + if (encoder.encodeQueueSize) + encoder.addEventListener("dequeue", res, { once: true }); + else + setTimeout(res, 100); + }); + currentSize = encoder.encodeQueueSize + output.queue.size; + } + } + + // FIXME: figure out how to make typescript happy without this monstrosity + if (value instanceof AudioData && encoder instanceof AudioEncoder) { + encoder.encode(value); + } else if (value instanceof VideoFrame && encoder instanceof VideoEncoder) { + encoder.encode(value); + } + + value.close(); + } + + await encoder.flush(); + encoder.close(); + output.push(null); + } + + async #encodeStreams(pipes: RenderingPipeline[]) { + return Promise.all( + pipes.map( + ({ decoder, encoder }) => { + return this.#encodeStream(decoder.output, encoder); + } + ) + ) + } + + async #processChunk({ chunk, metadata }: { chunk: Chunk, metadata: ChunkMetadata }, ostream: OutputStream, index: number) { + const libav = await this.#get(); + + let convertToPacket; + if (chunk instanceof EncodedVideoChunk) { + convertToPacket = LibAVWebCodecs.encodedVideoChunkToPacket; + } else { + convertToPacket = LibAVWebCodecs.encodedAudioChunkToPacket; + } + + return await convertToPacket(libav, chunk, metadata, ostream, index); + } + + async #mux(pipes: RenderingPipeline[], ostreams: OutputStream[]) { + const libav = await this.#get(); + const write_pkt = await libav.av_packet_alloc(); + + let writer_ctx = 0, output_ctx = 0; + + try { + const starterPackets = [], readers: ReadableStreamDefaultReader[] = []; + + for (let i = 0; i < ostreams.length; ++i) { + readers[i] = pipes[i].encoder.output.getReader(); + + const { done, value } = await readers[i].read(); + if (done) throw "this should not happen"; + + starterPackets.push( + await this.#processChunk(value, ostreams[i], i) + ); + } + + let writtenData = new Uint8Array(0); + libav.onwrite = (_, pos, data) => { + const newLen = Math.max(pos + data.length, writtenData.length); + if (newLen > writtenData.length) { + const newData = new Uint8Array(newLen); + newData.set(writtenData); + writtenData = newData; + } + + writtenData.set(data, pos); + }; + + await libav.mkwriterdev("output.mp4"); + [ output_ctx,, writer_ctx ] = await libav.ff_init_muxer( + { + format_name: 'mp4', + filename: 'output.mp4', + device: true, + open: true, + codecpars: true + }, ostreams + ); + + await libav.avformat_write_header(output_ctx, 0); + await libav.ff_write_multi(output_ctx, write_pkt, starterPackets); + + let writePromise = Promise.resolve(); + await Promise.all(pipes.map(async (_, i) => { + while (true) { + const { done, value } = await readers[i].read(); + if (done) break; + + writePromise = writePromise.then(async () => { + const packet = await this.#processChunk(value, ostreams[i], i); + await libav.ff_write_multi(output_ctx, write_pkt, [ packet ]); + }); + } + })); + + await writePromise; + await libav.av_write_trailer(output_ctx); + + const renderBlob = new Blob( + [ writtenData ], + { type: "video/mp4" } + ); + + window.open(URL.createObjectURL(renderBlob), '_blank'); + } finally { + try { + await libav.unlink('output.mp4'); + } catch {} + + await libav.av_packet_free(write_pkt); + if (output_ctx && writer_ctx) { + await libav.ff_free_muxer(output_ctx, writer_ctx); + } + } + } + + #decodePacket(decoder: Decoder, packet: Packet, stream: Stream) { + let chunk; + if (decoder instanceof VideoDecoder) { + chunk = LibAVWebCodecs.packetToEncodedVideoChunk(packet, stream); + } else if (decoder instanceof AudioDecoder) { + chunk = LibAVWebCodecs.packetToEncodedAudioChunk(packet, stream); + } + + decoder.decode(chunk); + } + + async* #demux(fmt_ctx: number) { + const libav = await this.#get(); + const read_pkt = await libav.av_packet_alloc(); + + try { + while (true) { + const [ ret, packets ] = await libav.ff_read_frame_multi(fmt_ctx, read_pkt, { limit: 1 }); + + if (ret !== -libav.EAGAIN && + ret !== 0 && + ret !== libav.AVERROR_EOF) { + break; + } + + for (const index in packets) { + for (const packet of packets[index]) { + yield { index: Number(index), packet }; + } + } + + if (ret === libav.AVERROR_EOF) + break; + } + } finally { + await libav.av_packet_free(read_pkt); + } + } + + async #createEncoder(stream: Stream, codec: string) { + const libav = await this.#get(); + + let streamToConfig, configToStream, Encoder; + + if (stream.codec_type === libav.AVMEDIA_TYPE_VIDEO) { + streamToConfig = LibAVWebCodecs.videoStreamToConfig; + configToStream = LibAVWebCodecs.configToVideoStream; + Encoder = VideoEncoder; + } else if (stream.codec_type === libav.AVMEDIA_TYPE_AUDIO) { + streamToConfig = LibAVWebCodecs.audioStreamToConfig; + configToStream = LibAVWebCodecs.configToAudioStream; + Encoder = AudioEncoder; + codec = 'mp4a.40.29'; + } else throw "Unknown type: " + stream.codec_type; + + const config = await streamToConfig(libav, stream); + if (config === null) { + throw "could not make encoder config"; + } + + const encoderConfig = { + codec, + width: config.codedWidth, + height: config.codedHeight, + numberOfChannels: config.numberOfChannels, + sampleRate: config.sampleRate + }; + + let { supported } = await Encoder.isConfigSupported(encoderConfig); + if (!supported) { + throw "cannot encode " + codec; + } + + const output = new BufferStream< + { chunk: Chunk, metadata: ChunkMetadata } + >(); + + const encoder = new Encoder({ + output: (chunk, metadata = {}) => { + output.push({ chunk, metadata }) + }, + error: console.error + }); + + encoder.configure(encoderConfig); + + const c2s = await configToStream(libav, encoderConfig); + + // FIXME: figure out a proper way to handle timescale + // (preferrably without killing self) + c2s[1] = 1; + c2s[2] = 60000; + + return { + pipe: { instance: encoder, output }, + stream: c2s + }; + } + + async #createDecoder(stream: Stream) { + const libav = await this.#get(); + + let streamToConfig, Decoder; + + if (stream.codec_type === libav.AVMEDIA_TYPE_VIDEO) { + streamToConfig = LibAVWebCodecs.videoStreamToConfig; + Decoder = VideoDecoder; + } else if (stream.codec_type === libav.AVMEDIA_TYPE_AUDIO) { + streamToConfig = LibAVWebCodecs.audioStreamToConfig; + Decoder = AudioDecoder; + } else throw "Unknown type: " + stream.codec_type; + + const config = await streamToConfig(libav, stream); + + if (config === null) { + throw "could not make decoder config"; + } + + let { supported } = await Decoder.isConfigSupported(config); + if (!supported) { + throw "cannot decode " + config.codec; + } + + const output = new BufferStream(); + const decoder = new Decoder({ + output: frame => output.push(frame), + error: console.error + }); + + decoder.configure(config); + return { instance: decoder, output } + } + #emitProgress(data: Uint8Array | Int8Array) { if (!this.onProgress) return; diff --git a/web/src/lib/types/libav.ts b/web/src/lib/types/libav.ts index eed54edf..797aa866 100644 --- a/web/src/lib/types/libav.ts +++ b/web/src/lib/types/libav.ts @@ -1,3 +1,5 @@ +import { BufferStream } from "$lib/buffer-stream"; + export type InputFileKind = "video" | "audio"; export type FileInfo = { @@ -26,3 +28,39 @@ export type FFmpegProgressEvent = { } export type FFmpegProgressCallback = (info: FFmpegProgressEvent) => void; +export type Decoder = VideoDecoder | AudioDecoder; +export type Encoder = VideoEncoder | AudioEncoder; + +export type ChunkMetadata = EncodedVideoChunkMetadata | EncodedAudioChunkMetadata; +export type Chunk = EncodedVideoChunk | EncodedAudioChunk; + +export type AudioPipeline = { + decoder: { + instance: AudioDecoder, + output: BufferStream + }, + encoder: { + instance: AudioEncoder, + output: BufferStream<{ + chunk: EncodedAudioChunk, + metadata: EncodedAudioChunkMetadata + }> + } +}; + +export type VideoPipeline = { + decoder: { + instance: VideoDecoder, + output: BufferStream + }, + encoder: { + instance: VideoEncoder, + output: BufferStream<{ + chunk: EncodedVideoChunk, + metadata: EncodedVideoChunkMetadata + }> + } +} + +export type RenderingPipeline = AudioPipeline | VideoPipeline; +export type OutputStream = [number, number, number]; diff --git a/web/src/routes/convert/+page.svelte b/web/src/routes/convert/+page.svelte new file mode 100644 index 00000000..f54a899e --- /dev/null +++ b/web/src/routes/convert/+page.svelte @@ -0,0 +1,69 @@ + + + +
+ +
+ {$t("remux.description")} +
+
+
+ +