Debounce/aggregate tlsync messages (#3012)

There is very little point sending data messages more often than 60
times a second, so we buffer them before sending.

### Change Type

- [x] `internal` — Any other changes that don't affect the published
package

### Test Plan

1. Smoke test (on a retro?)

- [x] End to end tests

---------

Co-authored-by: David Sheldrick <d.j.sheldrick@gmail.com>
This commit is contained in:
Dan Groshev 2024-03-11 13:33:47 +00:00 committed by GitHub
parent b5aff00c89
commit e527d7d0d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 170 additions and 91 deletions

View file

@ -1,6 +1,5 @@
import {
chunk,
serializeMessage,
TLPersistentClientSocket,
TLPersistentClientSocketStatus,
TLSocketClientSentEvent,
@ -176,7 +175,7 @@ export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord
if (!this._ws) return
if (this.connectionStatus === 'online') {
const chunks = chunk(serializeMessage(msg))
const chunks = chunk(JSON.stringify(msg))
for (const part of chunks) {
this._ws.send(part)
}

View file

@ -31,5 +31,4 @@ export {
type TLSocketServerSentEvent,
} from './lib/protocol'
export { schema } from './lib/schema'
export { serializeMessage } from './lib/serializeMessage'
export type { PersistedRoomSnapshotForSupabase, RoomState as RoomState } from './lib/server-types'

View file

@ -1,5 +1,6 @@
import { SerializedSchema, UnknownRecord } from '@tldraw/store'
import { TLRoomSocket } from './TLSyncRoom'
import { TLSocketServerSentDataEvent } from './protocol'
export enum RoomSessionState {
AWAITING_CONNECT_MESSAGE = 'awaiting-connect-message',
@ -33,4 +34,6 @@ export type RoomSession<R extends UnknownRecord> =
socket: TLRoomSocket<R>
serializedSchema: SerializedSchema
lastInteractionTime: number
debounceTimer: ReturnType<typeof setTimeout> | null
outstandingDataMessages: TLSocketServerSentDataEvent<R>[]
}

View file

@ -2,7 +2,6 @@ import { UnknownRecord } from '@tldraw/store'
import ws from 'ws'
import { TLRoomSocket } from './TLSyncRoom'
import { TLSocketServerSentEvent } from './protocol'
import { serializeMessage } from './serializeMessage'
/** @public */
export class ServerSocketAdapter<R extends UnknownRecord> implements TLRoomSocket<R> {
@ -11,8 +10,9 @@ export class ServerSocketAdapter<R extends UnknownRecord> implements TLRoomSocke
get isOpen(): boolean {
return this.ws.readyState === 1 // ready state open
}
// see TLRoomSocket for details on why this accepts a union and not just arrays
sendMessage(msg: TLSocketServerSentEvent<R>) {
this.ws.send(serializeMessage(msg))
this.ws.send(JSON.stringify(msg))
}
close() {
this.ws.close()

View file

@ -17,6 +17,7 @@ import {
TLPushRequest,
TLSYNC_PROTOCOL_VERSION,
TLSocketClientSentEvent,
TLSocketServerSentDataEvent,
TLSocketServerSentEvent,
} from './protocol'
import './requestAnimationFrame.polyfill'
@ -350,7 +351,7 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
this.lastServerClock = event.serverClock
}
incomingDiffBuffer: Extract<TLSocketServerSentEvent<R>, { type: 'patch' | 'push_result' }>[] = []
incomingDiffBuffer: TLSocketServerSentDataEvent<R>[] = []
/** Handle events received from the server */
private handleServerEvent = (event: TLSocketServerSentEvent<R>) => {
@ -366,11 +367,10 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
console.error('Restarting socket')
this.socket.restart()
break
case 'patch':
case 'push_result':
case 'data':
// wait for a connect to succeed before processing more events
if (!this.isConnectedToRoom) break
this.incomingDiffBuffer.push(event)
this.incomingDiffBuffer.push(...event.data)
this.scheduleRebase()
break
case 'incompatibility_error':

View file

@ -43,6 +43,7 @@ import {
TLIncompatibilityReason,
TLSYNC_PROTOCOL_VERSION,
TLSocketClientSentEvent,
TLSocketServerSentDataEvent,
TLSocketServerSentEvent,
} from './protocol'
@ -57,6 +58,8 @@ export type TLRoomSocket<R extends UnknownRecord> = {
export const MAX_TOMBSTONES = 3000
// the number of tombstones to delete when the max is reached
export const TOMBSTONE_PRUNE_BUFFER_SIZE = 300
// the minimum time between data-related messages to the clients
export const DATA_MESSAGE_DEBOUNCE_INTERVAL = 1000 / 60
const timeSince = (time: number) => Date.now() - time
@ -380,12 +383,15 @@ export class TLSyncRoom<R extends UnknownRecord> {
}
/**
* Send a message to a particular client.
* Send a message to a particular client. Debounces data events
*
* @param client - The client to send the message to.
* @param sessionKey - The session to send the message to.
* @param message - The message to send.
*/
private sendMessage(sessionKey: string, message: TLSocketServerSentEvent<R>) {
private sendMessage(
sessionKey: string,
message: TLSocketServerSentEvent<R> | TLSocketServerSentDataEvent<R>
) {
const session = this.sessions.get(sessionKey)
if (!session) {
console.warn('Tried to send message to unknown session', message.type)
@ -396,12 +402,49 @@ export class TLSyncRoom<R extends UnknownRecord> {
return
}
if (session.socket.isOpen) {
session.socket.sendMessage(message)
if (message.type !== 'patch' && message.type !== 'push_result') {
// this is not a data message
if (message.type !== 'pong') {
// non-data messages like "connect" might still need to be ordered correctly with
// respect to data messages, so it's better to flush just in case
this._flushDataMessages(sessionKey)
}
session.socket.sendMessage(message)
} else {
if (session.debounceTimer === null) {
// this is the first message since the last flush, don't delay it
session.socket.sendMessage({ type: 'data', data: [message] })
session.debounceTimer = setTimeout(
() => this._flushDataMessages(sessionKey),
DATA_MESSAGE_DEBOUNCE_INTERVAL
)
} else {
session.outstandingDataMessages.push(message)
}
}
} else {
this.cancelSession(session.sessionKey)
}
}
// needs to accept sessionKey and not a session because the session might be dead by the time
// the timer fires
_flushDataMessages(sessionKey: string) {
const session = this.sessions.get(sessionKey)
if (!session || session.state !== RoomSessionState.CONNECTED) {
return
}
session.debounceTimer = null
if (session.outstandingDataMessages.length > 0) {
session.socket.sendMessage({ type: 'data', data: session.outstandingDataMessages })
session.outstandingDataMessages.length = 0
}
}
private removeSession(sessionKey: string) {
const session = this.sessions.get(sessionKey)
if (!session) {
@ -461,10 +504,10 @@ export class TLSyncRoom<R extends UnknownRecord> {
}
/**
* Broadcast a message to all connected clients except the clientId provided.
* Broadcast a message to all connected clients except the one with the sessionKey provided.
*
* @param message - The message to broadcast.
* @param clientId - The client to exclude.
* @param sourceSessionKey - The session to exclude.
*/
broadcastPatch({
diff,
@ -507,7 +550,8 @@ export class TLSyncRoom<R extends UnknownRecord> {
* When a client connects to the room, add them to the list of clients and then merge the history
* down into the snapshots.
*
* @param client - The client that connected to the room.
* @param sessionKey - The session of the client that connected to the room.
* @param socket - Their socket.
*/
handleNewSession = (sessionKey: string, socket: TLRoomSocket<R>) => {
const existing = this.sessions.get(sessionKey)
@ -564,10 +608,10 @@ export class TLSyncRoom<R extends UnknownRecord> {
}
/**
* When the server receives a message from the clients Currently supports connect and patches.
* Invalid messages types log a warning. Currently doesn't validate data.
* When the server receives a message from the clients Currently, supports connect and patches.
* Invalid messages types throws an error. Currently, doesn't validate data.
*
* @param client - The client that sent the message
* @param sessionKey - The session that sent the message
* @param message - The message that was sent
*/
handleMessage = async (sessionKey: string, message: TLSocketClientSentEvent<R>) => {
@ -595,7 +639,7 @@ export class TLSyncRoom<R extends UnknownRecord> {
}
}
/** If the client is out of date or we are out of date, we need to let them know */
/** If the client is out of date, or we are out of date, we need to let them know */
private rejectSession(session: RoomSession<R>, reason: TLIncompatibilityReason) {
try {
if (session.socket.isOpen) {
@ -647,6 +691,8 @@ export class TLSyncRoom<R extends UnknownRecord> {
socket: session.socket,
serializedSchema: sessionSchema,
lastInteractionTime: Date.now(),
debounceTimer: null,
outstandingDataMessages: [],
})
this.sendMessage(session.sessionKey, msg)
}
@ -1002,7 +1048,7 @@ export class TLSyncRoom<R extends UnknownRecord> {
/**
* Handle the event when a client disconnects.
*
* @param client - The client that disconnected.
* @param sessionKey - The session that disconnected.
*/
handleClose = (sessionKey: string) => {
this.cancelSession(sessionKey)

View file

@ -2,7 +2,7 @@ import { SerializedSchema, UnknownRecord } from '@tldraw/store'
import { NetworkDiff, ObjectDiff, RecordOpType } from './diff'
/** @public */
export const TLSYNC_PROTOCOL_VERSION = 4
export const TLSYNC_PROTOCOL_VERSION = 5
/** @public */
export enum TLIncompatibilityReason {
@ -27,24 +27,28 @@ export type TLSocketServerSentEvent<R extends UnknownRecord> =
type: 'incompatibility_error'
reason: TLIncompatibilityReason
}
| {
type: 'error'
error?: any
}
| {
type: 'pong'
}
| { type: 'data'; data: TLSocketServerSentDataEvent<R>[] }
/** @public */
export type TLSocketServerSentDataEvent<R extends UnknownRecord> =
| {
type: 'patch'
diff: NetworkDiff<R>
serverClock: number
}
| {
type: 'error'
error?: any
}
| {
type: 'push_result'
clientClock: number
serverClock: number
action: 'discard' | 'commit' | { rebaseWithDiff: NetworkDiff<R> }
}
| {
type: 'pong'
}
/** @public */
export type TLPushRequest<R extends UnknownRecord> =

View file

@ -1,22 +0,0 @@
import { TLSocketClientSentEvent, TLSocketServerSentEvent } from './protocol'
type Message = TLSocketServerSentEvent<any> | TLSocketClientSentEvent<any>
let _lastSentMessage: Message | null = null
let _lastSentMessageSerialized: string | null = null
/**
* Serializes a message to a string. Caches the last serialized message to optimize for cases where
* the same message is broadcast to multiple places.
*
* @public
*/
export function serializeMessage(message: Message) {
if (message === _lastSentMessage) {
return _lastSentMessageSerialized as string
} else {
_lastSentMessage = message
_lastSentMessageSerialized = JSON.stringify(message)
return _lastSentMessageSerialized
}
}

View file

@ -21,4 +21,10 @@ export class TestServer<R extends UnknownRecord, P = unknown> {
socketPair.callbacks.onStatusChange?.('online')
}
flushDebouncingMessages() {
for (const sessionKey of this.room.sessions.keys()) {
this.room._flushDataMessages(sessionKey)
}
}
}

View file

@ -1,4 +1,5 @@
import { UnknownRecord } from '@tldraw/store'
import { structuredClone } from '@tldraw/utils'
import { TLPersistentClientSocket, TLPersistentClientSocketStatus } from '../lib/TLSyncClient'
import { TLRoomSocket } from '../lib/TLSyncRoom'
import { TLSocketClientSentEvent, TLSocketServerSentEvent } from '../lib/protocol'
@ -42,7 +43,8 @@ export class TestSocketPair<R extends UnknownRecord> {
// client was closed, drop the packet
return
}
this.serverSentEventQueue.push(msg)
// cloning because callers might reuse the same message object
this.serverSentEventQueue.push(structuredClone(msg))
},
}
didReceiveFromClient?: (msg: TLSocketClientSentEvent<R>) => void = undefined
@ -65,7 +67,8 @@ export class TestSocketPair<R extends UnknownRecord> {
if (this.clientSocket.connectionStatus !== 'online') {
throw new Error('trying to send before open')
}
this.clientSentEventQueue.push(msg)
// cloning because callers might reuse the same message object
this.clientSentEventQueue.push(structuredClone(msg))
},
restart: () => {
this.disconnect()

View file

@ -175,6 +175,8 @@ function runTest(seed: number) {
peer.editor.applyOp(op)
allOk('after applyOp')
server.flushDebouncingMessages()
if (peer.socketPair.isConnected && peer.randomInt(6) === 0) {
// randomly disconnect a peer
peer.socketPair.disconnect()
@ -213,6 +215,8 @@ function runTest(seed: number) {
}
while (peers.some((p) => p.socketPair.getNeedsFlushing())) {
server.flushDebouncingMessages()
for (const peer of peers) {
if (peer.socketPair.getNeedsFlushing()) {
peer.socketPair.flushServerSentEvents()

View file

@ -195,6 +195,8 @@ class TestInstance {
}
flush() {
this.server.flushDebouncingMessages()
while (this.oldSocketPair.getNeedsFlushing() || this.newSocketPair.getNeedsFlushing()) {
this.oldSocketPair.flushClientSentEvents()
this.oldSocketPair.flushServerSentEvents()
@ -491,10 +493,15 @@ describe('when the client is too new', () => {
})
expect(v2SendMessage).toHaveBeenCalledWith({
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
type: 'data',
data: [
{
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
})
@ -529,10 +536,15 @@ describe('when the client is too new', () => {
})
expect(data.v1SendMessage).toHaveBeenCalledWith({
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
type: 'data',
data: [
{
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
expect(data.v2SendMessage).toHaveBeenCalledWith({
@ -688,10 +700,15 @@ describe('when the client is too old', () => {
})
expect(data.v2SendMessage).toHaveBeenCalledWith({
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
type: 'data',
data: [
{
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
})
@ -705,23 +722,33 @@ describe('when the client is too old', () => {
})
expect(data.v1SendMessage).toHaveBeenCalledWith({
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
type: 'data',
data: [
{
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
expect(data.v2SendMessage).toHaveBeenCalledWith({
type: 'patch',
diff: {
[data.steve.id]: [
RecordOpType.Patch,
{
name: [ValueOpType.Put, 'Jeff'],
type: 'data',
data: [
{
type: 'patch',
diff: {
[data.steve.id]: [
RecordOpType.Patch,
{
name: [ValueOpType.Put, 'Jeff'],
},
],
},
],
},
serverClock: 11,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
})
})
@ -817,23 +844,33 @@ describe('when the client is the same version', () => {
})
expect(data.v2ClientASendMessage).toHaveBeenCalledWith({
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
type: 'data',
data: [
{
type: 'push_result',
action: 'commit',
clientClock: 1,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
expect(data.v2ClientBSendMessage).toHaveBeenCalledWith({
type: 'patch',
diff: {
[data.steve.id]: [
RecordOpType.Patch,
{
name: [ValueOpType.Put, 'Jeff'],
type: 'data',
data: [
{
type: 'patch',
diff: {
[data.steve.id]: [
RecordOpType.Patch,
{
name: [ValueOpType.Put, 'Jeff'],
},
],
},
],
},
serverClock: 11,
serverClock: 11,
},
],
} satisfies TLSocketServerSentEvent<RV2>)
})
})