[dotcom] TLSyncRoom
tidy (#2712)
I went through the sync room code today while thinking about syncing. As I worked through it, I added some comments etc for my own readability. There is no change in how the code works. There are a few functions declared in the body of `handlePushRequest`. We might want to re-arrange things so that we're not re-declaring those on each push (there are a LOT of pushes, basically every mouse move from every client). I did try that but reverted that change; it feels like a proper solution would be bigger than what I wanted to do here. ### Change Type - [x] `internal` — Any other changes that don't affect the published package[^2]
This commit is contained in:
parent
47f428eb6f
commit
b379a7a47c
1 changed files with 135 additions and 81 deletions
|
@ -592,8 +592,6 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
exhaustiveSwitchError(message)
|
exhaustiveSwitchError(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return this
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** If the client is out of date or we are out of date, we need to let them know */
|
/** If the client is out of date or we are out of date, we need to let them know */
|
||||||
|
@ -740,12 +738,12 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
session: RoomSession<R>,
|
session: RoomSession<R>,
|
||||||
message: Extract<TLSocketClientSentEvent<R>, { type: 'push' }>
|
message: Extract<TLSocketClientSentEvent<R>, { type: 'push' }>
|
||||||
) {
|
) {
|
||||||
const isPresencePush = 'presence' in message
|
// We must be connected to handle push requests
|
||||||
const clientClock = message.clientClock
|
|
||||||
|
|
||||||
if (session.state !== RoomSessionState.CONNECTED) {
|
if (session.state !== RoomSessionState.CONNECTED) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the last interaction time
|
||||||
session.lastInteractionTime = Date.now()
|
session.lastInteractionTime = Date.now()
|
||||||
|
|
||||||
// increment the clock for this push
|
// increment the clock for this push
|
||||||
|
@ -755,6 +753,7 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
// collect actual ops that resulted from the push
|
// collect actual ops that resulted from the push
|
||||||
// these will be broadcast to other users
|
// these will be broadcast to other users
|
||||||
let mergedChanges: NetworkDiff<R> | null = null
|
let mergedChanges: NetworkDiff<R> | null = null
|
||||||
|
|
||||||
const propagateOp = (id: string, op: RecordOp<R>) => {
|
const propagateOp = (id: string, op: RecordOp<R>) => {
|
||||||
if (!mergedChanges) mergedChanges = {}
|
if (!mergedChanges) mergedChanges = {}
|
||||||
mergedChanges[id] = op
|
mergedChanges[id] = op
|
||||||
|
@ -778,18 +777,24 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
: TLIncompatibilityReason.ClientTooOld
|
: TLIncompatibilityReason.ClientTooOld
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
const state = res.value
|
const { value: state } = res
|
||||||
|
|
||||||
|
// Get the existing document, if any
|
||||||
const doc = this.getDocument(id)
|
const doc = this.getDocument(id)
|
||||||
|
|
||||||
if (doc) {
|
if (doc) {
|
||||||
// if we already have a document with this id, set it to the new value
|
// If there's an existing document, replace it with the new state
|
||||||
// but propagate a diff rather than the entire value
|
// but propagate a diff rather than the entire value
|
||||||
const diff = doc.replaceState(state, this.clock)
|
const diff = doc.replaceState(state, this.clock)
|
||||||
if (!diff.ok) {
|
if (!diff.ok) {
|
||||||
return fail(TLIncompatibilityReason.InvalidRecord)
|
return fail(TLIncompatibilityReason.InvalidRecord)
|
||||||
}
|
}
|
||||||
if (diff.value) propagateOp(id, [RecordOpType.Patch, diff.value])
|
if (diff.value) {
|
||||||
|
propagateOp(id, [RecordOpType.Patch, diff.value])
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// if we don't already have a document with this id, create it and propagate the put op
|
// Otherwise, if we don't already have a document with this id
|
||||||
|
// create the document and propagate the put op
|
||||||
const result = this.addDocument(id, state, this.clock)
|
const result = this.addDocument(id, state, this.clock)
|
||||||
if (!result.ok) {
|
if (!result.ok) {
|
||||||
return fail(TLIncompatibilityReason.InvalidRecord)
|
return fail(TLIncompatibilityReason.InvalidRecord)
|
||||||
|
@ -804,62 +809,91 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
// if it was already deleted, there's no need to apply the patch
|
// if it was already deleted, there's no need to apply the patch
|
||||||
const doc = this.getDocument(id)
|
const doc = this.getDocument(id)
|
||||||
if (!doc) return Result.ok(undefined)
|
if (!doc) return Result.ok(undefined)
|
||||||
|
|
||||||
|
// Compare versions of the record
|
||||||
const theirVersion = getRecordVersion(doc.state, session.serializedSchema)
|
const theirVersion = getRecordVersion(doc.state, session.serializedSchema)
|
||||||
const ourVersion = getRecordVersion(doc.state, this.serializedSchema)
|
const ourVersion = getRecordVersion(doc.state, this.serializedSchema)
|
||||||
if (compareRecordVersions(ourVersion, theirVersion) === 1) {
|
const comparison = compareRecordVersions(ourVersion, theirVersion)
|
||||||
// if the client's version of the record is older than ours, we apply the patch to the downgraded version of the record
|
|
||||||
const downgraded = this.schema.migratePersistedRecord(
|
switch (comparison) {
|
||||||
doc.state,
|
case 0: {
|
||||||
session.serializedSchema,
|
// If the versions are compatible, apply the patch and propagate the patch op
|
||||||
'down'
|
const diff = doc.mergeDiff(patch, this.clock)
|
||||||
)
|
if (!diff.ok) {
|
||||||
if (downgraded.type === 'error') {
|
return fail(TLIncompatibilityReason.InvalidRecord)
|
||||||
return fail(TLIncompatibilityReason.ClientTooOld)
|
}
|
||||||
|
if (diff.value) {
|
||||||
|
propagateOp(id, [RecordOpType.Patch, diff.value])
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
const patched = applyObjectDiff(downgraded.value, patch)
|
case -1: {
|
||||||
// then upgrade the patched version and use that as the new state
|
// If the client's version of the record is newer than ours, we can't apply the patch
|
||||||
const upgraded = this.schema.migratePersistedRecord(
|
return fail(TLIncompatibilityReason.ServerTooOld)
|
||||||
patched,
|
|
||||||
session.serializedSchema,
|
|
||||||
'up'
|
|
||||||
)
|
|
||||||
if (upgraded.type === 'error') {
|
|
||||||
return fail(TLIncompatibilityReason.ClientTooOld)
|
|
||||||
}
|
}
|
||||||
const diff = doc.replaceState(upgraded.value, this.clock)
|
case 1: {
|
||||||
if (!diff.ok) {
|
// If the client's version of the record is older than ours,
|
||||||
return fail(TLIncompatibilityReason.InvalidRecord)
|
// we apply the patch to the downgraded version of the record
|
||||||
|
const downgraded = this.schema.migratePersistedRecord(
|
||||||
|
doc.state,
|
||||||
|
session.serializedSchema,
|
||||||
|
'down'
|
||||||
|
)
|
||||||
|
if (downgraded.type === 'error') {
|
||||||
|
return fail(TLIncompatibilityReason.ClientTooOld)
|
||||||
|
}
|
||||||
|
|
||||||
|
// apply the patch to the downgraded version
|
||||||
|
const patched = applyObjectDiff(downgraded.value, patch)
|
||||||
|
// then upgrade the patched version and use that as the new state
|
||||||
|
const upgraded = this.schema.migratePersistedRecord(
|
||||||
|
patched,
|
||||||
|
session.serializedSchema,
|
||||||
|
'up'
|
||||||
|
)
|
||||||
|
// If the client's version is too old, we'll hit an error
|
||||||
|
if (upgraded.type === 'error') {
|
||||||
|
return fail(TLIncompatibilityReason.ClientTooOld)
|
||||||
|
}
|
||||||
|
// replace the state with the upgraded version and propagate the patch op
|
||||||
|
const diff = doc.replaceState(upgraded.value, this.clock)
|
||||||
|
if (!diff.ok) {
|
||||||
|
return fail(TLIncompatibilityReason.InvalidRecord)
|
||||||
|
}
|
||||||
|
if (diff.value) {
|
||||||
|
propagateOp(id, [RecordOpType.Patch, diff.value])
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if (diff.value) propagateOp(id, [RecordOpType.Patch, diff.value])
|
|
||||||
} else if (compareRecordVersions(ourVersion, theirVersion) === -1) {
|
|
||||||
// if the client's version of the record is newer than ours, we can't apply the patch
|
|
||||||
return fail(TLIncompatibilityReason.ServerTooOld)
|
|
||||||
} else {
|
|
||||||
// otherwise apply the patch and propagate the patch op if needed
|
|
||||||
const diff = doc.mergeDiff(patch, this.clock)
|
|
||||||
if (!diff.ok) {
|
|
||||||
return fail(TLIncompatibilityReason.InvalidRecord)
|
|
||||||
}
|
|
||||||
if (diff.value) propagateOp(id, [RecordOpType.Patch, diff.value])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result.ok(undefined)
|
return Result.ok(undefined)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isPresencePush) {
|
const { clientClock } = message
|
||||||
|
|
||||||
|
if ('presence' in message) {
|
||||||
|
// The push request was for the presence scope.
|
||||||
const id = session.presenceId
|
const id = session.presenceId
|
||||||
const [type, val] = message.presence
|
const [type, val] = message.presence
|
||||||
if (type === RecordOpType.Put) {
|
const { typeName } = this.presenceType
|
||||||
if (!addDocument(id, { ...val, id, typeName: this.presenceType.typeName }).ok) return
|
switch (type) {
|
||||||
} else {
|
case RecordOpType.Put: {
|
||||||
if (
|
// Try to put the document. If it fails, stop here.
|
||||||
!patchDocument(id, {
|
const res = addDocument(id, { ...val, id, typeName })
|
||||||
|
if (!res.ok) return
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case RecordOpType.Patch: {
|
||||||
|
// Try to patch the document. If it fails, stop here.
|
||||||
|
const res = patchDocument(id, {
|
||||||
...val,
|
...val,
|
||||||
id: [ValueOpType.Put, id],
|
id: [ValueOpType.Put, id],
|
||||||
typeName: [ValueOpType.Put, this.presenceType.typeName],
|
typeName: [ValueOpType.Put, typeName],
|
||||||
}).ok
|
})
|
||||||
)
|
if (!res.ok) return
|
||||||
return
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
this.sendMessage(session.sessionKey, {
|
this.sendMessage(session.sessionKey, {
|
||||||
type: 'push_result',
|
type: 'push_result',
|
||||||
|
@ -868,44 +902,61 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
serverClock: this.clock,
|
serverClock: this.clock,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
const diff = message.diff
|
// The push request was for the document scope.
|
||||||
for (const [id, op] of Object.entries(diff)) {
|
for (const [id, op] of Object.entries(message.diff)) {
|
||||||
if (op[0] === RecordOpType.Put) {
|
switch (op[0]) {
|
||||||
// if it's not a document record, fail
|
case RecordOpType.Put: {
|
||||||
if (!this.documentTypes.has(op[1].typeName)) {
|
// Try to add the document.
|
||||||
return fail(TLIncompatibilityReason.InvalidRecord)
|
// If we're putting a record with a type that we don't recognize, fail
|
||||||
|
if (!this.documentTypes.has(op[1].typeName)) {
|
||||||
|
return fail(TLIncompatibilityReason.InvalidRecord)
|
||||||
|
}
|
||||||
|
const res = addDocument(id, op[1])
|
||||||
|
if (!res.ok) return
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if (!addDocument(id, op[1]).ok) return
|
case RecordOpType.Patch: {
|
||||||
} else if (op[0] === RecordOpType.Remove) {
|
// Try to patch the document. If it fails, stop here.
|
||||||
// if it was already deleted, don't do anything, no need to propagate a delete op
|
const res = patchDocument(id, op[1])
|
||||||
const doc = this.getDocument(id)
|
if (!res.ok) return
|
||||||
if (!doc) continue
|
break
|
||||||
if (!this.documentTypes.has(doc.state.typeName)) {
|
}
|
||||||
return fail(TLIncompatibilityReason.InvalidOperation)
|
case RecordOpType.Remove: {
|
||||||
|
const doc = this.getDocument(id)
|
||||||
|
if (!doc) {
|
||||||
|
// If the doc was already deleted, don't do anything, no need to propagate a delete op
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the doc is not a type that we recognize, fail
|
||||||
|
if (!this.documentTypes.has(doc.state.typeName)) {
|
||||||
|
return fail(TLIncompatibilityReason.InvalidOperation)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the document and propagate the delete op
|
||||||
|
this.removeDocument(id, this.clock)
|
||||||
|
// Schedule a pruneTombstones call to happen on the next call stack
|
||||||
|
setTimeout(this.pruneTombstones, 0)
|
||||||
|
propagateOp(id, op)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
// otherwise delete the document and propagate the delete op
|
|
||||||
this.removeDocument(id, this.clock)
|
|
||||||
// schedule a pruneTombstones call to happen after we are done here
|
|
||||||
setTimeout(this.pruneTombstones, 0)
|
|
||||||
propagateOp(id, op)
|
|
||||||
} else if (op[0] === RecordOpType.Patch) {
|
|
||||||
if (!patchDocument(id, op[1]).ok) return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Let the client know what action to take based on the results of the push
|
||||||
if (!mergedChanges) {
|
if (!mergedChanges) {
|
||||||
// we applied the client's changes but they had no effect
|
// DISCARD
|
||||||
// tell them to drop the diff
|
// Applying the client's changes had no effect, so the client should drop the diff
|
||||||
this.sendMessage(session.sessionKey, {
|
this.sendMessage(session.sessionKey, {
|
||||||
type: 'push_result',
|
type: 'push_result',
|
||||||
serverClock: this.clock,
|
serverClock: this.clock,
|
||||||
clientClock,
|
clientClock,
|
||||||
action: 'discard',
|
action: 'discard',
|
||||||
})
|
})
|
||||||
} else if (isEqual(mergedChanges, diff)) {
|
} else if (isEqual(mergedChanges, message.diff)) {
|
||||||
// we applied the client's changes and they had the exact same effect
|
// COMMIT
|
||||||
// on the server as they did on the client
|
// Applying the client's changes had the exact same effect on the server as
|
||||||
// tell them to keep the diff
|
// they had on the client, so the client should keep the diff
|
||||||
this.sendMessage(session.sessionKey, {
|
this.sendMessage(session.sessionKey, {
|
||||||
type: 'push_result',
|
type: 'push_result',
|
||||||
serverClock: this.clock,
|
serverClock: this.clock,
|
||||||
|
@ -913,8 +964,10 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
action: 'commit',
|
action: 'commit',
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
// We applied the client's changes and they had a different non-empty effect
|
// REBASE
|
||||||
// on the server, so we need to tell the client to rebase with our gold standard diff
|
// Applying the client's changes had a different non-empty effect on the server,
|
||||||
|
// so the client should rebase with our gold-standard / authoritative diff.
|
||||||
|
// First we need to migrate the diff to the client's version
|
||||||
const migrateResult = this.migrateDiffForSession(session.serializedSchema, mergedChanges)
|
const migrateResult = this.migrateDiffForSession(session.serializedSchema, mergedChanges)
|
||||||
if (!migrateResult.ok) {
|
if (!migrateResult.ok) {
|
||||||
return fail(
|
return fail(
|
||||||
|
@ -923,6 +976,7 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
: TLIncompatibilityReason.ClientTooOld
|
: TLIncompatibilityReason.ClientTooOld
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
// If the migration worked, send the rebased diff to the client
|
||||||
this.sendMessage(session.sessionKey, {
|
this.sendMessage(session.sessionKey, {
|
||||||
type: 'push_result',
|
type: 'push_result',
|
||||||
serverClock: this.clock,
|
serverClock: this.clock,
|
||||||
|
@ -932,8 +986,8 @@ export class TLSyncRoom<R extends UnknownRecord> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mergedChanges) {
|
// If there are merged changes, broadcast them to all other clients
|
||||||
// let all other client know about the changes
|
if (mergedChanges !== null) {
|
||||||
this.broadcastPatch({
|
this.broadcastPatch({
|
||||||
sourceSessionKey: session.sessionKey,
|
sourceSessionKey: session.sessionKey,
|
||||||
diff: mergedChanges,
|
diff: mergedChanges,
|
||||||
|
|
Loading…
Reference in a new issue