Merge branch 'metrics' into load-metrics
This commit is contained in:
commit
2ad7e2f9a9
6 changed files with 646 additions and 1157 deletions
|
@ -8,6 +8,7 @@ const nThen = require('nthen');
|
|||
const Util = require("./common-util");
|
||||
const MetaRPC = require("./commands/metadata");
|
||||
const Nacl = require('tweetnacl/nacl-fast');
|
||||
const Monitoring = require('./monitoring');
|
||||
const now = function () { return (new Date()).getTime(); };
|
||||
const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds
|
||||
|
||||
|
@ -320,6 +321,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, tim
|
|||
// Store the message first, and update the index only once it's stored.
|
||||
// store.messageBin can be async so updating the index first may
|
||||
// result in a wrong cpIndex
|
||||
Monitoring.increment('storeMessage');
|
||||
nThen((waitFor) => {
|
||||
Env.store.messageBin(id, msgBin, waitFor(function (err) {
|
||||
if (err) {
|
||||
|
@ -521,6 +523,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|||
const store = Env.store;
|
||||
|
||||
let offset = -1;
|
||||
Monitoring.increment('getHistoryAsync');
|
||||
nThen((waitFor) => {
|
||||
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => {
|
||||
if (err) {
|
||||
|
|
|
@ -127,6 +127,7 @@ const chanMetric = new Prometheus.Gauge({
|
|||
name: `active_channels`,
|
||||
help: 'Number of active pads',
|
||||
});
|
||||
const callsMetrics = {};
|
||||
EVENTS.MONITORING = function (data) {
|
||||
/*
|
||||
{
|
||||
|
@ -140,6 +141,7 @@ EVENTS.MONITORING = function (data) {
|
|||
}
|
||||
}
|
||||
*/
|
||||
let calls = {};
|
||||
Object.keys(data).forEach(pid => {
|
||||
let val = data[pid];
|
||||
let type = val.type;
|
||||
|
@ -156,13 +158,37 @@ EVENTS.MONITORING = function (data) {
|
|||
let percent = sum / (Monitoring.interval/1000);
|
||||
cpuTotalMetric.set({pid, type}, sum);
|
||||
cpuPercentMetric.set({pid, type}, percent);
|
||||
|
||||
if (type === 'main') {
|
||||
wsMetric.set(val.ws || 0);
|
||||
regMetric.set(val.registered || 0);
|
||||
chanMetric.set(val.channels || 0);
|
||||
}
|
||||
|
||||
if (val.calls) {
|
||||
Object.keys(val.calls).forEach(key => {
|
||||
let k = key;
|
||||
if (type === 'main') {
|
||||
k = `main_${key}`;
|
||||
} else {
|
||||
k = `worker_${key}`;
|
||||
}
|
||||
calls[k] = calls[k] || 0;
|
||||
calls[k] += val.calls[key];
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
Object.keys(calls).forEach(key => {
|
||||
let m = callsMetrics[key];
|
||||
if (!m) {
|
||||
m = callsMetrics[key] = new Prometheus.Gauge({
|
||||
name: key,
|
||||
help: key
|
||||
});
|
||||
}
|
||||
m.set(calls[key]);
|
||||
});
|
||||
};
|
||||
|
||||
EVENTS.ENV_UPDATE = function (data /*, cb */) {
|
||||
|
|
|
@ -15,6 +15,10 @@ VALUES.cpu = () => {
|
|||
oldCpu = process.cpuUsage();
|
||||
return val;
|
||||
};
|
||||
const calls = {};
|
||||
VALUES.calls = () => {
|
||||
return calls;
|
||||
};
|
||||
|
||||
const applyToEnv = (Env, data) => {
|
||||
if (!Env) { return; }
|
||||
|
@ -37,9 +41,15 @@ const remove = (Env, pid) => {
|
|||
}
|
||||
};
|
||||
|
||||
const increment = (key) => {
|
||||
calls[key] = calls[key] || 0;
|
||||
calls[key]++;
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
interval: 5000,
|
||||
applyToEnv,
|
||||
increment,
|
||||
getData,
|
||||
remove
|
||||
};
|
||||
|
|
|
@ -285,6 +285,8 @@ const computeIndex = function (data, cb) {
|
|||
const channelName = data.channel;
|
||||
const CB = Util.once(cb);
|
||||
|
||||
Monitoring.increment('computeIndex');
|
||||
|
||||
var start = 0;
|
||||
nThen(function (w) {
|
||||
store.getOffset(channelName, w(function (err, obj) {
|
||||
|
@ -306,6 +308,7 @@ const computeIndex = function (data, cb) {
|
|||
});
|
||||
}
|
||||
w.abort();
|
||||
Monitoring.increment('computeIndexFromOffset');
|
||||
CB(err, index);
|
||||
}));
|
||||
}).nThen(function (w) {
|
||||
|
@ -314,6 +317,7 @@ const computeIndex = function (data, cb) {
|
|||
store.clearOffset(channelName, w());
|
||||
}).nThen(function () {
|
||||
// now get the history as though it were the first time
|
||||
Monitoring.increment('computeIndexFromStart');
|
||||
computeIndexFromOffset(channelName, 0, CB);
|
||||
});
|
||||
};
|
||||
|
@ -321,6 +325,7 @@ const computeIndex = function (data, cb) {
|
|||
const computeMetadata = function (data, cb) {
|
||||
const ref = {};
|
||||
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);
|
||||
Monitoring.increment('computeMetadata');
|
||||
return void store.readChannelMetadata(data.channel, lineHandler, function (err) {
|
||||
if (err) {
|
||||
// stream errors?
|
||||
|
@ -401,6 +406,7 @@ const getPinState = function (data, cb) {
|
|||
var lineHandler = Pins.createLineHandler(ref, Env.Log.error);
|
||||
|
||||
// if channels aren't in memory. load them from disk
|
||||
Monitoring.increment('getPin');
|
||||
pinStore.readMessagesBin(safeKey, 0, (msgObj, readMore) => {
|
||||
lineHandler(msgObj.buff.toString('utf8'));
|
||||
readMore();
|
||||
|
@ -457,6 +463,7 @@ const _iterateFiles = function (channels, handler, cb) {
|
|||
|
||||
const getTotalSize = function (data, cb) {
|
||||
var bytes = 0;
|
||||
Monitoring.increment('getTotalSize');
|
||||
_iterateFiles(data.channels, function (channel, next) {
|
||||
_getFileSize(channel, function (err, size) {
|
||||
if (!err) { bytes += size; }
|
||||
|
@ -502,6 +509,7 @@ const getHashOffset = function (data, cb) {
|
|||
const lastKnownHash = data.hash;
|
||||
if (typeof(lastKnownHash) !== 'string') { return void cb("INVALID_HASH"); }
|
||||
|
||||
Monitoring.increment('getHashOffset');
|
||||
var offset = -1;
|
||||
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
|
||||
// tryParse return a parsed message or undefined
|
||||
|
@ -600,6 +608,8 @@ const completeUpload = function (data, cb) {
|
|||
var arg = data.arg;
|
||||
var size = data.size;
|
||||
|
||||
Monitoring.increment('uploadedBlob');
|
||||
|
||||
var method;
|
||||
var label;
|
||||
if (owned) {
|
||||
|
@ -679,6 +689,7 @@ const COMMANDS = {
|
|||
};
|
||||
|
||||
COMMANDS.INLINE = function (data, cb) {
|
||||
Monitoring.increment('inlineValidation');
|
||||
var signedMsg;
|
||||
try {
|
||||
signedMsg = Nacl.util.decodeBase64(data.msg);
|
||||
|
@ -703,6 +714,7 @@ COMMANDS.INLINE = function (data, cb) {
|
|||
|
||||
const checkDetachedSignature = function (signedMsg, signature, publicKey) {
|
||||
if (!(signedMsg && publicKey)) { return false; }
|
||||
Monitoring.increment('detachedValidation');
|
||||
|
||||
var signedBuffer;
|
||||
var pubBuffer;
|
||||
|
@ -766,10 +778,12 @@ COMMANDS.HASH_CHANNEL_LIST = function (data, cb) {
|
|||
};
|
||||
|
||||
COMMANDS.VALIDATE_ANCESTOR_PROOF = function (data, cb) {
|
||||
Monitoring.increment('validateAncestorProof');
|
||||
Block.validateAncestorProof(Env, data && data.proof, cb);
|
||||
};
|
||||
|
||||
COMMANDS.VALIDATE_LOGIN_BLOCK = function (data, cb) {
|
||||
Monitoring.increment('validateLoginBlock');
|
||||
Block.validateLoginBlock(Env, data.publicKey, data.signature, data.block, cb);
|
||||
};
|
||||
|
||||
|
|
1748
package-lock.json
generated
1748
package-lock.json
generated
File diff suppressed because it is too large
Load diff
|
@ -22,7 +22,7 @@
|
|||
"chainpad-crypto": "^0.2.5",
|
||||
"chainpad-listmap": "^1.1.0",
|
||||
"chainpad-netflux": "^1.2.0",
|
||||
"chainpad-server": "^5.2.0",
|
||||
"chainpad-server": "^5.2.1",
|
||||
"ckeditor": "npm:ckeditor4@~4.22.1",
|
||||
"codemirror": "^5.19.0",
|
||||
"components-font-awesome": "^4.6.3",
|
||||
|
|
Loading…
Reference in a new issue