untested implementation of trimHistory

This commit is contained in:
ansuz 2020-01-23 17:58:24 -05:00
parent c388641479
commit 9cdf54aff2
3 changed files with 211 additions and 41 deletions

View file

@ -10,29 +10,13 @@ const Meta = require("./metadata");
const WriteQueue = require("./write-queue");
const BatchRead = require("./batch-read");
const Extras = require("./hk-util.js");
let Log;
const now = function () { return (new Date()).getTime(); };
const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds
/* getHash
* this function slices off the leading portion of a message which is
most likely unique
* these "hashes" are used to identify particular messages in a channel's history
* clients store "hashes" either in memory or in their drive to query for new messages:
* when reconnecting to a pad
* when connecting to chat or a mailbox
* thus, we can't change this function without invalidating client data which:
* is encrypted clientside
* can't be easily migrated
* don't break it!
*/
const getHash = function (msg) {
if (typeof(msg) !== 'string') {
Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg);
return '';
}
return msg.slice(0,64);
};
const getHash = Extras.getHash;
const tryParse = function (str) {
try {
@ -185,7 +169,7 @@ module.exports.create = function (cfg) {
if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
// msgObj.offset is API guaranteed by our storage module
// it should always be a valid positive integer
offsetByHash[getHash(msg[4])] = msgObj.offset;
offsetByHash[getHash(msg[4], Log)] = msgObj.offset;
}
// There is a trailing \n at the end of the file
size = msgObj.offset + msgObj.buff.length + 1;
@ -502,7 +486,7 @@ module.exports.create = function (cfg) {
msgStruct.push(now());
// storeMessage
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log));
});
};
@ -601,7 +585,7 @@ module.exports.create = function (cfg) {
const msg = tryParse(msgObj.buff.toString('utf8'));
// if it was undefined then go onto the next message
if (typeof msg === "undefined") { return readMore(); }
if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) {
if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4], Log)) {
return void readMore();
}
offset = msgObj.offset;
@ -672,7 +656,7 @@ module.exports.create = function (cfg) {
var content = parsed[4];
if (typeof(content) !== 'string') { return; }
var hash = getHash(content);
var hash = getHash(content, Log);
if (hash === oldestKnownHash) {
found = true;
}

24
lib/hk-util.js Normal file
View file

@ -0,0 +1,24 @@
var HK = module.exports;
/* getHash
* this function slices off the leading portion of a message which is
most likely unique
* these "hashes" are used to identify particular messages in a channel's history
* clients store "hashes" either in memory or in their drive to query for new messages:
* when reconnecting to a pad
* when connecting to chat or a mailbox
* thus, we can't change this function without invalidating client data which:
* is encrypted clientside
* can't be easily migrated
* don't break it!
*/
HK.getHash = function (msg, Log) {
if (typeof(msg) !== 'string') {
if (Log) {
Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg);
}
return '';
}
return msg.slice(0,64);
};

View file

