Revert "Added checkpoints with the new code from ChainPad"

This reverts commit da2bfe2de9.
This commit is contained in:
ansuz 2016-05-30 10:20:08 +02:00
parent 599f5aeec8
commit 6e2e8bf21f
3 changed files with 164 additions and 264 deletions

View file

@ -87,27 +87,7 @@ dropUser = function (ctx, user) {
}; };
const getHistory = function (ctx, channelName, handler, cb) { const getHistory = function (ctx, channelName, handler, cb) {
var messageBuf = []; ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); }, cb);
ctx.store.getMessages(channelName, function (msgStr) {
messageBuf.push(JSON.parse(msgStr));
}, function () {
var startPoint;
var cpCount = 0;
var msgBuff2 = [];
for (startPoint = messageBuf.length - 1; startPoint >= 0; startPoint--) {
var msg = messageBuf[startPoint];
msgBuff2.push(msg);
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
cpCount++;
if (cpCount >= 2) {
for (var x = msgBuff2.pop(); x; x = msgBuff2.pop()) { handler(x); }
break;
}
}
//console.log(messageBuf[startPoint]);
}
cb();
});
}; };
const randName = function () { return Crypto.randomBytes(16).toString('hex'); }; const randName = function () { return Crypto.randomBytes(16).toString('hex'); };

View file

