diff --git a/api/src/cobalt.js b/api/src/cobalt.js index 363930ba..4c4d3c4e 100644 --- a/api/src/cobalt.js +++ b/api/src/cobalt.js @@ -1,12 +1,14 @@ import "dotenv/config"; import express from "express"; +import cluster from "node:cluster"; import path from "path"; import { fileURLToPath } from "url"; -import { env } from "./config.js" +import { env, isCluster } from "./config.js" import { Red } from "./misc/console-text.js"; +import { initCluster } from "./misc/cluster.js"; const app = express(); @@ -17,7 +19,12 @@ app.disable("x-powered-by"); if (env.apiURL) { const { runAPI } = await import("./core/api.js"); - runAPI(express, app, __dirname) + + if (cluster.isPrimary && isCluster) { + initCluster(); + } + + runAPI(express, app, __dirname, cluster.isPrimary); } else { console.log( Red("API_URL env variable is missing, cobalt api can't start.") diff --git a/api/src/config.js b/api/src/config.js index 9d2df3c6..899dfe49 100644 --- a/api/src/config.js +++ b/api/src/config.js @@ -1,5 +1,6 @@ import { getVersion } from "@imput/version-info"; import { services } from "./processing/service-config.js"; +import { supportsReusePort } from "./misc/cluster.js"; const version = await getVersion(); @@ -46,7 +47,7 @@ const env = { apiKeyURL: process.env.API_KEY_URL && new URL(process.env.API_KEY_URL), authRequired: process.env.API_AUTH_REQUIRED === '1', redisURL: process.env.API_REDIS_URL, - + instanceCount: (process.env.API_INSTANCE_COUNT && parseInt(process.env.API_INSTANCE_COUNT)) || 1, keyReloadInterval: 900, enabledServices, @@ -57,9 +58,18 @@ const cobaltUserAgent = `cobalt/${version} (+https://github.com/imputnet/cobalt) export let tunnelPort = env.apiPort; export const setTunnelPort = (port) => tunnelPort = port; +export const isCluster = env.instanceCount > 1; if (env.sessionEnabled && env.jwtSecret.length < 16) { throw new Error("JWT_SECRET env is too short (must be at least 16 characters long)"); +} else if (env.instanceCount > 1 && !env.redisURL) { + throw new Error("API_REDIS_URL is required when API_INSTANCE_COUNT is >= 2"); +} else if (env.instanceCount > 1 && !await supportsReusePort()) { + console.error('API_INSTANCE_COUNT is not supported in your environment. to use this env, your node.js'); + console.error('version must be >= 23.1.0, and you must be running a recent enough version of linux'); + console.error('(or other OS that supports it). for more info, see `reusePort` option on'); + console.error('https://nodejs.org/api/net.html#serverlistenoptions-callback'); + throw new Error('SO_REUSEPORT is not supported'); } export { diff --git a/api/src/core/api.js b/api/src/core/api.js index dc5c4ae1..9ef0c691 100644 --- a/api/src/core/api.js +++ b/api/src/core/api.js @@ -1,4 +1,5 @@ import cors from "cors"; +import http from "node:http"; import rateLimit from "express-rate-limit"; import { setGlobalDispatcher, ProxyAgent } from "undici"; import { getCommit, getBranch, getRemote, getVersion } from "@imput/version-info"; @@ -7,9 +8,9 @@ import jwt from "../security/jwt.js"; import stream from "../stream/stream.js"; import match from "../processing/match.js"; -import { env } from "../config.js"; +import { env, setTunnelPort } from "../config.js"; import { extract } from "../processing/url.js"; -import { Bright, Cyan } from "../misc/console-text.js"; +import { Green, Bright, Cyan } from "../misc/console-text.js"; import { generateHmac, generateSalt } from "../misc/crypto.js"; import { randomizeCiphers } from "../misc/randomize-ciphers.js"; import { verifyTurnstileToken } from "../security/turnstile.js"; @@ -40,7 +41,7 @@ const fail = (res, code, context) => { res.status(status).json(body); } -export const runAPI = (express, app, __dirname) => { +export const runAPI = (express, app, __dirname, isPrimary = true) => { const startTime = new Date(); const startTimestamp = startTime.getTime(); @@ -288,7 +289,7 @@ export const runAPI = (express, app, __dirname) => { return stream(res, streamInfo); }) - app.get('/itunnel', (req, res) => { + const itunnelHandler = (req, res) => { if (!req.ip.endsWith('127.0.0.1')) { return res.sendStatus(403); } @@ -308,7 +309,9 @@ export const runAPI = (express, app, __dirname) => { ]); return stream(res, { type: 'internal', ...streamInfo }); - }) + }; + + app.get('/itunnel', itunnelHandler); app.get('/', (_, res) => { res.type('json'); @@ -339,21 +342,27 @@ export const runAPI = (express, app, __dirname) => { setGlobalDispatcher(new ProxyAgent(env.externalProxy)) } - app.listen(env.apiPort, env.listenAddress, () => { - console.log(`\n` + - Bright(Cyan("cobalt ")) + Bright("API ^ω⁠^") + "\n" + + http.createServer(app).listen({ + port: env.apiPort, + host: env.listenAddress, + reusePort: env.instanceCount > 1 || undefined + }, () => { + if (isPrimary) { + console.log(`\n` + + Bright(Cyan("cobalt ")) + Bright("API ^ω⁠^") + "\n" + - "~~~~~~\n" + - Bright("version: ") + version + "\n" + - Bright("commit: ") + git.commit + "\n" + - Bright("branch: ") + git.branch + "\n" + - Bright("remote: ") + git.remote + "\n" + - Bright("start time: ") + startTime.toUTCString() + "\n" + - "~~~~~~\n" + + "~~~~~~\n" + + Bright("version: ") + version + "\n" + + Bright("commit: ") + git.commit + "\n" + + Bright("branch: ") + git.branch + "\n" + + Bright("remote: ") + git.remote + "\n" + + Bright("start time: ") + startTime.toUTCString() + "\n" + + "~~~~~~\n" + - Bright("url: ") + Bright(Cyan(env.apiURL)) + "\n" + - Bright("port: ") + env.apiPort + "\n" - ); + Bright("url: ") + Bright(Cyan(env.apiURL)) + "\n" + + Bright("port: ") + env.apiPort + "\n" + ); + } if (env.apiKeyURL) { APIKeys.setup(env.apiKeyURL); @@ -362,5 +371,19 @@ export const runAPI = (express, app, __dirname) => { if (env.cookiePath) { Cookies.setup(env.cookiePath); } - }) + }); + + if (!isPrimary) { + const istreamer = express(); + istreamer.get('/itunnel', itunnelHandler); + const server = istreamer.listen({ + port: 0, + host: '127.0.0.1', + exclusive: true + }, () => { + const { port } = server.address(); + console.log(`${Green('[✓]')} cobalt sub-instance running on 127.0.0.1:${port}`); + setTunnelPort(port); + }); + } } diff --git a/api/src/misc/cluster.js b/api/src/misc/cluster.js new file mode 100644 index 00000000..86ef604d --- /dev/null +++ b/api/src/misc/cluster.js @@ -0,0 +1,29 @@ +import net from "node:net"; +import cluster from "node:cluster"; +import { isCluster } from "../config.js"; + +export const supportsReusePort = async () => { + try { + await new Promise((resolve, reject) => { + const server = net.createServer().listen({ port: 0, reusePort: true }); + server.on('listening', () => server.close(resolve)); + server.on('error', (err) => (server.close(), reject(err))); + }); + + return true; + } catch { + return false; + } +} + +export const initCluster = async () => { + const { getSalt } = await import("../stream/manage.js"); + const salt = getSalt(); + + for (let i = 1; i < env.instanceCount; ++i) { + const worker = cluster.fork(); + worker.once('message', () => { + worker.send({ salt }); + }); + } +} diff --git a/api/src/stream/manage.js b/api/src/stream/manage.js index 50badce6..46fff483 100644 --- a/api/src/stream/manage.js +++ b/api/src/stream/manage.js @@ -4,8 +4,9 @@ import { nanoid } from "nanoid"; import { randomBytes } from "crypto"; import { strict as assert } from "assert"; import { setMaxListeners } from "node:events"; +import cluster from "node:cluster"; -import { env, tunnelPort } from "../config.js"; +import { env, tunnelPort, isCluster } from "../config.js"; import { closeRequest } from "./shared.js"; import { decryptStream, encryptStream, generateHmac, generateSalt } from "../misc/crypto.js"; @@ -15,7 +16,26 @@ const freebind = env.freebindCIDR && await import('freebind').catch(() => {}); const streamCache = new Store('streams'); const internalStreamCache = new Map(); -const hmacSalt = generateSalt(); +let hmacSalt = cluster.isPrimary ? generateSalt() : null; +let _saltRead = false; + +export const getSalt = () => { + if (!isCluster) throw "salt can only be read on multi-process instances"; + if (!cluster.isPrimary) throw "only primary cluster can read salt"; + if (_saltRead) throw "salt was already read"; + + _saltRead = true; + return hmacSalt; +} + +if (cluster.isWorker) { + process.send({ ready: true }); + process.once('message', (message) => { + if (message.salt && !hmacSalt) { + hmacSalt = message.salt; + } + }); +} export function createStream(obj) { const streamID = nanoid(), diff --git a/docs/run-an-instance.md b/docs/run-an-instance.md index f3fdc630..66a7d51d 100644 --- a/docs/run-an-instance.md +++ b/docs/run-an-instance.md @@ -78,6 +78,7 @@ sudo service nscd start | `API_KEY_URL` | ➖ | `file://keys.json` | the location of the api key database. for loading API keys, cobalt supports HTTP(S) urls, or local files by specifying a local path using the `file://` protocol. see the "api key file format" below for more details. | | `API_AUTH_REQUIRED` | ➖ | `1` | when set to `1`, the user always needs to be authenticated in some way before they can access the API (either via an api key or via turnstile, if enabled). | | `API_REDIS_URL` | ➖ | `redis://localhost:6379` | when set, cobalt uses redis instead of internal memory for the tunnel cache. | +| `API_INSTANCE_COUNT` | ➖ | `2` | supported only on Linux and node.js `>=23.1.0`. when configured, cobalt will spawn multiple sub-instances amongst which requests will be balanced. | \* the higher the nice value, the lower the priority. [read more here](https://en.wikipedia.org/wiki/Nice_(Unix)).