Compare commits
2 commits
main
...
offloading
Author | SHA1 | Date | |
---|---|---|---|
|
75875d684d | ||
|
faf4421452 |
4 changed files with 418 additions and 102 deletions
|
@ -191,6 +191,25 @@ module.exports.create = function (Env, cb) {
|
|||
archiveRetentionTime: Env.archiveRetentionTime,
|
||||
accountRetentionTime: Env.accountRetentionTime,
|
||||
|
||||
maxWorkers: Env.maxWorkers,
|
||||
}, w(function (err) {
|
||||
if (err) {
|
||||
throw new Error(err);
|
||||
}
|
||||
}));
|
||||
Workers.initializeStore(Env, {
|
||||
blobPath: Env.paths.blob,
|
||||
blobStagingPath: Env.paths.staging,
|
||||
taskPath: Env.paths.task,
|
||||
pinPath: Env.paths.pin,
|
||||
filePath: Env.paths.data,
|
||||
archivePath: Env.paths.archive,
|
||||
blockPath: Env.paths.block,
|
||||
|
||||
inactiveTime: Env.inactiveTime,
|
||||
archiveRetentionTime: Env.archiveRetentionTime,
|
||||
accountRetentionTime: Env.accountRetentionTime,
|
||||
|
||||
maxWorkers: Env.maxWorkers,
|
||||
}, w(function (err) {
|
||||
if (err) {
|
||||
|
|
|
@ -45,7 +45,7 @@ const EPHEMERAL_CHANNEL_LENGTH = HK.EPHEMERAL_CHANNEL_LENGTH = 34;
|
|||
// Temporary channels are archived X ms after everyone has left them
|
||||
const TEMPORARY_CHANNEL_LIFETIME = 30 * 1000;
|
||||
|
||||
const tryParse = HK.tryParse = function (Env, str) {
|
||||
HK.tryParse = function (Env, str) {
|
||||
try {
|
||||
return JSON.parse(str);
|
||||
} catch (err) {
|
||||
|
@ -316,11 +316,12 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
|
|||
if (typeof(cb) !== "function") { cb = function () {}; }
|
||||
|
||||
Env.queueStorage(id, function (next) {
|
||||
const msgBin = Buffer.from(msg + '\n', 'utf8');
|
||||
let msgBin;
|
||||
// Store the message first, and update the index only once it's stored.
|
||||
// store.messageBin can be async so updating the index first may
|
||||
// result in a wrong cpIndex
|
||||
nThen((waitFor) => {
|
||||
/*
|
||||
Env.store.messageBin(id, msgBin, waitFor(function (err) {
|
||||
if (err) {
|
||||
waitFor.abort();
|
||||
|
@ -336,6 +337,22 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
|
|||
return void next();
|
||||
}
|
||||
}));
|
||||
*/
|
||||
Env.storeWorker.storeMessage(Env, id, msg, waitFor((err, _msgBin) => {
|
||||
if (err) {
|
||||
waitFor.abort();
|
||||
Log.error("HK_STORE_MESSAGE_ERROR", err.message);
|
||||
// this error is critical, but there's not much we can do at the moment
|
||||
// proceed with more messages, but they'll probably fail too
|
||||
// at least you won't have a memory leak
|
||||
|
||||
// TODO make it possible to respond to clients with errors so they know
|
||||
// their message wasn't stored
|
||||
cb(err);
|
||||
return void next();
|
||||
}
|
||||
msgBin = _msgBin;
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
/* TODO we can skip updating the index if there's nobody in the channel.
|
||||
Populating it might actually be the cause of a memory leak.
|
||||
|
@ -518,8 +535,6 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
|
|||
|
||||
*/
|
||||
const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, cb) => {
|
||||
const store = Env.store;
|
||||
|
||||
let offset = -1;
|
||||
nThen((waitFor) => {
|
||||
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => {
|
||||
|
@ -534,19 +549,11 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|||
}
|
||||
offset = os;
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
}).nThen(() => {
|
||||
if (offset === -1) {
|
||||
return void cb(new Error('EUNKNOWN'));
|
||||
}
|
||||
const start = (beforeHash) ? 0 : offset;
|
||||
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
|
||||
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
|
||||
const parsed = tryParse(Env, msgObj.buff.toString('utf8'));
|
||||
if (!parsed) { return void readMore(); }
|
||||
handler(parsed, readMore);
|
||||
}, waitFor(function (err, reason) {
|
||||
return void cb(err, reason);
|
||||
}));
|
||||
Env.storeWorker.readHistoryMessages(Env, channelName, offset, beforeHash, handler, cb);
|
||||
});
|
||||
};
|
||||
|
||||
|
|
|
@ -11,49 +11,104 @@ const PID = process.pid;
|
|||
const Block = require("../storage/block");
|
||||
|
||||
const DB_PATH = 'lib/workers/db-worker';
|
||||
const STORAGE_PATH = 'lib/workers/storage-worker';
|
||||
|
||||
const MAX_JOBS = 16;
|
||||
const DEFAULT_QUERY_TIMEOUT = 60000 * 15; // increased from three to fifteen minutes because queries for very large files were taking as long as seven minutes
|
||||
|
||||
Workers.initialize = function (Env, config, _cb) {
|
||||
var cb = Util.once(Util.mkAsync(_cb));
|
||||
let Log = {
|
||||
error: () => {}
|
||||
};
|
||||
const handleLog = function (level, label, info) {
|
||||
if (typeof(Log[level]) !== 'function') { return; }
|
||||
Log[level](label, info);
|
||||
};
|
||||
const response = Util.response(function (errLabel, info) {
|
||||
if (!Log || !Log.error) { return; }
|
||||
Log.error('HK_WORKER__' + errLabel, info);
|
||||
});
|
||||
// pick ids that aren't already in use...
|
||||
const guid = function () {
|
||||
var id = Util.uid();
|
||||
return response.expected(id)? guid(): id;
|
||||
};
|
||||
|
||||
var incrementTime = function (command, start) {
|
||||
if (!command) { return; }
|
||||
var end = +new Date();
|
||||
var T = Env.commandTimers;
|
||||
var diff = (end - start);
|
||||
T[command] = (T[command] || 0) + (diff / 1000);
|
||||
const initWorker = function (Env, path, workers, config, handlers, cb) {
|
||||
const txid = guid();
|
||||
const { sendCommand, handleResponse } = handlers;
|
||||
|
||||
// Spawn a worker
|
||||
const worker = fork(path);
|
||||
|
||||
const state = {
|
||||
worker: worker,
|
||||
tasks: {},
|
||||
pid: worker.pid, // store the child process's id in an easily accessible location
|
||||
};
|
||||
|
||||
const workers = [];
|
||||
response.expect(txid, function (err) {
|
||||
if (err) { return void cb(err); }
|
||||
workers.push(state);
|
||||
cb(void 0, state);
|
||||
}, 15000);
|
||||
|
||||
const response = Util.response(function (errLabel, info) {
|
||||
Env.Log.error('HK_DB_WORKER__' + errLabel, info);
|
||||
worker.send({
|
||||
pid: PID,
|
||||
txid: txid,
|
||||
config: config,
|
||||
});
|
||||
|
||||
const Log = Env.Log;
|
||||
const handleLog = function (level, label, info) {
|
||||
if (typeof(Log[level]) !== 'function') { return; }
|
||||
Log[level](label, info);
|
||||
};
|
||||
worker.on('message', function (res) {
|
||||
handleResponse(state, res);
|
||||
});
|
||||
|
||||
var substituteWorker = Util.once(function () {
|
||||
Env.Log.info("SUBSTITUTE_WORKER", path);
|
||||
var idx = workers.indexOf(state);
|
||||
if (idx !== -1) {
|
||||
workers.splice(idx, 1);
|
||||
}
|
||||
|
||||
Object.keys(state.tasks).forEach(function (txid) {
|
||||
const cb = response.expectation(txid);
|
||||
if (typeof(cb) !== 'function') { return; }
|
||||
const task = state.tasks[txid];
|
||||
if (!(task && task.msg)) { return; }
|
||||
response.clear(txid);
|
||||
Log.info('WORKER_RESEND', path, task.msg);
|
||||
sendCommand(task.msg, cb);
|
||||
});
|
||||
|
||||
initWorker(Env, path, workers, config, handlers, function (err, state) {
|
||||
if (err) {
|
||||
throw new Error(err);
|
||||
}
|
||||
workers.push(state);
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('exit', substituteWorker);
|
||||
worker.on('close', substituteWorker);
|
||||
worker.on('error', function (err) {
|
||||
substituteWorker();
|
||||
Env.Log.error("DB_WORKER_ERROR", {
|
||||
error: err,
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
const initWorkersHandler = (Env, incrementTime, workers) => {
|
||||
Log = Env.Log;
|
||||
var isWorker = function (value) {
|
||||
return value && value.worker && typeof(value.worker.send) === 'function';
|
||||
};
|
||||
|
||||
// pick ids that aren't already in use...
|
||||
const guid = function () {
|
||||
var id = Util.uid();
|
||||
return response.expected(id)? guid(): id;
|
||||
};
|
||||
|
||||
const countWorkerTasks = function (/* index */) {
|
||||
return 0; // FIXME this disables all queueing until it can be proven correct
|
||||
//return Object.keys(workers[index].tasks || {}).length;
|
||||
};
|
||||
|
||||
var workerOffset = -1;
|
||||
var queue = [];
|
||||
|
||||
var getAvailableWorkerIndex = function () {
|
||||
// If there is already a backlog of tasks you can avoid some work
|
||||
// by going to the end of the line
|
||||
|
@ -104,6 +159,10 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
opt = opt || {};
|
||||
var index = getAvailableWorkerIndex();
|
||||
|
||||
// optional function that may be called multiple times
|
||||
// e.g. for each line processed by the worker on a file
|
||||
var handler = opt.handler;
|
||||
|
||||
var state = workers[index];
|
||||
// if there is no worker available:
|
||||
if (!isWorker(state)) {
|
||||
|
@ -111,6 +170,7 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
queue.push({
|
||||
msg: msg,
|
||||
cb: _cb,
|
||||
opt
|
||||
});
|
||||
if (drained) {
|
||||
drained = false;
|
||||
|
@ -118,7 +178,6 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
workers: workers.length,
|
||||
});
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -145,6 +204,8 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
// include the relevant worker process id in messages so that it will be logged
|
||||
// in the event that the message times out or fails in other ways.
|
||||
msg.worker = state.pid;
|
||||
msg.handler = handler;
|
||||
msg.hasHandler = Boolean(handler); // functions are removed when sent to worker
|
||||
|
||||
// track which worker is doing which jobs
|
||||
state.tasks[txid] = msg;
|
||||
|
@ -168,8 +229,27 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
return void Log.error("WRONG_PID", res);
|
||||
}
|
||||
|
||||
if (!res.txid) { return; }
|
||||
response.handle(res.txid, [res.error, res.value]);
|
||||
let txid = res.txid;
|
||||
if (!txid) { return; }
|
||||
|
||||
// if this is a line handler, run the handler and callback a "readMore"
|
||||
// to this txid
|
||||
if (res.handler && state.tasks[txid] && state.tasks[txid].handler) {
|
||||
state.tasks[txid].handler(res.value, () => {
|
||||
state.worker.send({
|
||||
txid,
|
||||
pid: state.worker.pid,
|
||||
readMore: true
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
response.handle(res.txid, [res.error, res.value]);
|
||||
} catch (e) {
|
||||
Log.error("WORKER_TASK_CB_ERROR", e);
|
||||
}
|
||||
delete state.tasks[res.txid];
|
||||
if (!queue.length) {
|
||||
if (!drained) {
|
||||
|
@ -198,69 +278,23 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
to the back because the following msg took its place. OR, in an
|
||||
even worse scenario, we cycle through the queue but don't run anything.
|
||||
*/
|
||||
sendCommand(nextMsg.msg, nextMsg.cb);
|
||||
sendCommand(nextMsg.msg, nextMsg.cb, nextMsg.opt);
|
||||
};
|
||||
|
||||
const initWorker = function (worker, cb) {
|
||||
const txid = guid();
|
||||
|
||||
const state = {
|
||||
worker: worker,
|
||||
tasks: {},
|
||||
pid: worker.pid, // store the child process's id in an easily accessible location
|
||||
};
|
||||
|
||||
response.expect(txid, function (err) {
|
||||
if (err) { return void cb(err); }
|
||||
workers.push(state);
|
||||
cb(void 0, state);
|
||||
}, 15000);
|
||||
|
||||
worker.send({
|
||||
pid: PID,
|
||||
txid: txid,
|
||||
config: config,
|
||||
});
|
||||
|
||||
worker.on('message', function (res) {
|
||||
handleResponse(state, res);
|
||||
});
|
||||
|
||||
var substituteWorker = Util.once(function () {
|
||||
Env.Log.info("SUBSTITUTE_DB_WORKER", '');
|
||||
var idx = workers.indexOf(state);
|
||||
if (idx !== -1) {
|
||||
workers.splice(idx, 1);
|
||||
}
|
||||
|
||||
Object.keys(state.tasks).forEach(function (txid) {
|
||||
const cb = response.expectation(txid);
|
||||
if (typeof(cb) !== 'function') { return; }
|
||||
const task = state.tasks[txid];
|
||||
if (!(task && task.msg)) { return; }
|
||||
response.clear(txid);
|
||||
Log.info('DB_WORKER_RESEND', task.msg);
|
||||
sendCommand(task.msg, cb);
|
||||
});
|
||||
|
||||
var w = fork(DB_PATH);
|
||||
initWorker(w, function (err, state) {
|
||||
if (err) {
|
||||
throw new Error(err);
|
||||
}
|
||||
workers.push(state);
|
||||
});
|
||||
});
|
||||
|
||||
worker.on('exit', substituteWorker);
|
||||
worker.on('close', substituteWorker);
|
||||
worker.on('error', function (err) {
|
||||
substituteWorker();
|
||||
Env.Log.error("DB_WORKER_ERROR", {
|
||||
error: err,
|
||||
});
|
||||
});
|
||||
return {
|
||||
sendCommand,
|
||||
handleResponse,
|
||||
};
|
||||
};
|
||||
|
||||
Workers.initializeStore = function (Env, config, _cb) {
|
||||
var cb = Util.once(Util.mkAsync(_cb));
|
||||
const workers = [];
|
||||
|
||||
const handlers = initWorkersHandler(Env, function () {}, workers);
|
||||
const { sendCommand } = handlers;
|
||||
|
||||
let env = Env.storeWorker = {};
|
||||
|
||||
nThen(function (w) {
|
||||
var limit = Env.maxWorkers;
|
||||
|
@ -275,7 +309,67 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
return;
|
||||
}
|
||||
|
||||
initWorker(fork(DB_PATH), w(function (err) {
|
||||
initWorker(Env, STORAGE_PATH, workers, config, handlers, w(function (err) {
|
||||
if (!err) { return; }
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
}));
|
||||
});
|
||||
}).nThen(function () {
|
||||
env.storeMessage = function (Env, channel, msg, cb) {
|
||||
Env.store.getWeakLock(channel, function (next) {
|
||||
sendCommand({
|
||||
command: 'STORE_MESSAGE',
|
||||
channel: channel,
|
||||
msg: msg,
|
||||
}, Util.both(next, cb));
|
||||
});
|
||||
};
|
||||
env.readHistoryMessages = function (Env, channel, offset, beforeHash, h, cb) {
|
||||
Env.store.getWeakLock(channel, function (next) {
|
||||
sendCommand({
|
||||
command: 'READ_HISTORY_MESSAGES',
|
||||
channel,
|
||||
offset,
|
||||
beforeHash
|
||||
}, Util.both(next, cb), { handler: h });
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
cb(void 0);
|
||||
});
|
||||
};
|
||||
|
||||
Workers.initialize = function (Env, config, _cb) {
|
||||
var cb = Util.once(Util.mkAsync(_cb));
|
||||
|
||||
var incrementTime = function (command, start) {
|
||||
if (!command) { return; }
|
||||
var end = +new Date();
|
||||
var T = Env.commandTimers;
|
||||
var diff = (end - start);
|
||||
T[command] = (T[command] || 0) + (diff / 1000);
|
||||
};
|
||||
|
||||
const workers = [];
|
||||
const handlers = initWorkersHandler(Env, incrementTime, workers);
|
||||
const { sendCommand } = handlers;
|
||||
|
||||
nThen(function (w) {
|
||||
var limit = Env.maxWorkers;
|
||||
var logged;
|
||||
|
||||
OS.cpus().forEach(function (cpu, index) {
|
||||
if (limit && index >= limit) {
|
||||
if (!logged) {
|
||||
logged = true;
|
||||
Log.info('WORKER_LIMIT', "(Opting not to use available CPUs beyond " + index + ')');
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
initWorker(Env, DB_PATH, workers, config, handlers, w(function (err) {
|
||||
if (!err) { return; }
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
|
|
196
lib/workers/storage-worker.js
Normal file
196
lib/workers/storage-worker.js
Normal file
|
@ -0,0 +1,196 @@
|
|||
// SPDX-FileCopyrightText: 2023 XWiki CryptPad Team <contact@cryptpad.org> and contributors
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
const Util = require("../common-util");
|
||||
const Store = require("../storage/file");
|
||||
const BlobStore = require("../storage/blob");
|
||||
const Tasks = require("../storage/tasks");
|
||||
const nThen = require("nthen");
|
||||
const Logger = require("../log");
|
||||
const HK = require('../hk-util');
|
||||
|
||||
const Env = {
|
||||
Log: {},
|
||||
};
|
||||
|
||||
// support the usual log API but pass it to the main process
|
||||
Logger.levels.forEach(function (level) {
|
||||
Env.Log[level] = function (label, info) {
|
||||
process.send({
|
||||
log: level,
|
||||
label: label,
|
||||
info: info,
|
||||
});
|
||||
};
|
||||
});
|
||||
|
||||
var ready = false;
|
||||
var store;
|
||||
var pinStore;
|
||||
var blobStore;
|
||||
|
||||
const init = function (config, _cb) {
|
||||
const cb = Util.once(Util.mkAsync(_cb));
|
||||
if (!config) {
|
||||
return void cb('E_INVALID_CONFIG');
|
||||
}
|
||||
|
||||
Env.paths = {
|
||||
pin: config.pinPath,
|
||||
block: config.blockPath,
|
||||
};
|
||||
|
||||
Env.inactiveTime = config.inactiveTime;
|
||||
Env.archiveRetentionTime = config.archiveRetentionTime;
|
||||
Env.accountRetentionTime = config.accountRetentionTime;
|
||||
|
||||
nThen(function (w) {
|
||||
Store.create(config, w(function (err, _store) {
|
||||
if (err) {
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
}
|
||||
Env.store = store = _store;
|
||||
}));
|
||||
Store.create({
|
||||
filePath: config.pinPath,
|
||||
archivePath: config.archivePath,
|
||||
// important to initialize the pinstore with its own volume id
|
||||
// otherwise archived pin logs will get mixed in with channels
|
||||
volumeId: 'pins',
|
||||
}, w(function (err, _pinStore) {
|
||||
if (err) {
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
}
|
||||
Env.pinStore = pinStore = _pinStore;
|
||||
}));
|
||||
BlobStore.create({
|
||||
blobPath: config.blobPath,
|
||||
blobStagingPath: config.blobStagingPath,
|
||||
archivePath: config.archivePath,
|
||||
getSession: function () {},
|
||||
}, w(function (err, blob) {
|
||||
if (err) {
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
}
|
||||
Env.blobStore = blobStore = blob;
|
||||
}));
|
||||
}).nThen(function (w) {
|
||||
Tasks.create({
|
||||
log: Env.Log,
|
||||
taskPath: config.taskPath,
|
||||
store: store,
|
||||
}, w(function (err, tasks) {
|
||||
if (err) {
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
}
|
||||
Env.tasks = tasks;
|
||||
}));
|
||||
}).nThen(function () {
|
||||
cb();
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
const storeMessage = (data, cb) => {
|
||||
const id = data.channel;
|
||||
const msg = data.msg;
|
||||
const Log = Env.Log;
|
||||
|
||||
const msgBin = Buffer.from(msg + '\n', 'utf8');
|
||||
store.messageBin(id, msgBin, function (err) {
|
||||
if (err) { return void cb(err); }
|
||||
cb(void 0, msgBin);
|
||||
});
|
||||
};
|
||||
|
||||
const readHistoryMessages = (data, cb) => {
|
||||
const { channel, offset, beforeHash } = data;
|
||||
|
||||
const start = (beforeHash) ? 0 : offset;
|
||||
store.readMessagesBin(channel, start, (msgObj, readMore, abort) => {
|
||||
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
|
||||
const parsed = HK.tryParse(Env, msgObj.buff.toString('utf8'));
|
||||
if (!parsed) console.log(channel);
|
||||
if (!parsed) { return void readMore(); }
|
||||
|
||||
// 3rd argument: this is a handler with a readMore cb
|
||||
cb(void 0, parsed, () => {
|
||||
readMore();
|
||||
});
|
||||
}, (err, reason) => {
|
||||
cb(err, reason);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
const COMMANDS = {
|
||||
STORE_MESSAGE: storeMessage,
|
||||
READ_HISTORY_MESSAGES: readHistoryMessages
|
||||
};
|
||||
|
||||
|
||||
|
||||
// Store readMore callback for a given txid
|
||||
const readMore = {};
|
||||
|
||||
|
||||
process.on('message', function (data) {
|
||||
if (!data || !data.txid || !data.pid) {
|
||||
return void process.send({
|
||||
error:'E_INVAL',
|
||||
data: data,
|
||||
});
|
||||
}
|
||||
|
||||
// If this is a readMore command, call its associated readMore function
|
||||
if (data.readMore) {
|
||||
if (typeof(readMore[data.txid]) === "function") { readMore[data.txid](); }
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise treat it as a first command
|
||||
|
||||
const cb = function (err, value, isHandler) {
|
||||
if (isHandler) {
|
||||
readMore[data.txid] = isHandler;
|
||||
} else {
|
||||
delete readMore[data.txid];
|
||||
}
|
||||
process.send({
|
||||
error: Util.serializeError(err),
|
||||
txid: data.txid,
|
||||
pid: data.pid,
|
||||
value: value,
|
||||
handler: Boolean(data.hasHandler && isHandler)
|
||||
});
|
||||
};
|
||||
|
||||
if (!ready) {
|
||||
return void init(data.config, function (err) {
|
||||
if (err) { return void cb(Util.serializeError(err)); }
|
||||
ready = true;
|
||||
cb();
|
||||
});
|
||||
}
|
||||
|
||||
const command = COMMANDS[data.command];
|
||||
if (typeof(command) !== 'function') {
|
||||
return void cb("E_BAD_COMMAND");
|
||||
}
|
||||
command(data, cb);
|
||||
});
|
||||
|
||||
process.on('uncaughtException', function (err) {
|
||||
console.error('[%s] UNCAUGHT EXCEPTION IN DB WORKER', new Date());
|
||||
console.error(err);
|
||||
console.error("TERMINATING");
|
||||
process.exit(1);
|
||||
});
|
Loading…
Reference in a new issue