From d48cc8fc07cd8f3cddae2f6ef2306ee1b2c779bc Mon Sep 17 00:00:00 2001 From: jj Date: Fri, 1 Nov 2024 16:43:01 +0000 Subject: [PATCH] api/cookie: implement cluster synchronization --- api/src/misc/cluster.js | 31 ++++++++++++++++++ api/src/processing/cookie/manager.js | 49 ++++++++++++++++++++++++++-- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/api/src/misc/cluster.js b/api/src/misc/cluster.js index ea43aaa9..56664d15 100644 --- a/api/src/misc/cluster.js +++ b/api/src/misc/cluster.js @@ -38,3 +38,34 @@ export const broadcast = (message) => { worker.send(message); } } + +export const send = (message) => { + if (!isCluster) { + return; + } + + if (cluster.isPrimary) { + return broadcast(message); + } else { + return process.send(message); + } +} + +export const waitFor = (key) => { + return new Promise(resolve => { + const listener = (message) => { + if (key in message) { + process.off('message', listener); + return resolve(message); + } + } + + process.on('message', listener); + }); +} + +export const mainOnMessage = (cb) => { + for (const worker of Object.values(cluster.workers)) { + worker.on('message', cb); + } +} diff --git a/api/src/processing/cookie/manager.js b/api/src/processing/cookie/manager.js index 074d0045..5ea3f6ca 100644 --- a/api/src/processing/cookie/manager.js +++ b/api/src/processing/cookie/manager.js @@ -3,6 +3,8 @@ import Cookie from './cookie.js'; import { readFile, writeFile } from 'fs/promises'; import { Green, Yellow } from '../../misc/console-text.js'; import { parse as parseSetCookie, splitCookiesString } from 'set-cookie-parser'; +import * as cluster from '../../misc/cluster.js'; +import { isCluster } from '../../config.js'; const WRITE_INTERVAL = 60000; let cookies = {}, dirty = false, intervalId; @@ -16,18 +18,55 @@ function writeChanges(cookiePath) { }) } -export const setup = async (cookiePath) => { +const setupMain = async (cookiePath) => { try { cookies = await readFile(cookiePath, 'utf8'); cookies = JSON.parse(cookies); intervalId = setInterval(() => writeChanges(cookiePath), WRITE_INTERVAL); - console.log(`${Green('[✓]')} cookies loaded successfully!`) + + cluster.broadcast({ cookies }); + + console.log(`${Green('[✓]')} cookies loaded successfully!`); } catch(e) { console.error(`${Yellow('[!]')} failed to load cookies.`); console.error('error:', e); } } +const setupWorker = async () => { + cookies = (await cluster.waitFor('cookies')).cookies; +} + +export const setup = async (cookiePath) => { + if (cluster.isPrimary) { + await setupMain(cookiePath); + } else if (cluster.isWorker) { + await setupWorker(); + } + + if (isCluster) { + const messageHandler = (message) => { + if ('cookieUpdate' in message) { + const { cookieUpdate } = message; + + if (cluster.isPrimary) { + dirty = true; + cluster.broadcast({ cookieUpdate }); + } + + const { service, idx, cookie } = cookieUpdate; + cookies[service][idx] = cookie; + } + } + + if (cluster.isPrimary) { + cluster.mainOnMessage(messageHandler); + } else { + process.on('message', messageHandler); + } + } +} + export function getCookie(service) { if (!cookies[service] || !cookies[service].length) return; @@ -38,6 +77,7 @@ export function getCookie(service) { cookies[service][idx] = Cookie.fromString(cookie); } + cookies[service][idx].meta = { service, idx }; return cookies[service][idx]; } @@ -50,6 +90,10 @@ export function updateCookieValues(cookie, values) { if (changed) { dirty = true; + if (isCluster && cookie.meta) { + const message = { cookieUpdate: { ...cookie.meta, cookie } }; + cluster.send(message); + } } return changed; @@ -65,5 +109,6 @@ export function updateCookie(cookie, headers) { cookie.unset(parsed.filter(c => c.expires < new Date()).map(c => c.name)); parsed.filter(c => !c.expires || c.expires > new Date()).forEach(c => values[c.name] = c.value); + updateCookieValues(cookie, values); }