Replace the chainpad server by the netflux server

This commit is contained in:
Yann Flory 2016-03-07 17:35:31 +01:00
parent 60c3aceb0d
commit 09a06a8bc5
3 changed files with 191 additions and 3 deletions

186
NetFluxWebsocketServer.js Normal file
View file

@ -0,0 +1,186 @@
;(function () { 'use strict';
let Crypto = require('crypto');
let WebSocket = require('ws');
let LAG_MAX_BEFORE_DISCONNECT = 30000;
let LAG_MAX_BEFORE_PING = 15000;
let HISTORY_KEEPER_ID = "_HISTORY_KEEPER_";
let dropUser;
let now = function () { return (new Date()).getTime(); };
let sendMsg = function (ctx, user, msg) {
try {
console.log('<' + JSON.stringify(msg));
user.socket.send(JSON.stringify(msg));
} catch (e) {
console.log(e.stack);
dropUser(ctx, user);
}
};
let sendChannelMessage = function (ctx, channel, msgStruct) {
msgStruct.unshift(0);
channel.forEach(function (user) { sendMsg(ctx, user, msgStruct); });
if (msgStruct[2] === 'MSG') {
ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { });
}
};
dropUser = function (ctx, user) {
if (user.socket.readyState !== WebSocket.CLOSING
&& user.socket.readyState !== WebSocket.CLOSED)
{
try {
user.socket.close();
} catch (e) {
console.log("Failed to disconnect ["+user.id+"], attempting to terminate");
try {
user.socket.terminate();
} catch (ee) {
console.log("Failed to terminate ["+user.id+"] *shrug*");
}
}
}
delete ctx.users[user.id];
Object.keys(ctx.channels).forEach(function (chanName) {
let chan = ctx.channels[chanName];
let idx = chan.indexOf(user);
if (idx < 0) { return; }
console.log("Removing ["+user.id+"] from channel ["+chanName+"]");
chan.splice(idx, 1);
if (chan.length === 0) {
console.log("Removing empty channel ["+chanName+"]");
delete ctx.channels[chanName];
} else {
sendChannelMessage(ctx, chan, [user.id, 'LEAVE', chanName, 'Quit: [ dropUser() ]']);
}
});
};
let getHistory = function (ctx, channelName, handler) {
ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); });
};
let randName = function () { return Crypto.randomBytes(16).toString('hex'); };
let handleMessage = function (ctx, user, msg) {
let json = JSON.parse(msg);
let seq = json.shift();
let cmd = json[0];
let obj = json[1];
user.timeOfLastMessage = now();
user.pingOutstanding = false;
if (cmd === 'JOIN') {
/*if (obj && obj.length !== 32) {
sendMsg(ctx, user, [seq, 'ERROR', 'ENOENT', obj]);
return;
}*/
let chanName = obj || randName();
let chan = ctx.channels[chanName] = ctx.channels[chanName] || [];
chan.id = chanName;
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'JOIN', chanName]);
chan.forEach(function (u) { sendMsg(ctx, user, [0, u.id, 'JOIN', chanName]); });
chan.push(user);
sendChannelMessage(ctx, chan, [user.id, 'JOIN', chanName]);
return;
}
if (cmd === 'MSG') {
if (obj === HISTORY_KEEPER_ID) {
let parsed;
try { parsed = JSON.parse(json[2]); } catch (err) { return; }
if (parsed[0] === 'GET_HISTORY') {
console.log('getHistory ' + parsed[1]);
getHistory(ctx, parsed[1], function (msg) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
}
return;
}
if (obj && !ctx.channels[obj] && !ctx.users[obj]) {
sendMsg(ctx, user, [seq, 'ERROR', 'ENOENT', obj]);
return;
}
let target;
json.unshift(user.id);
if ((target = ctx.channels[obj])) {
sendChannelMessage(ctx, target, json);
return;
}
if ((target = ctx.users[obj])) {
json.unshift(0);
sendMsg(ctx, target, json);
return;
}
}
if (cmd === 'LEAVE') {
let err;
let chan;
let idx;
if (!obj) { err = 'EINVAL'; }
if (!err && !(chan = ctx.channels[obj])) { err = 'ENOENT'; }
if (!err && (idx = chan.indexOf(user)) === -1) { err = 'NOT_IN_CHAN'; }
if (err) {
sendMsg(ctx, user, [seq, 'ERROR', err]);
return;
}
json.unshift(user.id);
sendChannelMessage(ctx, chan, [user.id, 'LEAVE', chan.id]);
chan.splice(idx, 1);
}
if (cmd === 'PING') {
sendMsg(ctx, user, [seq, 'PONG', obj]);
return;
}
};
let run = module.exports.run = function (storage, socketServer) {
let ctx = {
users: {},
channels: {},
store: storage
};
setInterval(function () {
Object.keys(ctx.users).forEach(function (userId) {
let u = ctx.users[userId];
if (now() - u.timeOfLastMessage > LAG_MAX_BEFORE_DISCONNECT) {
dropUser(ctx, u);
} else if (!u.pingOutstanding && now() - u.timeOfLastMessage > LAG_MAX_BEFORE_PING) {
sendMsg(ctx, u, [0, 'PING', now()]);
u.pingOutstanding = true;
}
});
}, 5000);
socketServer.on('connection', function(socket) {
let conn = socket.upgradeReq.connection;
let user = {
addr: conn.remoteAddress + '|' + conn.remotePort,
socket: socket,
id: randName(),
timeOfLastMessage: now(),
pingOutstanding: false
};
ctx.users[user.id] = user;
sendMsg(ctx, user, [0, 'IDENT', user.id]);
socket.on('message', function(message) {
console.log('>'+message);
try {
handleMessage(ctx, user, message);
} catch (e) {
console.log(e.stack);
dropUser(ctx, user);
}
});
socket.on('close', function (evt) {
for (let userId in ctx.users) {
if (ctx.users[userId].socket === socket) {
dropUser(ctx, ctx.users[userId]);
}
}
});
});
};
}());

View file

@ -7,6 +7,7 @@ var Https = require('https');
var Fs = require('fs');
var WebSocketServer = require('ws').Server;
var ChainPadSrv = require('./ChainPadSrv');
var NetfluxSrv = require('./NetFluxWebsocketServer');
var config = require('./config');
config.websocketPort = config.websocketPort || config.httpPort;
@ -79,5 +80,6 @@ if (config.websocketPort !== config.httpPort) {
var wsSrv = new WebSocketServer(wsConfig);
Storage.create(config, function (store) {
console.log('DB connected');
ChainPadSrv.create(wsSrv, store);
// ChainPadSrv.create(wsSrv, store);
NetfluxSrv.run(store, wsSrv);
});

View file

@ -113,8 +113,8 @@ define([
};
var options = {
// signaling: websocketUrl,
signaling: 'ws://localhost:9000',
signaling: websocketUrl,
// signaling: 'ws://localhost:9000',
topology: 'StarTopologyService',
protocol: 'WebSocketProtocolService',
connector: 'WebSocketService',