New admin command to archive an account

This commit is contained in:
yflory 2023-09-05 16:31:04 +02:00
parent 4ca32a793b
commit 708e36b3ee
2 changed files with 356 additions and 0 deletions

294
lib/archive-account.js Normal file
View file

@ -0,0 +1,294 @@
/* jshint esversion: 6, node: true */
const nThen = require('nthen');
const Pins = require('./pins');
const Environment = require("./env");
const Util = require("./common-util");
const Store = require('./storage/file.js');
const BlobStore = require("./storage/blob");
const BlockStore = require("./storage/block");
const Core = require("./commands/core");
const Metadata = require("./commands/metadata");
const Meta = require("./metadata");
const Logger = require("./log");
const Path = require("path");
const Fse = require("fs-extra");
const { parentPort } = require('node:worker_threads');
const config = require('./load-config');
const Env = Environment.create(config);
const COMMANDS = {};
let Log;
Env.computeMetadata = function (channel, cb) {
const ref = {};
const lineHandler = Meta.createLineHandler(ref, (err) => { console.log(err); });
return void Env.store.readChannelMetadata(channel, lineHandler, function (err) {
if (err) {
// stream errors?
return void cb(err);
}
cb(void 0, ref.meta);
});
};
const mkReportPath = function (Env, safeKey) {
return Path.join(Env.paths.archive, 'accounts', safeKey);
};
const storeReport = (Env, report, cb) => {
let path = mkReportPath(Env, report.key);
let s_data;
try {
s_data = JSON.stringify(report);
Fse.outputFile(path, s_data, cb);
} catch (err) {
return void cb(err);
}
};
const readReport = (Env, key, cb) => {
let path = mkReportPath(Env, key);
Fse.readJson(path, cb);
};
const deleteReport = (Env, key, cb) => {
let path = mkReportPath(Env, key);
Fse.remove(path, cb);
};
const init = (cb) => {
nThen((waitFor) => {
Logger.create(config, waitFor(function (_) {
Log = Env.Log = _;
}));
Store.create(config, waitFor(function (err, _store) {
if (err) {
waitFor.abort();
return void cb(err);
}
Env.store = _store;
}));
Store.create({
filePath: config.pinPath,
archivePath: config.archivePath,
// archive pin logs to their own subpath
volumeId: 'pins',
}, waitFor(function (err, _) {
if (err) {
waitFor.abort();
throw err;
}
Env.pinStore = _;
}));
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function () {},
}, waitFor(function (err, blob) {
if (err) {
waitFor.abort();
return void cb(err);
}
Env.blobStore = blob;
}));
}).nThen(cb);
};
COMMANDS.start = (edPublic, blockId) => {
const safeKey = Util.escapeKeyCharacters(edPublic);
let ref = {};
let blobsToArchive = [];
let channelsToArchive = [];
let deletedChannels = [];
let deletedBlobs = [];
nThen((waitFor) => {
init(waitFor());
}).nThen((waitFor) => {
let lineHandler = Pins.createLineHandler(ref, (err) => { console.log(err); });
Env.pinStore.getMessages(safeKey, lineHandler, waitFor(() => {}));
}).nThen((waitFor) => {
Log.info('MODERATION_ACCOUNT_ARCHIVAL_START', edPublic, waitFor());
var n = nThen;
Object.keys(ref.pins || {}).forEach((chanId) => {
n = n((w) => {
// Blobs
if (Env.blobStore.isFileId(chanId)) {
return void Env.blobStore.isOwnedBy(safeKey, chanId, w((err, owned) => {
if (err || !owned) { return; }
blobsToArchive.push(chanId);
}));
}
// Pads
Metadata.getMetadata(Env, chanId, w((err, metadata) => {
if (err) { return; } // Can't read metadata? Don't archive
if (!Core.hasOwners(metadata)) { return; } // No owner, don't archive
if (Core.isOwner(metadata, edPublic) && metadata.owners.length === 1) {
channelsToArchive.push(chanId); // Only owner: archive
}
}));
}).nThen;
});
n(waitFor());
}).nThen((waitFor) => {
Log.info('MODERATION_ACCOUNT_ARCHIVAL_LISTED', JSON.stringify({
pads: channelsToArchive.length,
blobs: blobsToArchive.length
}), waitFor());
var n = nThen;
// Archive the pads
channelsToArchive.forEach((chanId) => {
n = n((w) => {
Env.store.archiveChannel(chanId, w(function (err) {
if (err) {
return Log.error('MODERATION_CHANNEL_ARCHIVAL_ERROR', {
error: err,
channel: chanId,
}, w());
}
deletedChannels.push(chanId);
Log.info('MODERATION_CHANNEL_ARCHIVAL', chanId, w());
}));
}).nThen;
});
// Archive the blobs
blobsToArchive.forEach((blobId) => {
n = n((w) => {
Env.blobStore.archive.blob(blobId, w(function (err) {
if (err) {
return Log.error('MODERATION_BLOB_ARCHIVAL_ERROR', {
error: err,
item: blobId,
}, w());
}
deletedBlobs.push(blobId);
Log.info('MODERATION_BLOB_ARCHIVAL', blobId, w());
}));
}).nThen;
});
n(waitFor(() => {
// Archive the pin log
Env.pinStore.archiveChannel(safeKey, waitFor(function (err) {
if (err) {
return Log.error('MODERATION_ACCOUNT_PIN_LOG', err, waitFor());
}
Log.info('MODERATION_ACCOUNT_LOG', safeKey, waitFor());
}));
if (!blockId) { return; }
BlockStore.archive(Env, blockId, waitFor(function (err) {
if (err) {
blockId = undefined;
return Log.error('MODERATION_ACCOUNT_BLOCK', err, waitFor());
}
Log.info('MODERATION_ACCOUNT_BLOCK', safeKey, waitFor());
}));
}));
}).nThen((waitFor) => {
var report = {
key: safeKey,
channels: deletedChannels,
blobs: deletedBlobs,
blockId: blockId
};
storeReport(Env, report, waitFor((err) => {
if (err) {
return Log.error('MODERATION_ACCOUNT_REPORT', report, waitFor());
}
}));
}).nThen(() => {
parentPort.postMessage(JSON.stringify(deletedChannels));
process.exit(0);
});
};
COMMANDS.restore = (edPublic) => {
const safeKey = Util.escapeKeyCharacters(edPublic);
let pads, blobs;
let blockId;
let errors = [];
nThen((waitFor) => {
init(waitFor());
}).nThen((waitFor) => {
Log.info('MODERATION_ACCOUNT_RESTORE_START', edPublic, waitFor());
readReport(Env, safeKey, waitFor((err, report) => {
if (err) { throw new Error(err); }
pads = report.channels;
blobs = report.blobs;
blockId = report.blockId;
}));
}).nThen((waitFor) => {
Log.info('MODERATION_ACCOUNT_RESTORE_LISTED', JSON.stringify({
pads: pads.length,
blobs: blobs.length
}), waitFor());
var n = nThen;
pads.forEach((chanId) => {
n = n((w) => {
Env.store.restoreArchivedChannel(chanId, w(function (err) {
if (err) {
errors.push(chanId);
return Log.error('MODERATION_CHANNEL_RESTORE_ERROR', {
error: err,
channel: chanId,
}, w());
}
Log.info('MODERATION_CHANNEL_RESTORE', chanId, w());
}));
}).nThen;
});
blobs.forEach((blobId) => {
n = n((w) => {
Env.blobStore.restore.blob(blobId, w(function (err) {
if (err) {
errors.push(blobId);
return Log.error('MODERATION_BLOB_RESTORE_ERROR', {
error: err,
item: blobId,
}, w());
}
Log.info('MODERATION_BLOB_RESTORE', blobId, w());
}));
}).nThen;
});
n(waitFor(() => {
// remove the pin logs of inactive accounts if inactive account removal is configured
Env.pinStore.restoreArchivedChannel(safeKey, waitFor(function (err) {
if (err) {
return Log.error('MODERATION_ACCOUNT_PIN_LOG_RESTORE', err, waitFor());
}
Log.info('MODERATION_ACCOUNT_LOG_RESTORE', safeKey, waitFor());
}));
if (!blockId) { return; }
BlockStore.restore(Env, blockId, waitFor(function (err) {
if (err) {
blockId = undefined;
return Log.error('MODERATION_ACCOUNT_BLOCK_RESTORE', err, waitFor());
}
Log.info('MODERATION_ACCOUNT_BLOCK_RESTORE', safeKey, waitFor());
}));
}));
}).nThen((waitFor) => {
deleteReport(Env, safeKey, waitFor((err) => {
if (err) {
return Log.error('MODERATION_ACCOUNT_REPORT_DELETE', safeKey, waitFor());
}
}));
}).nThen(() => {
parentPort.postMessage(JSON.stringify(errors));
process.exit(0);
});
};
parentPort.on('message', (message) => {
let parsed = message; //JSON.parse(message);
let command = parsed.command;
let content = parsed.content;
let block = parsed.block;
COMMANDS[command](content, block);
});
parentPort.postMessage('READY');

