Merge branch 'soon' into staging

This commit is contained in:
ansuz 2020-10-15 13:23:53 +05:30
commit 713c8c574b
6 changed files with 88 additions and 18 deletions

View file

@ -211,6 +211,21 @@ the server adds two pieces of information to the supplied decree:
Decrees.write(Env, decree, cb);
};
// CryptPad_AsyncStore.rpc.send('ADMIN', ['SET_LAST_EVICTION', 0], console.log)
var setLastEviction = function (Env, Server, cb, data, unsafeKey) {
var time = data && data[1];
if (typeof(time) !== 'number') {
return void cb('INVALID_ARGS');
}
Env.lastEviction = time;
cb();
Env.Log.info('LAST_EVICTION_TIME_SET', {
author: unsafeKey,
time: time,
});
};
// CryptPad_AsyncStore.rpc.send('ADMIN', ['INSTANCE_STATUS], console.log)
var instanceStatus = function (Env, Server, cb) {
cb(void 0, {
@ -225,8 +240,8 @@ var instanceStatus = function (Env, Server, cb) {
defaultStorageLimit: Env.defaultStorageLimit,
lastEviction: Env.lastEviction,
// FIXME eviction is run in a worker and this isn't returned
//knownActiveAccounts: Env.knownActiveAccounts,
evictionReport: Env.evictionReport,
disableIntegratedEviction: Env.disableIntegratedEviction,
disableIntegratedTasks: Env.disableIntegratedTasks,
@ -257,6 +272,7 @@ var commands = {
ADMIN_DECREE: adminDecree,
INSTANCE_STATUS: instanceStatus,
GET_LIMITS: getLimits,
SET_LAST_EVICTION: setLastEviction,
};
Admin.command = function (Env, safeKey, data, _cb, Server) {

View file

@ -112,9 +112,6 @@ commands.SET_PREMIUM_UPLOAD_SIZE = makeIntegerSetter('premiumUploadSize');
// CryptPad_AsyncStore.rpc.send('ADMIN', [ 'ADMIN_DECREE', ['UPDATE_DEFAULT_STORAGE', [100 * 1024 * 1024]]], console.log)
commands.UPDATE_DEFAULT_STORAGE = makeIntegerSetter('defaultStorageLimit');
// CryptPad_AsyncStore.rpc.send('ADMIN', [ 'ADMIN_DECREE', ['SET_LAST_EVICTION', [0]]], console.log)
commands.SET_LAST_EVICTION = makeIntegerSetter('lastEviction');
// CryptPad_AsyncStore.rpc.send('ADMIN', [ 'ADMIN_DECREE', ['SET_INACTIVE_TIME', [90]]], console.log)
commands.SET_INACTIVE_TIME = makeIntegerSetter('inactiveTime');

View file

@ -92,7 +92,7 @@ module.exports.create = function (config) {
disableIntegratedTasks: config.disableIntegratedTasks || false,
disableIntegratedEviction: config.disableIntegratedEviction || false,
lastEviction: +new Date(),
knownActiveAccounts: 0,
evictionReport: {},
};
(function () {

View file

@ -32,6 +32,26 @@ Env = {
module.exports = function (Env, cb) {
var complete = Util.once(Util.mkAsync(cb));
var report = {
// archivedChannelsRemoved,
// archivedAccountsRemoved,
// archivedBlobProofsRemoved,
// archivedBlobsRemoved,
// totalChannels,
// activeChannels,
// totalBlobs,
// activeBlobs,
// totalAccounts,
// activeAccounts,
// channelsArchived,
launchTime: +new Date(),
// runningTime,
};
// the administrator should have set an 'inactiveTime' in their config
// if they didn't, just exit.
@ -81,7 +101,9 @@ module.exports = function (Env, cb) {
TODO make this configurable ?
*/
var BLOOM_CAPACITY = (1 << 20) - 1; // over a million items
var BLOOM_ERROR = 1 / 1000; // an error rate of one in a thousand
var BLOOM_ERROR = 1 / 10000; // an error rate of one in a thousand
// the number of ms artificially introduced between CPU-intensive operations
var THROTTLE_FACTOR = 10;
// we'll use one filter for the set of active documents
var activeDocs = Bloom.optimalFilter(BLOOM_CAPACITY, BLOOM_ERROR);
@ -152,6 +174,8 @@ module.exports = function (Env, cb) {
if (err) {
return Log.error('EVICT_ARCHIVED_FINAL_ERROR', err);
}
report.archivedChannelsRemoved = removed;
report.archivedAccountsRemoved = accounts;
Log.info('EVICT_ARCHIVED_CHANNELS_REMOVED', removed);
Log.info('EVICT_ARCHIVED_ACCOUNTS_REMOVED', accounts);
};
@ -165,6 +189,7 @@ module.exports = function (Env, cb) {
// if they are older than the specified retention time
var removed = 0;
blobs.list.archived.proofs(function (err, item, next) {
next = Util.mkAsync(next, THROTTLE_FACTOR);
if (err) {
Log.error("EVICT_BLOB_LIST_ARCHIVED_PROOF_ERROR", err);
return void next();
@ -180,6 +205,7 @@ module.exports = function (Env, cb) {
next();
}));
}, w(function () {
report.archivedBlobProofsRemoved = removed;
Log.info('EVICT_ARCHIVED_BLOB_PROOFS_REMOVED', removed);
}));
};
@ -190,6 +216,7 @@ module.exports = function (Env, cb) {
// if they are older than the specified retention time
var removed = 0;
blobs.list.archived.blobs(function (err, item, next) {
next = Util.mkAsync(next, THROTTLE_FACTOR);
if (err) {
Log.error("EVICT_BLOB_LIST_ARCHIVED_BLOBS_ERROR", err);
return void next();
@ -205,6 +232,7 @@ module.exports = function (Env, cb) {
next();
});
}, w(function () {
report.archivedBlobsRemoved = removed;
Log.info('EVICT_ARCHIVED_BLOBS_REMOVED', removed);
}));
};
@ -232,6 +260,8 @@ module.exports = function (Env, cb) {
};
var done = function () {
report.activeChannels = active;
report.totalChannels = channels;
Log.info('EVICT_CHANNELS_CATEGORIZED', {
active: active,
channels: channels,
@ -246,6 +276,7 @@ module.exports = function (Env, cb) {
var active = 0;
blobs.list.blobs(function (err, item, next) {
next = Util.mkAsync(next, THROTTLE_FACTOR);
n_blobs++;
if (err) {
Log.error("EVICT_BLOB_CATEGORIZATION", err);
@ -262,6 +293,8 @@ module.exports = function (Env, cb) {
}
next();
}, w(function () {
report.totalBlobs = n_blobs;
report.activeBlobs = active;
Log.info('EVICT_BLOBS_CATEGORIZED', {
active: active,
blobs: n_blobs,
@ -311,6 +344,7 @@ module.exports = function (Env, cb) {
// otherwise, we'll only retain data from active accounts
// so we need more heuristics
var handler = function (content, id, next) {
next = Util.mkAsync(next, THROTTLE_FACTOR);
accounts++;
var mtime = content.latest;
@ -327,6 +361,10 @@ module.exports = function (Env, cb) {
// we plan to delete them, because it may be interesting information
inactive++;
if (PRESERVE_INACTIVE_ACCOUNTS) {
Log.info('EVICT_INACTIVE_ACCOUNT_PRESERVED', {
id: id,
mtime: mtime,
});
pinAll(pinList);
return void next();
}
@ -356,9 +394,8 @@ module.exports = function (Env, cb) {
"EVICT_COUNT_ACCOUNTS":
"EVICT_INACTIVE_ACCOUNTS";
// update the number of known active accounts in Env for statistics
Env.knownActiveAccounts = accounts - inactive;
report.totalAccounts = accounts;
report.activeAccounts = accounts - inactive;
Log.info(label, {
accounts: accounts,
inactive: inactive,
@ -375,7 +412,9 @@ module.exports = function (Env, cb) {
// iterate over blobs and remove them
// if they have not been accessed within the specified retention time
var removed = 0;
var total = 0;
blobs.list.blobs(function (err, item, next) {
next = Util.mkAsync(next, THROTTLE_FACTOR);
if (err) {
Log.error("EVICT_BLOB_LIST_BLOBS_ERROR", err);
return void next();
@ -384,6 +423,7 @@ module.exports = function (Env, cb) {
next();
return void Log.error('EVICT_BLOB_LIST_BLOBS_NO_ITEM', item);
}
total++;
if (pinnedDocs.test(item.blobId)) { return void next(); }
if (activeDocs.test(item.blobId)) { return void next(); }
@ -392,6 +432,7 @@ module.exports = function (Env, cb) {
// unless we address this race condition with this last-minute double-check
if (getNewestTime(item) > inactiveTime) { return void next(); }
removed++;
blobs.archive.blob(item.blobId, function (err) {
if (err) {
Log.error("EVICT_ARCHIVE_BLOB_ERROR", {
@ -403,10 +444,11 @@ module.exports = function (Env, cb) {
Log.info("EVICT_ARCHIVE_BLOB", {
item: item,
});
removed++;
next();
});
}, w(function () {
report.totalBlobs = total;
report.activeBlobs = total - removed;
Log.info('EVICT_BLOBS_REMOVED', removed);
}));
};
@ -416,6 +458,7 @@ module.exports = function (Env, cb) {
// if they don't correspond to a pinned or active file
var removed = 0;
blobs.list.proofs(function (err, item, next) {
next = Util.mkAsync(next, THROTTLE_FACTOR);
if (err) {
next();
return void Log.error("EVICT_BLOB_LIST_PROOFS_ERROR", err);
@ -458,6 +501,7 @@ module.exports = function (Env, cb) {
var archived = 0;
var handler = function (err, item, cb) {
cb = Util.mkAsync(cb, THROTTLE_FACTOR);
channels++;
if (err) {
Log.error('EVICT_CHANNEL_ITERATION', err);
@ -514,6 +558,7 @@ module.exports = function (Env, cb) {
};
var done = function () {
report.channelsArchived = archived;
return void Log.info('EVICT_CHANNELS_ARCHIVED', archived);
};
@ -539,8 +584,9 @@ module.exports = function (Env, cb) {
.nThen(archiveInactiveBlobProofs)
.nThen(archiveInactiveChannels)
.nThen(function () {
Log.info("EVICT_TIME_TO_RUN_SCRIPT", msSinceStart());
var runningTime = report.runningTime = msSinceStart();
Log.info("EVICT_TIME_TO_RUN_SCRIPT", runningTime);
}).nThen(function () {
complete();
complete(void 0, report);
});
};

View file

@ -191,13 +191,17 @@ module.exports.create = function (Env, cb) {
// evict inactive data once per day
if ((now - ONE_DAY) < Env.lastEviction) { return; }
active = true;
Env.evictInactive(function (err) {
Env.evictInactive(function (err, report) {
if (err) {
// NO_INACTIVE_TIME
Log.error('EVICT_INACTIVE_MAIN_ERROR', err);
}
active = false;
Env.lastEviction = now;
if (report) {
Log.info('EVICT_INACTIVE_REPORT', report);
}
Env.evictionReport = report || {};
});
}, 60 * 1000);
}).nThen(function () {

View file

@ -81,7 +81,8 @@ Workers.initialize = function (Env, config, _cb) {
};
var drained = true;
var sendCommand = function (msg, _cb) {
var sendCommand = function (msg, _cb, opt) {
opt = opt || {};
var index = getAvailableWorkerIndex();
var state = workers[index];
@ -119,7 +120,9 @@ Workers.initialize = function (Env, config, _cb) {
delete state.tasks[txid];
})));
response.expect(txid, cb, 180000);
// default to timing out affter 180s if no explicit timeout is passed
var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: 180000;
response.expect(txid, cb, timeout);
state.worker.send(msg);
};
@ -354,13 +357,17 @@ Workers.initialize = function (Env, config, _cb) {
Env.evictInactive = function (cb) {
sendCommand({
command: 'EVICT_INACTIVE',
}, cb);
}, cb, {
timeout: 1000 * 60 * 300, // time out after 300 minutes (5 hours)
});
};
Env.runTasks = function (cb) {
sendCommand({
command: 'RUN_TASKS',
}, cb);
}, cb, {
timeout: 1000 * 60 * 10, // time out after 10 minutes
});
};
Env.writeTask = function (time, command, args, cb) {