@ -7,6 +7,8 @@ var Path = require("path");
var nThen = require("nthen");
var Semaphore = require("saferphore");
var Util = require("../lib/common-util");
var Meta = require("../lib/metadata");
var Extras = require("../lib/hk-util");
const Schedule = require("../lib/schedule");
const Readline = require("readline");
@ -39,6 +41,10 @@ var mkArchiveMetadataPath = function (env, channelId) {
return Path.join(env.archiveRoot, 'datastore', channelId.slice(0, 2), channelId) + '.metadata.ndjson';
};
var mkTempPath = function (env, channelId) {
return mkPath(env, channelId) + '.temp';
};
// pass in the path so we can reuse the same function for archived files
var channelExists = function (filepath, cb) {
Fs.stat(filepath, function (err, stat) {
@ -868,6 +874,178 @@ var getMessages = function (env, chanName, handler, cb) {
});
};
var trimChannel = function (env, channelName, hash, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
// this function is queued as a blocking action for the relevant channel
// derive temporary file paths for metadata and log buffers
var tempChannelPath = mkTempPath(env, channelName);
// derive production db paths
var channelPath = mkPath(env, channelName);
var metadataPath = mkMetadataPath(env, channelName);
// derive archive paths
var archiveChannelPath = mkArchivePath(env, channelName);
var archiveMetadataPath = mkArchiveMetadataPath(env, channelName);
var metadataReference = {};
var tempStream;
var ABORT;
var cleanUp = function (cb) {
if (tempStream && !tempStream.closed) {
try {
tempStream.close();
} catch (err) { }
}
Fse.unlink(tempChannelPath, function (err) {
// proceed if deleted or if there was nothing to delete
if (!err || err.code === 'ENOENT') { return cb(); }
// else abort and call back with the error
cb(err);
});
};
nThen(function (w) {
// close the file descriptor if it is open
closeChannel(env, channelName, w(function (err) {
if (err) {
w.abort();
return void cb(err);
}
}));
}).nThen(function (w) {
cleanUp(w(function (err) {
if (err) {
w.abort();
cb(err);
}
}));
}).nThen(function (w) {
// eat errors since loading the logger here would create a cyclical dependency
var lineHandler = Meta.createLineHandler(metadataReference, Util.noop);
readMetadata(env, channelName, lineHandler, w(function (err) {
if (err) {
w.abort();
return void cb(err);
}
// if there were no errors just fall through to the next block
}));
}).nThen(function (w) {
// create temp buffer writeStream
tempStream = Fs.createWriteStream(tempChannelPath, {
flags: 'a',
});
tempStream.on('open', w());
tempStream.on('error', function (err) {
w.abort();
ABORT = true;
cleanUp(function () {
cb(err);
});
});
}).nThen(function (w) {
var i = 0;
var retain = false;
var handler = function (msgObj, readMore, abort) {
if (ABORT) { return void abort(); }
// the first message might be metadata... ignore it if so
if (i++ === 0 && msgObj.buff.indexOf('{') === 0) { return; }
if (retain) {
// if this flag is set then you've already found
// the message you were looking for.
// write it to your temp buffer and keep going
return void tempStream.write(msgObj.buff, function () {
readMore();
});
}
var msg = Util.tryParse(msgObj.buff.toString('utf8'));
var msgHash = Extras.getHash(msg[4]);
if (msgHash === hash) {
// everything from this point on should be retained
retain = true;
}
};
readMessagesBin(env, channelName, 0, handler, w(function (err) {
if (err) {
w.abort();
return void cleanUp(function () {
// intentionally call back with main error
// not the cleanup error
cb(err);
});
}
if (!retain) {
// you never found the message you were looking for
// this whole operation is invalid...
// clean up, abort, and call back with an error
w.abort();
cleanUp(function () {
// intentionally call back with main error
// not the cleanup error
cb('HASH_NOT_FOUND');
});
}
}));
}).nThen(function (w) {
// copy existing channel to the archive
Fse.copy(channelPath, archiveChannelPath, w(function (err) {
if (!err || err.code === 'ENOENT') { return; }
w.abort();
cleanUp(function () {
cb(err);
});
}));
// copy existing metadaata to the archive
Fse.copy(metadataPath, archiveMetadataPath, w(function (err) {
if (!err || err.code === 'ENOENT') { return; }
w.abort();
cleanUp(function () {
cb(err);
});
}));
}).nThen(function (w) {
// overwrite the existing metadata log with the current metadata state
Fs.writeFile(metadataPath, JSON.stringify(metadataReference.meta) + '\n', w(function (err) {
// this shouldn't happen, but if it does your channel might be messed up :(
if (err) {
w.abort();
cb(err);
}
}));
// overwrite the existing channel with the temp log
Fse.move(tempChannelPath, channelPath, {
overwrite: true,
}, w(function (err) {
// this shouldn't happen, but if it does your channel might be messed up :(
if (err) {
w.abort();
cb(err);
}
}));
}).nThen(function () {
// clean up and call back with no error
// triggering a historyKeeper index cache eviction...
cleanUp(function () {
cb();
});
});
};
module.exports.create = function (conf, cb) {
var env = {
root: conf.filePath || './datastore',
@ -987,25 +1165,9 @@ module.exports.create = function (conf, cb) {
});
},
trimChannel: function (channelName, hash, cb) {
// XXX ansuz
// XXX queue lock
/* block any reads from the metadata and log files
until this whole process has finished
close the file descriptor if it is open
derive temporary file paths for metadata and log buffers
compute metadata state and write to metadata buffer
scan through log file and begin copying lines to the log buffer
once you recognize the first line by the hash the user provided
archive the file and current metadata once both buffers are copied
move the metadata and log buffers into place
return the lock on reads
call back
in case of an error, remove the buffer files
*/
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
schedule.blocking(channelName, function (next) {
cb("E_NOT_IMPLEMENTED");
next();
trimChannel(env, channelName, hash, Util.both(cb, next));
});
},