Compare commits
1 commit
main
...
blob-metad
Author | SHA1 | Date | |
---|---|---|---|
|
9be21c21c4 |
4 changed files with 57 additions and 2 deletions
|
@ -10,7 +10,8 @@ Data.getMetadataRaw = function (Env, channel /* channelName */, _cb) {
|
|||
const cb = Util.once(Util.mkAsync(_cb));
|
||||
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
|
||||
if (channel.length !== HK.STANDARD_CHANNEL_LENGTH &&
|
||||
channel.length !== HK.ADMIN_CHANNEL_LENGTH) { return cb("INVALID_CHAN_LENGTH"); }
|
||||
channel.length !== HK.ADMIN_CHANNEL_LENGTH &&
|
||||
channel.length !== HK.BLOB_ID_LENGTH) { return cb("INVALID_CHAN_LENGTH"); }
|
||||
|
||||
// return synthetic metadata for admin broadcast channels as a safety net
|
||||
// in case anybody manages to write metadata
|
||||
|
@ -76,6 +77,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
|
|||
|
||||
var channel = data.channel;
|
||||
var command = data.command;
|
||||
// XXX BLOBMD allow blobs
|
||||
if (!channel || !Core.isValidId(channel)) { return void cb ('INVALID_CHAN'); }
|
||||
if (!command || typeof (command) !== 'string') { return void cb('INVALID_COMMAND'); }
|
||||
if (Meta.commands.indexOf(command) === -1) { return void cb('UNSUPPORTED_COMMAND'); }
|
||||
|
@ -137,6 +139,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
|
|||
cb(void 0, metadata);
|
||||
return void next();
|
||||
}
|
||||
// XXX BLOBMD use correct store for blobs
|
||||
Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) {
|
||||
if (e) {
|
||||
cb(e);
|
||||
|
@ -152,6 +155,7 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
|
|||
|
||||
// update the cached metadata
|
||||
metadata_cache[channel] = metadata;
|
||||
Env.checkCache(channel); // XXX ???
|
||||
|
||||
// it's easy to check if the channel is restricted
|
||||
const isRestricted = metadata.restricted;
|
||||
|
|
|
@ -40,6 +40,8 @@ const ADMIN_CHANNEL_LENGTH = HK.ADMIN_CHANNEL_LENGTH = 33;
|
|||
// with a 34 character id
|
||||
const EPHEMERAL_CHANNEL_LENGTH = HK.EPHEMERAL_CHANNEL_LENGTH = 34;
|
||||
|
||||
const BLOB_ID_LENGTH = HK.BLOB_ID_LENGTH = 48;
|
||||
|
||||
// Temporary channels are archived X ms after everyone has left them
|
||||
const TEMPORARY_CHANNEL_LIFETIME = 30 * 1000;
|
||||
|
||||
|
|
|
@ -7,6 +7,11 @@ var BlobStore = module.exports;
|
|||
var nThen = require("nthen");
|
||||
var Semaphore = require("saferphore");
|
||||
var Util = require("../common-util");
|
||||
var Meta = require("../metadata");
|
||||
|
||||
const BatchRead = require("../batch-read");
|
||||
const readFileBin = require("../stream-file").readFileBin;
|
||||
const Schedule = require("../schedule");
|
||||
|
||||
var isValidSafeKey = function (safeKey) {
|
||||
return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44;
|
||||
|
@ -26,11 +31,16 @@ var prependArchive = function (Env, path) {
|
|||
return Path.join(Env.archivePath, 'blob', relativePathToBlob);
|
||||
};
|
||||
|
||||
// /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId>
|
||||
// /blob/<blobPrefix>/<blobId>
|
||||
var makeBlobPath = function (Env, blobId) {
|
||||
return Path.join(Env.blobPath, blobId.slice(0, 2), blobId);
|
||||
};
|
||||
|
||||
// /blob/<blobPrefix>/<blobId>.metadata.ndjson
|
||||
var mkMetadataPath = function (env, channelId) {
|
||||
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.metadata.ndjson';
|
||||
};
|
||||
|
||||
// /blobstate/<safeKeyPrefix>/<safeKey>
|
||||
var makeStagePath = function (Env, safeKey) {
|
||||
return Path.join(Env.blobStagingPath, safeKey.slice(0, 2), safeKey);
|
||||
|
@ -355,6 +365,43 @@ var restoreProof = function (Env, safeKey, blobId, cb) {
|
|||
Fse.move(archivePath, proofPath, cb);
|
||||
};
|
||||
|
||||
var getDedicatedMetadata = function (env, blobId, handler, _cb) {
|
||||
var metadataPath = mkMetadataPath(env, blobId);
|
||||
var stream = Fs.createReadStream(metadataPath, {start: 0});
|
||||
|
||||
const collector = createIdleStreamCollector(stream);
|
||||
var cb = Util.both(_cb, collector);
|
||||
|
||||
readFileBin(stream, function (msgObj, readMore) {
|
||||
collector.keepAlive();
|
||||
var line = msgObj.buff.toString('utf8');
|
||||
try {
|
||||
var parsed = JSON.parse(line);
|
||||
handler(null, parsed);
|
||||
} catch (err) {
|
||||
handler(err, line);
|
||||
}
|
||||
readMore();
|
||||
}, function (err) {
|
||||
// ENOENT => there is no metadata log
|
||||
if (!err || err.code === 'ENOENT') { return void cb(); }
|
||||
// otherwise stream errors?
|
||||
cb(err);
|
||||
});
|
||||
};
|
||||
/* readMetadata
|
||||
Load the log of metadata amendments.
|
||||
*/
|
||||
var readMetadata = function (Env, blobId, handler, cb) {
|
||||
getDedicatedMetadata(env, channelId, handler, function (err) {
|
||||
if (err) {
|
||||
// stream errors?
|
||||
return void cb(err);
|
||||
}
|
||||
cb();
|
||||
});
|
||||
};
|
||||
|
||||
var makeWalker = function (n, handleChild, done) {
|
||||
if (!n || typeof(n) !== 'number' || n < 2) { n = 2; }
|
||||
|
||||
|
@ -486,6 +533,7 @@ BlobStore.create = function (config, _cb) {
|
|||
archivePath: config.archivePath || './data/archive',
|
||||
getSession: config.getSession,
|
||||
};
|
||||
var schedule = Env.schedule = Schedule();
|
||||
|
||||
nThen(function (w) {
|
||||
var CB = Util.both(w.abort, cb);
|
||||
|
|
|
@ -307,6 +307,7 @@ const computeIndex = function (data, cb) {
|
|||
const computeMetadata = function (data, cb) {
|
||||
const ref = {};
|
||||
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);
|
||||
// XXX BLOBMD use correct store
|
||||
return void store.readChannelMetadata(data.channel, lineHandler, function (err) {
|
||||
if (err) {
|
||||
// stream errors?
|
||||
|
|
Loading…
Reference in a new issue