put sync stuff in bemo worker (#4060)

this PR puts sync stuff in the bemo worker, and sets up a temporary
dev-only page in dotcom for testing bemo stuff


### Change type

- [ ] `bugfix`
- [ ] `improvement`
- [x] `feature`
- [ ] `api`
- [ ] `other`

### Test plan

1. Create a shape...
2.

- [ ] Unit tests
- [ ] End to end tests

### Release notes

- Fixed a bug with...
This commit is contained in:
David Sheldrick 2024-07-03 15:10:54 +01:00 committed by GitHub
parent 8906bd8ffa
commit c1fe8ec99a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
83 changed files with 571 additions and 120 deletions

View file

@ -1,8 +1,7 @@
import { TLStoreSnapshot } from '@tldraw/tlschema'
import { areObjectsShallowEqual } from '@tldraw/utils'
import { useState } from 'react'
import { TLEditorSnapshot } from '../..'
import { loadSnapshot } from '../config/TLEditorSnapshot'
import { TLEditorSnapshot, loadSnapshot } from '../config/TLEditorSnapshot'
import { TLStoreOptions, createTLStore } from '../config/createTLStore'
/** @public */

View file

@ -0,0 +1,3 @@
# @tldraw/sync-react
react bindings for tldraw sync

View file

@ -0,0 +1,70 @@
{
"name": "@tldraw/sync-react",
"description": "A tiny little drawing app (multiplayer sync react bindings).",
"version": "2.0.0-alpha.11",
"private": true,
"author": {
"name": "tldraw GB Ltd.",
"email": "hello@tldraw.com"
},
"homepage": "https://tldraw.dev",
"license": "SEE LICENSE IN LICENSE.md",
"repository": {
"type": "git",
"url": "https://github.com/tldraw/tldraw"
},
"bugs": {
"url": "https://github.com/tldraw/tldraw/issues"
},
"keywords": [
"tldraw",
"drawing",
"app",
"development",
"whiteboard",
"canvas",
"infinite"
],
"/* NOTE */": "These `main` and `types` fields are rewritten by the build script. They are not the actual values we publish",
"main": "./src/index.ts",
"types": "./.tsbuild/index.d.ts",
"/* GOTCHA */": "files will include ./dist and index.d.ts by default, add any others you want to include in here",
"files": [],
"scripts": {
"test-ci": "lazy inherit",
"test": "yarn run -T jest",
"test-coverage": "lazy inherit",
"lint": "yarn run -T tsx ../../scripts/lint.ts"
},
"devDependencies": {
"typescript": "^5.3.3",
"uuid-by-string": "^4.0.0",
"uuid-readable": "^0.0.2"
},
"jest": {
"preset": "config/jest/node",
"testEnvironment": "../../../packages/utils/patchedJestJsDom.js",
"moduleNameMapper": {
"^~(.*)": "<rootDir>/src/$1"
},
"transformIgnorePatterns": [
"ignore everything. swc is fast enough to transform everything"
],
"setupFiles": [
"./setupJest.js"
]
},
"dependencies": {
"@tldraw/sync": "workspace:*",
"@tldraw/utils": "workspace:*",
"lodash.isequal": "^4.5.0",
"nanoevents": "^7.0.1",
"nanoid": "4.0.2",
"tldraw": "workspace:*",
"ws": "^8.16.0"
},
"peerDependencies": {
"react": "^18",
"react-dom": "^18"
}
}

View file

View file

@ -0,0 +1,3 @@
test('make ci pass with empty test', () => {
// empty
})

View file

@ -0,0 +1 @@
export { useRemoteSyncClient, type RemoteTLStoreWithStatus } from './useRemoteSyncClient'

View file

@ -0,0 +1,144 @@
import {
ClientWebSocketAdapter,
TLCloseEventCode,
TLIncompatibilityReason,
TLPersistentClientSocketStatus,
TLRemoteSyncError,
TLSyncClient,
schema,
} from '@tldraw/sync'
import { useEffect, useState } from 'react'
import {
Signal,
TAB_ID,
TLRecord,
TLStore,
TLStoreSnapshot,
TLStoreWithStatus,
TLUserPreferences,
computed,
createPresenceStateDerivation,
defaultUserPreferences,
getUserPreferences,
useTLStore,
useValue,
} from 'tldraw'
const MULTIPLAYER_EVENT_NAME = 'multiplayer.client'
/** @public */
export type RemoteTLStoreWithStatus = Exclude<
TLStoreWithStatus,
{ status: 'synced-local' } | { status: 'not-synced' }
>
/** @public */
export function useRemoteSyncClient(opts: UseSyncClientConfig): RemoteTLStoreWithStatus {
const [state, setState] = useState<{
readyClient?: TLSyncClient<TLRecord, TLStore>
error?: Error
} | null>(null)
const { uri, roomId = 'default', userPreferences: prefs } = opts
const store = useTLStore({ schema })
const error: NonNullable<typeof state>['error'] = state?.error ?? undefined
const track = opts.trackAnalyticsEvent
useEffect(() => {
if (error) return
const userPreferences = computed<{ id: string; color: string; name: string }>(
'userPreferences',
() => {
const user = prefs?.get() ?? getUserPreferences()
return {
id: user.id,
color: user.color ?? defaultUserPreferences.color,
name: user.name ?? defaultUserPreferences.name,
}
}
)
const socket = new ClientWebSocketAdapter(async () => {
// set sessionKey as a query param on the uri
const withParams = new URL(uri)
withParams.searchParams.set('sessionKey', TAB_ID)
withParams.searchParams.set('storeId', store.id)
return withParams.toString()
})
socket.onStatusChange((val: TLPersistentClientSocketStatus, closeCode?: number) => {
if (val === 'error' && closeCode === TLCloseEventCode.NOT_FOUND) {
track?.(MULTIPLAYER_EVENT_NAME, { name: 'room-not-found', roomId })
setState({ error: new TLRemoteSyncError(TLIncompatibilityReason.RoomNotFound) })
client.close()
socket.close()
return
}
})
let didCancel = false
const client = new TLSyncClient({
store,
socket,
didCancel: () => didCancel,
onLoad(client) {
track?.(MULTIPLAYER_EVENT_NAME, { name: 'load', roomId })
setState({ readyClient: client })
},
onLoadError(err) {
track?.(MULTIPLAYER_EVENT_NAME, { name: 'load-error', roomId })
console.error(err)
setState({ error: err })
},
onSyncError(reason) {
track?.(MULTIPLAYER_EVENT_NAME, { name: 'sync-error', roomId, reason })
setState({ error: new TLRemoteSyncError(reason) })
},
onAfterConnect() {
// if the server crashes and loses all data it can return an empty document
// when it comes back up. This is a safety check to make sure that if something like
// that happens, it won't render the app broken and require a restart. The user will
// most likely lose all their changes though since they'll have been working with pages
// that won't exist. There's certainly something we can do to make this better.
// but the likelihood of this happening is very low and maybe not worth caring about beyond this.
store.ensureStoreIsUsable()
},
presence: createPresenceStateDerivation(userPreferences)(store),
})
return () => {
didCancel = true
client.close()
socket.close()
}
}, [prefs, roomId, store, uri, error, track])
return useValue<RemoteTLStoreWithStatus>(
'remote synced store',
() => {
if (!state) return { status: 'loading' }
if (state.error) return { status: 'error', error: state.error }
if (!state.readyClient) return { status: 'loading' }
const connectionStatus = state.readyClient.socket.connectionStatus
return {
status: 'synced-remote',
connectionStatus: connectionStatus === 'error' ? 'offline' : connectionStatus,
store: state.readyClient.store,
}
},
[state]
)
}
/** @public */
export interface UseSyncClientConfig {
uri: string
roomId?: string
userPreferences?: Signal<TLUserPreferences>
snapshotForNewRoomRef?: { current: null | TLStoreSnapshot }
/* @internal */
trackAnalyticsEvent?(name: string, data: { [key: string]: any }): void
}

View file

@ -0,0 +1,20 @@
{
"extends": "../../config/tsconfig.base.json",
"include": ["src"],
"exclude": ["node_modules", "docs", ".tsbuild*"],
"compilerOptions": {
"outDir": "./.tsbuild",
"rootDir": "src"
},
"references": [
{
"path": "../sync"
},
{
"path": "../tldraw"
},
{
"path": "../utils"
}
]
}

View file

@ -0,0 +1 @@
jestResolver.js

1
packages/sync/LICENSE.md Normal file
View file

@ -0,0 +1 @@
This code is licensed under the [tldraw license](https://github.com/tldraw/tldraw/blob/main/LICENSE.md)

View file

@ -0,0 +1,4 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json",
"extends": "../../config/api-extractor.json"
}

View file

@ -0,0 +1,25 @@
// HACK: `ws` has an import map mapping browser context to a dummy implementation that just
// throws an error, because it's impossible to create a websocket server in the browser
// and `ws` tries to be helpful. Unfortunately, it doesn't work well in our tests:
// we run Jest in jsdom context, because we test browser APIs, but this causes Jest
// to select the browser version of the package, which makes it impossible to run a test
// websocket server.
//
// The solution is to override `ws` exports in the Jest resolver to point to the node version
// regardless.
//
// An additional complication is that Jest seems to expect the resolver to be a CommonJS module,
// so this module is CommonJS despite the rest of the codebase being ESM.
//
// see https://jestjs.io/docs/configuration#resolver-string for docs
module.exports = function jestResolver(path, options) {
return options.defaultResolver(path, {
...options,
packageFilter: (pkg) => {
if (path === 'ws') {
pkg.exports['.']['browser'] = './index.js'
}
return pkg
},
})
}

View file

@ -1,5 +1,5 @@
{
"name": "@tldraw/tlsync",
"name": "@tldraw/sync",
"description": "A tiny little drawing app (multiplayer sync).",
"version": "2.0.0-alpha.11",
"private": true,
@ -43,6 +43,7 @@
"uuid-readable": "^0.0.2"
},
"jest": {
"resolver": "<rootDir>/jestResolver.js",
"preset": "config/jest/node",
"testEnvironment": "../../../packages/utils/patchedJestJsDom.js",
"moduleNameMapper": {

View file

@ -1,3 +1,5 @@
export { ClientWebSocketAdapter } from './lib/ClientWebSocketAdapter'
export { TLRemoteSyncError } from './lib/TLRemoteSyncError'
export { TLSocketRoom } from './lib/TLSocketRoom'
export {
TLCloseEventCode,

View file

@ -0,0 +1,200 @@
import { TLRecord } from 'tldraw'
import { ClientWebSocketAdapter, INACTIVE_MIN_DELAY } from './ClientWebSocketAdapter'
// NOTE: there is a hack in apps/dotcom/jestResolver.js to make this import work
import { WebSocketServer, WebSocket as WsWebSocket } from 'ws'
import { TLSocketClientSentEvent, getTlsyncProtocolVersion } from './protocol'
async function waitFor(predicate: () => boolean) {
let safety = 0
while (!predicate()) {
if (safety++ > 1000) {
throw new Error('waitFor predicate timed out')
}
try {
jest.runAllTimers()
jest.useRealTimers()
await new Promise((resolve) => setTimeout(resolve, 10))
} finally {
jest.useFakeTimers()
}
}
}
jest.useFakeTimers()
describe(ClientWebSocketAdapter, () => {
let adapter: ClientWebSocketAdapter
let wsServer: WebSocketServer
let connectedServerSocket: WsWebSocket
const connectMock = jest.fn<void, [socket: WsWebSocket]>((socket) => {
connectedServerSocket = socket
})
beforeEach(() => {
adapter = new ClientWebSocketAdapter(() => 'ws://localhost:2233')
wsServer = new WebSocketServer({ port: 2233 })
wsServer.on('connection', connectMock)
})
afterEach(() => {
adapter.close()
wsServer.close()
connectMock.mockClear()
})
it('should be able to be constructed', () => {
expect(adapter).toBeTruthy()
})
it('should start with connectionStatus=offline', () => {
expect(adapter.connectionStatus).toBe('offline')
})
it('should start with connectionStatus=offline', () => {
expect(adapter.connectionStatus).toBe('offline')
})
it('should respond to onopen events by setting connectionStatus=online', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(adapter.connectionStatus).toBe('online')
})
it('should respond to onerror events by setting connectionStatus=error', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
adapter._ws?.onerror?.({} as any)
expect(adapter.connectionStatus).toBe('error')
})
it('should try to reopen the connection if there was an error', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(adapter._ws).toBeTruthy()
const prevClientSocket = adapter._ws
const prevServerSocket = connectedServerSocket
prevServerSocket.terminate()
await waitFor(() => connectedServerSocket !== prevServerSocket)
// there is a race here, the server could've opened a new socket already, but it hasn't
// transitioned to OPEN yet, thus the second waitFor
await waitFor(() => connectedServerSocket.readyState === WebSocket.OPEN)
expect(adapter._ws).not.toBe(prevClientSocket)
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
})
it('should transition to online if a retry succeeds', async () => {
adapter._ws?.onerror?.({} as any)
await waitFor(() => adapter.connectionStatus === 'online')
expect(adapter.connectionStatus).toBe('online')
})
it('should call .close on the underlying socket if .close is called before the socket opens', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
const closeSpy = jest.spyOn(adapter._ws!, 'close')
adapter.close()
await waitFor(() => closeSpy.mock.calls.length > 0)
expect(closeSpy).toHaveBeenCalled()
})
it('should transition to offline if the server disconnects', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
connectedServerSocket.terminate()
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
expect(adapter.connectionStatus).toBe('offline')
})
it('retries to connect if the server disconnects', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
connectedServerSocket.terminate()
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
expect(adapter.connectionStatus).toBe('offline')
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(adapter.connectionStatus).toBe('online')
connectedServerSocket.terminate()
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
expect(adapter.connectionStatus).toBe('offline')
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(adapter.connectionStatus).toBe('online')
})
it('attempts to reconnect early if the tab becomes active', async () => {
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
const hiddenMock = jest.spyOn(document, 'hidden', 'get')
hiddenMock.mockReturnValue(true)
// it's necessary to close the socket, as otherwise the websocket might stay half-open
connectedServerSocket.close()
wsServer.close()
await waitFor(() => adapter._ws?.readyState !== WebSocket.OPEN)
expect(adapter._reconnectManager.intendedDelay).toBeGreaterThanOrEqual(INACTIVE_MIN_DELAY)
hiddenMock.mockReturnValue(false)
document.dispatchEvent(new Event('visibilitychange'))
expect(adapter._reconnectManager.intendedDelay).toBeLessThan(INACTIVE_MIN_DELAY)
hiddenMock.mockRestore()
})
it('supports receiving messages', async () => {
const onMessage = jest.fn()
adapter.onReceiveMessage(onMessage)
connectMock.mockImplementationOnce((ws) => {
ws.send('{ "type": "message", "data": "hello" }')
})
await waitFor(() => onMessage.mock.calls.length === 1)
expect(onMessage).toHaveBeenCalledWith({ type: 'message', data: 'hello' })
})
it('supports sending messages', async () => {
const onMessage = jest.fn()
connectMock.mockImplementationOnce((ws) => {
ws.on('message', onMessage)
})
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
const message: TLSocketClientSentEvent<TLRecord> = {
type: 'connect',
connectRequestId: 'test',
schema: { schemaVersion: 1, storeVersion: 0, recordVersions: {} },
protocolVersion: getTlsyncProtocolVersion(),
lastServerClock: 0,
}
adapter.sendMessage(message)
await waitFor(() => onMessage.mock.calls.length === 1)
expect(JSON.parse(onMessage.mock.calls[0][0].toString())).toEqual(message)
})
it('signals status changes', async () => {
const onStatusChange = jest.fn()
adapter.onStatusChange(onStatusChange)
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(onStatusChange).toHaveBeenCalledWith('online')
connectedServerSocket.terminate()
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
expect(onStatusChange).toHaveBeenCalledWith('offline', 1006)
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(onStatusChange).toHaveBeenCalledWith('online')
connectedServerSocket.terminate()
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
expect(onStatusChange).toHaveBeenCalledWith('offline', 1006)
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
expect(onStatusChange).toHaveBeenCalledWith('online')
adapter._ws?.onerror?.({} as any)
expect(onStatusChange).toHaveBeenCalledWith('error', undefined)
})
it('signals the correct closeCode when a room is not found', async () => {
const onStatusChange = jest.fn()
adapter.onStatusChange(onStatusChange)
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
adapter._ws!.onclose?.({ code: 4099 } as any)
expect(onStatusChange).toHaveBeenCalledWith('error', 4099)
})
it('signals status changes while restarting', async () => {
const onStatusChange = jest.fn()
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
adapter.onStatusChange(onStatusChange)
adapter.restart()
await waitFor(() => onStatusChange.mock.calls.length === 2)
expect(onStatusChange).toHaveBeenCalledWith('offline', undefined)
expect(onStatusChange).toHaveBeenCalledWith('online')
})
})

