From 708e36b3ee3f96af4c1440cf34d603460e9c81f1 Mon Sep 17 00:00:00 2001 From: yflory Date: Tue, 5 Sep 2023 16:31:04 +0200 Subject: [PATCH] New admin command to archive an account --- lib/archive-account.js | 294 ++++++++++++++++++++++++++++++++++++++ lib/commands/admin-rpc.js | 62 ++++++++ 2 files changed, 356 insertions(+) create mode 100644 lib/archive-account.js diff --git a/lib/archive-account.js b/lib/archive-account.js new file mode 100644 index 000000000..da59c96f5 --- /dev/null +++ b/lib/archive-account.js @@ -0,0 +1,294 @@ +/* jshint esversion: 6, node: true */ +const nThen = require('nthen'); +const Pins = require('./pins'); +const Environment = require("./env"); +const Util = require("./common-util"); +const Store = require('./storage/file.js'); +const BlobStore = require("./storage/blob"); +const BlockStore = require("./storage/block"); +const Core = require("./commands/core"); +const Metadata = require("./commands/metadata"); +const Meta = require("./metadata"); +const Logger = require("./log"); + +const Path = require("path"); +const Fse = require("fs-extra"); + +const { parentPort } = require('node:worker_threads'); + +const config = require('./load-config'); +const Env = Environment.create(config); +const COMMANDS = {}; +let Log; + +Env.computeMetadata = function (channel, cb) { + const ref = {}; + const lineHandler = Meta.createLineHandler(ref, (err) => { console.log(err); }); + return void Env.store.readChannelMetadata(channel, lineHandler, function (err) { + if (err) { + // stream errors? + return void cb(err); + } + cb(void 0, ref.meta); + }); +}; + +const mkReportPath = function (Env, safeKey) { + return Path.join(Env.paths.archive, 'accounts', safeKey); +}; +const storeReport = (Env, report, cb) => { + let path = mkReportPath(Env, report.key); + let s_data; + try { + s_data = JSON.stringify(report); + Fse.outputFile(path, s_data, cb); + } catch (err) { + return void cb(err); + } +}; +const readReport = (Env, key, cb) => { + let path = mkReportPath(Env, key); + Fse.readJson(path, cb); +}; +const deleteReport = (Env, key, cb) => { + let path = mkReportPath(Env, key); + Fse.remove(path, cb); +}; + +const init = (cb) => { + nThen((waitFor) => { + Logger.create(config, waitFor(function (_) { + Log = Env.Log = _; + })); + Store.create(config, waitFor(function (err, _store) { + if (err) { + waitFor.abort(); + return void cb(err); + } + Env.store = _store; + })); + Store.create({ + filePath: config.pinPath, + archivePath: config.archivePath, + // archive pin logs to their own subpath + volumeId: 'pins', + }, waitFor(function (err, _) { + if (err) { + waitFor.abort(); + throw err; + } + Env.pinStore = _; + })); + BlobStore.create({ + blobPath: config.blobPath, + blobStagingPath: config.blobStagingPath, + archivePath: config.archivePath, + getSession: function () {}, + }, waitFor(function (err, blob) { + if (err) { + waitFor.abort(); + return void cb(err); + } + Env.blobStore = blob; + })); + }).nThen(cb); +}; + +COMMANDS.start = (edPublic, blockId) => { + const safeKey = Util.escapeKeyCharacters(edPublic); + let ref = {}; + let blobsToArchive = []; + let channelsToArchive = []; + let deletedChannels = []; + let deletedBlobs = []; + nThen((waitFor) => { + init(waitFor()); + }).nThen((waitFor) => { + let lineHandler = Pins.createLineHandler(ref, (err) => { console.log(err); }); + Env.pinStore.getMessages(safeKey, lineHandler, waitFor(() => {})); + }).nThen((waitFor) => { + Log.info('MODERATION_ACCOUNT_ARCHIVAL_START', edPublic, waitFor()); + var n = nThen; + Object.keys(ref.pins || {}).forEach((chanId) => { + n = n((w) => { + // Blobs + if (Env.blobStore.isFileId(chanId)) { + return void Env.blobStore.isOwnedBy(safeKey, chanId, w((err, owned) => { + if (err || !owned) { return; } + blobsToArchive.push(chanId); + })); + } + // Pads + Metadata.getMetadata(Env, chanId, w((err, metadata) => { + if (err) { return; } // Can't read metadata? Don't archive + if (!Core.hasOwners(metadata)) { return; } // No owner, don't archive + if (Core.isOwner(metadata, edPublic) && metadata.owners.length === 1) { + channelsToArchive.push(chanId); // Only owner: archive + } + })); + }).nThen; + }); + n(waitFor()); + }).nThen((waitFor) => { + Log.info('MODERATION_ACCOUNT_ARCHIVAL_LISTED', JSON.stringify({ + pads: channelsToArchive.length, + blobs: blobsToArchive.length + }), waitFor()); + + var n = nThen; + // Archive the pads + channelsToArchive.forEach((chanId) => { + n = n((w) => { + Env.store.archiveChannel(chanId, w(function (err) { + if (err) { + return Log.error('MODERATION_CHANNEL_ARCHIVAL_ERROR', { + error: err, + channel: chanId, + }, w()); + } + deletedChannels.push(chanId); + Log.info('MODERATION_CHANNEL_ARCHIVAL', chanId, w()); + })); + }).nThen; + }); + // Archive the blobs + blobsToArchive.forEach((blobId) => { + n = n((w) => { + Env.blobStore.archive.blob(blobId, w(function (err) { + if (err) { + return Log.error('MODERATION_BLOB_ARCHIVAL_ERROR', { + error: err, + item: blobId, + }, w()); + } + deletedBlobs.push(blobId); + Log.info('MODERATION_BLOB_ARCHIVAL', blobId, w()); + })); + }).nThen; + }); + n(waitFor(() => { + // Archive the pin log + Env.pinStore.archiveChannel(safeKey, waitFor(function (err) { + if (err) { + return Log.error('MODERATION_ACCOUNT_PIN_LOG', err, waitFor()); + } + Log.info('MODERATION_ACCOUNT_LOG', safeKey, waitFor()); + })); + if (!blockId) { return; } + BlockStore.archive(Env, blockId, waitFor(function (err) { + if (err) { + blockId = undefined; + return Log.error('MODERATION_ACCOUNT_BLOCK', err, waitFor()); + } + Log.info('MODERATION_ACCOUNT_BLOCK', safeKey, waitFor()); + })); + })); + }).nThen((waitFor) => { + var report = { + key: safeKey, + channels: deletedChannels, + blobs: deletedBlobs, + blockId: blockId + }; + storeReport(Env, report, waitFor((err) => { + if (err) { + return Log.error('MODERATION_ACCOUNT_REPORT', report, waitFor()); + } + })); + }).nThen(() => { + parentPort.postMessage(JSON.stringify(deletedChannels)); + process.exit(0); + }); +}; + + +COMMANDS.restore = (edPublic) => { + const safeKey = Util.escapeKeyCharacters(edPublic); + let pads, blobs; + let blockId; + let errors = []; + nThen((waitFor) => { + init(waitFor()); + }).nThen((waitFor) => { + Log.info('MODERATION_ACCOUNT_RESTORE_START', edPublic, waitFor()); + readReport(Env, safeKey, waitFor((err, report) => { + if (err) { throw new Error(err); } + pads = report.channels; + blobs = report.blobs; + blockId = report.blockId; + })); + }).nThen((waitFor) => { + Log.info('MODERATION_ACCOUNT_RESTORE_LISTED', JSON.stringify({ + pads: pads.length, + blobs: blobs.length + }), waitFor()); + var n = nThen; + pads.forEach((chanId) => { + n = n((w) => { + Env.store.restoreArchivedChannel(chanId, w(function (err) { + if (err) { + errors.push(chanId); + return Log.error('MODERATION_CHANNEL_RESTORE_ERROR', { + error: err, + channel: chanId, + }, w()); + } + Log.info('MODERATION_CHANNEL_RESTORE', chanId, w()); + })); + }).nThen; + }); + blobs.forEach((blobId) => { + n = n((w) => { + Env.blobStore.restore.blob(blobId, w(function (err) { + if (err) { + errors.push(blobId); + return Log.error('MODERATION_BLOB_RESTORE_ERROR', { + error: err, + item: blobId, + }, w()); + } + Log.info('MODERATION_BLOB_RESTORE', blobId, w()); + })); + }).nThen; + }); + n(waitFor(() => { + // remove the pin logs of inactive accounts if inactive account removal is configured + Env.pinStore.restoreArchivedChannel(safeKey, waitFor(function (err) { + if (err) { + return Log.error('MODERATION_ACCOUNT_PIN_LOG_RESTORE', err, waitFor()); + } + Log.info('MODERATION_ACCOUNT_LOG_RESTORE', safeKey, waitFor()); + })); + if (!blockId) { return; } + BlockStore.restore(Env, blockId, waitFor(function (err) { + if (err) { + blockId = undefined; + return Log.error('MODERATION_ACCOUNT_BLOCK_RESTORE', err, waitFor()); + } + Log.info('MODERATION_ACCOUNT_BLOCK_RESTORE', safeKey, waitFor()); + })); + })); + }).nThen((waitFor) => { + deleteReport(Env, safeKey, waitFor((err) => { + if (err) { + return Log.error('MODERATION_ACCOUNT_REPORT_DELETE', safeKey, waitFor()); + } + })); + }).nThen(() => { + parentPort.postMessage(JSON.stringify(errors)); + process.exit(0); + }); + +}; + +parentPort.on('message', (message) => { + let parsed = message; //JSON.parse(message); + let command = parsed.command; + let content = parsed.content; + let block = parsed.block; + COMMANDS[command](content, block); +}); + +parentPort.postMessage('READY'); + + diff --git a/lib/commands/admin-rpc.js b/lib/commands/admin-rpc.js index 071ce31f0..0daf2c38d 100644 --- a/lib/commands/admin-rpc.js +++ b/lib/commands/admin-rpc.js @@ -10,6 +10,9 @@ const Core = require("./core"); const Channel = require("./channel"); const BlockStore = require("../storage/block"); const MFA = require("../storage/mfa"); +/* jshint ignore:start */ +const { Worker } = require('node:worker_threads'); +/* jshint ignore:end */ var Fs = require("fs"); @@ -279,6 +282,62 @@ var restoreArchivedDocument = function (Env, Server, cb, data) { } }; +// CryptPad_AsyncStore.rpc.send('ADMIN', ['ARCHIVE_ACCOUNT', edPublic], console.log) +var archiveAccount = function (Env, Server, _cb, data) { + const cb = Util.once(_cb); + const worker = new Worker('./lib/archive-account.js'); + worker.on('message', message => { + if (message === 'READY') { + return worker.postMessage({ + command: 'start', + content: data && data[1], + block: data && data[2] + }); + } + // DONE: disconnect all users from these channels + var deletedChannels = Util.tryParse(message); + if (Array.isArray(deletedChannels)) { + let n = nThen; + deletedChannels.forEach((chanId) => { + n = n((w) => { + setTimeout(w(() => { + Channel.disconnectChannelMembers(Env, Server, chanId, 'EDELETED', () => {}); + }), 10); + }).nThen; + }); + } + cb(void 0, { state: true }); + }); + worker.on('error', (err) => { + console.error(err); + cb(err); + }); + worker.on('exit', () => { worker.unref(); }); +}; +var restoreAccount = function (Env, Server, _cb, data) { + const cb = Util.once(_cb); + const worker = new Worker('./lib/archive-account.js'); + worker.on('message', message => { + if (message === 'READY') { + return worker.postMessage({ + command: 'restore', + content: data && data[1] + }); + } + // Response + cb(void 0, { + state: true, + errors: Util.tryParse(message) + }); + }); + worker.on('error', (err) => { + console.error(err); + cb(err); + }); + worker.on('exit', () => { worker.unref(); }); + +}; + // CryptPad_AsyncStore.rpc.send('ADMIN', ['CLEAR_CACHED_CHANNEL_INDEX', documentID], console.log) var clearChannelIndex = function (Env, Server, cb, data) { var id = Array.isArray(data) && data[1]; @@ -780,6 +839,9 @@ var commands = { ARCHIVE_DOCUMENT: archiveDocument, RESTORE_ARCHIVED_DOCUMENT: restoreArchivedDocument, + ARCHIVE_ACCOUNT: archiveAccount, + RESTORE_ACCOUNT: restoreAccount, + CLEAR_CACHED_CHANNEL_INDEX: clearChannelIndex, GET_CACHED_CHANNEL_INDEX: getChannelIndex, // TODO implement admin historyTrim