Compare commits

...

25 commits

Author SHA1 Message Date
yflory
3722d09021 Merge branch 'sodium' into metrics 2024-07-11 11:09:14 +02:00
yflory
8c11b8c4c6 lint compliance 2024-07-11 11:08:44 +02:00
yflory
d5d8b0dcab Add new metrics 2024-07-11 11:06:10 +02:00
yflory
36d7d917a7 Merge branch 'monitoring' into metrics 2024-07-11 10:56:11 +02:00
yflory
4d3f592731 Get frequency of metrics instead of absolute number 2024-07-08 15:01:03 +02:00
yflory
80b279489e Add metrics for getHistoryAsync 2024-07-04 11:32:23 +02:00
yflory
fda74533d7 Merge branch 'main' into monitoring 2024-07-02 11:54:16 +02:00
yflory
2068b0a6a3 Merge branch 'sodium' into monitoring 2024-07-02 11:54:12 +02:00
yflory
57be683695 Add metrics about commands usage 2024-07-01 14:39:37 +02:00
yflory
2f22b9484b Merge branch '2024.6-rc' into monitoring 2024-06-28 11:51:32 +02:00
yflory
411039abeb Add support for a serverside crypto plugin 2024-06-27 16:26:57 +02:00
yflory
8d3500268a Merge branch '2024.6-rc' into monitoring 2024-06-25 15:58:00 +02:00
yflory
1d639a7653 update package.json and package-lock.json 2024-06-12 15:32:26 +02:00
yflory
1310979994 lint compliance 2024-06-12 15:01:57 +02:00
yflory
2de0a0d4b9 Merge branch 'stats' into monitoring 2024-06-12 14:59:55 +02:00
yflory
73891d819e Add new monitoring data 2024-05-23 16:01:38 +02:00
yflory
eb1249b6cd Fix cpu usage monitoring 2024-05-17 11:22:32 +02:00
yflory
332edec162 Merge branch 'main' into monitoring 2024-05-17 10:45:37 +02:00
yflory
bde6ef8044 Add CPU monitoring 2024-05-17 10:30:39 +02:00
yflory
9b8e487b70 Merge branch 'main' into monitoring 2024-05-16 17:19:40 +02:00
yflory
99f1cf650e Fix package.json 2024-05-16 17:18:00 +02:00
yflory
9c22a25ba2 Merge branch 'main' into monitoring 2024-05-16 17:15:09 +02:00
yflory
fff4e7581e Add heap memory data to the monitoring tools 2023-08-30 16:13:09 +02:00
yflory
49af0533b5 lint compliance 2023-08-22 18:36:48 +02:00
yflory
4f2a48a72e Add memory monitoring tools 2023-08-22 18:09:22 +02:00
13 changed files with 1000 additions and 1155 deletions

View file

@ -108,7 +108,7 @@ nThen(function (w) {
};
// spawn ws server and attach netflux event handlers
let Server = NetfluxSrv.create(new WebSocketServer({ server: Env.httpServer}))
let Server = Env.Server = NetfluxSrv.create(new WebSocketServer({ server: Env.httpServer}))
.on('channelClose', historyKeeper.channelClose)
.on('channelMessage', historyKeeper.channelMessage)
.on('channelOpen', historyKeeper.channelOpen)

24
lib/crypto.js Normal file
View file

@ -0,0 +1,24 @@
const Nacl = require('tweetnacl/nacl-fast');
const CPCrypto = module.exports;
const plugins = require('./plugin-manager');
CPCrypto.init = (cb) => {
const crypto = {};
crypto.open = (signedMsg, validateKey) => {
return Nacl.sign.open(signedMsg, validateKey);
};
crypto.detachedVerify = (signedBuffer, signatureBuffer, validateKey) => {
return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, validateKey);
};
if (plugins.SODIUM && plugins.SODIUM.crypto) {
let c = plugins.SODIUM.crypto;
if (c.open) { crypto.open = c.open; }
if (c.detachedVerify) { crypto.detachedVerify = c.detachedVerify; }
}
// Make async because we might need it later with libsodium's promise
// libsodium.ready.then(() => {});
setTimeout(() => {
cb(void 0, crypto);
});
};