View file

@ -0,0 +1,458 @@
import { atom, Atom } from '@tldraw/state'
import { TLRecord } from '@tldraw/tlschema'
import { assert } from '@tldraw/utils'
import { chunk } from './chunk'
import { TLSocketClientSentEvent, TLSocketServerSentEvent } from './protocol'
import {
TLCloseEventCode,
TLPersistentClientSocket,
TLPersistentClientSocketStatus,
} from './TLSyncClient'
function listenTo<T extends EventTarget>(target: T, event: string, handler: () => void) {
target.addEventListener(event, handler)
return () => {
target.removeEventListener(event, handler)
}
}
function debug(...args: any[]) {
// @ts-ignore
if (typeof window !== 'undefined' && window.__tldraw_socket_debug) {
const now = new Date()
// eslint-disable-next-line no-console
console.log(
`${now.getHours()}:${now.getMinutes()}:${now.getSeconds()}.${now.getMilliseconds()}`,
...args
//, new Error().stack
)
}
}
// NOTE: ClientWebSocketAdapter requires its users to implement their own connection loss
// detection, for example by regularly pinging the server and .restart()ing
// the connection when a number of pings goes unanswered. Without this mechanism,
// we might not be able to detect the websocket connection going down in a timely manner
// (it will probably time out on outgoing data packets at some point).
//
// This is by design. Whilst the Websocket protocol specifies protocol-level pings,
// they don't seem to be surfaced in browser APIs and can't be relied on. Therefore,
// pings need to be implemented one level up, on the application API side, which for our
// codebase means whatever code that uses ClientWebSocketAdapter.
export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord> {
_ws: WebSocket | null = null
isDisposed = false
readonly _reconnectManager: ReconnectManager
// TODO: .close should be a project-wide interface with a common contract (.close()d thing
// can only be garbage collected, and can't be used anymore)
close() {
this.isDisposed = true
this._reconnectManager.close()
// WebSocket.close() is idempotent
this._ws?.close()
}
constructor(getUri: () => Promise<string> | string) {
this._reconnectManager = new ReconnectManager(this, getUri)
}
private _handleConnect() {
debug('handleConnect')
this._connectionStatus.set('online')
this.statusListeners.forEach((cb) => cb('online'))
this._reconnectManager.connected()
}
private _handleDisconnect(reason: 'closed' | 'error' | 'manual', closeCode?: number) {
debug('handleDisconnect', {
currentStatus: this.connectionStatus,
closeCode,
reason,
})
let newStatus: 'offline' | 'error'
switch (reason) {
case 'closed':
if (closeCode === TLCloseEventCode.NOT_FOUND) {
newStatus = 'error'
break
}
newStatus = 'offline'
break
case 'error':
newStatus = 'error'
break
case 'manual':
newStatus = 'offline'
break
}
if (
// it the status changed
this.connectionStatus !== newStatus &&
// ignore errors if we're already in the offline state
!(newStatus === 'error' && this.connectionStatus === 'offline')
) {
this._connectionStatus.set(newStatus)
this.statusListeners.forEach((cb) => cb(newStatus, closeCode))
}
this._reconnectManager.disconnected()
}
_setNewSocket(ws: WebSocket) {
assert(!this.isDisposed, 'Tried to set a new websocket on a disposed socket')
assert(
this._ws === null ||
this._ws.readyState === WebSocket.CLOSED ||
this._ws.readyState === WebSocket.CLOSING,
`Tried to set a new websocket in when the existing one was ${this._ws?.readyState}`
)
// NOTE: Sockets can stay for quite a while in the CLOSING state. This is because the transition
// between CLOSING and CLOSED happens either after the closing handshake, or after a
// timeout, but in either case those sockets don't need any special handling, the browser
// will close them eventually. We just "orphan" such sockets and ignore their onclose/onerror.
ws.onopen = () => {
debug('ws.onopen')
assert(
this._ws === ws,
"sockets must only be orphaned when they are CLOSING or CLOSED, so they can't open"
)
this._handleConnect()
}
ws.onclose = (event: CloseEvent) => {
debug('ws.onclose')
if (this._ws === ws) {
this._handleDisconnect('closed', event.code)
} else {
debug('ignoring onclose for an orphaned socket')
}
}
ws.onerror = () => {
debug('ws.onerror')
if (this._ws === ws) {
this._handleDisconnect('error')
} else {
debug('ignoring onerror for an orphaned socket')
}
}
ws.onmessage = (ev) => {
assert(
this._ws === ws,
"sockets must only be orphaned when they are CLOSING or CLOSED, so they can't receive messages"
)
const parsed = JSON.parse(ev.data.toString())
this.messageListeners.forEach((cb) => cb(parsed))
}
this._ws = ws
}
_closeSocket() {
if (this._ws === null) return
this._ws.close()
// explicitly orphan the socket to ignore its onclose/onerror, because onclose can be delayed
this._ws = null
this._handleDisconnect('manual')
}
// TLPersistentClientSocket stuff
_connectionStatus: Atom<TLPersistentClientSocketStatus | 'initial'> = atom(
'websocket connection status',
'initial'
)
// eslint-disable-next-line no-restricted-syntax
get connectionStatus(): TLPersistentClientSocketStatus {
const status = this._connectionStatus.get()
return status === 'initial' ? 'offline' : status
}
sendMessage(msg: TLSocketClientSentEvent<TLRecord>) {
assert(!this.isDisposed, 'Tried to send message on a disposed socket')
if (!this._ws) return
if (this.connectionStatus === 'online') {
const chunks = chunk(JSON.stringify(msg))
for (const part of chunks) {
this._ws.send(part)
}
} else {
console.warn('Tried to send message while ' + this.connectionStatus)
}
}
private messageListeners = new Set<(msg: TLSocketServerSentEvent<TLRecord>) => void>()
onReceiveMessage(cb: (val: TLSocketServerSentEvent<TLRecord>) => void) {
assert(!this.isDisposed, 'Tried to add message listener on a disposed socket')
this.messageListeners.add(cb)
return () => {
this.messageListeners.delete(cb)
}
}
private statusListeners = new Set<
(status: TLPersistentClientSocketStatus, closeCode?: number) => void
>()
onStatusChange(cb: (val: TLPersistentClientSocketStatus, closeCode?: number) => void) {
assert(!this.isDisposed, 'Tried to add status listener on a disposed socket')
this.statusListeners.add(cb)
return () => {
this.statusListeners.delete(cb)
}
}
restart() {
assert(!this.isDisposed, 'Tried to restart a disposed socket')
debug('restarting')
this._closeSocket()
this._reconnectManager.maybeReconnected()
}
}
// Those constants are exported primarily for tests
// ACTIVE_ means the tab is active, document.hidden is false
export const ACTIVE_MIN_DELAY = 500
export const ACTIVE_MAX_DELAY = 2000
// Correspondingly, here document.hidden is true. It's intended to reduce the load and battery drain
// on client devices somewhat when they aren't looking at the tab. We don't disconnect completely
// to minimise issues with reconnection/sync when the tab becomes visible again
export const INACTIVE_MIN_DELAY = 1000
export const INACTIVE_MAX_DELAY = 1000 * 60 * 5
export const DELAY_EXPONENT = 1.5
// this is a tradeoff between quickly detecting connections stuck in the CONNECTING state and
// not needlessly reconnecting if the connection is just slow to establish
export const ATTEMPT_TIMEOUT = 1000
class ReconnectManager {
private isDisposed = false
private disposables: (() => void)[] = [
() => {
if (this.reconnectTimeout) clearTimeout(this.reconnectTimeout)
if (this.recheckConnectingTimeout) clearTimeout(this.recheckConnectingTimeout)
},
]
private reconnectTimeout: ReturnType<typeof setTimeout> | null = null
private recheckConnectingTimeout: ReturnType<typeof setTimeout> | null = null
private lastAttemptStart: number | null = null
intendedDelay: number = ACTIVE_MIN_DELAY
private state: 'pendingAttempt' | 'pendingAttemptResult' | 'delay' | 'connected'
constructor(
private socketAdapter: ClientWebSocketAdapter,
private getUri: () => Promise<string> | string
) {
this.subscribeToReconnectHints()
this.disposables.push(
listenTo(window, 'offline', () => {
debug('window went offline')
// On the one hand, 'offline' event is not really reliable; on the other, the only
// alternative is to wait for pings not being delivered, which takes more than 20 seconds,
// which means we won't see the ClientWebSocketAdapter status change for more than
// 20 seconds after the tab goes offline. Our application layer must be resistent to
// connection restart anyway, so we can just try to reconnect and see if
// we're truly offline.
this.socketAdapter._closeSocket()
})
)
this.state = 'pendingAttempt'
this.intendedDelay = ACTIVE_MIN_DELAY
this.scheduleAttempt()
}
private subscribeToReconnectHints() {
this.disposables.push(
listenTo(window, 'online', () => {
debug('window went online')
this.maybeReconnected()
}),
listenTo(document, 'visibilitychange', () => {
if (!document.hidden) {
debug('document became visible')
this.maybeReconnected()
}
})
)
if (Object.prototype.hasOwnProperty.call(navigator, 'connection')) {
const connection = (navigator as any)['connection'] as EventTarget
this.disposables.push(
listenTo(connection, 'change', () => {
debug('navigator.connection change')
this.maybeReconnected()
})
)
}
}
private scheduleAttempt() {
assert(this.state === 'pendingAttempt')
debug('scheduling a connection attempt')
Promise.resolve(this.getUri()).then((uri) => {
// this can happen if the promise gets resolved too late
if (this.state !== 'pendingAttempt' || this.isDisposed) return
assert(
this.socketAdapter._ws?.readyState !== WebSocket.OPEN,
'There should be no connection attempts while already connected'
)
this.lastAttemptStart = Date.now()
this.socketAdapter._setNewSocket(new WebSocket(uri))
this.state = 'pendingAttemptResult'
})
}
private getMaxDelay() {
return document.hidden ? INACTIVE_MAX_DELAY : ACTIVE_MAX_DELAY
}
private getMinDelay() {
return document.hidden ? INACTIVE_MIN_DELAY : ACTIVE_MIN_DELAY
}
private clearReconnectTimeout() {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout)
this.reconnectTimeout = null
}
}
private clearRecheckConnectingTimeout() {
if (this.recheckConnectingTimeout) {
clearTimeout(this.recheckConnectingTimeout)
this.recheckConnectingTimeout = null
}
}
maybeReconnected() {
debug('ReconnectManager.maybeReconnected')
// It doesn't make sense to have another check scheduled if we're already checking it now.
// If we have a CONNECTING check scheduled and relevant, it'll be recreated below anyway
this.clearRecheckConnectingTimeout()
// readyState can be CONNECTING, OPEN, CLOSING, CLOSED, or null (if getUri() is still pending)
if (this.socketAdapter._ws?.readyState === WebSocket.OPEN) {
debug('ReconnectManager.maybeReconnected: already connected')
// nothing to do, we're already OK
return
}
if (this.socketAdapter._ws?.readyState === WebSocket.CONNECTING) {
debug('ReconnectManager.maybeReconnected: connecting')
// We might be waiting for a TCP connection that sent SYN out and will never get it back,
// while a new connection appeared. On the other hand, we might have just started connecting
// and will succeed in a bit. Thus, we're checking how old the attempt is and retry anew
// if it's old enough. This by itself can delay the connection a bit, but shouldn't prevent
// new connections as long as `maybeReconnected` is not looped itself
assert(
this.lastAttemptStart,
'ReadyState=CONNECTING without lastAttemptStart should be impossible'
)
const sinceLastStart = Date.now() - this.lastAttemptStart
if (sinceLastStart < ATTEMPT_TIMEOUT) {
debug('ReconnectManager.maybeReconnected: connecting, rechecking later')
this.recheckConnectingTimeout = setTimeout(
() => this.maybeReconnected(),
ATTEMPT_TIMEOUT - sinceLastStart
)
} else {
debug('ReconnectManager.maybeReconnected: connecting, but for too long, retry now')
// Last connection attempt was started a while ago, it's possible that network conditions
// changed, and it's worth retrying to connect. `disconnected` will handle reconnection
//
// NOTE: The danger here is looping in connection attemps if connections are slow.
// Make sure that `maybeReconnected` is not called in the `disconnected` codepath!
this.clearRecheckConnectingTimeout()
this.socketAdapter._closeSocket()
}
return
}
debug('ReconnectManager.maybeReconnected: closing/closed/null, retry now')
// readyState is CLOSING or CLOSED, or the websocket is null
// Restart the backoff and retry ASAP (honouring the min delay)
// this.state doesn't really matter, because disconnected() will handle any state correctly
this.intendedDelay = ACTIVE_MIN_DELAY
this.disconnected()
}
disconnected() {
debug('ReconnectManager.disconnected')
// This either means we're freshly disconnected, or the last connection attempt failed;
// either way, time to try again.
// Guard against delayed notifications and recheck synchronously
if (
this.socketAdapter._ws?.readyState !== WebSocket.OPEN &&
this.socketAdapter._ws?.readyState !== WebSocket.CONNECTING
) {
debug('ReconnectManager.disconnected: websocket is not OPEN or CONNECTING')
this.clearReconnectTimeout()
let delayLeft
if (this.state === 'connected') {
// it's the first sign that we got disconnected; the state will be updated below,
// just set the appropriate delay for now
this.intendedDelay = this.getMinDelay()
delayLeft = this.intendedDelay
} else {
delayLeft =
this.lastAttemptStart !== null
? this.lastAttemptStart + this.intendedDelay - Date.now()
: 0
}
if (delayLeft > 0) {
debug('ReconnectManager.disconnected: delaying, delayLeft', delayLeft)
// try again later
this.state = 'delay'
this.reconnectTimeout = setTimeout(() => this.disconnected(), delayLeft)
} else {
// not connected and not delayed, time to retry
this.state = 'pendingAttempt'
this.intendedDelay = Math.min(
this.getMaxDelay(),
Math.max(this.getMinDelay(), this.intendedDelay) * DELAY_EXPONENT
)
debug(
'ReconnectManager.disconnected: attempting a connection, next delay',
this.intendedDelay
)
this.scheduleAttempt()
}
}
}
connected() {
debug('ReconnectManager.connected')
// this notification could've been delayed, recheck synchronously
if (this.socketAdapter._ws?.readyState === WebSocket.OPEN) {
debug('ReconnectManager.connected: websocket is OPEN')
this.state = 'connected'
this.clearReconnectTimeout()
this.intendedDelay = ACTIVE_MIN_DELAY
}
}
close() {
this.disposables.forEach((d) => d())
this.isDisposed = true
}
}

