diff --git a/apps/dotcom/src/utils/remote-sync/ClientWebSocketAdapter.ts b/apps/dotcom/src/utils/remote-sync/ClientWebSocketAdapter.ts index 34fdd5bfd..cd7de836d 100644 --- a/apps/dotcom/src/utils/remote-sync/ClientWebSocketAdapter.ts +++ b/apps/dotcom/src/utils/remote-sync/ClientWebSocketAdapter.ts @@ -1,6 +1,5 @@ import { chunk, - serializeMessage, TLPersistentClientSocket, TLPersistentClientSocketStatus, TLSocketClientSentEvent, @@ -176,7 +175,7 @@ export class ClientWebSocketAdapter implements TLPersistentClientSocket = socket: TLRoomSocket serializedSchema: SerializedSchema lastInteractionTime: number + debounceTimer: ReturnType | null + outstandingDataMessages: TLSocketServerSentDataEvent[] } diff --git a/packages/tlsync/src/lib/ServerSocketAdapter.ts b/packages/tlsync/src/lib/ServerSocketAdapter.ts index 9f6ed134b..c91e75223 100644 --- a/packages/tlsync/src/lib/ServerSocketAdapter.ts +++ b/packages/tlsync/src/lib/ServerSocketAdapter.ts @@ -2,7 +2,6 @@ import { UnknownRecord } from '@tldraw/store' import ws from 'ws' import { TLRoomSocket } from './TLSyncRoom' import { TLSocketServerSentEvent } from './protocol' -import { serializeMessage } from './serializeMessage' /** @public */ export class ServerSocketAdapter implements TLRoomSocket { @@ -11,8 +10,9 @@ export class ServerSocketAdapter implements TLRoomSocke get isOpen(): boolean { return this.ws.readyState === 1 // ready state open } + // see TLRoomSocket for details on why this accepts a union and not just arrays sendMessage(msg: TLSocketServerSentEvent) { - this.ws.send(serializeMessage(msg)) + this.ws.send(JSON.stringify(msg)) } close() { this.ws.close() diff --git a/packages/tlsync/src/lib/TLSyncClient.ts b/packages/tlsync/src/lib/TLSyncClient.ts index b370553b0..987042531 100644 --- a/packages/tlsync/src/lib/TLSyncClient.ts +++ b/packages/tlsync/src/lib/TLSyncClient.ts @@ -17,6 +17,7 @@ import { TLPushRequest, TLSYNC_PROTOCOL_VERSION, TLSocketClientSentEvent, + TLSocketServerSentDataEvent, TLSocketServerSentEvent, } from './protocol' import './requestAnimationFrame.polyfill' @@ -350,7 +351,7 @@ export class TLSyncClient = Store this.lastServerClock = event.serverClock } - incomingDiffBuffer: Extract, { type: 'patch' | 'push_result' }>[] = [] + incomingDiffBuffer: TLSocketServerSentDataEvent[] = [] /** Handle events received from the server */ private handleServerEvent = (event: TLSocketServerSentEvent) => { @@ -366,11 +367,10 @@ export class TLSyncClient = Store console.error('Restarting socket') this.socket.restart() break - case 'patch': - case 'push_result': + case 'data': // wait for a connect to succeed before processing more events if (!this.isConnectedToRoom) break - this.incomingDiffBuffer.push(event) + this.incomingDiffBuffer.push(...event.data) this.scheduleRebase() break case 'incompatibility_error': diff --git a/packages/tlsync/src/lib/TLSyncRoom.ts b/packages/tlsync/src/lib/TLSyncRoom.ts index f169473b9..cd0611395 100644 --- a/packages/tlsync/src/lib/TLSyncRoom.ts +++ b/packages/tlsync/src/lib/TLSyncRoom.ts @@ -43,6 +43,7 @@ import { TLIncompatibilityReason, TLSYNC_PROTOCOL_VERSION, TLSocketClientSentEvent, + TLSocketServerSentDataEvent, TLSocketServerSentEvent, } from './protocol' @@ -57,6 +58,8 @@ export type TLRoomSocket = { export const MAX_TOMBSTONES = 3000 // the number of tombstones to delete when the max is reached export const TOMBSTONE_PRUNE_BUFFER_SIZE = 300 +// the minimum time between data-related messages to the clients +export const DATA_MESSAGE_DEBOUNCE_INTERVAL = 1000 / 60 const timeSince = (time: number) => Date.now() - time @@ -380,12 +383,15 @@ export class TLSyncRoom { } /** - * Send a message to a particular client. + * Send a message to a particular client. Debounces data events * - * @param client - The client to send the message to. + * @param sessionKey - The session to send the message to. * @param message - The message to send. */ - private sendMessage(sessionKey: string, message: TLSocketServerSentEvent) { + private sendMessage( + sessionKey: string, + message: TLSocketServerSentEvent | TLSocketServerSentDataEvent + ) { const session = this.sessions.get(sessionKey) if (!session) { console.warn('Tried to send message to unknown session', message.type) @@ -396,12 +402,49 @@ export class TLSyncRoom { return } if (session.socket.isOpen) { - session.socket.sendMessage(message) + if (message.type !== 'patch' && message.type !== 'push_result') { + // this is not a data message + if (message.type !== 'pong') { + // non-data messages like "connect" might still need to be ordered correctly with + // respect to data messages, so it's better to flush just in case + this._flushDataMessages(sessionKey) + } + session.socket.sendMessage(message) + } else { + if (session.debounceTimer === null) { + // this is the first message since the last flush, don't delay it + session.socket.sendMessage({ type: 'data', data: [message] }) + + session.debounceTimer = setTimeout( + () => this._flushDataMessages(sessionKey), + DATA_MESSAGE_DEBOUNCE_INTERVAL + ) + } else { + session.outstandingDataMessages.push(message) + } + } } else { this.cancelSession(session.sessionKey) } } + // needs to accept sessionKey and not a session because the session might be dead by the time + // the timer fires + _flushDataMessages(sessionKey: string) { + const session = this.sessions.get(sessionKey) + + if (!session || session.state !== RoomSessionState.CONNECTED) { + return + } + + session.debounceTimer = null + + if (session.outstandingDataMessages.length > 0) { + session.socket.sendMessage({ type: 'data', data: session.outstandingDataMessages }) + session.outstandingDataMessages.length = 0 + } + } + private removeSession(sessionKey: string) { const session = this.sessions.get(sessionKey) if (!session) { @@ -461,10 +504,10 @@ export class TLSyncRoom { } /** - * Broadcast a message to all connected clients except the clientId provided. + * Broadcast a message to all connected clients except the one with the sessionKey provided. * * @param message - The message to broadcast. - * @param clientId - The client to exclude. + * @param sourceSessionKey - The session to exclude. */ broadcastPatch({ diff, @@ -507,7 +550,8 @@ export class TLSyncRoom { * When a client connects to the room, add them to the list of clients and then merge the history * down into the snapshots. * - * @param client - The client that connected to the room. + * @param sessionKey - The session of the client that connected to the room. + * @param socket - Their socket. */ handleNewSession = (sessionKey: string, socket: TLRoomSocket) => { const existing = this.sessions.get(sessionKey) @@ -564,10 +608,10 @@ export class TLSyncRoom { } /** - * When the server receives a message from the clients Currently supports connect and patches. - * Invalid messages types log a warning. Currently doesn't validate data. + * When the server receives a message from the clients Currently, supports connect and patches. + * Invalid messages types throws an error. Currently, doesn't validate data. * - * @param client - The client that sent the message + * @param sessionKey - The session that sent the message * @param message - The message that was sent */ handleMessage = async (sessionKey: string, message: TLSocketClientSentEvent) => { @@ -595,7 +639,7 @@ export class TLSyncRoom { } } - /** If the client is out of date or we are out of date, we need to let them know */ + /** If the client is out of date, or we are out of date, we need to let them know */ private rejectSession(session: RoomSession, reason: TLIncompatibilityReason) { try { if (session.socket.isOpen) { @@ -647,6 +691,8 @@ export class TLSyncRoom { socket: session.socket, serializedSchema: sessionSchema, lastInteractionTime: Date.now(), + debounceTimer: null, + outstandingDataMessages: [], }) this.sendMessage(session.sessionKey, msg) } @@ -1002,7 +1048,7 @@ export class TLSyncRoom { /** * Handle the event when a client disconnects. * - * @param client - The client that disconnected. + * @param sessionKey - The session that disconnected. */ handleClose = (sessionKey: string) => { this.cancelSession(sessionKey) diff --git a/packages/tlsync/src/lib/protocol.ts b/packages/tlsync/src/lib/protocol.ts index e1c7a4a5c..08f315aed 100644 --- a/packages/tlsync/src/lib/protocol.ts +++ b/packages/tlsync/src/lib/protocol.ts @@ -2,7 +2,7 @@ import { SerializedSchema, UnknownRecord } from '@tldraw/store' import { NetworkDiff, ObjectDiff, RecordOpType } from './diff' /** @public */ -export const TLSYNC_PROTOCOL_VERSION = 4 +export const TLSYNC_PROTOCOL_VERSION = 5 /** @public */ export enum TLIncompatibilityReason { @@ -27,24 +27,28 @@ export type TLSocketServerSentEvent = type: 'incompatibility_error' reason: TLIncompatibilityReason } + | { + type: 'error' + error?: any + } + | { + type: 'pong' + } + | { type: 'data'; data: TLSocketServerSentDataEvent[] } + +/** @public */ +export type TLSocketServerSentDataEvent = | { type: 'patch' diff: NetworkDiff serverClock: number } - | { - type: 'error' - error?: any - } | { type: 'push_result' clientClock: number serverClock: number action: 'discard' | 'commit' | { rebaseWithDiff: NetworkDiff } } - | { - type: 'pong' - } /** @public */ export type TLPushRequest = diff --git a/packages/tlsync/src/lib/serializeMessage.ts b/packages/tlsync/src/lib/serializeMessage.ts deleted file mode 100644 index cc3ff11bc..000000000 --- a/packages/tlsync/src/lib/serializeMessage.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { TLSocketClientSentEvent, TLSocketServerSentEvent } from './protocol' - -type Message = TLSocketServerSentEvent | TLSocketClientSentEvent - -let _lastSentMessage: Message | null = null -let _lastSentMessageSerialized: string | null = null - -/** - * Serializes a message to a string. Caches the last serialized message to optimize for cases where - * the same message is broadcast to multiple places. - * - * @public - */ -export function serializeMessage(message: Message) { - if (message === _lastSentMessage) { - return _lastSentMessageSerialized as string - } else { - _lastSentMessage = message - _lastSentMessageSerialized = JSON.stringify(message) - return _lastSentMessageSerialized - } -} diff --git a/packages/tlsync/src/test/TestServer.ts b/packages/tlsync/src/test/TestServer.ts index 66dabcffb..256894cf9 100644 --- a/packages/tlsync/src/test/TestServer.ts +++ b/packages/tlsync/src/test/TestServer.ts @@ -21,4 +21,10 @@ export class TestServer { socketPair.callbacks.onStatusChange?.('online') } + + flushDebouncingMessages() { + for (const sessionKey of this.room.sessions.keys()) { + this.room._flushDataMessages(sessionKey) + } + } } diff --git a/packages/tlsync/src/test/TestSocketPair.ts b/packages/tlsync/src/test/TestSocketPair.ts index db90039ca..1bb80919f 100644 --- a/packages/tlsync/src/test/TestSocketPair.ts +++ b/packages/tlsync/src/test/TestSocketPair.ts @@ -1,4 +1,5 @@ import { UnknownRecord } from '@tldraw/store' +import { structuredClone } from '@tldraw/utils' import { TLPersistentClientSocket, TLPersistentClientSocketStatus } from '../lib/TLSyncClient' import { TLRoomSocket } from '../lib/TLSyncRoom' import { TLSocketClientSentEvent, TLSocketServerSentEvent } from '../lib/protocol' @@ -42,7 +43,8 @@ export class TestSocketPair { // client was closed, drop the packet return } - this.serverSentEventQueue.push(msg) + // cloning because callers might reuse the same message object + this.serverSentEventQueue.push(structuredClone(msg)) }, } didReceiveFromClient?: (msg: TLSocketClientSentEvent) => void = undefined @@ -65,7 +67,8 @@ export class TestSocketPair { if (this.clientSocket.connectionStatus !== 'online') { throw new Error('trying to send before open') } - this.clientSentEventQueue.push(msg) + // cloning because callers might reuse the same message object + this.clientSentEventQueue.push(structuredClone(msg)) }, restart: () => { this.disconnect() diff --git a/packages/tlsync/src/test/syncFuzz.test.ts b/packages/tlsync/src/test/syncFuzz.test.ts index 53a7700fc..4974bc558 100644 --- a/packages/tlsync/src/test/syncFuzz.test.ts +++ b/packages/tlsync/src/test/syncFuzz.test.ts @@ -175,6 +175,8 @@ function runTest(seed: number) { peer.editor.applyOp(op) allOk('after applyOp') + server.flushDebouncingMessages() + if (peer.socketPair.isConnected && peer.randomInt(6) === 0) { // randomly disconnect a peer peer.socketPair.disconnect() @@ -213,6 +215,8 @@ function runTest(seed: number) { } while (peers.some((p) => p.socketPair.getNeedsFlushing())) { + server.flushDebouncingMessages() + for (const peer of peers) { if (peer.socketPair.getNeedsFlushing()) { peer.socketPair.flushServerSentEvents() diff --git a/packages/tlsync/src/test/upgradeDowngrade.test.ts b/packages/tlsync/src/test/upgradeDowngrade.test.ts index d6f27f778..58f8d5317 100644 --- a/packages/tlsync/src/test/upgradeDowngrade.test.ts +++ b/packages/tlsync/src/test/upgradeDowngrade.test.ts @@ -195,6 +195,8 @@ class TestInstance { } flush() { + this.server.flushDebouncingMessages() + while (this.oldSocketPair.getNeedsFlushing() || this.newSocketPair.getNeedsFlushing()) { this.oldSocketPair.flushClientSentEvents() this.oldSocketPair.flushServerSentEvents() @@ -491,10 +493,15 @@ describe('when the client is too new', () => { }) expect(v2SendMessage).toHaveBeenCalledWith({ - type: 'push_result', - action: 'commit', - clientClock: 1, - serverClock: 11, + type: 'data', + data: [ + { + type: 'push_result', + action: 'commit', + clientClock: 1, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) }) @@ -529,10 +536,15 @@ describe('when the client is too new', () => { }) expect(data.v1SendMessage).toHaveBeenCalledWith({ - type: 'push_result', - action: 'commit', - clientClock: 1, - serverClock: 11, + type: 'data', + data: [ + { + type: 'push_result', + action: 'commit', + clientClock: 1, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) expect(data.v2SendMessage).toHaveBeenCalledWith({ @@ -688,10 +700,15 @@ describe('when the client is too old', () => { }) expect(data.v2SendMessage).toHaveBeenCalledWith({ - type: 'push_result', - action: 'commit', - clientClock: 1, - serverClock: 11, + type: 'data', + data: [ + { + type: 'push_result', + action: 'commit', + clientClock: 1, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) }) @@ -705,23 +722,33 @@ describe('when the client is too old', () => { }) expect(data.v1SendMessage).toHaveBeenCalledWith({ - type: 'push_result', - action: 'commit', - clientClock: 1, - serverClock: 11, + type: 'data', + data: [ + { + type: 'push_result', + action: 'commit', + clientClock: 1, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) expect(data.v2SendMessage).toHaveBeenCalledWith({ - type: 'patch', - diff: { - [data.steve.id]: [ - RecordOpType.Patch, - { - name: [ValueOpType.Put, 'Jeff'], + type: 'data', + data: [ + { + type: 'patch', + diff: { + [data.steve.id]: [ + RecordOpType.Patch, + { + name: [ValueOpType.Put, 'Jeff'], + }, + ], }, - ], - }, - serverClock: 11, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) }) }) @@ -817,23 +844,33 @@ describe('when the client is the same version', () => { }) expect(data.v2ClientASendMessage).toHaveBeenCalledWith({ - type: 'push_result', - action: 'commit', - clientClock: 1, - serverClock: 11, + type: 'data', + data: [ + { + type: 'push_result', + action: 'commit', + clientClock: 1, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) expect(data.v2ClientBSendMessage).toHaveBeenCalledWith({ - type: 'patch', - diff: { - [data.steve.id]: [ - RecordOpType.Patch, - { - name: [ValueOpType.Put, 'Jeff'], + type: 'data', + data: [ + { + type: 'patch', + diff: { + [data.steve.id]: [ + RecordOpType.Patch, + { + name: [ValueOpType.Put, 'Jeff'], + }, + ], }, - ], - }, - serverClock: 11, + serverClock: 11, + }, + ], } satisfies TLSocketServerSentEvent) }) })