View file

@ -252,6 +252,7 @@ module.exports.create = function (config) {
curvePublic: Nacl.util.encodeBase64(curve.publicKey),
selfDestructTo: {},
monitoring: {}
};
(function () {
@ -415,6 +416,7 @@ const BAD = [
'limits',
'customLimits',
'scheduleDecree',
'monitoring',
'httpServer',

View file

@ -8,6 +8,7 @@ const nThen = require('nthen');
const Util = require("./common-util");
const MetaRPC = require("./commands/metadata");
const Nacl = require('tweetnacl/nacl-fast');
const Monitoring = require('./monitoring');
const now = function () { return (new Date()).getTime(); };
const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds
@ -320,6 +321,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
// Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may
// result in a wrong cpIndex
Monitoring.increment('storeMessage');
nThen((waitFor) => {
Env.store.messageBin(id, msgBin, waitFor(function (err) {
if (err) {
@ -391,6 +393,8 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
var msgLength = msgBin.length;
index.size += msgLength;
Monitoring.increment('broadcastMessage', (channel.length-1));
// handle the next element in the queue
next();
@ -521,6 +525,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
const store = Env.store;
let offset = -1;
Monitoring.increment('getHistoryAsync');
nThen((waitFor) => {
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => {
if (err) {

View file

@ -19,6 +19,9 @@ const BlobStore = require("./storage/blob");
const BlockStore = require("./storage/block");
const plugins = require("./plugin-manager");
const Prometheus = require('prom-client');
const Monitoring = require('./monitoring');
const DEFAULT_QUERY_TIMEOUT = 5000;
const PID = process.pid;
@ -66,6 +69,153 @@ Env.incrementBytesWritten = function () {};
const EVENTS = {};
// XXX Store in monitoring.js
const rssMetric = new Prometheus.Gauge({
name: `memory_rss`,
help: 'The amount of space occupied in the main memory device for the process.',
labelNames: ['pid', 'type']
});
const heapTotalMetric = new Prometheus.Gauge({
name: `memory_heap_total`,
help: "Total heap memory.",
labelNames: ['pid', 'type']
});
const heapUsedMetric = new Prometheus.Gauge({
name: `memory_heap_used`,
help: 'Used heap memory.',
labelNames: ['pid', 'type']
});
const externalMetric = new Prometheus.Gauge({
name: `memory_external`,
help: 'Memory usage of C++ objects bound to JavaScript objects managed by V8.',
labelNames: ['pid', 'type']
});
const arrayBufferMetric = new Prometheus.Gauge({
name: `memory_array_buffers`,
help: 'Memory allocated for ArrayBuffers and SharedArrayBuffers.',
labelNames: ['pid', 'type']
});
const cpuUserMetric = new Prometheus.Gauge({
name: `process_cpu_user_seconds_total`,
help: 'Total user CPU time spent in seconds during the configured interval.',
labelNames: ['pid', 'type']
});
const cpuSystemMetric = new Prometheus.Gauge({
name: `process_cpu_system_seconds_total`,
help: 'Total system CPU time spent in seconds during the configured interval.',
labelNames: ['pid', 'type']
});
const cpuTotalMetric = new Prometheus.Gauge({
name: `process_cpu_seconds_total`,
help: 'Total user and system CPU time spent in seconds during the configured interval',
labelNames: ['pid', 'type']
});
const cpuPercentMetric = new Prometheus.Gauge({
name: `process_cpu_percent`,
help: 'Total user and system CPU time spent divided by the interval duration',
labelNames: ['pid', 'type']
});
const wsMetric = new Prometheus.Gauge({
name: `active_websockets`,
help: 'Number of active websocket connections',
});
const regMetric = new Prometheus.Gauge({
name: `active_registered_users`,
help: 'Number of registered users online',
});
const chanMetric = new Prometheus.Gauge({
name: `active_channels`,
help: 'Number of active pads',
});
const callsMetrics = {};
const callsFreq = {};
setInterval(() => {
Object.keys(callsFreq).forEach(key => {
let last = callsFreq[key] = callsFreq[key] || {};
last.value = last.value || 0;
if (!last.time) {
last.time = +new Date();
last.oldValue = last.value;
return;
}
// last.time exists, we can get a frequency
let now = +new Date();
let diffTime = (now - last.time)/1000;
let diffValue = last.value - (last.oldValue || 0);
let freq = Math.floor(diffValue/diffTime) || 0;
last.time = now;
last.oldValue = last.value;
// Update metrics
let m = callsMetrics[key];
if (!m) {
m = callsMetrics[key] = new Prometheus.Gauge({
name: key,
help: key
});
}
m.set(freq);
});
}, 5000);
EVENTS.MONITORING = function (data) {
/*
{
main: {
rss: 1234
...
},
pid1: {
rss: 234
...
}
}
*/
let calls = {};
Object.keys(data).forEach(pid => {
let val = data[pid];
let type = val.type;
rssMetric.set({pid, type}, val.mem?.rss || 0);
heapTotalMetric.set({pid, type}, val.mem?.heapTotal || 0);
heapUsedMetric.set({pid, type}, val.mem?.heapUsed || 0);
externalMetric.set({pid, type}, val.mem?.external || 0);
arrayBufferMetric.set({pid, type}, val.mem?.arrayBuffers || 0);
let userSeconds = (val.cpu?.user || 0) / 1000000;
let systemSeconds = (val.cpu?.system || 0) / 1000000;
cpuUserMetric.set({pid, type}, userSeconds);
cpuSystemMetric.set({pid, type}, systemSeconds);
let sum = userSeconds + systemSeconds;
let percent = sum / (Monitoring.interval/1000);
cpuTotalMetric.set({pid, type}, sum);
cpuPercentMetric.set({pid, type}, percent);
if (type === 'main') {
wsMetric.set(val.ws || 0);
regMetric.set(val.registered || 0);
chanMetric.set(val.channels || 0);
}
if (val.calls) {
Object.keys(val.calls).forEach(key => {
let k = key;
if (type === 'main') {
k = `main_${key}`;
} else {
k = `worker_${key}`;
}
calls[k] = calls[k] || 0;
calls[k] += val.calls[key];
});
}
});
Object.keys(calls).forEach(key => {
let f = callsFreq[key] = callsFreq[key] || {};
f.value = calls[key];
});
};
EVENTS.ENV_UPDATE = function (data /*, cb */) {
try {
Env = JSON.parse(data);
@ -219,6 +369,13 @@ const wsProxy = createProxyMiddleware({
app.use('/cryptpad_websocket', wsProxy);
app.get('/metrics', (req, res) => {
Prometheus.register.metrics().then((data) => {
res.set('Content-Type', Prometheus.register.contentType);
res.send(data);
});
});
app.use('/ssoauth', (req, res, next) => {
if (SSOUtils && req && req.body && req.body.SAMLResponse) {
req.method = 'GET';
@ -799,7 +956,14 @@ nThen(function (w) {
}));
}).nThen(function () {
// TODO inform the parent process that this worker is ready
setInterval(() => {
sendMessage({
command: 'MONITORING',
data: Monitoring.getData('http-worker')
}, () => {
// Done
});
}, Monitoring.interval);
});
process.on('uncaughtException', function (err) {

55
lib/monitoring.js Normal file
View file

@ -0,0 +1,55 @@
/*
globals process
*/
const VALUES = {};
VALUES.mem = () => {
return process.memoryUsage();
};
let oldCpu;
VALUES.cpu = () => {
if (!oldCpu) {
oldCpu = process.cpuUsage();
return {user:0,system:0};
}
let val = process.cpuUsage(oldCpu);
oldCpu = process.cpuUsage();
return val;
};
const calls = {};
VALUES.calls = () => {
return calls;
};
const applyToEnv = (Env, data) => {
if (!Env) { return; }
Env.monitoring[data.pid] = data;
};
const getData = (type) => {
const value = {
pid: process.pid,
type: type
};
Object.keys(VALUES).forEach(key => {
value[key] = VALUES[key]();
});
return value;
};
const remove = (Env, pid) => {
if (Env && Env.monitoring && pid && Env.monitoring[pid]) {
delete Env.monitoring[pid];
}
};
const increment = (key, value) => {
if (typeof(value) !== "number") { value = 1; }
calls[key] = (calls[key] || 0) + value;
};
module.exports = {
interval: 5000,
applyToEnv,
increment,
getData,
remove
};

View file

@ -170,6 +170,15 @@ var rpc = function (Env, Server, userId, data, respond) {
var command = msg[1];
/*
// TODO get data from lib/upload.js to be able to get the size of the uploaded file
if (command === 'UPLOAD_COMPLETE' || command === 'OWNED_UPLOAD_COMPLETE') {
let m = Env.monitoring = Env.monitoring || {};
let b = m.upload = m.upload || {};
let id = msg[2];
if (id) { b[id] = +new Date(); }
}
*/
if (command === 'UPLOAD') {
// UPLOAD is a special case that skips signature validation
// intentional fallthrough behaviour

View file

@ -16,6 +16,8 @@ const Logger = require("../log");
const Tasks = require("../storage/tasks");
const Nacl = require('tweetnacl/nacl-fast');
const Eviction = require("../eviction");
const Monitoring = require('../monitoring');
const CPCrypto = require('../crypto');
const Env = {
Log: {},
@ -56,6 +58,13 @@ const init = function (config, _cb) {
Env.archiveRetentionTime = config.archiveRetentionTime;
Env.accountRetentionTime = config.accountRetentionTime;
setInterval(() => {
process.send({
monitoring: true,
data: Monitoring.getData('db-worker')
});
}, Monitoring.interval);
nThen(function (w) {
Store.create(config, w(function (err, _store) {
if (err) {
@ -101,6 +110,10 @@ const init = function (config, _cb) {
}
Env.tasks = tasks;
}));
}).nThen(function (w) {
CPCrypto.init(w(function (err, crypto) {
Env.crypto = crypto;
}));
}).nThen(function () {
cb();
});
@ -272,6 +285,8 @@ const computeIndex = function (data, cb) {
const channelName = data.channel;
const CB = Util.once(cb);
Monitoring.increment('computeIndex');
var start = 0;
nThen(function (w) {
store.getOffset(channelName, w(function (err, obj) {
@ -293,6 +308,7 @@ const computeIndex = function (data, cb) {
});
}
w.abort();
Monitoring.increment('computeIndexFromOffset');
CB(err, index);
}));
}).nThen(function (w) {
@ -301,6 +317,7 @@ const computeIndex = function (data, cb) {
store.clearOffset(channelName, w());
}).nThen(function () {
// now get the history as though it were the first time
Monitoring.increment('computeIndexFromStart');
computeIndexFromOffset(channelName, 0, CB);
});
};
@ -308,6 +325,7 @@ const computeIndex = function (data, cb) {
const computeMetadata = function (data, cb) {
const ref = {};
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);
Monitoring.increment('computeMetadata');
return void store.readChannelMetadata(data.channel, lineHandler, function (err) {
if (err) {
// stream errors?
@ -388,6 +406,7 @@ const getPinState = function (data, cb) {
var lineHandler = Pins.createLineHandler(ref, Env.Log.error);
// if channels aren't in memory. load them from disk
Monitoring.increment('getPin');
pinStore.readMessagesBin(safeKey, 0, (msgObj, readMore) => {
lineHandler(msgObj.buff.toString('utf8'));
readMore();
@ -444,6 +463,7 @@ const _iterateFiles = function (channels, handler, cb) {
const getTotalSize = function (data, cb) {
var bytes = 0;
Monitoring.increment('getTotalSize');
_iterateFiles(data.channels, function (channel, next) {
_getFileSize(channel, function (err, size) {
if (!err) { bytes += size; }
@ -489,6 +509,7 @@ const getHashOffset = function (data, cb) {
const lastKnownHash = data.hash;
if (typeof(lastKnownHash) !== 'string') { return void cb("INVALID_HASH"); }
Monitoring.increment('getHashOffset');
var offset = -1;
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
// tryParse return a parsed message or undefined
@ -587,6 +608,8 @@ const completeUpload = function (data, cb) {
var arg = data.arg;
var size = data.size;
Monitoring.increment('uploadedBlob');
var method;
var label;
if (owned) {
@ -666,6 +689,7 @@ const COMMANDS = {
};
COMMANDS.INLINE = function (data, cb) {
Monitoring.increment('inlineValidation');
var signedMsg;
try {
signedMsg = Nacl.util.decodeBase64(data.msg);
@ -680,7 +704,8 @@ COMMANDS.INLINE = function (data, cb) {
return void cb("E_BADKEY");
}
// validate the message
const validated = Nacl.sign.open(signedMsg, validateKey);
//const validated = Nacl.sign.open(signedMsg, validateKey);
const validated = Env.crypto.open(signedMsg, validateKey);
if (!validated) {
return void cb("FAILED");
}
@ -689,6 +714,7 @@ COMMANDS.INLINE = function (data, cb) {
const checkDetachedSignature = function (signedMsg, signature, publicKey) {
if (!(signedMsg && publicKey)) { return false; }
Monitoring.increment('detachedValidation');
var signedBuffer;
var pubBuffer;
@ -720,7 +746,8 @@ const checkDetachedSignature = function (signedMsg, signature, publicKey) {
throw new Error("INVALID_SIGNATURE_LENGTH");
}
if (Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer) !== true) {
//if (Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer) !== true) {
if (Env.crypto.detachedVerify(signedBuffer, signatureBuffer, pubBuffer) !== true) {
throw new Error("FAILED");
}
};
@ -751,10 +778,12 @@ COMMANDS.HASH_CHANNEL_LIST = function (data, cb) {
};
COMMANDS.VALIDATE_ANCESTOR_PROOF = function (data, cb) {
Monitoring.increment('validateAncestorProof');
Block.validateAncestorProof(Env, data && data.proof, cb);
};
COMMANDS.VALIDATE_LOGIN_BLOCK = function (data, cb) {
Monitoring.increment('validateLoginBlock');
Block.validateLoginBlock(Env, data.publicKey, data.signature, data.block, cb);
};

View file

@ -9,6 +9,7 @@ const { fork } = require('child_process');
const Workers = module.exports;
const PID = process.pid;
const Block = require("../storage/block");
const Monitoring = require('../monitoring');
const DB_PATH = 'lib/workers/db-worker';
const MAX_JOBS = 16;
@ -162,6 +163,13 @@ Workers.initialize = function (Env, config, _cb) {
if (res.log) {
return void handleLog(res.log, res.label, res.info);
}
// handle monitoring data
if (res.monitoring) {
Monitoring.applyToEnv(Env, res.data);
return;
}
// but don't bother handling things addressed to other processes
// since it's basically guaranteed not to work
if (res.pid !== PID) {
@ -226,7 +234,9 @@ Workers.initialize = function (Env, config, _cb) {
handleResponse(state, res);
});
let pid = worker.pid;
var substituteWorker = Util.once(function () {
Monitoring.remove(Env, pid);
Env.Log.info("SUBSTITUTE_DB_WORKER", '');
var idx = workers.indexOf(state);
if (idx !== -1) {

1764
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -22,7 +22,7 @@
"chainpad-crypto": "^0.2.5",
"chainpad-listmap": "^1.1.0",
"chainpad-netflux": "^1.2.0",
"chainpad-server": "^5.2.0",
"chainpad-server": "^5.2.1",
"ckeditor": "npm:ckeditor4@~4.22.1",
"codemirror": "^5.19.0",
"components-font-awesome": "^4.6.3",
@ -50,6 +50,7 @@
"open-sans-fontface": "^1.4.0",
"openid-client": "^5.4.2",
"pako": "^2.1.0",
"prom-client": "^14.2.0",
"prompt-confirm": "^2.0.4",
"pull-stream": "^3.6.1",
"require-css": "0.1.10",

61
scripts/testcrypto.js Normal file
View file

@ -0,0 +1,61 @@
let SodiumNative = require('sodium-native');
let Nacl = require('tweetnacl/nacl-fast');
let LibSodium = require('libsodium-wrappers');
let msgStr = "This is a test";
let keys = Nacl.sign.keyPair();
let pub = keys.publicKey;
let msg = Nacl.util.decodeUTF8(msgStr);
let signedMsg = Nacl.sign(msg, keys.secretKey);
let sig = signedMsg.subarray(0, 64);
LibSodium.ready.then(() => {
/*
console.log('tweetnacl open');
console.log(!!Nacl.sign.open(signedMsg, pub));
console.log('tweetnacl detached');
console.log(Nacl.sign.detached.verify(msg, sig, pub));
console.log('sodium-native open');
console.log(SodiumNative.crypto_sign_open(msg, signedMsg, pub));
console.log('sodium-native detached');
console.log(SodiumNative.crypto_sign_verify_detached(sig, msg, pub));
LibSodium.ready.then(() => {
console.log('libsodium open');
console.log(!!LibSodium.crypto_sign_open(signedMsg, pub));
console.log('libsodium detached');
console.log(LibSodium.crypto_sign_verify_detached(sig, msg, pub));
});
*/
const n = 10000;
let a;
console.log('start sodium-native');
a = +new Date();
for (var i = 0; i < n; i++) {
SodiumNative.crypto_sign_open(msg, signedMsg, pub);
SodiumNative.crypto_sign_verify_detached(sig, msg, pub);
}
console.log('end sodium-native ', (+new Date() - a), ' ms');
console.log('start libsodium');
a = +new Date();
for (var i = 0; i < n; i++) {
LibSodium.crypto_sign_open(signedMsg, pub);
LibSodium.crypto_sign_verify_detached(sig, msg, pub);
}
console.log('end libsodium ', (+new Date() - a), ' ms');
console.log('start tweetnacl');
a = +new Date();
for (var i = 0; i < n; i++) {
Nacl.sign.open(signedMsg, pub);
Nacl.sign.detached.verify(msg, sig, pub);
}
console.log('end tweetnacl ', (+new Date() - a), ' ms');
});

View file

@ -15,6 +15,7 @@ var config = require("./lib/load-config");
var Environment = require("./lib/env");
var Env = Environment.create(config);
var Default = require("./lib/defaults");
var Monitoring = require('./lib/monitoring');
var app = Express();
@ -49,6 +50,11 @@ COMMANDS.GET_PROFILING_DATA = function (msg, cb) {
cb(void 0, Env.bytesWritten);
};
COMMANDS.MONITORING = function (msg, cb) {
Monitoring.applyToEnv(Env, msg.data);
cb();
};
nThen(function (w) {
require("./lib/log").create(config, w(function (_log) {
Env.Log = _log;
@ -93,6 +99,7 @@ nThen(function (w) {
var launchWorker = (online) => {
var worker = Cluster.fork(workerState);
var pid = worker.process.pid;
worker.on('online', () => {
online();
});
@ -122,6 +129,7 @@ nThen(function (w) {
});
worker.on('exit', (code, signal) => {
Monitoring.remove(Env, pid);
if (!signal && code === 0) { return; }
// relaunch http workers if they crash
Env.Log.error('HTTP_WORKER_EXIT', {
@ -163,6 +171,19 @@ nThen(function (w) {
broadcast('FLUSH_CACHE', Env.FRESH_KEY);
}, 250);
setInterval(() => {
// Add main process data to monitoring
let monitoring = Monitoring.getData('main');
let Server = Env.Server;
let stats = Server.getSessionStats();
monitoring.ws = stats.total;
monitoring.channels = Server.getActiveChannelCount();
monitoring.registered = Object.keys(Env.netfluxUsers).length;
// Send updated values
Monitoring.applyToEnv(Env, monitoring);
broadcast('MONITORING', Env.monitoring);
}, Monitoring.interval);
Env.envUpdated.reg(throttledEnvChange);
Env.cacheFlushed.reg(throttledCacheFlush);