Compare commits

...

2 commits

Author SHA1 Message Date
yflory
75875d684d Read history messages from the new worker 2024-07-04 18:47:29 +02:00
yflory
faf4421452 Create new type of worker and add storeMessage 2024-07-03 14:35:21 +02:00
4 changed files with 418 additions and 102 deletions

View file

@ -191,6 +191,25 @@ module.exports.create = function (Env, cb) {
archiveRetentionTime: Env.archiveRetentionTime,
accountRetentionTime: Env.accountRetentionTime,
maxWorkers: Env.maxWorkers,
}, w(function (err) {
if (err) {
throw new Error(err);
}
}));
Workers.initializeStore(Env, {
blobPath: Env.paths.blob,
blobStagingPath: Env.paths.staging,
taskPath: Env.paths.task,
pinPath: Env.paths.pin,
filePath: Env.paths.data,
archivePath: Env.paths.archive,
blockPath: Env.paths.block,
inactiveTime: Env.inactiveTime,
archiveRetentionTime: Env.archiveRetentionTime,
accountRetentionTime: Env.accountRetentionTime,
maxWorkers: Env.maxWorkers,
}, w(function (err) {
if (err) {

View file

@ -45,7 +45,7 @@ const EPHEMERAL_CHANNEL_LENGTH = HK.EPHEMERAL_CHANNEL_LENGTH = 34;
// Temporary channels are archived X ms after everyone has left them
const TEMPORARY_CHANNEL_LIFETIME = 30 * 1000;
const tryParse = HK.tryParse = function (Env, str) {
HK.tryParse = function (Env, str) {
try {
return JSON.parse(str);
} catch (err) {
@ -316,11 +316,12 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
if (typeof(cb) !== "function") { cb = function () {}; }
Env.queueStorage(id, function (next) {
const msgBin = Buffer.from(msg + '\n', 'utf8');
let msgBin;
// Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may
// result in a wrong cpIndex
nThen((waitFor) => {
/*
Env.store.messageBin(id, msgBin, waitFor(function (err) {
if (err) {
waitFor.abort();
@ -336,6 +337,22 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
return void next();
}
}));
*/
Env.storeWorker.storeMessage(Env, id, msg, waitFor((err, _msgBin) => {
if (err) {
waitFor.abort();
Log.error("HK_STORE_MESSAGE_ERROR", err.message);
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
cb(err);
return void next();
}
msgBin = _msgBin;
}));
}).nThen((waitFor) => {
/* TODO we can skip updating the index if there's nobody in the channel.
Populating it might actually be the cause of a memory leak.
@ -518,8 +535,6 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
*/
const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, cb) => {
const store = Env.store;
let offset = -1;
nThen((waitFor) => {
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => {
@ -534,19 +549,11 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
}
offset = os;
}));
}).nThen((waitFor) => {
}).nThen(() => {
if (offset === -1) {
return void cb(new Error('EUNKNOWN'));
}
const start = (beforeHash) ? 0 : offset;
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
const parsed = tryParse(Env, msgObj.buff.toString('utf8'));
if (!parsed) { return void readMore(); }
handler(parsed, readMore);
}, waitFor(function (err, reason) {
return void cb(err, reason);
}));
Env.storeWorker.readHistoryMessages(Env, channelName, offset, beforeHash, handler, cb);
});
};

View file

@ -11,49 +11,104 @@ const PID = process.pid;
const Block = require("../storage/block");
const DB_PATH = 'lib/workers/db-worker';
const STORAGE_PATH = 'lib/workers/storage-worker';
const MAX_JOBS = 16;
const DEFAULT_QUERY_TIMEOUT = 60000 * 15; // increased from three to fifteen minutes because queries for very large files were taking as long as seven minutes
Workers.initialize = function (Env, config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
let Log = {
error: () => {}
};
const handleLog = function (level, label, info) {
if (typeof(Log[level]) !== 'function') { return; }
Log[level](label, info);
};
const response = Util.response(function (errLabel, info) {
if (!Log || !Log.error) { return; }
Log.error('HK_WORKER__' + errLabel, info);
});
// pick ids that aren't already in use...
const guid = function () {
var id = Util.uid();
return response.expected(id)? guid(): id;
};
var incrementTime = function (command, start) {
if (!command) { return; }
var end = +new Date();
var T = Env.commandTimers;
var diff = (end - start);
T[command] = (T[command] || 0) + (diff / 1000);
const initWorker = function (Env, path, workers, config, handlers, cb) {
const txid = guid();
const { sendCommand, handleResponse } = handlers;
// Spawn a worker
const worker = fork(path);
const state = {
worker: worker,
tasks: {},
pid: worker.pid, // store the child process's id in an easily accessible location
};
const workers = [];
response.expect(txid, function (err) {
if (err) { return void cb(err); }
workers.push(state);
cb(void 0, state);
}, 15000);
const response = Util.response(function (errLabel, info) {
Env.Log.error('HK_DB_WORKER__' + errLabel, info);
worker.send({
pid: PID,
txid: txid,
config: config,
});
const Log = Env.Log;
const handleLog = function (level, label, info) {
if (typeof(Log[level]) !== 'function') { return; }
Log[level](label, info);
};
worker.on('message', function (res) {
handleResponse(state, res);
});
var substituteWorker = Util.once(function () {
Env.Log.info("SUBSTITUTE_WORKER", path);
var idx = workers.indexOf(state);
if (idx !== -1) {
workers.splice(idx, 1);
}
Object.keys(state.tasks).forEach(function (txid) {
const cb = response.expectation(txid);
if (typeof(cb) !== 'function') { return; }
const task = state.tasks[txid];
if (!(task && task.msg)) { return; }
response.clear(txid);
Log.info('WORKER_RESEND', path, task.msg);
sendCommand(task.msg, cb);
});
initWorker(Env, path, workers, config, handlers, function (err, state) {
if (err) {
throw new Error(err);
}
workers.push(state);
});
});
worker.on('exit', substituteWorker);
worker.on('close', substituteWorker);
worker.on('error', function (err) {
substituteWorker();
Env.Log.error("DB_WORKER_ERROR", {
error: err,
});
});
};
const initWorkersHandler = (Env, incrementTime, workers) => {
Log = Env.Log;
var isWorker = function (value) {
return value && value.worker && typeof(value.worker.send) === 'function';
};
// pick ids that aren't already in use...
const guid = function () {
var id = Util.uid();
return response.expected(id)? guid(): id;
};
const countWorkerTasks = function (/* index */) {
return 0; // FIXME this disables all queueing until it can be proven correct
//return Object.keys(workers[index].tasks || {}).length;
};
var workerOffset = -1;
var queue = [];
var getAvailableWorkerIndex = function () {
// If there is already a backlog of tasks you can avoid some work
// by going to the end of the line
@ -104,6 +159,10 @@ Workers.initialize = function (Env, config, _cb) {
opt = opt || {};
var index = getAvailableWorkerIndex();
// optional function that may be called multiple times
// e.g. for each line processed by the worker on a file
var handler = opt.handler;
var state = workers[index];
// if there is no worker available:
if (!isWorker(state)) {
@ -111,6 +170,7 @@ Workers.initialize = function (Env, config, _cb) {
queue.push({
msg: msg,
cb: _cb,
opt
});
if (drained) {
drained = false;
@ -118,7 +178,6 @@ Workers.initialize = function (Env, config, _cb) {
workers: workers.length,
});
}
return;
}
@ -145,6 +204,8 @@ Workers.initialize = function (Env, config, _cb) {
// include the relevant worker process id in messages so that it will be logged
// in the event that the message times out or fails in other ways.
msg.worker = state.pid;
msg.handler = handler;
msg.hasHandler = Boolean(handler); // functions are removed when sent to worker
// track which worker is doing which jobs
state.tasks[txid] = msg;
@ -168,8 +229,27 @@ Workers.initialize = function (Env, config, _cb) {
return void Log.error("WRONG_PID", res);
}
if (!res.txid) { return; }
response.handle(res.txid, [res.error, res.value]);
let txid = res.txid;
if (!txid) { return; }
// if this is a line handler, run the handler and callback a "readMore"
// to this txid
if (res.handler && state.tasks[txid] && state.tasks[txid].handler) {
state.tasks[txid].handler(res.value, () => {
state.worker.send({
txid,
pid: state.worker.pid,
readMore: true
});
});
return;
}
try {
response.handle(res.txid, [res.error, res.value]);
} catch (e) {
Log.error("WORKER_TASK_CB_ERROR", e);
}
delete state.tasks[res.txid];
if (!queue.length) {
if (!drained) {
@ -198,69 +278,23 @@ Workers.initialize = function (Env, config, _cb) {
to the back because the following msg took its place. OR, in an
even worse scenario, we cycle through the queue but don't run anything.
*/
sendCommand(nextMsg.msg, nextMsg.cb);
sendCommand(nextMsg.msg, nextMsg.cb, nextMsg.opt);
};
const initWorker = function (worker, cb) {
const txid = guid();
const state = {
worker: worker,
tasks: {},
pid: worker.pid, // store the child process's id in an easily accessible location
};
response.expect(txid, function (err) {
if (err) { return void cb(err); }
workers.push(state);
cb(void 0, state);
}, 15000);
worker.send({
pid: PID,
txid: txid,
config: config,
});
worker.on('message', function (res) {
handleResponse(state, res);
});
var substituteWorker = Util.once(function () {
Env.Log.info("SUBSTITUTE_DB_WORKER", '');
var idx = workers.indexOf(state);
if (idx !== -1) {
workers.splice(idx, 1);
}
Object.keys(state.tasks).forEach(function (txid) {
const cb = response.expectation(txid);
if (typeof(cb) !== 'function') { return; }
const task = state.tasks[txid];
if (!(task && task.msg)) { return; }
response.clear(txid);
Log.info('DB_WORKER_RESEND', task.msg);
sendCommand(task.msg, cb);
});
var w = fork(DB_PATH);
initWorker(w, function (err, state) {
if (err) {
throw new Error(err);
}
workers.push(state);
});
});
worker.on('exit', substituteWorker);
worker.on('close', substituteWorker);
worker.on('error', function (err) {
substituteWorker();
Env.Log.error("DB_WORKER_ERROR", {
error: err,
});
});
return {
sendCommand,
handleResponse,
};
};
Workers.initializeStore = function (Env, config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
const workers = [];
const handlers = initWorkersHandler(Env, function () {}, workers);
const { sendCommand } = handlers;
let env = Env.storeWorker = {};
nThen(function (w) {
var limit = Env.maxWorkers;
@ -275,7 +309,67 @@ Workers.initialize = function (Env, config, _cb) {
return;
}
initWorker(fork(DB_PATH), w(function (err) {
initWorker(Env, STORAGE_PATH, workers, config, handlers, w(function (err) {
if (!err) { return; }
w.abort();
return void cb(err);
}));
});
}).nThen(function () {
env.storeMessage = function (Env, channel, msg, cb) {
Env.store.getWeakLock(channel, function (next) {
sendCommand({
command: 'STORE_MESSAGE',
channel: channel,
msg: msg,
}, Util.both(next, cb));
});
};
env.readHistoryMessages = function (Env, channel, offset, beforeHash, h, cb) {
Env.store.getWeakLock(channel, function (next) {
sendCommand({
command: 'READ_HISTORY_MESSAGES',
channel,
offset,
beforeHash
}, Util.both(next, cb), { handler: h });
});
};
cb(void 0);
});
};
Workers.initialize = function (Env, config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
var incrementTime = function (command, start) {
if (!command) { return; }
var end = +new Date();
var T = Env.commandTimers;
var diff = (end - start);
T[command] = (T[command] || 0) + (diff / 1000);
};
const workers = [];
const handlers = initWorkersHandler(Env, incrementTime, workers);
const { sendCommand } = handlers;
nThen(function (w) {
var limit = Env.maxWorkers;
var logged;
OS.cpus().forEach(function (cpu, index) {
if (limit && index >= limit) {
if (!logged) {
logged = true;
Log.info('WORKER_LIMIT', "(Opting not to use available CPUs beyond " + index + ')');
}
return;
}
initWorker(Env, DB_PATH, workers, config, handlers, w(function (err) {
if (!err) { return; }
w.abort();
return void cb(err);

View file

@ -0,0 +1,196 @@
// SPDX-FileCopyrightText: 2023 XWiki CryptPad Team <contact@cryptpad.org> and contributors
//
// SPDX-License-Identifier: AGPL-3.0-or-later
const Util = require("../common-util");
const Store = require("../storage/file");
const BlobStore = require("../storage/blob");
const Tasks = require("../storage/tasks");
const nThen = require("nthen");
const Logger = require("../log");
const HK = require('../hk-util');
const Env = {
Log: {},
};
// support the usual log API but pass it to the main process
Logger.levels.forEach(function (level) {
Env.Log[level] = function (label, info) {
process.send({
log: level,
label: label,
info: info,
});
};
});
var ready = false;
var store;
var pinStore;
var blobStore;
const init = function (config, _cb) {
const cb = Util.once(Util.mkAsync(_cb));
if (!config) {
return void cb('E_INVALID_CONFIG');
}
Env.paths = {
pin: config.pinPath,
block: config.blockPath,
};
Env.inactiveTime = config.inactiveTime;
Env.archiveRetentionTime = config.archiveRetentionTime;
Env.accountRetentionTime = config.accountRetentionTime;
nThen(function (w) {
Store.create(config, w(function (err, _store) {
if (err) {
w.abort();
return void cb(err);
}
Env.store = store = _store;
}));
Store.create({
filePath: config.pinPath,
archivePath: config.archivePath,
// important to initialize the pinstore with its own volume id
// otherwise archived pin logs will get mixed in with channels
volumeId: 'pins',
}, w(function (err, _pinStore) {
if (err) {
w.abort();
return void cb(err);
}
Env.pinStore = pinStore = _pinStore;
}));
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function () {},
}, w(function (err, blob) {
if (err) {
w.abort();
return void cb(err);
}
Env.blobStore = blobStore = blob;
}));
}).nThen(function (w) {
Tasks.create({
log: Env.Log,
taskPath: config.taskPath,
store: store,
}, w(function (err, tasks) {
if (err) {
w.abort();
return void cb(err);
}
Env.tasks = tasks;
}));
}).nThen(function () {
cb();
});
};
const storeMessage = (data, cb) => {
const id = data.channel;
const msg = data.msg;
const Log = Env.Log;
const msgBin = Buffer.from(msg + '\n', 'utf8');
store.messageBin(id, msgBin, function (err) {
if (err) { return void cb(err); }
cb(void 0, msgBin);
});
};
const readHistoryMessages = (data, cb) => {
const { channel, offset, beforeHash } = data;
const start = (beforeHash) ? 0 : offset;
store.readMessagesBin(channel, start, (msgObj, readMore, abort) => {
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
const parsed = HK.tryParse(Env, msgObj.buff.toString('utf8'));
if (!parsed) console.log(channel);
if (!parsed) { return void readMore(); }
// 3rd argument: this is a handler with a readMore cb
cb(void 0, parsed, () => {
readMore();
});
}, (err, reason) => {
cb(err, reason);
});
};
const COMMANDS = {
STORE_MESSAGE: storeMessage,
READ_HISTORY_MESSAGES: readHistoryMessages
};
// Store readMore callback for a given txid
const readMore = {};
process.on('message', function (data) {
if (!data || !data.txid || !data.pid) {
return void process.send({
error:'E_INVAL',
data: data,
});
}
// If this is a readMore command, call its associated readMore function
if (data.readMore) {
if (typeof(readMore[data.txid]) === "function") { readMore[data.txid](); }
return;
}
// Otherwise treat it as a first command
const cb = function (err, value, isHandler) {
if (isHandler) {
readMore[data.txid] = isHandler;
} else {
delete readMore[data.txid];
}
process.send({
error: Util.serializeError(err),
txid: data.txid,
pid: data.pid,
value: value,
handler: Boolean(data.hasHandler && isHandler)
});
};
if (!ready) {
return void init(data.config, function (err) {
if (err) { return void cb(Util.serializeError(err)); }
ready = true;
cb();
});
}
const command = COMMANDS[data.command];
if (typeof(command) !== 'function') {
return void cb("E_BAD_COMMAND");
}
command(data, cb);
});
process.on('uncaughtException', function (err) {
console.error('[%s] UNCAUGHT EXCEPTION IN DB WORKER', new Date());
console.error(err);
console.error("TERMINATING");
process.exit(1);
});