View file

@ -10,6 +10,9 @@ const Core = require("./core");
const Channel = require("./channel"); const Channel = require("./channel");
const BlockStore = require("../storage/block"); const BlockStore = require("../storage/block");
const MFA = require("../storage/mfa"); const MFA = require("../storage/mfa");
/* jshint ignore:start */
const { Worker } = require('node:worker_threads');
/* jshint ignore:end */
var Fs = require("fs"); var Fs = require("fs");
@ -279,6 +282,62 @@ var restoreArchivedDocument = function (Env, Server, cb, data) {
} }
}; };
// CryptPad_AsyncStore.rpc.send('ADMIN', ['ARCHIVE_ACCOUNT', edPublic], console.log)
var archiveAccount = function (Env, Server, _cb, data) {
const cb = Util.once(_cb);
const worker = new Worker('./lib/archive-account.js');
worker.on('message', message => {
if (message === 'READY') {
return worker.postMessage({
command: 'start',
content: data && data[1],
block: data && data[2]
});
}
// DONE: disconnect all users from these channels
var deletedChannels = Util.tryParse(message);
if (Array.isArray(deletedChannels)) {
let n = nThen;
deletedChannels.forEach((chanId) => {
n = n((w) => {
setTimeout(w(() => {
Channel.disconnectChannelMembers(Env, Server, chanId, 'EDELETED', () => {});
}), 10);
}).nThen;
});
}
cb(void 0, { state: true });
});
worker.on('error', (err) => {
console.error(err);
cb(err);
});
worker.on('exit', () => { worker.unref(); });
};
var restoreAccount = function (Env, Server, _cb, data) {
const cb = Util.once(_cb);
const worker = new Worker('./lib/archive-account.js');
worker.on('message', message => {
if (message === 'READY') {
return worker.postMessage({
command: 'restore',
content: data && data[1]
});
}
// Response
cb(void 0, {
state: true,
errors: Util.tryParse(message)
});
});
worker.on('error', (err) => {
console.error(err);
cb(err);
});
worker.on('exit', () => { worker.unref(); });
};
// CryptPad_AsyncStore.rpc.send('ADMIN', ['CLEAR_CACHED_CHANNEL_INDEX', documentID], console.log) // CryptPad_AsyncStore.rpc.send('ADMIN', ['CLEAR_CACHED_CHANNEL_INDEX', documentID], console.log)
var clearChannelIndex = function (Env, Server, cb, data) { var clearChannelIndex = function (Env, Server, cb, data) {
var id = Array.isArray(data) && data[1]; var id = Array.isArray(data) && data[1];
@ -780,6 +839,9 @@ var commands = {
ARCHIVE_DOCUMENT: archiveDocument, ARCHIVE_DOCUMENT: archiveDocument,
RESTORE_ARCHIVED_DOCUMENT: restoreArchivedDocument, RESTORE_ARCHIVED_DOCUMENT: restoreArchivedDocument,
ARCHIVE_ACCOUNT: archiveAccount,
RESTORE_ACCOUNT: restoreAccount,
CLEAR_CACHED_CHANNEL_INDEX: clearChannelIndex, CLEAR_CACHED_CHANNEL_INDEX: clearChannelIndex,
GET_CACHED_CHANNEL_INDEX: getChannelIndex, GET_CACHED_CHANNEL_INDEX: getChannelIndex,
// TODO implement admin historyTrim // TODO implement admin historyTrim