Compare commits

...

1 commit

Author SHA1 Message Date
yflory
9be21c21c4 WIP blob metadata 2023-08-29 15:19:34 +02:00
4 changed files with 57 additions and 2 deletions

View file

@ -10,7 +10,8 @@ Data.getMetadataRaw = function (Env, channel /* channelName */, _cb) {
const cb = Util.once(Util.mkAsync(_cb));
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== HK.STANDARD_CHANNEL_LENGTH &&
channel.length !== HK.ADMIN_CHANNEL_LENGTH) { return cb("INVALID_CHAN_LENGTH"); }
channel.length !== HK.ADMIN_CHANNEL_LENGTH &&
channel.length !== HK.BLOB_ID_LENGTH) { return cb("INVALID_CHAN_LENGTH"); }
// return synthetic metadata for admin broadcast channels as a safety net
// in case anybody manages to write metadata
@ -76,6 +77,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
var channel = data.channel;
var command = data.command;
// XXX BLOBMD allow blobs
if (!channel || !Core.isValidId(channel)) { return void cb ('INVALID_CHAN'); }
if (!command || typeof (command) !== 'string') { return void cb('INVALID_COMMAND'); }
if (Meta.commands.indexOf(command) === -1) { return void cb('UNSUPPORTED_COMMAND'); }
@ -137,6 +139,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
cb(void 0, metadata);
return void next();
}
// XXX BLOBMD use correct store for blobs
Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) {
if (e) {
cb(e);
@ -152,6 +155,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
// update the cached metadata
metadata_cache[channel] = metadata;
Env.checkCache(channel); // XXX ???
// it's easy to check if the channel is restricted
const isRestricted = metadata.restricted;

View file

@ -40,6 +40,8 @@ const ADMIN_CHANNEL_LENGTH = HK.ADMIN_CHANNEL_LENGTH = 33;
// with a 34 character id
const EPHEMERAL_CHANNEL_LENGTH = HK.EPHEMERAL_CHANNEL_LENGTH = 34;
const BLOB_ID_LENGTH = HK.BLOB_ID_LENGTH = 48;
// Temporary channels are archived X ms after everyone has left them
const TEMPORARY_CHANNEL_LIFETIME = 30 * 1000;

View file

@ -7,6 +7,11 @@ var BlobStore = module.exports;
var nThen = require("nthen");
var Semaphore = require("saferphore");
var Util = require("../common-util");
var Meta = require("../metadata");
const BatchRead = require("../batch-read");
const readFileBin = require("../stream-file").readFileBin;
const Schedule = require("../schedule");
var isValidSafeKey = function (safeKey) {
return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44;
@ -26,11 +31,16 @@ var prependArchive = function (Env, path) {
return Path.join(Env.archivePath, 'blob', relativePathToBlob);
};
// /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId>
// /blob/<blobPrefix>/<blobId>
var makeBlobPath = function (Env, blobId) {
return Path.join(Env.blobPath, blobId.slice(0, 2), blobId);
};
// /blob/<blobPrefix>/<blobId>.metadata.ndjson
var mkMetadataPath = function (env, channelId) {
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.metadata.ndjson';
};
// /blobstate/<safeKeyPrefix>/<safeKey>
var makeStagePath = function (Env, safeKey) {
return Path.join(Env.blobStagingPath, safeKey.slice(0, 2), safeKey);
@ -355,6 +365,43 @@ var restoreProof = function (Env, safeKey, blobId, cb) {
Fse.move(archivePath, proofPath, cb);
};
var getDedicatedMetadata = function (env, blobId, handler, _cb) {
var metadataPath = mkMetadataPath(env, blobId);
var stream = Fs.createReadStream(metadataPath, {start: 0});
const collector = createIdleStreamCollector(stream);
var cb = Util.both(_cb, collector);
readFileBin(stream, function (msgObj, readMore) {
collector.keepAlive();
var line = msgObj.buff.toString('utf8');
try {
var parsed = JSON.parse(line);
handler(null, parsed);
} catch (err) {
handler(err, line);
}
readMore();
}, function (err) {
// ENOENT => there is no metadata log
if (!err || err.code === 'ENOENT') { return void cb(); }
// otherwise stream errors?
cb(err);
});
};
/* readMetadata
Load the log of metadata amendments.
*/
var readMetadata = function (Env, blobId, handler, cb) {
getDedicatedMetadata(env, channelId, handler, function (err) {
if (err) {
// stream errors?
return void cb(err);
}
cb();
});
};
var makeWalker = function (n, handleChild, done) {
if (!n || typeof(n) !== 'number' || n < 2) { n = 2; }
@ -486,6 +533,7 @@ BlobStore.create = function (config, _cb) {
archivePath: config.archivePath || './data/archive',
getSession: config.getSession,
};
var schedule = Env.schedule = Schedule();
nThen(function (w) {
var CB = Util.both(w.abort, cb);

View file

@ -307,6 +307,7 @@ const computeIndex = function (data, cb) {
const computeMetadata = function (data, cb) {
const ref = {};
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);
// XXX BLOBMD use correct store
return void store.readChannelMetadata(data.channel, lineHandler, function (err) {
if (err) {
// stream errors?