@ -28,8 +28,7 @@ var create = Patch.create = function (parentHash) {
return { return {
type: 'Patch', type: 'Patch',
operations: [], operations: [],
parentHash: parentHash, parentHash: parentHash
isCheckpoint: false
}; };
}; };
@ -46,13 +45,6 @@ var check = Patch.check = function (patch, docLength_opt) {
docLength_opt += Operation.lengthChange(patch.operations[i]); docLength_opt += Operation.lengthChange(patch.operations[i]);
} }
} }
if (patch.isCheckpoint) {
Common.assert(patch.operations.length === 1);
Common.assert(patch.operations[0].offset === 0);
if (typeof(docLength_opt) === 'number') {
Common.assert(!docLength_opt || patch.operations[0].toRemove === docLength_opt);
}
}
}; };
var toObj = Patch.toObj = function (patch) { var toObj = Patch.toObj = function (patch) {
@ -112,20 +104,6 @@ var addOperation = Patch.addOperation = function (patch, op) {
if (Common.PARANOIA) { check(patch); } if (Common.PARANOIA) { check(patch); }
}; };
var createCheckpoint = Patch.createCheckpoint =
function (parentContent, checkpointContent, parentContentHash_opt)
{
var op = Operation.create(0, parentContent.length, checkpointContent);
if (Common.PARANOIA && parentContentHash_opt) {
Common.assert(parentContentHash_opt === hash(parentContent));
}
parentContentHash_opt = parentContentHash_opt || hash(parentContent);
var out = create(parentContentHash_opt);
addOperation(out, op);
out.isCheckpoint = true;
return out;
};
var clone = Patch.clone = function (patch) { var clone = Patch.clone = function (patch) {
if (Common.PARANOIA) { check(patch); } if (Common.PARANOIA) { check(patch); }
var out = create(); var out = create();
@ -402,7 +380,7 @@ var PARANOIA = module.exports.PARANOIA = true;
var VALIDATE_ENTIRE_CHAIN_EACH_MSG = module.exports.VALIDATE_ENTIRE_CHAIN_EACH_MSG = false; var VALIDATE_ENTIRE_CHAIN_EACH_MSG = module.exports.VALIDATE_ENTIRE_CHAIN_EACH_MSG = false;
/* throw errors over non-compliant messages which would otherwise be treated as invalid */ /* throw errors over non-compliant messages which would otherwise be treated as invalid */
var TESTING = module.exports.TESTING = false; var TESTING = module.exports.TESTING = true;
var assert = module.exports.assert = function (expr) { var assert = module.exports.assert = function (expr) {
if (!expr) { throw new Error("Failed assertion"); } if (!expr) { throw new Error("Failed assertion"); }
@ -457,11 +435,10 @@ var REGISTER = Message.REGISTER = 0;
var REGISTER_ACK = Message.REGISTER_ACK = 1; var REGISTER_ACK = Message.REGISTER_ACK = 1;
var PATCH = Message.PATCH = 2; var PATCH = Message.PATCH = 2;
var DISCONNECT = Message.DISCONNECT = 3; var DISCONNECT = Message.DISCONNECT = 3;
var CHECKPOINT = Message.CHECKPOINT = 4;
var check = Message.check = function(msg) { var check = Message.check = function(msg) {
Common.assert(msg.type === 'Message'); Common.assert(msg.type === 'Message');
if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) { if (msg.messageType === PATCH) {
Patch.check(msg.content); Patch.check(msg.content);
Common.assert(typeof(msg.lastMsgHash) === 'string'); Common.assert(typeof(msg.lastMsgHash) === 'string');
} else { } else {
@ -482,8 +459,9 @@ var create = Message.create = function (type, content, lastMsgHash) {
var toString = Message.toString = function (msg) { var toString = Message.toString = function (msg) {
if (Common.PARANOIA) { check(msg); } if (Common.PARANOIA) { check(msg); }
if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) {
return JSON.stringify([msg.messageType, Patch.toObj(msg.content), msg.lastMsgHash]); if (msg.messageType === PATCH) {
return JSON.stringify([PATCH, Patch.toObj(msg.content), msg.lastMsgHash]);
} else { } else {
throw new Error(); throw new Error();
} }
@ -500,11 +478,43 @@ var discardBencode = function (msg, arr) {
}; };
var fromString = Message.fromString = function (str) { var fromString = Message.fromString = function (str) {
var m = JSON.parse(str); var msg = str;
if (m[0] !== CHECKPOINT && m[0] !== PATCH) { throw new Error("invalid message type " + m[0]); }
var msg = create(m[0], Patch.fromObj(m[1]), m[2]); if (str.charAt(0) === '[') {
if (m[0] === CHECKPOINT) { msg.content.isCheckpoint = true; } var m = JSON.parse(str);
return msg; return create(m[0], Patch.fromObj(m[1]), m[2]);
} else {
/* Just in case we receive messages in the old format,
we should try to parse them. We only need the content, though,
so just extract that and throw the rest away */
var last;
var parts = [];
// chop off all the bencoded components
while (msg) {
msg = discardBencode(msg, parts);
}
// grab the last component from the parts
// we don't need anything else
var contentStr = parts.slice(-1)[0];
var content = JSON.parse(contentStr);
var message;
if (content[0] === PATCH) {
message = create(userName, PATCH, Patch.fromObj(content[1]), content[2]);
} else if ([4,5].indexOf(content[0]) !== -1 /* === PING || content[0] === PONG*/) {
// it's a ping or pong, which we don't want to support anymore
message = create(userName, content[0], content[1]);
} else {
message = create(userName, content[0]);
}
// This check validates every operation in the patch.
check(message);
return message
}
}; };
var hashOf = Message.hashOf = function (msg) { var hashOf = Message.hashOf = function (msg) {
@ -540,16 +550,8 @@ var Sha = module.exports.Sha = require('./SHA256');
var ChainPad = {}; var ChainPad = {};
// hex_sha256('') // hex_sha256('')
var EMPTY_STR_HASH = module.exports.EMPTY_STR_HASH = var EMPTY_STR_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'; var ZERO = '0000000000000000000000000000000000000000000000000000000000000000';
var ZERO = '0000000000000000000000000000000000000000000000000000000000000000';
// Default number of patches between checkpoints (patches older than this will be pruned)
// default for realtime.config.checkpointInterval
var DEFAULT_CHECKPOINT_INTERVAL = 200;
// Default number of milliseconds to wait before syncing to the server
var DEFAULT_AVERAGE_SYNC_MILLISECONDS = 300;
var enterChainPad = function (realtime, func) { var enterChainPad = function (realtime, func) {
return function () { return function () {
@ -565,9 +567,8 @@ var debug = function (realtime, msg) {
}; };
var schedule = function (realtime, func, timeout) { var schedule = function (realtime, func, timeout) {
if (realtime.aborted) { return; }
if (!timeout) { if (!timeout) {
timeout = Math.floor(Math.random() * 2 * realtime.config.avgSyncMilliseconds); timeout = Math.floor(Math.random() * 2 * realtime.avgSyncTime);
} }
var to = setTimeout(enterChainPad(realtime, function () { var to = setTimeout(enterChainPad(realtime, function () {
realtime.schedules.splice(realtime.schedules.indexOf(to), 1); realtime.schedules.splice(realtime.schedules.indexOf(to), 1);
@ -597,52 +598,12 @@ var onMessage = function (realtime, message, callback) {
} }
}; };
var sendMessage = function (realtime, msg, callback) {
var strMsg = Message.toString(msg);
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
realtime.pending = null;
} else {
var pending = realtime.pending;
realtime.pending = null;
Common.assert(pending.hash === msg.hashOf);
handleMessage(realtime, strMsg, true);
pending.callback();
}
});
msg.hashOf = msg.hashOf || Message.hashOf(msg);
var timeout = schedule(realtime, function () {
debug(realtime, "Failed to send message [" + msg.hashOf + "] to server");
sync(realtime);
}, 10000 + (Math.random() * 5000));
if (realtime.pending) { throw new Error("there is already a pending message"); }
realtime.pending = {
hash: msg.hashOf,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === msg.hashOf) {
debug(realtime, "initial Ack received [" + msg.hashOf + "]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
callback();
}
};
if (Common.PARANOIA) { check(realtime); }
};
var sync = function (realtime) { var sync = function (realtime) {
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
if (realtime.syncSchedule && !realtime.pending) { if (realtime.syncSchedule) {
unschedule(realtime, realtime.syncSchedule); unschedule(realtime, realtime.syncSchedule);
realtime.syncSchedule = null; realtime.syncSchedule = null;
} else { } else {
//debug(realtime, "already syncing...");
// we're currently waiting on something from the server. // we're currently waiting on something from the server.
return; return;
} }
@ -656,19 +617,6 @@ var sync = function (realtime) {
return; return;
} }
if (((parentCount(realtime, realtime.best) + 1) % realtime.config.checkpointInterval) === 0) {
var best = realtime.best;
debug(realtime, "Sending checkpoint");
var cpp = Patch.createCheckpoint(realtime.authDoc,
realtime.authDoc,
realtime.best.content.inverseOf.parentHash);
var cp = Message.create(Message.CHECKPOINT, cpp, realtime.best.hashOf);
sendMessage(realtime, cp, function () {
debug(realtime, "Checkpoint sent and accepted");
});
return;
}
var msg; var msg;
if (realtime.best === realtime.initialMessage) { if (realtime.best === realtime.initialMessage) {
msg = realtime.initialMessage; msg = realtime.initialMessage;
@ -676,16 +624,39 @@ var sync = function (realtime) {
msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf); msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf);
} }
sendMessage(realtime, msg, function () { var strMsg = Message.toString(msg);
//debug(realtime, "patch sent");
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
} else {
handleMessage(realtime, strMsg, true);
}
}); });
var hash = Message.hashOf(msg);
var timeout = schedule(realtime, function () {
debug(realtime, "Failed to send message ["+hash+"] to server");
sync(realtime);
}, 10000 + (Math.random() * 5000));
realtime.pending = {
hash: hash,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === hash) {
debug(realtime, "initial Ack received ["+hash+"]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
}
};
if (Common.PARANOIA) { check(realtime); }
}; };
var create = ChainPad.create = function (config) { var create = ChainPad.create = function (config) {
config = config || {}; config = config || {};
var initialState = config.initialState || ''; var initialState = config.initialState || '';
config.checkpointInterval = config.checkpointInterval || DEFAULT_CHECKPOINT_INTERVAL;
config.avgSyncMilliseconds = config.avgSyncMilliseconds || DEFAULT_AVERAGE_SYNC_MILLISECONDS;
var realtime = { var realtime = {
type: 'ChainPad', type: 'ChainPad',
@ -694,7 +665,7 @@ var create = ChainPad.create = function (config) {
config: config, config: config,
logLevel: (typeof(config.logLevel) === 'number') ? config.logLevel : 1, logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel: 1,
/** A patch representing all uncommitted work. */ /** A patch representing all uncommitted work. */
uncommitted: null, uncommitted: null,
@ -702,17 +673,18 @@ var create = ChainPad.create = function (config) {
uncommittedDocLength: initialState.length, uncommittedDocLength: initialState.length,
patchHandlers: [], patchHandlers: [],
changeHandlers: [], opHandlers: [],
messageHandlers: [], messageHandlers: [],
schedules: [], schedules: [],
aborted: false,
syncSchedule: null, syncSchedule: null,
registered: false, registered: false,
avgSyncTime: 100,
// this is only used if PARANOIA is enabled. // this is only used if PARANOIA is enabled.
userInterfaceContent: undefined, userInterfaceContent: undefined,
@ -727,6 +699,12 @@ var create = ChainPad.create = function (config) {
rootMessage: null, rootMessage: null,
userName: config.userName || 'anonymous', userName: config.userName || 'anonymous',
/**
* Set to the message which sets the initialState if applicable.
* Reset to null after the initial message has been successfully broadcasted.
*/
initialMessage: null,
}; };
if (Common.PARANOIA) { if (Common.PARANOIA) {
@ -734,10 +712,6 @@ var create = ChainPad.create = function (config) {
} }
var zeroPatch = Patch.create(EMPTY_STR_HASH); var zeroPatch = Patch.create(EMPTY_STR_HASH);
if (initialState !== '') {
var initialOp = Operation.create(0, 0, initialState);
Patch.addOperation(zeroPatch, initialOp);
}
zeroPatch.inverseOf = Patch.invert(zeroPatch, ''); zeroPatch.inverseOf = Patch.invert(zeroPatch, '');
zeroPatch.inverseOf.inverseOf = zeroPatch; zeroPatch.inverseOf.inverseOf = zeroPatch;
var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO); var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO);
@ -747,12 +721,40 @@ var create = ChainPad.create = function (config) {
(realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg); (realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg);
realtime.rootMessage = zeroMsg; realtime.rootMessage = zeroMsg;
realtime.best = zeroMsg; realtime.best = zeroMsg;
realtime.authDoc = initialState;
realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash);
if (initialState === '') {
realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash);
return realtime;
}
var initialOp = Operation.create(0, 0, initialState);
var initialStatePatch = Patch.create(zeroPatch.inverseOf.parentHash);
Patch.addOperation(initialStatePatch, initialOp);
initialStatePatch.inverseOf = Patch.invert(initialStatePatch, '');
initialStatePatch.inverseOf.inverseOf = initialStatePatch;
// flag this patch so it can be handled specially.
// Specifically, we never treat an initialStatePatch as our own,
// we let it be reverted to prevent duplication of data.
initialStatePatch.isInitialStatePatch = true;
initialStatePatch.inverseOf.isInitialStatePatch = true;
realtime.authDoc = initialState;
if (Common.PARANOIA) { if (Common.PARANOIA) {
realtime.userInterfaceContent = initialState; realtime.userInterfaceContent = initialState;
} }
initialMessage = Message.create(Message.PATCH, initialStatePatch, zeroMsg.hashOf);
initialMessage.hashOf = Message.hashOf(initialMessage);
initialMessage.parentCount = 1;
initialMessage.isFromMe = true;
realtime.messages[initialMessage.hashOf] = initialMessage;
(realtime.messagesByParent[initialMessage.lastMessageHash] || []).push(initialMessage);
realtime.best = initialMessage;
realtime.uncommitted = Patch.create(initialStatePatch.inverseOf.parentHash);
realtime.initialMessage = initialMessage;
return realtime; return realtime;
}; };
@ -801,17 +803,6 @@ var doOperation = ChainPad.doOperation = function (realtime, op) {
realtime.uncommittedDocLength += Operation.lengthChange(op); realtime.uncommittedDocLength += Operation.lengthChange(op);
}; };
var doPatch = ChainPad.doPatch = function (realtime, patch) {
if (Common.PARANOIA) {
check(realtime);
Common.assert(Patch.invert(realtime.uncommitted).parentHash === patch.parentHash);
realtime.userInterfaceContent = Patch.apply(patch, realtime.userInterfaceContent);
}
Patch.check(patch, realtime.uncommittedDocLength);
realtime.uncommitted = Patch.merge(realtime.uncommitted, patch);
realtime.uncommittedDocLength += Patch.lengthChange(patch);
};
var isAncestorOf = function (realtime, ancestor, decendent) { var isAncestorOf = function (realtime, ancestor, decendent) {
if (!decendent || !ancestor) { return false; } if (!decendent || !ancestor) { return false; }
if (ancestor === decendent) { return true; } if (ancestor === decendent) { return true; }
@ -867,34 +858,31 @@ var getBestChild = function (realtime, msg) {
return best; return best;
}; };
var pushUIPatch = function (realtime, patch) {
if (patch.operations.length) {
// push the uncommittedPatch out to the user interface.
for (var i = 0; i < realtime.patchHandlers.length; i++) {
realtime.patchHandlers[i](patch);
}
for (var i = 0; i < realtime.changeHandlers.length; i++) {
for (var j = patch.operations.length; j >= 0; j--) {
var op = patch.operations[j];
realtime.changeHandlers[i](op.offset, op.toRemove, op.toInsert);
}
}
}
};
var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) { var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) {
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
var msg = Message.fromString(msgStr); var msg = Message.fromString(msgStr);
// otherwise it's a disconnect. // These are all deprecated message types
if (msg.messageType !== Message.PATCH && msg.messageType !== Message.CHECKPOINT) { if (['REGISTER', 'PONG', 'DISCONNECT'].map(function (x) {
debug(realtime, "unrecognized message type " + msg.messageType); return Message[x];
}).indexOf(msg.messageType) !== -1) {
console.log("Deprecated message type: [%s]", msg.messageType);
return; return;
} }
// otherwise it's a disconnect.
if (msg.messageType !== Message.PATCH) {
console.error("disconnect");
return; }
msg.hashOf = Message.hashOf(msg); msg.hashOf = Message.hashOf(msg);
if (realtime.pending && realtime.pending.hash === msg.hashOf) {
realtime.pending.callback();
realtime.pending = null;
}
if (realtime.messages[msg.hashOf]) { if (realtime.messages[msg.hashOf]) {
debug(realtime, "Patch [" + msg.hashOf + "] is already known"); debug(realtime, "Patch [" + msg.hashOf + "] is already known");
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
@ -906,33 +894,10 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
realtime.messagesByParent[msg.lastMsgHash] || []).push(msg); realtime.messagesByParent[msg.lastMsgHash] || []).push(msg);
if (!isAncestorOf(realtime, realtime.rootMessage, msg)) { if (!isAncestorOf(realtime, realtime.rootMessage, msg)) {
if (realtime.rootMessage === realtime.best && msg.content.isCheckpoint) { // we'll probably find the missing parent later.
// We're starting with a trucated chain from a checkpoint, we will adopt this debug(realtime, "Patch [" + msg.hashOf + "] not connected to root");
// as the root message and go with it... if (Common.PARANOIA) { check(realtime); }
var userDoc = Patch.apply(realtime.uncommitted, realtime.authDoc); return;
Common.assert(!Common.PARANOIA || realtime.userInterfaceContent === userDoc);
var fixUserDocPatch = Patch.invert(realtime.uncommitted, realtime.authDoc);
Patch.addOperation(fixUserDocPatch,
Operation.create(0, realtime.authDoc.length, msg.content.operations[0].toInsert));
fixUserDocPatch =
Patch.simplify(fixUserDocPatch, userDoc, realtime.config.operationSimplify);
msg.parentCount = 0;
realtime.rootMessage = realtime.best = msg;
realtime.authDoc = msg.content.operations[0].toInsert;
realtime.uncommitted = Patch.create(Sha.hex_sha256(realtime.authDoc));
realtime.uncommittedDocLength = realtime.authDoc.length;
pushUIPatch(realtime, fixUserDocPatch);
if (Common.PARANOIA) { realtime.userInterfaceContent = realtime.authDoc; }
return;
} else {
// we'll probably find the missing parent later.
debug(realtime, "Patch [" + msg.hashOf + "] not connected to root");
if (Common.PARANOIA) { check(realtime); }
return;
}
} }
// of this message fills in a hole in the chain which makes another patch better, swap to the // of this message fills in a hole in the chain which makes another patch better, swap to the
@ -998,49 +963,14 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
return; return;
} }
if (patch.isCheckpoint) { var simplePatch =
// Ok, we have a checkpoint patch. Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify);
// If the chain length is not equal to checkpointInterval then this patch is invalid. if (!Patch.equals(simplePatch, patch)) {
var i = 0; debug(realtime, "patch [" + msg.hashOf + "] can be simplified");
var checkpointP; if (Common.PARANOIA) { check(realtime); }
for (var m = getParent(realtime, msg); m; m = getParent(realtime, m)) { if (Common.TESTING) { throw new Error(); }
if (m.content.isCheckpoint) { delete realtime.messages[msg.hashOf];
if (checkpointP) { return;
checkpointP = m;
break;
}
checkpointP = m;
}
}
if (checkpointP && checkpointP !== realtime.rootMessage) {
var point = parentCount(realtime, checkpointP);
if ((point % realtime.config.checkpointInterval) !== 0) {
debug(realtime, "checkpoint [" + msg.hashOf + "] at invalid point [" + point + "]");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
return;
}
// Time to prune some old messages from the chain
debug(realtime, "checkpoint [" + msg.hashOf + "]");
for (var m = getParent(realtime, checkpointP); m; m = getParent(realtime, m)) {
debug(realtime, "pruning [" + m.hashOf + "]");
delete realtime.messages[m.hashOf];
delete realtime.messagesByParent[m.hashOf];
}
realtime.rootMessage = checkpointP;
}
} else {
var simplePatch =
Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify);
if (!Patch.equals(simplePatch, patch)) {
debug(realtime, "patch [" + msg.hashOf + "] can be simplified");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
return;
}
} }
patch.inverseOf = Patch.invert(patch, authDocAtTimeOfPatch); patch.inverseOf = Patch.invert(patch, authDocAtTimeOfPatch);
@ -1082,8 +1012,19 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
Common.assert(newUserInterfaceContent === realtime.userInterfaceContent); Common.assert(newUserInterfaceContent === realtime.userInterfaceContent);
} }
pushUIPatch(realtime, uncommittedPatch); if (uncommittedPatch.operations.length) {
// push the uncommittedPatch out to the user interface.
for (var i = 0; i < realtime.patchHandlers.length; i++) {
realtime.patchHandlers[i](uncommittedPatch);
}
if (realtime.opHandlers.length) {
for (var i = uncommittedPatch.operations.length-1; i >= 0; i--) {
for (var j = 0; j < realtime.opHandlers.length; j++) {
realtime.opHandlers[j](uncommittedPatch.operations[i]);
}
}
}
}
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
}; };
@ -1120,26 +1061,13 @@ var getDepthOfState = function (content, minDepth, realtime) {
module.exports.create = function (conf) { module.exports.create = function (conf) {
var realtime = ChainPad.create(conf); var realtime = ChainPad.create(conf);
var out = { return {
onPatch: enterChainPad(realtime, function (handler) { onPatch: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function'); Common.assert(typeof(handler) === 'function');
realtime.patchHandlers.push(handler); realtime.patchHandlers.push(handler);
}), }),
patch: enterChainPad(realtime, function (patch, x, y) {
if (typeof(patch) === 'number') {
// Actually they meant to call realtime.change()
out.change(patch, x, y);
return;
}
doPatch(realtime, patch);
}),
onChange: enterChainPad(realtime, function (handler) { patch: enterChainPad(realtime, function (offset, count, chars) {
Common.assert(typeof(handler) === 'function');
realtime.changeHandlers.push(handler);
}),
change: enterChainPad(realtime, function (offset, count, chars) {
if (count === 0 && chars === '') { return; }
doOperation(realtime, Operation.create(offset, count, chars)); doOperation(realtime, Operation.create(offset, count, chars));
}), }),
@ -1147,32 +1075,26 @@ module.exports.create = function (conf) {
Common.assert(typeof(handler) === 'function'); Common.assert(typeof(handler) === 'function');
realtime.messageHandlers.push(handler); realtime.messageHandlers.push(handler);
}), }),
message: enterChainPad(realtime, function (message) { message: enterChainPad(realtime, function (message) {
handleMessage(realtime, message, false); handleMessage(realtime, message, false);
}), }),
start: enterChainPad(realtime, function () { start: enterChainPad(realtime, function () {
if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); } if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); }
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }); realtime.syncSchedule = schedule(realtime, function () { sync(realtime); });
}), }),
abort: enterChainPad(realtime, function () { abort: enterChainPad(realtime, function () {
realtime.aborted = true;
realtime.schedules.forEach(function (s) { clearTimeout(s) }); realtime.schedules.forEach(function (s) { clearTimeout(s) });
}), }),
sync: enterChainPad(realtime, function () {
sync: enterChainPad(realtime, function () { sync(realtime); }), sync(realtime);
}),
getAuthDoc: function () { return realtime.authDoc; }, getAuthDoc: function () { return realtime.authDoc; },
getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); }, getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); },
getDepthOfState: function (content, minDepth) { getDepthOfState: function (content, minDepth) {
return getDepthOfState(content, minDepth, realtime); return getDepthOfState(content, minDepth, realtime);
} }
}; };
return out;
}; };
}, },

View file

@ -142,21 +142,19 @@ define([
// shim between chainpad and netflux // shim between chainpad and netflux
chainpadAdapter = { chainpadAdapter = {
msgIn : function(peerId, msg) { msgIn : function(peerId, msg) {
msg = msg.replace(/^cp\|/, ''); var message = parseMessage(msg);
try { try {
var decryptedMsg = Crypto.decrypt(msg, cryptKey); var decryptedMsg = Crypto.decrypt(message, cryptKey);
messagesHistory.push(decryptedMsg); messagesHistory.push(decryptedMsg);
return decryptedMsg; return decryptedMsg;
} catch (err) { } catch (err) {
console.error(err); console.error(err);
return msg; return message;
} }
}, },
msgOut : function(msg, wc) { msgOut : function(msg, wc) {
try { try {
var cmsg = Crypto.encrypt(msg, cryptKey); return Crypto.encrypt(msg, cryptKey);
if (msg.indexOf('[4') === 0) { cmsg = 'cp|' + cmsg; }
return cmsg;
} catch (err) { } catch (err) {
console.log(msg); console.log(msg);
throw err; throw err;