squish sync data events before sending them out (#3118)

Recently (https://github.com/tldraw/tldraw/pull/3012), we started
aggregating data messages before sending them out. However, local
testing shows that we generate *many* redundant messages (see the test
file for an example of a real buffer captured during local testing with
just two users). This PR adds a function to squish those updates
together, reducing the amount of data we need to transfer and load on
the client that won't need to process those redundant messages.

The function is checked with [fast-check](https://fast-check.dev/), a JS
property test framework, to make sure that squished deltas result in
exactly the same state as the original ones.

### Change Type

- [x] `minor` — New feature

### Test Plan

1. Needs a group smoke test

- [x] End to end tests
This commit is contained in:
Dan Groshev 2024-04-02 09:57:58 +01:00 committed by GitHub
parent 8db84b33b2
commit b42a222c88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 825 additions and 32 deletions

View file

@ -37,6 +37,7 @@
"lint": "yarn run -T tsx ../../scripts/lint.ts"
},
"devDependencies": {
"fast-check": "^3.16.0",
"tldraw": "workspace:*",
"typescript": "^5.3.3",
"uuid-by-string": "^4.0.0",

View file

@ -49,6 +49,43 @@ export type TLPersistentClientSocket<R extends UnknownRecord = UnknownRecord> =
const PING_INTERVAL = 5000
const MAX_TIME_TO_WAIT_FOR_SERVER_INTERACTION_BEFORE_RESETTING_CONNECTION = PING_INTERVAL * 2
export function _applyNetworkDiffToStore<R extends UnknownRecord, S extends Store<R> = Store<R>>(
diff: NetworkDiff<R>,
store: S
): RecordsDiff<R> | null {
const changes: RecordsDiff<R> = { added: {} as any, updated: {} as any, removed: {} as any }
type k = keyof typeof changes.updated
let hasChanges = false
for (const [id, op] of objectMapEntries(diff)) {
if (op[0] === RecordOpType.Put) {
const existing = store.get(id as RecordId<any>)
if (existing && !isEqual(existing, op[1])) {
hasChanges = true
changes.updated[id as k] = [existing, op[1]]
} else {
hasChanges = true
changes.added[id as k] = op[1]
}
} else if (op[0] === RecordOpType.Patch) {
const record = store.get(id as RecordId<any>)
if (!record) {
// the record was removed upstream
continue
}
const patched = applyObjectDiff(record, op[1])
hasChanges = true
changes.updated[id as k] = [record, patched]
} else if (op[0] === RecordOpType.Remove) {
if (store.has(id as RecordId<any>)) {
hasChanges = true
changes.removed[id as k] = store.get(id as RecordId<any>)
}
}
}
return hasChanges ? changes : null
}
// Should connect support chunking the response to allow for large payloads?
/**
@ -495,36 +532,8 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
*/
private applyNetworkDiff(diff: NetworkDiff<R>, runCallbacks: boolean) {
this.debug('applyNetworkDiff', diff)
const changes: RecordsDiff<R> = { added: {} as any, updated: {} as any, removed: {} as any }
type k = keyof typeof changes.updated
let hasChanges = false
for (const [id, op] of objectMapEntries(diff)) {
if (op[0] === RecordOpType.Put) {
const existing = this.store.get(id as RecordId<any>)
if (existing && !isEqual(existing, op[1])) {
hasChanges = true
changes.updated[id as k] = [existing, op[1]]
} else {
hasChanges = true
changes.added[id as k] = op[1]
}
} else if (op[0] === RecordOpType.Patch) {
const record = this.store.get(id as RecordId<any>)
if (!record) {
// the record was removed upstream
continue
}
const patched = applyObjectDiff(record, op[1])
hasChanges = true
changes.updated[id as k] = [record, patched]
} else if (op[0] === RecordOpType.Remove) {
if (this.store.has(id as RecordId<any>)) {
hasChanges = true
changes.removed[id as k] = this.store.get(id as RecordId<any>)
}
}
}
if (hasChanges) {
const changes = _applyNetworkDiffToStore(diff, this.store)
if (changes !== null) {
this.store.applyDiff(changes, runCallbacks)
}
}

View file

@ -48,6 +48,7 @@ import {
TLSocketServerSentDataEvent,
TLSocketServerSentEvent,
} from './protocol'
import { squishDataEvents } from './squish'
/** @public */
export type TLRoomSocket<R extends UnknownRecord> = {
@ -456,7 +457,10 @@ export class TLSyncRoom<R extends UnknownRecord> {
session.socket.sendMessage(message)
}
} else {
session.socket.sendMessage({ type: 'data', data: session.outstandingDataMessages })
session.socket.sendMessage({
type: 'data',
data: squishDataEvents(session.outstandingDataMessages),
})
}
session.outstandingDataMessages.length = 0
}

View file

@ -236,7 +236,11 @@ export function applyObjectDiff<T extends object>(object: T, objectDiff: ObjectD
break
}
case ValueOpType.Patch: {
if (object[key as keyof T] && typeof object[key as keyof T] === 'object') {
if (
object[key as keyof T] &&
typeof object[key as keyof T] === 'object' &&
!Array.isArray(object[key as keyof T])
) {
const diff = op[1]
const patched = applyObjectDiff(object[key as keyof T] as object, diff)
if (patched !== object[key as keyof T]) {

View file

@ -0,0 +1,191 @@
import { UnknownRecord } from '@tldraw/store'
import { exhaustiveSwitchError, objectMapEntries, structuredClone } from '@tldraw/utils'
import {
NetworkDiff,
ObjectDiff,
RecordOp,
RecordOpType,
ValueOpType,
applyObjectDiff,
} from './diff'
import { TLSocketServerSentDataEvent } from './protocol'
interface State<R extends UnknownRecord> {
lastPatch: (TLSocketServerSentDataEvent<R> & { type: 'patch' }) | null
squished: TLSocketServerSentDataEvent<R>[]
}
type Bailed = boolean
function patchThePatch(lastPatch: ObjectDiff, newPatch: ObjectDiff): Bailed {
for (const [newKey, newOp] of Object.entries(newPatch)) {
switch (newOp[0]) {
case ValueOpType.Put:
lastPatch[newKey] = newOp
break
case ValueOpType.Append:
if (lastPatch[newKey] === undefined) {
lastPatch[newKey] = newOp
} else {
const lastOp = lastPatch[newKey]
switch (lastOp[0]) {
case ValueOpType.Put: {
const lastValues = lastOp[1]
if (Array.isArray(lastValues)) {
const newValues = newOp[1]
lastValues.push(...newValues)
} else {
// we're trying to append to something that was put previously, but
// is not an array; bail out
return true
}
break
}
case ValueOpType.Append: {
const lastValues = lastOp[1]
const lastOffset = lastOp[2]
const newValues = newOp[1]
const newOffset = newOp[2]
if (newOffset === lastOffset + lastValues.length) {
lastValues.push(...newValues)
} else {
// something weird is going on, bail out
return true
}
break
}
default:
// trying to append to either a deletion or a patch, bail out
return true
}
}
break
case ValueOpType.Patch:
if (lastPatch[newKey] === undefined) {
lastPatch[newKey] = newOp
} else {
// bail out, recursive patching is too hard
return true
}
break
case ValueOpType.Delete:
// overwrite whatever was there previously, no point if it's going to be removed
// todo: check if it was freshly put and don't add if it wasn't?
lastPatch[newKey] = newOp
break
default:
exhaustiveSwitchError(newOp[0])
}
}
return false
}
function patchTheOp<R extends UnknownRecord>(
lastRecordOp: RecordOp<R>,
newPatch: ObjectDiff
): Bailed {
switch (lastRecordOp[0]) {
case RecordOpType.Put:
// patching a freshly added value is easy, just patch as normal
lastRecordOp[1] = applyObjectDiff(lastRecordOp[1], newPatch)
break
case RecordOpType.Patch: {
// both are patches, merge them
const bailed = patchThePatch(lastRecordOp[1], newPatch)
if (bailed) {
return true
}
break
}
case RecordOpType.Remove:
// we're trying to patch an object that was removed, just disregard the update
break
default:
exhaustiveSwitchError(lastRecordOp[0])
}
return false
}
function squishInto<R extends UnknownRecord>(
lastDiff: NetworkDiff<R>,
newDiff: NetworkDiff<R>
): Bailed {
for (const [newId, newOp] of objectMapEntries(newDiff)) {
switch (newOp[0]) {
case RecordOpType.Put:
// we Put the same record several times, just overwrite whatever came previously
lastDiff[newId] = newOp
break
case RecordOpType.Patch:
if (lastDiff[newId] === undefined) {
// this is the patch now
lastDiff[newId] = newOp
} else {
// patch the previous RecordOp!
const bailed = patchTheOp(lastDiff[newId], newOp[1])
if (bailed) {
return true
}
}
break
case RecordOpType.Remove:
// overwrite whatever was there previously
// todo: check if it was freshly put and don't add if it wasn't?
lastDiff[newId] = newOp
break
default:
exhaustiveSwitchError(newOp[0])
}
}
return false
}
export function squishDataEvents<R extends UnknownRecord>(
dataEvents: TLSocketServerSentDataEvent<R>[]
): TLSocketServerSentDataEvent<R>[] {
if (dataEvents.length < 2) {
// most common case
return dataEvents
}
const state: State<R> = { lastPatch: null, squished: [] }
for (const e of dataEvents) {
switch (e.type) {
case 'push_result':
if (state.lastPatch !== null) {
state.squished.push(state.lastPatch)
state.lastPatch = null
}
state.squished.push(e)
break
case 'patch':
if (state.lastPatch !== null) {
// this structuredClone is necessary to avoid modifying the original list of events
// (otherwise objects can get reused on put and then modified on patch)
const bailed = squishInto(state.lastPatch.diff, structuredClone(e.diff))
if (bailed) {
// this is unfortunate, but some patches were too hard to patch, give up
// and return the original list
return dataEvents
}
state.lastPatch.serverClock = e.serverClock
} else {
state.lastPatch = structuredClone(e)
}
break
default:
exhaustiveSwitchError(e, 'type')
}
}
if (state.lastPatch !== null) {
state.squished.push(state.lastPatch)
}
return state.squished
}

View file

@ -0,0 +1,574 @@
import { createRecordType, IdOf, RecordId, Store, StoreSchema, UnknownRecord } from '@tldraw/store'
import { assert, structuredClone } from '@tldraw/utils'
import fc, { Arbitrary } from 'fast-check'
import { NetworkDiff, ObjectDiff, RecordOpType, ValueOpType } from '../lib/diff'
import { TLSocketServerSentDataEvent } from '../lib/protocol'
import { squishDataEvents } from '../lib/squish'
import { _applyNetworkDiffToStore } from '../lib/TLSyncClient'
test('basic squishing', () => {
const capture = [
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 929.58203125,
h: 500.14453125,
},
],
},
],
},
serverClock: 9237,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679590],
cursor: [
'put',
{
x: 1526.07421875,
y: 565.66796875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9238,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 916.046875,
h: 494.20703125,
},
],
},
],
},
serverClock: 9239,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679599],
cursor: [
'put',
{
x: 1519.26171875,
y: 563.71875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9240,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 909.234375,
h: 492.2578125,
},
],
},
],
},
serverClock: 9241,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679608],
cursor: [
'put',
{
x: 1512.41015625,
y: 562.23046875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9242,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 902.3828125,
h: 490.76953125,
},
],
},
],
},
serverClock: 9243,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679617],
cursor: [
'put',
{
x: 1506.71484375,
y: 561.29296875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9244,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 896.6875,
h: 489.83203125,
},
],
},
],
},
serverClock: 9245,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679625],
cursor: [
'put',
{
x: 1501.734375,
y: 560.88671875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9246,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 891.70703125,
h: 489.42578125,
},
],
},
],
},
serverClock: 9247,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679633],
cursor: [
'put',
{
x: 1497.22265625,
y: 560.6875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9248,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 887.1953125,
h: 489.2265625,
},
],
},
],
},
serverClock: 9249,
},
] as const satisfies TLSocketServerSentDataEvent<UnknownRecord>[]
const squished = squishDataEvents(capture)
const manuallySquished = [
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
'patch',
{
lastActivityTimestamp: ['put', 1710188679633],
cursor: [
'put',
{
x: 1497.22265625,
y: 560.6875,
rotation: 0,
type: 'default',
},
],
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 887.1953125,
h: 489.2265625,
},
],
},
],
},
serverClock: 9249,
},
]
// see https://github.com/jestjs/jest/issues/14011 for why the second clone is needed
expect(squished).toStrictEqual(structuredClone(manuallySquished))
})
const TEST_RECORD_TYPENAME = 'testRecord' as const
interface TestRecord extends UnknownRecord {
fieldA?: TestRecordValue
fieldB?: TestRecordValue
fieldC?: TestRecordValue
}
type TestRecordValue =
| string
| number[]
| { fieldA?: TestRecordValue; fieldB?: TestRecordValue; fieldC?: TestRecordValue }
const TestRecord = createRecordType<TestRecord>(TEST_RECORD_TYPENAME, {
validator: {
validate(value) {
return value as TestRecord
},
},
scope: 'document',
})
class Model {
diffs: NetworkDiff<TestRecord>[] = []
idMap: IdOf<TestRecord>[]
private readonly initialStoreData: Record<IdOf<TestRecord>, TestRecord>
constructor(public initialStoreContent: TestRecord[]) {
this.idMap = initialStoreContent.map((r) => r.id)
this.initialStoreData = Object.fromEntries(initialStoreContent.map((r) => [r.id, r]))
}
trueIdx(idx: number) {
return idx % this.idMap.length
}
getId(idx: number) {
return this.idMap[this.trueIdx(idx)]
}
private getFreshStore(): Store<TestRecord> {
return new Store({
initialData: this.initialStoreData,
schema: StoreSchema.create<TestRecord>({ testRecord: TestRecord }),
props: {},
})
}
private getStoreWithDiffs(diffs: NetworkDiff<TestRecord>[]) {
const store = this.getFreshStore()
for (const diff of diffs) {
const changes = _applyNetworkDiffToStore(diff, store)
if (changes !== null) {
store.applyDiff(changes, false)
}
}
return store
}
runTest() {
const dataEvents = this.diffs.map((diff, idx) => ({
type: 'patch' as const,
diff,
serverClock: idx,
}))
const squishedDiffs = squishDataEvents(dataEvents).map((e) => {
assert(e.type === 'patch')
return e.diff
})
const baseStore = this.getStoreWithDiffs(this.diffs)
const squishedStore = this.getStoreWithDiffs(squishedDiffs)
// see https://github.com/jestjs/jest/issues/14011 for the explanation for that structuredClone
expect(squishedStore.serialize()).toEqual(structuredClone(baseStore.serialize()))
}
// offsets are a MAJOR pain because they depend on the entire history of diffs so far, and
// the store silently discards append patches if their offsets don't match, so they need
// to be correct to exercise the squisher
// NOTE: modifies the diff
fixOffsets(recordId: IdOf<TestRecord>, fullDiff: ObjectDiff) {
const fixed = structuredClone(fullDiff)
const store = this.getStoreWithDiffs(this.diffs)
const record = store.get(recordId)
if (record === undefined) {
return fixed
}
const fixer = (obj: any, diff: ObjectDiff) => {
for (const [k, v] of Object.entries(diff)) {
if (v[0] === ValueOpType.Append && Array.isArray(obj[k])) {
v[2] = obj[k].length
} else if (v[0] === ValueOpType.Patch && typeof obj[k] === 'object') {
fixer(obj[k], v[1])
}
}
}
fixer(record, fixed)
return fixed
}
}
type Real = 'whatever'
class RecordPut implements fc.Command<Model, Real> {
constructor(readonly record: TestRecord) {}
check(_m: Readonly<Model>) {
return true
}
run(m: Model): void {
m.diffs.push({ [this.record.id]: [RecordOpType.Put, this.record] })
m.idMap.push(this.record.id)
m.runTest()
}
toString = () => `Put(${JSON.stringify(this.record)})`
}
class RecordRemove implements fc.Command<Model, Real> {
constructor(readonly idx: number) {}
check(m: Readonly<Model>) {
return m.idMap.length > 0
}
run(m: Model) {
m.diffs.push({ [m.getId(this.idx)]: [RecordOpType.Remove] })
m.idMap.splice(m.trueIdx(this.idx), 1)
m.runTest()
}
toString = () => `Remove(#${this.idx})`
}
class RecordPatch implements fc.Command<Model, Real> {
constructor(
readonly idx: number,
readonly patch: ObjectDiff
) {}
check(m: Readonly<Model>) {
return m.idMap.length > 0
}
run(m: Model) {
const fixedPatch = m.fixOffsets(m.getId(this.idx), this.patch)
m.diffs.push({ [m.getId(this.idx)]: [RecordOpType.Patch, fixedPatch] })
m.runTest()
}
toString = () => `Patch(#${this.idx}, ${JSON.stringify(this.patch)})`
}
const { TestRecordValueArb }: { TestRecordValueArb: Arbitrary<TestRecordValue> } = fc.letrec(
(tie) => ({
TestRecordValueArb: fc.oneof(
fc.string(),
fc.array(fc.integer()),
fc.record(
{
fieldA: tie('TestRecordValueArb'),
fieldB: tie('TestRecordValueArb'),
fieldC: tie('TestRecordValueArb'),
},
{ requiredKeys: ['fieldA'] }
)
),
})
)
const TestRecordKeyArb = fc.oneof(
fc.constant('fieldA' as const),
fc.constant('fieldB' as const),
fc.constant('fieldC' as const)
)
const TestRecordArb = fc.record(
{
id: fc.oneof(fc.constant('idA'), fc.constant('idB'), fc.constant('idC')) as Arbitrary<
RecordId<TestRecord>
>,
typeName: fc.constant(TEST_RECORD_TYPENAME),
fieldA: TestRecordValueArb,
fieldB: TestRecordValueArb,
fieldC: TestRecordValueArb,
},
{ requiredKeys: ['id', 'typeName'] }
)
const { ObjectDiffArb }: { ObjectDiffArb: Arbitrary<ObjectDiff> } = fc.letrec((tie) => ({
ObjectDiffArb: fc.dictionary(
TestRecordKeyArb,
fc.oneof(
fc.tuple(fc.constant(ValueOpType.Put), TestRecordValueArb),
// The offset is -1 because it depends on the length of the array *in the current state*,
// so it can't be generated here. Instead, it's patched up in the command
fc.tuple(fc.constant(ValueOpType.Append), fc.array(fc.integer()), fc.constant(-1)),
fc.tuple(fc.constant(ValueOpType.Patch), tie('ObjectDiffArb')),
fc.tuple(fc.constant(ValueOpType.Delete))
),
{ minKeys: 1, maxKeys: 3 }
),
}))
const allCommands = [
TestRecordArb.map((r) => new RecordPut(r)),
fc.nat(10).map((idx) => new RecordRemove(idx)),
fc.tuple(fc.nat(), ObjectDiffArb).map(([idx, diff]) => new RecordPatch(idx, diff)),
]
const initialStoreContentArb: Arbitrary<TestRecord[]> = fc.uniqueArray(TestRecordArb, {
selector: (r) => r.id,
maxLength: 3,
})
test('fast-checking squish', () => {
// If you see this test failing, to reproduce you need both seed and path in fc.assert,
// and replayPath in fc.commands. See the next test for an examples
fc.assert(
fc.property(
initialStoreContentArb,
fc.commands(allCommands, {}),
(initialStoreContent, cmds) => {
fc.modelRun(
() => ({
model: new Model(initialStoreContent),
real: 'whatever',
}),
cmds
)
}
),
{
verbose: 1,
numRuns: 1_000,
}
)
})
test('problem: applying a patch to an array', () => {
fc.assert(
fc.property(
initialStoreContentArb,
fc.commands(allCommands, {
replayPath: 'CDJ:F',
}),
(initialStoreContent, cmds) => {
fc.modelRun(
() => ({
model: new Model(initialStoreContent),
real: 'whatever',
}),
cmds
)
}
),
{ seed: -1883357795, path: '7653:1:2:2:4:3:3:3:3', endOnFailure: true }
)
})

View file

@ -7662,6 +7662,7 @@ __metadata:
"@tldraw/store": "workspace:*"
"@tldraw/tlschema": "workspace:*"
"@tldraw/utils": "workspace:*"
fast-check: "npm:^3.16.0"
lodash.isequal: "npm:^4.5.0"
nanoevents: "npm:^7.0.1"
nanoid: "npm:4.0.2"
@ -13814,6 +13815,15 @@ __metadata:
languageName: node
linkType: hard
"fast-check@npm:^3.16.0":
version: 3.16.0
resolution: "fast-check@npm:3.16.0"
dependencies:
pure-rand: "npm:^6.0.0"
checksum: 4a14945b885ef2d75c3252a067a4cfa2440a2c0da18341d514be3803fafb616b0ec68806071f29e1267a85c7a9e4a5e192ae5e592727d8d2e66389f946be472c
languageName: node
linkType: hard
"fast-deep-equal@npm:^3.1.1, fast-deep-equal@npm:^3.1.3":
version: 3.1.3
resolution: "fast-deep-equal@npm:3.1.3"