api: cluster support

still missing synchronization for some structures
This commit is contained in:
jj 2024-10-31 22:59:06 +00:00
parent 2d6d406f48
commit 40d6a02b61
No known key found for this signature in database
6 changed files with 114 additions and 24 deletions

View file

@ -1,12 +1,14 @@
import "dotenv/config"; import "dotenv/config";
import express from "express"; import express from "express";
import cluster from "node:cluster";
import path from "path"; import path from "path";
import { fileURLToPath } from "url"; import { fileURLToPath } from "url";
import { env } from "./config.js" import { env, isCluster } from "./config.js"
import { Red } from "./misc/console-text.js"; import { Red } from "./misc/console-text.js";
import { initCluster } from "./misc/cluster.js";
const app = express(); const app = express();
@ -17,7 +19,12 @@ app.disable("x-powered-by");
if (env.apiURL) { if (env.apiURL) {
const { runAPI } = await import("./core/api.js"); const { runAPI } = await import("./core/api.js");
runAPI(express, app, __dirname)
if (cluster.isPrimary && isCluster) {
initCluster();
}
runAPI(express, app, __dirname, cluster.isPrimary);
} else { } else {
console.log( console.log(
Red("API_URL env variable is missing, cobalt api can't start.") Red("API_URL env variable is missing, cobalt api can't start.")

View file

@ -1,5 +1,6 @@
import { getVersion } from "@imput/version-info"; import { getVersion } from "@imput/version-info";
import { services } from "./processing/service-config.js"; import { services } from "./processing/service-config.js";
import { supportsReusePort } from "./misc/cluster.js";
const version = await getVersion(); const version = await getVersion();
@ -46,7 +47,7 @@ const env = {
apiKeyURL: process.env.API_KEY_URL && new URL(process.env.API_KEY_URL), apiKeyURL: process.env.API_KEY_URL && new URL(process.env.API_KEY_URL),
authRequired: process.env.API_AUTH_REQUIRED === '1', authRequired: process.env.API_AUTH_REQUIRED === '1',
redisURL: process.env.API_REDIS_URL, redisURL: process.env.API_REDIS_URL,
instanceCount: (process.env.API_INSTANCE_COUNT && parseInt(process.env.API_INSTANCE_COUNT)) || 1,
keyReloadInterval: 900, keyReloadInterval: 900,
enabledServices, enabledServices,
@ -57,9 +58,18 @@ const cobaltUserAgent = `cobalt/${version} (+https://github.com/imputnet/cobalt)
export let tunnelPort = env.apiPort; export let tunnelPort = env.apiPort;
export const setTunnelPort = (port) => tunnelPort = port; export const setTunnelPort = (port) => tunnelPort = port;
export const isCluster = env.instanceCount > 1;
if (env.sessionEnabled && env.jwtSecret.length < 16) { if (env.sessionEnabled && env.jwtSecret.length < 16) {
throw new Error("JWT_SECRET env is too short (must be at least 16 characters long)"); throw new Error("JWT_SECRET env is too short (must be at least 16 characters long)");
} else if (env.instanceCount > 1 && !env.redisURL) {
throw new Error("API_REDIS_URL is required when API_INSTANCE_COUNT is >= 2");
} else if (env.instanceCount > 1 && !await supportsReusePort()) {
console.error('API_INSTANCE_COUNT is not supported in your environment. to use this env, your node.js');
console.error('version must be >= 23.1.0, and you must be running a recent enough version of linux');
console.error('(or other OS that supports it). for more info, see `reusePort` option on');
console.error('https://nodejs.org/api/net.html#serverlistenoptions-callback');
throw new Error('SO_REUSEPORT is not supported');
} }
export { export {

View file

@ -1,4 +1,5 @@
import cors from "cors"; import cors from "cors";
import http from "node:http";
import rateLimit from "express-rate-limit"; import rateLimit from "express-rate-limit";
import { setGlobalDispatcher, ProxyAgent } from "undici"; import { setGlobalDispatcher, ProxyAgent } from "undici";
import { getCommit, getBranch, getRemote, getVersion } from "@imput/version-info"; import { getCommit, getBranch, getRemote, getVersion } from "@imput/version-info";
@ -7,9 +8,9 @@ import jwt from "../security/jwt.js";
import stream from "../stream/stream.js"; import stream from "../stream/stream.js";
import match from "../processing/match.js"; import match from "../processing/match.js";
import { env } from "../config.js"; import { env, setTunnelPort } from "../config.js";
import { extract } from "../processing/url.js"; import { extract } from "../processing/url.js";
import { Bright, Cyan } from "../misc/console-text.js"; import { Green, Bright, Cyan } from "../misc/console-text.js";
import { generateHmac, generateSalt } from "../misc/crypto.js"; import { generateHmac, generateSalt } from "../misc/crypto.js";
import { randomizeCiphers } from "../misc/randomize-ciphers.js"; import { randomizeCiphers } from "../misc/randomize-ciphers.js";
import { verifyTurnstileToken } from "../security/turnstile.js"; import { verifyTurnstileToken } from "../security/turnstile.js";
@ -40,7 +41,7 @@ const fail = (res, code, context) => {
res.status(status).json(body); res.status(status).json(body);
} }
export const runAPI = (express, app, __dirname) => { export const runAPI = (express, app, __dirname, isPrimary = true) => {
const startTime = new Date(); const startTime = new Date();
const startTimestamp = startTime.getTime(); const startTimestamp = startTime.getTime();
@ -288,7 +289,7 @@ export const runAPI = (express, app, __dirname) => {
return stream(res, streamInfo); return stream(res, streamInfo);
}) })
app.get('/itunnel', (req, res) => { const itunnelHandler = (req, res) => {
if (!req.ip.endsWith('127.0.0.1')) { if (!req.ip.endsWith('127.0.0.1')) {
return res.sendStatus(403); return res.sendStatus(403);
} }
@ -308,7 +309,9 @@ export const runAPI = (express, app, __dirname) => {
]); ]);
return stream(res, { type: 'internal', ...streamInfo }); return stream(res, { type: 'internal', ...streamInfo });
}) };
app.get('/itunnel', itunnelHandler);
app.get('/', (_, res) => { app.get('/', (_, res) => {
res.type('json'); res.type('json');
@ -339,21 +342,27 @@ export const runAPI = (express, app, __dirname) => {
setGlobalDispatcher(new ProxyAgent(env.externalProxy)) setGlobalDispatcher(new ProxyAgent(env.externalProxy))
} }
app.listen(env.apiPort, env.listenAddress, () => { http.createServer(app).listen({
console.log(`\n` + port: env.apiPort,
Bright(Cyan("cobalt ")) + Bright("API ^ω⁠^") + "\n" + host: env.listenAddress,
reusePort: env.instanceCount > 1 || undefined
}, () => {
if (isPrimary) {
console.log(`\n` +
Bright(Cyan("cobalt ")) + Bright("API ^ω⁠^") + "\n" +
"~~~~~~\n" + "~~~~~~\n" +
Bright("version: ") + version + "\n" + Bright("version: ") + version + "\n" +
Bright("commit: ") + git.commit + "\n" + Bright("commit: ") + git.commit + "\n" +
Bright("branch: ") + git.branch + "\n" + Bright("branch: ") + git.branch + "\n" +
Bright("remote: ") + git.remote + "\n" + Bright("remote: ") + git.remote + "\n" +
Bright("start time: ") + startTime.toUTCString() + "\n" + Bright("start time: ") + startTime.toUTCString() + "\n" +
"~~~~~~\n" + "~~~~~~\n" +
Bright("url: ") + Bright(Cyan(env.apiURL)) + "\n" + Bright("url: ") + Bright(Cyan(env.apiURL)) + "\n" +
Bright("port: ") + env.apiPort + "\n" Bright("port: ") + env.apiPort + "\n"
); );
}
if (env.apiKeyURL) { if (env.apiKeyURL) {
APIKeys.setup(env.apiKeyURL); APIKeys.setup(env.apiKeyURL);
@ -362,5 +371,19 @@ export const runAPI = (express, app, __dirname) => {
if (env.cookiePath) { if (env.cookiePath) {
Cookies.setup(env.cookiePath); Cookies.setup(env.cookiePath);
} }
}) });
if (!isPrimary) {
const istreamer = express();
istreamer.get('/itunnel', itunnelHandler);
const server = istreamer.listen({
port: 0,
host: '127.0.0.1',
exclusive: true
}, () => {
const { port } = server.address();
console.log(`${Green('[✓]')} cobalt sub-instance running on 127.0.0.1:${port}`);
setTunnelPort(port);
});
}
} }

29
api/src/misc/cluster.js Normal file
View file

@ -0,0 +1,29 @@
import net from "node:net";
import cluster from "node:cluster";
import { isCluster } from "../config.js";
export const supportsReusePort = async () => {
try {
await new Promise((resolve, reject) => {
const server = net.createServer().listen({ port: 0, reusePort: true });
server.on('listening', () => server.close(resolve));
server.on('error', (err) => (server.close(), reject(err)));
});
return true;
} catch {
return false;
}
}
export const initCluster = async () => {
const { getSalt } = await import("../stream/manage.js");
const salt = getSalt();
for (let i = 1; i < env.instanceCount; ++i) {
const worker = cluster.fork();
worker.once('message', () => {
worker.send({ salt });
});
}
}

View file

@ -4,8 +4,9 @@ import { nanoid } from "nanoid";
import { randomBytes } from "crypto"; import { randomBytes } from "crypto";
import { strict as assert } from "assert"; import { strict as assert } from "assert";
import { setMaxListeners } from "node:events"; import { setMaxListeners } from "node:events";
import cluster from "node:cluster";
import { env, tunnelPort } from "../config.js"; import { env, tunnelPort, isCluster } from "../config.js";
import { closeRequest } from "./shared.js"; import { closeRequest } from "./shared.js";
import { decryptStream, encryptStream, generateHmac, generateSalt } from "../misc/crypto.js"; import { decryptStream, encryptStream, generateHmac, generateSalt } from "../misc/crypto.js";
@ -15,7 +16,26 @@ const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
const streamCache = new Store('streams'); const streamCache = new Store('streams');
const internalStreamCache = new Map(); const internalStreamCache = new Map();
const hmacSalt = generateSalt(); let hmacSalt = cluster.isPrimary ? generateSalt() : null;
let _saltRead = false;
export const getSalt = () => {
if (!isCluster) throw "salt can only be read on multi-process instances";
if (!cluster.isPrimary) throw "only primary cluster can read salt";
if (_saltRead) throw "salt was already read";
_saltRead = true;
return hmacSalt;
}
if (cluster.isWorker) {
process.send({ ready: true });
process.once('message', (message) => {
if (message.salt && !hmacSalt) {
hmacSalt = message.salt;
}
});
}
export function createStream(obj) { export function createStream(obj) {
const streamID = nanoid(), const streamID = nanoid(),

View file

@ -78,6 +78,7 @@ sudo service nscd start
| `API_KEY_URL` | | `file://keys.json` | the location of the api key database. for loading API keys, cobalt supports HTTP(S) urls, or local files by specifying a local path using the `file://` protocol. see the "api key file format" below for more details. | | `API_KEY_URL` | | `file://keys.json` | the location of the api key database. for loading API keys, cobalt supports HTTP(S) urls, or local files by specifying a local path using the `file://` protocol. see the "api key file format" below for more details. |
| `API_AUTH_REQUIRED` | | `1` | when set to `1`, the user always needs to be authenticated in some way before they can access the API (either via an api key or via turnstile, if enabled). | | `API_AUTH_REQUIRED` | | `1` | when set to `1`, the user always needs to be authenticated in some way before they can access the API (either via an api key or via turnstile, if enabled). |
| `API_REDIS_URL` | | `redis://localhost:6379` | when set, cobalt uses redis instead of internal memory for the tunnel cache. | | `API_REDIS_URL` | | `redis://localhost:6379` | when set, cobalt uses redis instead of internal memory for the tunnel cache. |
| `API_INSTANCE_COUNT` | | `2` | supported only on Linux and node.js `>=23.1.0`. when configured, cobalt will spawn multiple sub-instances amongst which requests will be balanced. |
\* the higher the nice value, the lower the priority. [read more here](https://en.wikipedia.org/wiki/Nice_(Unix)). \* the higher the nice value, the lower the priority. [read more here](https://en.wikipedia.org/wiki/Nice_(Unix)).