38c1700173
(instead of waiting until you've read an unbounded number of pin logs while queries back up in memory) Also replace instances of 'publicKey' with 'safeKey' or 'unsafeKey' to clearly and correctly indicate their format.
196 lines
6 KiB
JavaScript
196 lines
6 KiB
JavaScript
/*jshint esversion: 6 */
|
|
|
|
var Pins = module.exports;
|
|
|
|
const Fs = require("fs");
|
|
const Path = require("path");
|
|
const Util = require("./common-util");
|
|
const Plan = require("./plan");
|
|
|
|
/* Accepts a reference to an object, and...
|
|
either a string describing which log is being processed (backwards compatibility),
|
|
or a function which will log the error with all relevant data
|
|
*/
|
|
var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
|
|
var fileName;
|
|
if (typeof(errorHandler) === 'string') {
|
|
fileName = errorHandler;
|
|
errorHandler = function (label, data) {
|
|
console.error(label, {
|
|
log: fileName,
|
|
data: data,
|
|
});
|
|
};
|
|
}
|
|
|
|
// passing the reference to an object allows us to overwrite accumulated pins
|
|
// make sure to get ref.pins as the result
|
|
// it's a weird API but it's faster than unpinning manually
|
|
var pins = ref.pins = {};
|
|
ref.index = 0;
|
|
ref.latest = 0; // the latest message (timestamp in ms)
|
|
ref.surplus = 0; // how many lines exist behind a reset
|
|
return function (line) {
|
|
ref.index++;
|
|
if (!Boolean(line)) { return; }
|
|
|
|
var l;
|
|
try {
|
|
l = JSON.parse(line);
|
|
} catch (e) {
|
|
return void errorHandler('PIN_LINE_PARSE_ERROR', line);
|
|
}
|
|
|
|
if (!Array.isArray(l)) {
|
|
return void errorHandler('PIN_LINE_NOT_FORMAT_ERROR', l);
|
|
}
|
|
|
|
if (typeof(l[2]) === 'number') {
|
|
ref.latest = l[2]; // date
|
|
}
|
|
|
|
switch (l[0]) {
|
|
case 'RESET': {
|
|
pins = ref.pins = {};
|
|
if (l[1] && l[1].length) { l[1].forEach((x) => { ref.pins[x] = 1; }); }
|
|
ref.surplus = ref.index;
|
|
//jshint -W086
|
|
// fallthrough
|
|
}
|
|
case 'PIN': {
|
|
l[1].forEach((x) => { pins[x] = 1; });
|
|
break;
|
|
}
|
|
case 'UNPIN': {
|
|
l[1].forEach((x) => { delete pins[x]; });
|
|
break;
|
|
}
|
|
default:
|
|
errorHandler("PIN_LINE_UNSUPPORTED_COMMAND", l);
|
|
}
|
|
};
|
|
};
|
|
|
|
/*
|
|
takes contents of a pinFile (UTF8 string)
|
|
and the pin file's name
|
|
returns an array of of channel ids which are pinned
|
|
|
|
throw errors on pin logs with invalid pin data
|
|
*/
|
|
Pins.calculateFromLog = function (pinFile, fileName) {
|
|
var ref = {};
|
|
var handler = createLineHandler(ref, fileName);
|
|
|
|
pinFile.split('\n').forEach(handler);
|
|
return Object.keys(ref.pins);
|
|
};
|
|
|
|
/*
|
|
pins/
|
|
pins/A+/
|
|
pins/A+/A+hyhrQLrgYixOomZYxpuEhwfiVzKk1bBp+arH-zbgo=.ndjson
|
|
*/
|
|
|
|
const getSafeKeyFromPath = function (path) {
|
|
return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
|
|
};
|
|
|
|
const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) {
|
|
(state[itemId] = state[itemId] || {})[safeKey] = 1;
|
|
};
|
|
|
|
Pins.list = function (_done, config) {
|
|
// allow for a configurable pin store location
|
|
const pinPath = config.pinPath || './data/pins';
|
|
|
|
// allow for a configurable amount of parallelism
|
|
const plan = Plan(config.workers || 5);
|
|
|
|
// run a supplied handler whenever you finish reading a log
|
|
// or noop if not supplied.
|
|
const handler = config.handler || function () {};
|
|
|
|
// use and mutate a supplied object for state if it's passed
|
|
const pinned = config.pinned || {};
|
|
|
|
var isDone = false;
|
|
// ensure that 'done' is only called once
|
|
// that it calls back asynchronously
|
|
// and that it sets 'isDone' to true, so that pending processes
|
|
// know to abort
|
|
const done = Util.once(Util.both(Util.mkAsync(_done), function () {
|
|
isDone = true;
|
|
}));
|
|
|
|
const errorHandler = function (label, info) {
|
|
console.log(label, info);
|
|
};
|
|
|
|
// TODO replace this with lib-readline?
|
|
const streamFile = function (path, cb) {
|
|
const id = getSafeKeyFromPath(path);
|
|
|
|
return void Fs.readFile(path, 'utf8', function (err, body) {
|
|
if (err) { return void cb(err); }
|
|
const ref = {};
|
|
const pinHandler = createLineHandler(ref, errorHandler);
|
|
var lines = body.split('\n');
|
|
lines.forEach(pinHandler);
|
|
handler(ref, id, pinned);
|
|
cb(void 0, ref);
|
|
});
|
|
};
|
|
|
|
const scanDirectory = function (path, cb) {
|
|
Fs.readdir(path, function (err, list) {
|
|
if (err) {
|
|
return void cb(err);
|
|
}
|
|
cb(void 0, list.map(function (item) {
|
|
return {
|
|
path: Path.join(path, item),
|
|
id: item.replace(/\.ndjson$/, ''),
|
|
};
|
|
}));
|
|
});
|
|
};
|
|
|
|
scanDirectory(pinPath, function (err, dirs) {
|
|
if (err) {
|
|
if (err.code === 'ENOENT') { return void done(void 0, {}); }
|
|
return void done(err);
|
|
}
|
|
dirs.forEach(function (dir) {
|
|
plan.job(1, function (next) {
|
|
if (isDone) { return void next(); }
|
|
scanDirectory(dir.path, function (nested_err, logs) {
|
|
if (nested_err) {
|
|
return void done(err);
|
|
}
|
|
logs.forEach(function (log) {
|
|
if (!/\.ndjson$/.test(log.path)) { return; }
|
|
plan.job(0, function (next) {
|
|
if (isDone) { return void next(); }
|
|
streamFile(log.path, function (err, ref) {
|
|
if (err) { return void done(err); }
|
|
|
|
var set = ref.pins;
|
|
for (var item in set) {
|
|
addUserPinToState(pinned, log.id, item);
|
|
}
|
|
next();
|
|
});
|
|
});
|
|
});
|
|
next();
|
|
});
|
|
});
|
|
});
|
|
|
|
plan.done(function () {
|
|
// err ?
|
|
done(void 0, pinned);
|
|
}).start();
|
|
});
|
|
};
|