Update blob activity when loaded

This commit is contained in:
yflory 2023-08-29 17:50:39 +02:00
parent 31a5cbafdb
commit 27b9c9bac3
3 changed files with 93 additions and 16 deletions

View file

@ -9,6 +9,7 @@ const Logger = require("./log");
const AuthCommands = require("./http-commands");
const MFA = require("./storage/mfa");
const Sessions = require("./storage/sessions");
const BlobStore = require("./storage/blob");
const DEFAULT_QUERY_TIMEOUT = 5000;
const PID = process.pid;
@ -206,6 +207,13 @@ app.use('/blob', function (req, res, next) {
/* Head requests are used to check the size of a blob.
Clients can configure a maximum size to download automatically,
and can manually click to download blobs which exceed that limit. */
const url = req.url;
if (typeof(url) === "string" && Env.blobStore) {
const s = url.split('/');
if (s[1] && s[1].length === 2 && s[2] && s[2].length === Env.blobStore.BLOB_LENGTH) {
Env.blobStore.updateActivity(s[2], () => {});
}
}
if (req.method === 'HEAD') {
Express.static(Path.resolve(Env.paths.blob), {
setHeaders: function (res /*, path, stat */) {
@ -618,6 +626,17 @@ nThen(function (w) {
// websocket traffic to the correct port (Env.websocketPort)
wsProxy.upgrade(req, socket, head);
});
var config = require("./load-config");
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function () {},
}, w(function (err, blob) {
if (err) { return; }
Env.blobStore = blob;
}));
}).nThen(function () {
// TODO inform the parent process that this worker is ready

View file

@ -8,12 +8,14 @@ var nThen = require("nthen");
var Semaphore = require("saferphore");
var Util = require("../common-util");
const BLOB_LENGTH = 48;
var isValidSafeKey = function (safeKey) {
return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44;
};
var isValidId = function (id) {
return typeof(id) === 'string' && id.length === 48 && !/[^a-f0-9]/.test(id);
return typeof(id) === 'string' && id.length === BLOB_LENGTH && !/[^a-f0-9]/.test(id);
};
// helpers
@ -31,6 +33,10 @@ var makeBlobPath = function (Env, blobId) {
return Path.join(Env.blobPath, blobId.slice(0, 2), blobId);
};
var makeActivityPath = function (Env, blobId) {
return makeBlobPath(Env, blobId) + '.activity';
};
// /blobstate/<safeKeyPrefix>/<safeKey>
var makeStagePath = function (Env, safeKey) {
return Path.join(Env.blobStagingPath, safeKey.slice(0, 2), safeKey);
@ -103,6 +109,31 @@ var makeFileStream = function (full, _cb) {
});
};
var clearActivity = function (Env, blobId, cb) {
var path = makeActivityPath(Env, blobId);
// if we fail to delete the activity file, it can still be removed later by the eviction script
Fs.unlink(path, cb);
};
var updateActivity = function (Env, blobId, cb) {
var path = makeActivityPath(Env, blobId);
var s_data = String(+new Date());
Fs.writeFile(path, s_data, cb);
};
var getActivity = function (Env, blobId, cb) {
var path = makeActivityPath(Env, blobId);
Fs.readFile(path, function (err, content) {
if (err) { return void cb(err); }
try {
var date = new Date(+content);
cb(void 0, date);
} catch (err2) {
cb(err2);
}
});
};
/********** METHODS **************/
var upload = function (Env, safeKey, content, cb) {
@ -506,6 +537,7 @@ BlobStore.create = function (config, _cb) {
Fse.writeFile(fullPath, 'PLACEHOLDER\n', w());
}).nThen(function () {
var methods = {
BLOB_LENGTH: BLOB_LENGTH,
isFileId: isValidId,
status: function (safeKey, _cb) {
// TODO check if the final destination is a file
@ -623,6 +655,23 @@ BlobStore.create = function (config, _cb) {
getUploadSize(Env, id, cb);
},
// ACTIVITY
clearActivity: function (id, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(id)) { return void cb("INVALID_ID"); }
clearActivity(Env, id, cb);
},
updateActivity: function (id, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(id)) { return void cb("INVALID_ID"); }
updateActivity(Env, id, cb);
},
getActivity: function (id, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(id)) { return void cb("INVALID_ID"); }
getActivity(Env, id, cb);
},
list: {
blobs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));

View file

@ -234,6 +234,22 @@ var factory = function () {
config.Cache.setBlobCache(id, u8, cb);
};
var headRequest = function (src, cb) {
var xhr = new XMLHttpRequest();
xhr.open("HEAD", src);
if (sendCredentials) { xhr.withCredentials = true; }
xhr.onerror = function () { return void cb("XHR_ERROR"); };
xhr.onreadystatechange = function() {
if (this.readyState === this.DONE) {
cb(null, Number(xhr.getResponseHeader("Content-Length")));
}
};
xhr.onload = function () {
if (/^4/.test('' + this.status)) { return void cb("XHR_ERROR " + this.status); }
};
xhr.send();
};
var getFileSize = function (src, _cb) {
var cb = function (e, res) {
_cb(e, res);
@ -243,25 +259,14 @@ var factory = function () {
var cacheKey = getCacheKey(src);
var check = function () {
var xhr = new XMLHttpRequest();
xhr.open("HEAD", src);
if (sendCredentials) { xhr.withCredentials = true; }
xhr.onerror = function () { return void cb("XHR_ERROR"); };
xhr.onreadystatechange = function() {
if (this.readyState === this.DONE) {
cb(null, Number(xhr.getResponseHeader("Content-Length")));
}
};
xhr.onload = function () {
if (/^4/.test('' + this.status)) { return void cb("XHR_ERROR " + this.status); }
};
xhr.send();
headRequest(src, cb);
};
if (!cacheKey) { return void check(); }
getBlobCache(cacheKey, function (err, u8) {
if (err || !u8) { return void check(); }
check(); // send the HEAD request to update the blob activity
if (err || !u8) { return; }
cb(null, 0);
});
};
@ -748,7 +753,11 @@ var factory = function () {
});
};
if (cfg.force) { dl(); return mediaObject; }
if (cfg.force) {
headRequest(src, function () {}); // Update activity
dl();
return mediaObject;
}
var maxSize = typeof(config.maxDownloadSize) === "number" ? config.maxDownloadSize
: (5 * 1024 * 1024);