View file

@ -0,0 +1,9 @@
import { TLIncompatibilityReason } from './protocol'
/** @public */
export class TLRemoteSyncError extends Error {
override name = 'RemoteSyncError'
constructor(public readonly reason: TLIncompatibilityReason) {
super(`remote sync error: ${reason}`)
}
}

View file

@ -36,8 +36,8 @@ import {
} from '../shared/default-shape-constants'
import { getFontDefForExport } from '../shared/defaultStyleDefs'
import { useDefaultColorTheme } from '../../..'
import { startEditingShapeWithLabel } from '../../tools/SelectTool/selectHelpers'
import { useDefaultColorTheme } from '../shared/useDefaultColorTheme'
import {
CLONE_HANDLE_MARGIN,
NOTE_CENTER_OFFSET,

View file

@ -0,0 +1,49 @@
import { promiseWithResolve } from '@tldraw/utils'
import { createPersistQueue } from './createPersistQueue'
const tick = () => Promise.resolve()
describe('createPersistQueue', () => {
it('creates a function that runs some async function when invoked', async () => {
let numExecutions = 0
const persist = createPersistQueue(async () => {
numExecutions++
}, 10)
expect(numExecutions).toBe(0)
await tick()
// nothing happens until we call the function
expect(numExecutions).toBe(0)
await persist()
expect(numExecutions).toBe(1)
await tick()
expect(numExecutions).toBe(1)
})
it('will queue up a second invocation if invoked while executing', async () => {
let numExecutions = 0
const promise = promiseWithResolve()
const persist = createPersistQueue(async () => {
await promise
numExecutions++
}, 10)
const persistPromiseA = persist()
await tick()
expect(numExecutions).toBe(0)
persist()
persist()
const persistPromiseB = persist()
await tick()
expect(numExecutions).toBe(0)
// nothing happens until we resolve the promise
promise.resolve(undefined)
await persistPromiseA
expect(numExecutions).toBe(2)
await persistPromiseB
expect(numExecutions).toBe(2)
})
})

View file

@ -0,0 +1,31 @@
// Save the room to supabase
export function createPersistQueue(persist: () => Promise<void>, timeout: number) {
let persistAgain = false
let queue: null | Promise<void> = null
// check whether the worker was woken up to persist after having gone to sleep
return async () => {
if (queue) {
persistAgain = true
return await queue
}
try {
queue = Promise.resolve(
(async () => {
do {
if (persistAgain) {
if (timeout > 0) {
await new Promise((resolve) => setTimeout(resolve, timeout))
}
persistAgain = false
}
await persist()
} while (persistAgain)
})()
)
await queue
} finally {
queue = null
}
}
}

View file

@ -1,6 +1,7 @@
/// <reference no-default-lib="true"/>
/// <reference types="@cloudflare/workers-types" />
export { createPersistQueue } from './createPersistQueue'
export { notFound } from './errors'
export { getUrlMetadata, urlMetadataQueryValidator } from './getUrlMetadata'
export {