Merge branch 'soon' into staging
This commit is contained in:
commit
ae5a592298
3 changed files with 165 additions and 5 deletions
160
lib/eviction.js
160
lib/eviction.js
|
@ -3,6 +3,10 @@ var Bloom = require("@mcrowe/minibloom");
|
|||
var Util = require("../lib/common-util");
|
||||
var Pins = require("../lib/pins");
|
||||
var Keys = require("./keys");
|
||||
var Path = require('node:path');
|
||||
var config = require("./load-config");
|
||||
var Fs = require("node:fs");
|
||||
var Fse = require("fs-extra");
|
||||
|
||||
var getNewestTime = function (stats) {
|
||||
return stats[['atime', 'ctime', 'mtime'].reduce(function (a, b) {
|
||||
|
@ -32,6 +36,7 @@ Env = {
|
|||
|
||||
// the number of ms artificially introduced between CPU-intensive operations
|
||||
var THROTTLE_FACTOR = 10;
|
||||
var PROGRESS_FACTOR = 1000;
|
||||
|
||||
var evictArchived = function (Env, cb) {
|
||||
var Log;
|
||||
|
@ -70,6 +75,103 @@ var evictArchived = function (Env, cb) {
|
|||
blobs = Env.blobStore;
|
||||
};
|
||||
|
||||
var migrateBlobRoot = function (from, to) {
|
||||
// only migrate subpaths, leave everything else alone
|
||||
if (!Path.dirname(from).startsWith(Path.dirname(to))) { return; }
|
||||
|
||||
// expects a directory
|
||||
var recurse = function (relativePath) {
|
||||
var src = Path.join(from, relativePath);
|
||||
var children;
|
||||
try {
|
||||
children = Fs.readdirSync(src);
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOENT') { return; }
|
||||
// if you can't read a directory's contents
|
||||
// then nothing else will work, so just abort
|
||||
Log.verbose("EVICT_ARCHIVED_NOT_DIRECTORY", {
|
||||
error: err,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
var dest;
|
||||
if (children.length === 0) {
|
||||
try {
|
||||
Fse.removeSync(src);
|
||||
} catch (err2) {
|
||||
Log.error('EVICT_ARCHIVED_EMPTY_DIR_REMOVAL', {
|
||||
error: err2,
|
||||
});
|
||||
// removal is non-essential, so we can continue
|
||||
}
|
||||
} else {
|
||||
// make an equivalent path in the target directory
|
||||
dest = Path.join(to, relativePath);
|
||||
|
||||
try {
|
||||
Fse.mkdirpSync(dest);
|
||||
} catch (err3) {
|
||||
Log.error("EVICT_ARCHIVED_BLOB_MIGRATION", {
|
||||
error: err3,
|
||||
});
|
||||
|
||||
// failure to create the host directory
|
||||
// will cause problems when we try to move
|
||||
// so bail out here
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
children.forEach(function (child) {
|
||||
var childSrcPath = Path.join(src, child);
|
||||
var stat = Fs.statSync(childSrcPath);
|
||||
if (stat.isDirectory()) {
|
||||
return void recurse(Path.join(relativePath, child));
|
||||
}
|
||||
|
||||
var childDestPath = Path.join(dest, child);
|
||||
|
||||
try {
|
||||
Log.verbose("EVICT_ARCHIVED_MOVE_FROM_DEPRECATED_PATH", {
|
||||
from: childSrcPath,
|
||||
to: childDestPath,
|
||||
});
|
||||
Fse.moveSync(childSrcPath, childDestPath, {
|
||||
overwrite: false,
|
||||
});
|
||||
} catch (err4) {
|
||||
Log.error('EVICT_ARCHIVED_MOVE_FAILURE', {
|
||||
error: err4,
|
||||
});
|
||||
}
|
||||
});
|
||||
};
|
||||
recurse('');
|
||||
};
|
||||
|
||||
/* In CryptPad 5.2.0 we merged a patch which converted
|
||||
all of CryptPad's root filepaths to their absolute form,
|
||||
rather than the relative paths we'd been using until then.
|
||||
Unfortunately, we overlooked a case where two absolute
|
||||
paths were concatenated together, resulting in blobs being
|
||||
archived to an incorrect path.
|
||||
|
||||
This migration detects evidence of incorrect archivals
|
||||
and moves such archived files to their intended location
|
||||
before continuing with the normal eviction procedure.
|
||||
*/
|
||||
var migrateIncorrectBlobs = function () {
|
||||
var incorrectPaths = [
|
||||
Path.join(Env.paths.archive, config.blobPath),
|
||||
Path.join(Env.paths.archive, Path.resolve(config.blobPath))
|
||||
];
|
||||
var correctPath = Path.join(Env.paths.archive, 'blob');
|
||||
incorrectPaths.forEach(root => {
|
||||
migrateBlobRoot(root, correctPath);
|
||||
});
|
||||
};
|
||||
|
||||
var removeArchivedChannels = function (w) {
|
||||
// this block will iterate over archived channels and removes them
|
||||
// if they've been in cold storage for longer than your configured archive time
|
||||
|
@ -186,6 +288,7 @@ var evictArchived = function (Env, cb) {
|
|||
};
|
||||
|
||||
nThen(loadStorage)
|
||||
.nThen(migrateIncorrectBlobs)
|
||||
.nThen(removeArchivedChannels)
|
||||
.nThen(removeArchivedBlobProofs)
|
||||
.nThen(removeArchivedBlobs)
|
||||
|
@ -289,6 +392,12 @@ module.exports = function (Env, cb) {
|
|||
var active = 0;
|
||||
var handler = function (err, item, cb) {
|
||||
channels++;
|
||||
if (channels % PROGRESS_FACTOR === 0) {
|
||||
Log.info('EVICT_CHANNEL_CATEGORIZATION_PROGRESS', {
|
||||
channels: channels,
|
||||
});
|
||||
}
|
||||
|
||||
if (err) {
|
||||
Log.error('EVICT_CHANNEL_CATEGORIZATION', err);
|
||||
return void cb();
|
||||
|
@ -315,6 +424,7 @@ module.exports = function (Env, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
Log.info('EVICT_CHANNEL_ACTIVITY_START', 'Assessing channel activity');
|
||||
store.listChannels(handler, w(done));
|
||||
};
|
||||
|
||||
|
@ -322,9 +432,16 @@ module.exports = function (Env, cb) {
|
|||
var n_blobs = 0;
|
||||
var active = 0;
|
||||
|
||||
Log.info('EVICT_BLOBS_ACTIVITY_START', 'Assessing blob activity');
|
||||
blobs.list.blobs(function (err, item, next) {
|
||||
next = Util.mkAsync(next, THROTTLE_FACTOR);
|
||||
n_blobs++;
|
||||
if (n_blobs % PROGRESS_FACTOR === 0) {
|
||||
Log.info('EVICT_BLOB_CATEGORIZATION_PROGRESS', {
|
||||
blobs: n_blobs,
|
||||
});
|
||||
}
|
||||
|
||||
if (err) {
|
||||
Log.error("EVICT_BLOB_CATEGORIZATION", err);
|
||||
return void next();
|
||||
|
@ -393,6 +510,11 @@ module.exports = function (Env, cb) {
|
|||
var handler = function (content, id, next) {
|
||||
next = Util.mkAsync(next, THROTTLE_FACTOR);
|
||||
accounts++;
|
||||
if (accounts % PROGRESS_FACTOR === 0) {
|
||||
Log.info('EVICT_ACCOUNT_CATEGORIZATION_PROGRESS', {
|
||||
accounts: accounts,
|
||||
});
|
||||
}
|
||||
|
||||
var mtime = content.latest;
|
||||
var pinList = Object.keys(content.pins);
|
||||
|
@ -449,6 +571,7 @@ module.exports = function (Env, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
Log.info('EVICT_ACCOUNTS_ACTIVITY_START', 'Assessing account activity');
|
||||
Pins.load(w(done), {
|
||||
pinPath: Env.paths.pin,
|
||||
handler: handler,
|
||||
|
@ -460,6 +583,8 @@ module.exports = function (Env, cb) {
|
|||
// if they have not been accessed within the specified retention time
|
||||
var removed = 0;
|
||||
var total = 0;
|
||||
|
||||
Log.info('EVICT_BLOB_START', {});
|
||||
blobs.list.blobs(function (err, item, next) {
|
||||
next = Util.mkAsync(next, THROTTLE_FACTOR);
|
||||
if (err) {
|
||||
|
@ -471,6 +596,12 @@ module.exports = function (Env, cb) {
|
|||
return void Log.error('EVICT_BLOB_LIST_BLOBS_NO_ITEM', item);
|
||||
}
|
||||
total++;
|
||||
if (total % PROGRESS_FACTOR === 0) {
|
||||
Log.info("EVICT_BLOB_PROGRESS", {
|
||||
blobs: total,
|
||||
});
|
||||
}
|
||||
|
||||
if (pinnedDocs.test(item.blobId)) { return void next(); }
|
||||
if (activeDocs.test(item.blobId)) { return void next(); }
|
||||
|
||||
|
@ -504,6 +635,9 @@ module.exports = function (Env, cb) {
|
|||
// iterate over blob proofs and remove them
|
||||
// if they don't correspond to a pinned or active file
|
||||
var removed = 0;
|
||||
var total = 0;
|
||||
|
||||
Log.info("EVICT_ARCHIVE_INACTIVE_BLOB_PROOFS_START", {});
|
||||
blobs.list.proofs(function (err, item, next) {
|
||||
next = Util.mkAsync(next, THROTTLE_FACTOR);
|
||||
if (err) {
|
||||
|
@ -514,6 +648,14 @@ module.exports = function (Env, cb) {
|
|||
next();
|
||||
return void Log.error('EVICT_BLOB_LIST_PROOFS_NO_ITEM', item);
|
||||
}
|
||||
total++;
|
||||
|
||||
if (total % PROGRESS_FACTOR === 0) {
|
||||
Log.info('EVICT_BLOB_PROOF_PROGRESS', {
|
||||
proofs: total,
|
||||
});
|
||||
}
|
||||
|
||||
if (pinnedDocs.test(item.blobId)) { return void next(); }
|
||||
if (getNewestTime(item) > inactiveTime) { return void next(); }
|
||||
nThen(function (w) {
|
||||
|
@ -539,7 +681,10 @@ module.exports = function (Env, cb) {
|
|||
});
|
||||
});
|
||||
}, w(function () {
|
||||
Log.info("EVICT_BLOB_PROOFS_REMOVED", removed);
|
||||
Log.info("EVICT_BLOB_PROOFS_REMOVED", {
|
||||
removed,
|
||||
total,
|
||||
});
|
||||
}));
|
||||
};
|
||||
|
||||
|
@ -550,6 +695,13 @@ module.exports = function (Env, cb) {
|
|||
var handler = function (err, item, cb) {
|
||||
cb = Util.mkAsync(cb, THROTTLE_FACTOR);
|
||||
channels++;
|
||||
if (channels % PROGRESS_FACTOR === 0) {
|
||||
Log.info('EVICT_INACTIVE_CHANNELS_PROGRESS', {
|
||||
channels,
|
||||
archived,
|
||||
});
|
||||
}
|
||||
|
||||
if (err) {
|
||||
Log.error('EVICT_CHANNEL_ITERATION', err);
|
||||
return void cb();
|
||||
|
@ -606,9 +758,13 @@ module.exports = function (Env, cb) {
|
|||
|
||||
var done = function () {
|
||||
report.channelsArchived = archived;
|
||||
return void Log.info('EVICT_CHANNELS_ARCHIVED', archived);
|
||||
return void Log.info('EVICT_CHANNELS_ARCHIVED', {
|
||||
channels,
|
||||
archived,
|
||||
});
|
||||
};
|
||||
|
||||
Log.info('EVICT_INACTIVE_CHANNELS_START', {});
|
||||
store.listChannels(handler, w(done), true); // using a hacky "fast mode" since we only need the channel id
|
||||
};
|
||||
|
||||
|
|
|
@ -19,7 +19,11 @@ var isValidId = function (id) {
|
|||
// helpers
|
||||
|
||||
var prependArchive = function (Env, path) {
|
||||
return Path.join(Env.archivePath, path);
|
||||
// Env has an absolute path to the blob storage
|
||||
// we want the path to the blob relative to that
|
||||
var relativePathToBlob = Path.relative(Env.blobPath, path);
|
||||
// the new path structure is the same, but relative to the blob archive root
|
||||
return Path.join(Env.archivePath, 'blob', relativePathToBlob);
|
||||
};
|
||||
|
||||
// /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId>
|
||||
|
@ -492,7 +496,7 @@ BlobStore.create = function (config, _cb) {
|
|||
if (e) { CB(e); }
|
||||
}));
|
||||
|
||||
Fse.mkdirp(Path.join(Env.archivePath, Env.blobPath), w(function (e) {
|
||||
Fse.mkdirp(Path.join(Env.archivePath, './blob'), w(function (e) {
|
||||
if (e) { CB(e); }
|
||||
}));
|
||||
}).nThen(function (w) {
|
||||
|
|
|
@ -2993,7 +2993,7 @@ Uncaught TypeError: Cannot read property 'calculatedType' of null
|
|||
var m = metadataMgr.getChannelMembers().filter(function (str) {
|
||||
return str.length === 32;
|
||||
}).length;
|
||||
if ((m - v) === 1 && !readOnly) {
|
||||
if ((m - v) === 1 && !readOnly && common.isLoggedIn()) {
|
||||
var needCp = ooChannel.queue.length > CHECKPOINT_INTERVAL;
|
||||
APP.initCheckpoint = needCp;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue