Better websocket reconnection handling (#2960)
Right now it's fairly easy to encounter a situation when a tab coming online wouldn't recognise that the connection can now be reestablished for a while. This PR cleans up reconnection logic, reenables tests, and makes sure we get online as robustly as possible. ### Change Type - [x] `patch` — Bug fix ### Test Plan 1. Check that reconnection works as expected - [x] End to end tests --------- Co-authored-by: David Sheldrick <d.j.sheldrick@gmail.com>
This commit is contained in:
parent
f0f133fdd2
commit
c3e8628680
7 changed files with 415 additions and 167 deletions
1
apps/dotcom/.eslintignore
Normal file
1
apps/dotcom/.eslintignore
Normal file
|
@ -0,0 +1 @@
|
||||||
|
jestResolver.js
|
25
apps/dotcom/jestResolver.js
Normal file
25
apps/dotcom/jestResolver.js
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
// HACK: `ws` has an import map mapping browser context to a dummy implementation that just
|
||||||
|
// throws an error, because it's impossible to create a websocket server in the browser
|
||||||
|
// and `ws` tries to be helpful. Unfortunately, it doesn't work well in our tests:
|
||||||
|
// we run Jest in jsdom context, because we test browser APIs, but this causes Jest
|
||||||
|
// to select the browser version of the package, which makes it impossible to run a test
|
||||||
|
// websocket server.
|
||||||
|
//
|
||||||
|
// The solution is to override `ws` exports in the Jest resolver to point to the node version
|
||||||
|
// regardless.
|
||||||
|
//
|
||||||
|
// An additional complication is that Jest seems to expect the resolver to be a CommonJS module,
|
||||||
|
// so this module is CommonJS despite the rest of the codebase being ESM.
|
||||||
|
//
|
||||||
|
// see https://jestjs.io/docs/configuration#resolver-string for docs
|
||||||
|
module.exports = function jestResolver(path, options) {
|
||||||
|
return options.defaultResolver(path, {
|
||||||
|
...options,
|
||||||
|
packageFilter: (pkg) => {
|
||||||
|
if (path === 'ws') {
|
||||||
|
pkg.exports['.']['browser'] = './index.js'
|
||||||
|
}
|
||||||
|
return pkg
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
|
@ -24,6 +24,7 @@
|
||||||
"@sentry/react": "^7.77.0",
|
"@sentry/react": "^7.77.0",
|
||||||
"@tldraw/assets": "workspace:*",
|
"@tldraw/assets": "workspace:*",
|
||||||
"@tldraw/tlsync": "workspace:*",
|
"@tldraw/tlsync": "workspace:*",
|
||||||
|
"@tldraw/utils": "workspace:*",
|
||||||
"@vercel/analytics": "^1.1.1",
|
"@vercel/analytics": "^1.1.1",
|
||||||
"browser-fs-access": "^0.33.0",
|
"browser-fs-access": "^0.33.0",
|
||||||
"idb": "^7.1.1",
|
"idb": "^7.1.1",
|
||||||
|
@ -52,6 +53,7 @@
|
||||||
"ws": "^8.16.0"
|
"ws": "^8.16.0"
|
||||||
},
|
},
|
||||||
"jest": {
|
"jest": {
|
||||||
|
"resolver": "<rootDir>/jestResolver.js",
|
||||||
"preset": "config/jest/node",
|
"preset": "config/jest/node",
|
||||||
"roots": [
|
"roots": [
|
||||||
"<rootDir>"
|
"<rootDir>"
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
import { TLSYNC_PROTOCOL_VERSION } from '@tldraw/tlsync'
|
import { TLSocketClientSentEvent, TLSYNC_PROTOCOL_VERSION } from '@tldraw/tlsync'
|
||||||
import * as ws from 'ws'
|
import { TLRecord } from 'tldraw'
|
||||||
import { ClientWebSocketAdapter } from './ClientWebSocketAdapter'
|
import { ClientWebSocketAdapter, INACTIVE_MIN_DELAY } from './ClientWebSocketAdapter'
|
||||||
|
// NOTE: there is a hack in apps/dotcom/jestResolver.js to make this import work
|
||||||
|
import { WebSocketServer, WebSocket as WsWebSocket } from 'ws'
|
||||||
|
|
||||||
async function waitFor(predicate: () => boolean) {
|
async function waitFor(predicate: () => boolean) {
|
||||||
let safety = 0
|
let safety = 0
|
||||||
|
@ -20,19 +22,16 @@ async function waitFor(predicate: () => boolean) {
|
||||||
|
|
||||||
jest.useFakeTimers()
|
jest.useFakeTimers()
|
||||||
|
|
||||||
// TODO: unskip this test. It accidentally got disabled a long time ago when we moved this file into
|
describe(ClientWebSocketAdapter, () => {
|
||||||
// the dotcom folder which didn't have testing set up at the time. We need to spend some time fixing
|
|
||||||
// it before it can be re-enabled.
|
|
||||||
describe.skip(ClientWebSocketAdapter, () => {
|
|
||||||
let adapter: ClientWebSocketAdapter
|
let adapter: ClientWebSocketAdapter
|
||||||
let wsServer: ws.Server
|
let wsServer: WebSocketServer
|
||||||
let connectedWs: ws.WebSocket
|
let connectedServerSocket: WsWebSocket
|
||||||
const connectMock = jest.fn<void, [socket: ws.WebSocket]>((socket) => {
|
const connectMock = jest.fn<void, [socket: WsWebSocket]>((socket) => {
|
||||||
connectedWs = socket
|
connectedServerSocket = socket
|
||||||
})
|
})
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
adapter = new ClientWebSocketAdapter(() => 'ws://localhost:2233')
|
adapter = new ClientWebSocketAdapter(() => 'ws://localhost:2233')
|
||||||
wsServer = new ws.Server({ port: 2233 })
|
wsServer = new WebSocketServer({ port: 2233 })
|
||||||
wsServer.on('connection', connectMock)
|
wsServer.on('connection', connectMock)
|
||||||
})
|
})
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
|
@ -59,12 +58,15 @@ describe.skip(ClientWebSocketAdapter, () => {
|
||||||
adapter._ws?.onerror?.({} as any)
|
adapter._ws?.onerror?.({} as any)
|
||||||
expect(adapter.connectionStatus).toBe('error')
|
expect(adapter.connectionStatus).toBe('error')
|
||||||
})
|
})
|
||||||
it('should try to reopen the connection if there was an error', () => {
|
it('should try to reopen the connection if there was an error', async () => {
|
||||||
const prevWes = adapter._ws
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
adapter._ws?.onerror?.({} as any)
|
expect(adapter._ws).toBeTruthy()
|
||||||
jest.advanceTimersByTime(1000)
|
const prevClientSocket = adapter._ws
|
||||||
expect(adapter._ws).not.toBe(prevWes)
|
const prevServerSocket = connectedServerSocket
|
||||||
expect(adapter._ws?.readyState).toBe(WebSocket.CONNECTING)
|
prevServerSocket.terminate()
|
||||||
|
await waitFor(() => connectedServerSocket !== prevServerSocket)
|
||||||
|
expect(adapter._ws).not.toBe(prevClientSocket)
|
||||||
|
expect(adapter._ws?.readyState).toBe(WebSocket.OPEN)
|
||||||
})
|
})
|
||||||
it('should transition to online if a retry succeeds', async () => {
|
it('should transition to online if a retry succeeds', async () => {
|
||||||
adapter._ws?.onerror?.({} as any)
|
adapter._ws?.onerror?.({} as any)
|
||||||
|
@ -72,6 +74,7 @@ describe.skip(ClientWebSocketAdapter, () => {
|
||||||
expect(adapter.connectionStatus).toBe('online')
|
expect(adapter.connectionStatus).toBe('online')
|
||||||
})
|
})
|
||||||
it('should call .close on the underlying socket if .close is called before the socket opens', async () => {
|
it('should call .close on the underlying socket if .close is called before the socket opens', async () => {
|
||||||
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
const closeSpy = jest.spyOn(adapter._ws!, 'close')
|
const closeSpy = jest.spyOn(adapter._ws!, 'close')
|
||||||
adapter.close()
|
adapter.close()
|
||||||
await waitFor(() => closeSpy.mock.calls.length > 0)
|
await waitFor(() => closeSpy.mock.calls.length > 0)
|
||||||
|
@ -79,39 +82,37 @@ describe.skip(ClientWebSocketAdapter, () => {
|
||||||
})
|
})
|
||||||
it('should transition to offline if the server disconnects', async () => {
|
it('should transition to offline if the server disconnects', async () => {
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
connectedWs.terminate()
|
connectedServerSocket.terminate()
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
||||||
expect(adapter.connectionStatus).toBe('offline')
|
expect(adapter.connectionStatus).toBe('offline')
|
||||||
})
|
})
|
||||||
it('retries to connect if the server disconnects', async () => {
|
it('retries to connect if the server disconnects', async () => {
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
connectedWs.terminate()
|
connectedServerSocket.terminate()
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
||||||
expect(adapter.connectionStatus).toBe('offline')
|
expect(adapter.connectionStatus).toBe('offline')
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
expect(adapter.connectionStatus).toBe('online')
|
expect(adapter.connectionStatus).toBe('online')
|
||||||
connectedWs.terminate()
|
connectedServerSocket.terminate()
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
||||||
expect(adapter.connectionStatus).toBe('offline')
|
expect(adapter.connectionStatus).toBe('offline')
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
expect(adapter.connectionStatus).toBe('online')
|
expect(adapter.connectionStatus).toBe('online')
|
||||||
})
|
})
|
||||||
|
|
||||||
it('closes the socket if the window goes offline and attempts to reconnect', async () => {
|
it('attempts to reconnect early if the tab becomes active', async () => {
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
|
||||||
const closeSpy = jest.spyOn(adapter._ws!, 'close')
|
|
||||||
window.dispatchEvent(new Event('offline'))
|
|
||||||
expect(closeSpy).toHaveBeenCalled()
|
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
|
||||||
})
|
|
||||||
|
|
||||||
it('attempts to reconnect early if the window comes back online', async () => {
|
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
|
const hiddenMock = jest.spyOn(document, 'hidden', 'get')
|
||||||
|
hiddenMock.mockReturnValue(true)
|
||||||
|
// it's necessary to close the socket, as otherwise the websocket might stay half-open
|
||||||
|
connectedServerSocket.close()
|
||||||
wsServer.close()
|
wsServer.close()
|
||||||
window.dispatchEvent(new Event('offline'))
|
await waitFor(() => adapter._ws?.readyState !== WebSocket.OPEN)
|
||||||
adapter._reconnectTimeout.intervalLength = 50000
|
expect(adapter._reconnectManager.intendedDelay).toBeGreaterThanOrEqual(INACTIVE_MIN_DELAY)
|
||||||
window.dispatchEvent(new Event('online'))
|
hiddenMock.mockReturnValue(false)
|
||||||
expect(adapter._reconnectTimeout.intervalLength).toBeLessThan(1000)
|
document.dispatchEvent(new Event('visibilitychange'))
|
||||||
|
expect(adapter._reconnectManager.intendedDelay).toBeLessThan(INACTIVE_MIN_DELAY)
|
||||||
|
hiddenMock.mockRestore()
|
||||||
})
|
})
|
||||||
|
|
||||||
it('supports receiving messages', async () => {
|
it('supports receiving messages', async () => {
|
||||||
|
@ -125,8 +126,7 @@ describe.skip(ClientWebSocketAdapter, () => {
|
||||||
expect(onMessage).toHaveBeenCalledWith({ type: 'message', data: 'hello' })
|
expect(onMessage).toHaveBeenCalledWith({ type: 'message', data: 'hello' })
|
||||||
})
|
})
|
||||||
|
|
||||||
// TODO: this is failing on github actions, investigate
|
it('supports sending messages', async () => {
|
||||||
it.skip('supports sending messages', async () => {
|
|
||||||
const onMessage = jest.fn()
|
const onMessage = jest.fn()
|
||||||
connectMock.mockImplementationOnce((ws) => {
|
connectMock.mockImplementationOnce((ws) => {
|
||||||
ws.on('message', onMessage)
|
ws.on('message', onMessage)
|
||||||
|
@ -134,19 +134,19 @@ describe.skip(ClientWebSocketAdapter, () => {
|
||||||
|
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
|
|
||||||
adapter.sendMessage({
|
const message: TLSocketClientSentEvent<TLRecord> = {
|
||||||
type: 'connect',
|
type: 'connect',
|
||||||
connectRequestId: 'test',
|
connectRequestId: 'test',
|
||||||
schema: { schemaVersion: 0, storeVersion: 0, recordVersions: {} },
|
schema: { schemaVersion: 0, storeVersion: 0, recordVersions: {} },
|
||||||
protocolVersion: TLSYNC_PROTOCOL_VERSION,
|
protocolVersion: TLSYNC_PROTOCOL_VERSION,
|
||||||
lastServerClock: 0,
|
lastServerClock: 0,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
adapter.sendMessage(message)
|
||||||
|
|
||||||
await waitFor(() => onMessage.mock.calls.length === 1)
|
await waitFor(() => onMessage.mock.calls.length === 1)
|
||||||
|
|
||||||
expect(onMessage.mock.calls[0][0].toString()).toBe(
|
expect(JSON.parse(onMessage.mock.calls[0][0].toString())).toEqual(message)
|
||||||
'{"type":"connect","instanceId":"test","lastServerClock":0}'
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
it('signals status changes', async () => {
|
it('signals status changes', async () => {
|
||||||
|
@ -154,12 +154,12 @@ describe.skip(ClientWebSocketAdapter, () => {
|
||||||
adapter.onStatusChange(onStatusChange)
|
adapter.onStatusChange(onStatusChange)
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
expect(onStatusChange).toHaveBeenCalledWith('online')
|
expect(onStatusChange).toHaveBeenCalledWith('online')
|
||||||
connectedWs.terminate()
|
connectedServerSocket.terminate()
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
||||||
expect(onStatusChange).toHaveBeenCalledWith('offline')
|
expect(onStatusChange).toHaveBeenCalledWith('offline')
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
expect(onStatusChange).toHaveBeenCalledWith('online')
|
expect(onStatusChange).toHaveBeenCalledWith('online')
|
||||||
connectedWs.terminate()
|
connectedServerSocket.terminate()
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.CLOSED)
|
||||||
expect(onStatusChange).toHaveBeenCalledWith('offline')
|
expect(onStatusChange).toHaveBeenCalledWith('offline')
|
||||||
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
await waitFor(() => adapter._ws?.readyState === WebSocket.OPEN)
|
||||||
|
|
|
@ -6,137 +6,160 @@ import {
|
||||||
TLSocketClientSentEvent,
|
TLSocketClientSentEvent,
|
||||||
TLSocketServerSentEvent,
|
TLSocketServerSentEvent,
|
||||||
} from '@tldraw/tlsync'
|
} from '@tldraw/tlsync'
|
||||||
|
import { assert } from '@tldraw/utils'
|
||||||
import { atom, Atom, TLRecord } from 'tldraw'
|
import { atom, Atom, TLRecord } from 'tldraw'
|
||||||
|
|
||||||
function windowListen(...args: Parameters<typeof window.addEventListener>) {
|
function listenTo<T extends EventTarget>(target: T, event: string, handler: () => void) {
|
||||||
window.addEventListener(...args)
|
target.addEventListener(event, handler)
|
||||||
return () => {
|
return () => {
|
||||||
window.removeEventListener(...args)
|
target.removeEventListener(event, handler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function debug(...args: any[]) {
|
function debug(...args: any[]) {
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
if (typeof window !== 'undefined' && window.__tldraw_socket_debug) {
|
if (typeof window !== 'undefined' && window.__tldraw_socket_debug) {
|
||||||
|
const now = new Date()
|
||||||
// eslint-disable-next-line no-console
|
// eslint-disable-next-line no-console
|
||||||
console.log(...args, new Error().stack)
|
console.log(
|
||||||
|
`${now.getHours()}:${now.getMinutes()}:${now.getSeconds()}.${now.getMilliseconds()}`,
|
||||||
|
...args
|
||||||
|
//, new Error().stack
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: ClientWebSocketAdapter requires its users to implement their own connection loss
|
||||||
|
// detection, for example by regularly pinging the server and .restart()ing
|
||||||
|
// the connection when a number of pings goes unanswered. Without this mechanism,
|
||||||
|
// we might not be able to detect the websocket connection going down in a timely manner
|
||||||
|
// (it will probably time out on outgoing data packets at some point).
|
||||||
|
//
|
||||||
|
// This is by design. Whilst the Websocket protocol specifies protocol-level pings,
|
||||||
|
// they don't seem to be surfaced in browser APIs and can't be relied on. Therefore,
|
||||||
|
// pings need to be implemented one level up, on the application API side, which for our
|
||||||
|
// codebase means whatever code that uses ClientWebSocketAdapter.
|
||||||
|
|
||||||
export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord> {
|
export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord> {
|
||||||
_ws: WebSocket | null = null
|
_ws: WebSocket | null = null
|
||||||
|
|
||||||
wasManuallyClosed = false
|
isDisposed = false
|
||||||
|
|
||||||
disposables: (() => void)[] = []
|
readonly _reconnectManager: ReconnectManager
|
||||||
|
|
||||||
|
// TODO: .close should be a project-wide interface with a common contract (.close()d thing
|
||||||
|
// can only be garbage collected, and can't be used anymore)
|
||||||
close() {
|
close() {
|
||||||
this.wasManuallyClosed = true
|
this.isDisposed = true
|
||||||
this.disposables.forEach((d) => d())
|
this._reconnectManager.close()
|
||||||
this._reconnectTimeout.clear()
|
// WebSocket.close() is idempotent
|
||||||
if (this._ws?.readyState === WebSocket.OPEN) {
|
|
||||||
debug('close d')
|
|
||||||
this._ws.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
constructor(private getUri: () => Promise<string> | string) {
|
|
||||||
this.disposables.push(
|
|
||||||
windowListen('online', () => {
|
|
||||||
debug('window online')
|
|
||||||
if (this.connectionStatus !== 'online') {
|
|
||||||
this._reconnectTimeout.clear()
|
|
||||||
this._attemptReconnect()
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
windowListen('offline', () => {
|
|
||||||
debug('window offline')
|
|
||||||
if (this.connectionStatus === 'online') {
|
|
||||||
this._ws?.close()
|
this._ws?.close()
|
||||||
this._ws?.onclose?.(null as any)
|
|
||||||
}
|
}
|
||||||
}),
|
|
||||||
windowListen('pointermove', () => {
|
constructor(getUri: () => Promise<string> | string) {
|
||||||
// if the pointer moves while we are offline, we should try to reconnect more
|
this._reconnectManager = new ReconnectManager(this, getUri)
|
||||||
// often than every 5 mins!
|
|
||||||
if (this.connectionStatus !== 'online') {
|
|
||||||
this._reconnectTimeout.userInteractionOccurred()
|
|
||||||
}
|
}
|
||||||
}),
|
|
||||||
windowListen('keydown', () => {
|
private _handleConnect() {
|
||||||
// if the user pressed a key while we are offline, we should try to reconnect more
|
debug('handleConnect')
|
||||||
// often than every 5 mins!
|
|
||||||
if (this.connectionStatus !== 'online') {
|
this._connectionStatus.set('online')
|
||||||
this._reconnectTimeout.userInteractionOccurred()
|
this.statusListeners.forEach((cb) => cb('online'))
|
||||||
|
|
||||||
|
this._reconnectManager.connected()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private _handleDisconnect(reason: 'closed' | 'error' | 'manual') {
|
||||||
|
debug('handleDisconnect', {
|
||||||
|
currentStatus: this.connectionStatus,
|
||||||
|
reason,
|
||||||
})
|
})
|
||||||
)
|
|
||||||
this._reconnectTimeout.run()
|
let newStatus: 'offline' | 'error'
|
||||||
|
switch (reason) {
|
||||||
|
case 'closed':
|
||||||
|
newStatus = 'offline'
|
||||||
|
break
|
||||||
|
case 'error':
|
||||||
|
newStatus = 'error'
|
||||||
|
break
|
||||||
|
case 'manual':
|
||||||
|
newStatus = 'offline'
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleDisconnect(status: Exclude<TLPersistentClientSocketStatus, 'online'>) {
|
|
||||||
debug('handleDisconnect', status, this.connectionStatus)
|
|
||||||
if (
|
if (
|
||||||
// if the status is the same as before, don't do anything
|
// it the status changed
|
||||||
this.connectionStatus === status ||
|
this.connectionStatus !== newStatus &&
|
||||||
// if we receive an error we only care about it while we're in the initial state
|
// ignore errors if we're already in the offline state
|
||||||
(status === 'error' && this.connectionStatus === 'offline')
|
!(newStatus === 'error' && this.connectionStatus === 'offline')
|
||||||
) {
|
) {
|
||||||
this._attemptReconnect()
|
this._connectionStatus.set(newStatus)
|
||||||
return
|
this.statusListeners.forEach((cb) => cb(newStatus))
|
||||||
}
|
|
||||||
this._connectionStatus.set(status)
|
|
||||||
this.statusListeners.forEach((cb) => cb(status))
|
|
||||||
this._reconnectTimeout.clear()
|
|
||||||
this._attemptReconnect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private configureSocket() {
|
this._reconnectManager.disconnected()
|
||||||
const ws = this._ws
|
}
|
||||||
if (!ws) return
|
|
||||||
|
_setNewSocket(ws: WebSocket) {
|
||||||
|
assert(!this.isDisposed, 'Tried to set a new websocket on a disposed socket')
|
||||||
|
assert(
|
||||||
|
this._ws === null ||
|
||||||
|
this._ws.readyState === WebSocket.CLOSED ||
|
||||||
|
this._ws.readyState === WebSocket.CLOSING,
|
||||||
|
`Tried to set a new websocket in when the existing one was ${this._ws?.readyState}`
|
||||||
|
)
|
||||||
|
// NOTE: Sockets can stay for quite a while in the CLOSING state. This is because the transition
|
||||||
|
// between CLOSING and CLOSED happens either after the closing handshake, or after a
|
||||||
|
// timeout, but in either case those sockets don't need any special handling, the browser
|
||||||
|
// will close them eventually. We just "orphan" such sockets and ignore their onclose/onerror.
|
||||||
ws.onopen = () => {
|
ws.onopen = () => {
|
||||||
debug('ws.onopen')
|
debug('ws.onopen')
|
||||||
// ws might be opened multiple times so need to check that it wasn't already supplanted
|
assert(
|
||||||
if (this._ws !== ws || this.wasManuallyClosed) {
|
this._ws === ws,
|
||||||
if (ws.readyState === WebSocket.OPEN) {
|
"sockets must only be orphaned when they are CLOSING or CLOSED, so they can't open"
|
||||||
debug('close a')
|
)
|
||||||
ws.close()
|
this._handleConnect()
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
this._connectionStatus.set('online')
|
|
||||||
this.statusListeners.forEach((cb) => cb(this.connectionStatus))
|
|
||||||
this._reconnectTimeout.clear()
|
|
||||||
}
|
}
|
||||||
ws.onclose = () => {
|
ws.onclose = () => {
|
||||||
debug('ws.onclose')
|
debug('ws.onclose')
|
||||||
this.handleDisconnect('offline')
|
if (this._ws === ws) {
|
||||||
|
this._handleDisconnect('closed')
|
||||||
|
} else {
|
||||||
|
debug('ignoring onclose for an orphaned socket')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ws.onerror = () => {
|
ws.onerror = () => {
|
||||||
debug('ws.onerror')
|
debug('ws.onerror')
|
||||||
this.handleDisconnect('error')
|
if (this._ws === ws) {
|
||||||
|
this._handleDisconnect('error')
|
||||||
|
} else {
|
||||||
|
debug('ignoring onerror for an orphaned socket')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ws.onmessage = (ev) => {
|
ws.onmessage = (ev) => {
|
||||||
|
assert(
|
||||||
|
this._ws === ws,
|
||||||
|
"sockets must only be orphaned when they are CLOSING or CLOSED, so they can't receive messages"
|
||||||
|
)
|
||||||
const parsed = JSON.parse(ev.data.toString())
|
const parsed = JSON.parse(ev.data.toString())
|
||||||
this.messageListeners.forEach((cb) => cb(parsed))
|
this.messageListeners.forEach((cb) => cb(parsed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this._ws = ws
|
||||||
}
|
}
|
||||||
|
|
||||||
readonly _reconnectTimeout = new ExponentialBackoffTimeout(async () => {
|
_closeSocket() {
|
||||||
debug('close b')
|
if (this._ws === null) return
|
||||||
this._ws?.close()
|
|
||||||
this._ws = new WebSocket(await this.getUri())
|
|
||||||
this.configureSocket()
|
|
||||||
})
|
|
||||||
|
|
||||||
_attemptReconnect() {
|
this._ws.close()
|
||||||
debug('_attemptReconnect', this.wasManuallyClosed)
|
// explicitly orphan the socket to ignore its onclose/onerror, because onclose can be delayed
|
||||||
if (this.wasManuallyClosed) {
|
this._ws = null
|
||||||
return
|
this._handleDisconnect('manual')
|
||||||
}
|
|
||||||
this._reconnectTimeout.run()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TLPersistentClientSocket stuff
|
||||||
|
|
||||||
_connectionStatus: Atom<TLPersistentClientSocketStatus | 'initial'> = atom(
|
_connectionStatus: Atom<TLPersistentClientSocketStatus | 'initial'> = atom(
|
||||||
'websocket connection status',
|
'websocket connection status',
|
||||||
'initial'
|
'initial'
|
||||||
|
@ -149,6 +172,8 @@ export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord
|
||||||
}
|
}
|
||||||
|
|
||||||
sendMessage(msg: TLSocketClientSentEvent<TLRecord>) {
|
sendMessage(msg: TLSocketClientSentEvent<TLRecord>) {
|
||||||
|
assert(!this.isDisposed, 'Tried to send message on a disposed socket')
|
||||||
|
|
||||||
if (!this._ws) return
|
if (!this._ws) return
|
||||||
if (this.connectionStatus === 'online') {
|
if (this.connectionStatus === 'online') {
|
||||||
const chunks = chunk(serializeMessage(msg))
|
const chunks = chunk(serializeMessage(msg))
|
||||||
|
@ -162,6 +187,8 @@ export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord
|
||||||
|
|
||||||
private messageListeners = new Set<(msg: TLSocketServerSentEvent<TLRecord>) => void>()
|
private messageListeners = new Set<(msg: TLSocketServerSentEvent<TLRecord>) => void>()
|
||||||
onReceiveMessage(cb: (val: TLSocketServerSentEvent<TLRecord>) => void) {
|
onReceiveMessage(cb: (val: TLSocketServerSentEvent<TLRecord>) => void) {
|
||||||
|
assert(!this.isDisposed, 'Tried to add message listener on a disposed socket')
|
||||||
|
|
||||||
this.messageListeners.add(cb)
|
this.messageListeners.add(cb)
|
||||||
return () => {
|
return () => {
|
||||||
this.messageListeners.delete(cb)
|
this.messageListeners.delete(cb)
|
||||||
|
@ -170,6 +197,8 @@ export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord
|
||||||
|
|
||||||
private statusListeners = new Set<(status: TLPersistentClientSocketStatus) => void>()
|
private statusListeners = new Set<(status: TLPersistentClientSocketStatus) => void>()
|
||||||
onStatusChange(cb: (val: TLPersistentClientSocketStatus) => void) {
|
onStatusChange(cb: (val: TLPersistentClientSocketStatus) => void) {
|
||||||
|
assert(!this.isDisposed, 'Tried to add status listener on a disposed socket')
|
||||||
|
|
||||||
this.statusListeners.add(cb)
|
this.statusListeners.add(cb)
|
||||||
return () => {
|
return () => {
|
||||||
this.statusListeners.delete(cb)
|
this.statusListeners.delete(cb)
|
||||||
|
@ -177,59 +206,246 @@ export class ClientWebSocketAdapter implements TLPersistentClientSocket<TLRecord
|
||||||
}
|
}
|
||||||
|
|
||||||
restart() {
|
restart() {
|
||||||
debug('close c')
|
assert(!this.isDisposed, 'Tried to restart a disposed socket')
|
||||||
this.close()
|
debug('restarting')
|
||||||
this.wasManuallyClosed = false
|
|
||||||
this._reconnectTimeout.clear()
|
this._closeSocket()
|
||||||
this._reconnectTimeout.runNow()
|
this._reconnectManager.maybeReconnected()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ExponentialBackoffTimeout {
|
// Those constants are exported primarily for tests
|
||||||
private timeout: NodeJS.Timeout | null = null
|
// ACTIVE_ means the tab is active, document.hidden is false
|
||||||
private nextScheduledRunTimestamp = 0
|
export const ACTIVE_MIN_DELAY = 500
|
||||||
intervalLength: number
|
export const ACTIVE_MAX_DELAY = 2000
|
||||||
|
// Correspondingly, here document.hidden is true. It's intended to reduce the load and battery drain
|
||||||
|
// on client devices somewhat when they aren't looking at the tab. We don't disconnect completely
|
||||||
|
// to minimise issues with reconnection/sync when the tab becomes visible again
|
||||||
|
export const INACTIVE_MIN_DELAY = 1000
|
||||||
|
export const INACTIVE_MAX_DELAY = 1000 * 60 * 5
|
||||||
|
export const DELAY_EXPONENT = 1.5
|
||||||
|
// this is a tradeoff between quickly detecting connections stuck in the CONNECTING state and
|
||||||
|
// not needlessly reconnecting if the connection is just slow to establish
|
||||||
|
export const ATTEMPT_TIMEOUT = 1000
|
||||||
|
|
||||||
|
class ReconnectManager {
|
||||||
|
private isDisposed = false
|
||||||
|
private disposables: (() => void)[] = [
|
||||||
|
() => {
|
||||||
|
if (this.reconnectTimeout) clearTimeout(this.reconnectTimeout)
|
||||||
|
if (this.recheckConnectingTimeout) clearTimeout(this.recheckConnectingTimeout)
|
||||||
|
},
|
||||||
|
]
|
||||||
|
private reconnectTimeout: ReturnType<typeof setTimeout> | null = null
|
||||||
|
private recheckConnectingTimeout: ReturnType<typeof setTimeout> | null = null
|
||||||
|
|
||||||
|
private lastAttemptStart: number | null = null
|
||||||
|
intendedDelay: number = ACTIVE_MIN_DELAY
|
||||||
|
private state: 'pendingAttempt' | 'pendingAttemptResult' | 'delay' | 'connected'
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private cb: () => Promise<void>,
|
private socketAdapter: ClientWebSocketAdapter,
|
||||||
// five mins
|
private getUri: () => Promise<string> | string
|
||||||
private readonly maxIdleIntervalLength: number = 1000 * 60 * 5,
|
|
||||||
// five seconds
|
|
||||||
private readonly maxInteractiveIntervalLength: number = 1000,
|
|
||||||
private startIntervalLength: number = 500
|
|
||||||
) {
|
) {
|
||||||
this.intervalLength = startIntervalLength
|
this.subscribeToReconnectHints()
|
||||||
|
|
||||||
|
this.disposables.push(
|
||||||
|
listenTo(window, 'offline', () => {
|
||||||
|
debug('window went offline')
|
||||||
|
// On the one hand, 'offline' event is not really reliable; on the other, the only
|
||||||
|
// alternative is to wait for pings not being delivered, which takes more than 20 seconds,
|
||||||
|
// which means we won't see the ClientWebSocketAdapter status change for more than
|
||||||
|
// 20 seconds after the tab goes offline. Our application layer must be resistent to
|
||||||
|
// connection restart anyway, so we can just try to reconnect and see if
|
||||||
|
// we're truly offline.
|
||||||
|
this.socketAdapter._closeSocket()
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
this.state = 'pendingAttempt'
|
||||||
|
this.intendedDelay = ACTIVE_MIN_DELAY
|
||||||
|
this.scheduleAttempt()
|
||||||
}
|
}
|
||||||
|
|
||||||
runNow() {
|
private subscribeToReconnectHints() {
|
||||||
this.cb()
|
this.disposables.push(
|
||||||
|
listenTo(window, 'online', () => {
|
||||||
|
debug('window went online')
|
||||||
|
this.maybeReconnected()
|
||||||
|
}),
|
||||||
|
listenTo(document, 'visibilitychange', () => {
|
||||||
|
if (!document.hidden) {
|
||||||
|
debug('document became visible')
|
||||||
|
this.maybeReconnected()
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
run() {
|
if (Object.prototype.hasOwnProperty.call(navigator, 'connection')) {
|
||||||
if (this.timeout) return
|
const connection = (navigator as any)['connection'] as EventTarget
|
||||||
this.timeout = setTimeout(() => {
|
this.disposables.push(
|
||||||
this.cb()
|
listenTo(connection, 'change', () => {
|
||||||
this.intervalLength = Math.min(this.intervalLength * 2, this.maxIdleIntervalLength)
|
debug('navigator.connection change')
|
||||||
if (this.timeout) {
|
this.maybeReconnected()
|
||||||
clearTimeout(this.timeout)
|
})
|
||||||
this.timeout = null
|
)
|
||||||
}
|
|
||||||
}, this.intervalLength)
|
|
||||||
this.nextScheduledRunTimestamp = Date.now() + this.intervalLength
|
|
||||||
}
|
|
||||||
|
|
||||||
clear() {
|
|
||||||
this.intervalLength = this.startIntervalLength
|
|
||||||
if (this.timeout) {
|
|
||||||
clearTimeout(this.timeout)
|
|
||||||
this.timeout = null
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
userInteractionOccurred() {
|
private scheduleAttempt() {
|
||||||
if (Date.now() + this.maxInteractiveIntervalLength < this.nextScheduledRunTimestamp) {
|
assert(this.state === 'pendingAttempt')
|
||||||
this.clear()
|
debug('scheduling a connection attempt')
|
||||||
this.run()
|
Promise.resolve(this.getUri()).then((uri) => {
|
||||||
|
// this can happen if the promise gets resolved too late
|
||||||
|
if (this.state !== 'pendingAttempt' || this.isDisposed) return
|
||||||
|
assert(
|
||||||
|
this.socketAdapter._ws?.readyState !== WebSocket.OPEN,
|
||||||
|
'There should be no connection attempts while already connected'
|
||||||
|
)
|
||||||
|
|
||||||
|
this.lastAttemptStart = Date.now()
|
||||||
|
this.socketAdapter._setNewSocket(new WebSocket(uri))
|
||||||
|
this.state = 'pendingAttemptResult'
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
private getMaxDelay() {
|
||||||
|
return document.hidden ? INACTIVE_MAX_DELAY : ACTIVE_MAX_DELAY
|
||||||
|
}
|
||||||
|
|
||||||
|
private getMinDelay() {
|
||||||
|
return document.hidden ? INACTIVE_MIN_DELAY : ACTIVE_MIN_DELAY
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearReconnectTimeout() {
|
||||||
|
if (this.reconnectTimeout) {
|
||||||
|
clearTimeout(this.reconnectTimeout)
|
||||||
|
this.reconnectTimeout = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearRecheckConnectingTimeout() {
|
||||||
|
if (this.recheckConnectingTimeout) {
|
||||||
|
clearTimeout(this.recheckConnectingTimeout)
|
||||||
|
this.recheckConnectingTimeout = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
maybeReconnected() {
|
||||||
|
debug('ReconnectManager.maybeReconnected')
|
||||||
|
// It doesn't make sense to have another check scheduled if we're already checking it now.
|
||||||
|
// If we have a CONNECTING check scheduled and relevant, it'll be recreated below anyway
|
||||||
|
this.clearRecheckConnectingTimeout()
|
||||||
|
|
||||||
|
// readyState can be CONNECTING, OPEN, CLOSING, CLOSED, or null (if getUri() is still pending)
|
||||||
|
if (this.socketAdapter._ws?.readyState === WebSocket.OPEN) {
|
||||||
|
debug('ReconnectManager.maybeReconnected: already connected')
|
||||||
|
// nothing to do, we're already OK
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.socketAdapter._ws?.readyState === WebSocket.CONNECTING) {
|
||||||
|
debug('ReconnectManager.maybeReconnected: connecting')
|
||||||
|
// We might be waiting for a TCP connection that sent SYN out and will never get it back,
|
||||||
|
// while a new connection appeared. On the other hand, we might have just started connecting
|
||||||
|
// and will succeed in a bit. Thus, we're checking how old the attempt is and retry anew
|
||||||
|
// if it's old enough. This by itself can delay the connection a bit, but shouldn't prevent
|
||||||
|
// new connections as long as `maybeReconnected` is not looped itself
|
||||||
|
assert(
|
||||||
|
this.lastAttemptStart,
|
||||||
|
'ReadyState=CONNECTING without lastAttemptStart should be impossible'
|
||||||
|
)
|
||||||
|
const sinceLastStart = Date.now() - this.lastAttemptStart
|
||||||
|
if (sinceLastStart < ATTEMPT_TIMEOUT) {
|
||||||
|
debug('ReconnectManager.maybeReconnected: connecting, rechecking later')
|
||||||
|
this.recheckConnectingTimeout = setTimeout(
|
||||||
|
() => this.maybeReconnected(),
|
||||||
|
ATTEMPT_TIMEOUT - sinceLastStart
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
debug('ReconnectManager.maybeReconnected: connecting, but for too long, retry now')
|
||||||
|
// Last connection attempt was started a while ago, it's possible that network conditions
|
||||||
|
// changed, and it's worth retrying to connect. `disconnected` will handle reconnection
|
||||||
|
//
|
||||||
|
// NOTE: The danger here is looping in connection attemps if connections are slow.
|
||||||
|
// Make sure that `maybeReconnected` is not called in the `disconnected` codepath!
|
||||||
|
this.clearRecheckConnectingTimeout()
|
||||||
|
this.socketAdapter._closeSocket()
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
debug('ReconnectManager.maybeReconnected: closing/closed/null, retry now')
|
||||||
|
// readyState is CLOSING or CLOSED, or the websocket is null
|
||||||
|
// Restart the backoff and retry ASAP (honouring the min delay)
|
||||||
|
// this.state doesn't really matter, because disconnected() will handle any state correctly
|
||||||
|
this.intendedDelay = ACTIVE_MIN_DELAY
|
||||||
|
this.disconnected()
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnected() {
|
||||||
|
debug('ReconnectManager.disconnected')
|
||||||
|
// This either means we're freshly disconnected, or the last connection attempt failed;
|
||||||
|
// either way, time to try again.
|
||||||
|
|
||||||
|
// Guard against delayed notifications and recheck synchronously
|
||||||
|
if (
|
||||||
|
this.socketAdapter._ws?.readyState !== WebSocket.OPEN &&
|
||||||
|
this.socketAdapter._ws?.readyState !== WebSocket.CONNECTING
|
||||||
|
) {
|
||||||
|
debug('ReconnectManager.disconnected: websocket is not OPEN or CONNECTING')
|
||||||
|
this.clearReconnectTimeout()
|
||||||
|
|
||||||
|
let delayLeft
|
||||||
|
if (this.state === 'connected') {
|
||||||
|
// it's the first sign that we got disconnected; the state will be updated below,
|
||||||
|
// just set the appropriate delay for now
|
||||||
|
this.intendedDelay = this.getMinDelay()
|
||||||
|
delayLeft = this.intendedDelay
|
||||||
|
} else {
|
||||||
|
delayLeft =
|
||||||
|
this.lastAttemptStart !== null
|
||||||
|
? this.lastAttemptStart + this.intendedDelay - Date.now()
|
||||||
|
: 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delayLeft > 0) {
|
||||||
|
debug('ReconnectManager.disconnected: delaying, delayLeft', delayLeft)
|
||||||
|
// try again later
|
||||||
|
this.state = 'delay'
|
||||||
|
|
||||||
|
this.reconnectTimeout = setTimeout(() => this.disconnected(), delayLeft)
|
||||||
|
} else {
|
||||||
|
// not connected and not delayed, time to retry
|
||||||
|
this.state = 'pendingAttempt'
|
||||||
|
|
||||||
|
this.intendedDelay = Math.min(
|
||||||
|
this.getMaxDelay(),
|
||||||
|
Math.max(this.getMinDelay(), this.intendedDelay) * DELAY_EXPONENT
|
||||||
|
)
|
||||||
|
debug(
|
||||||
|
'ReconnectManager.disconnected: attempting a connection, next delay',
|
||||||
|
this.intendedDelay
|
||||||
|
)
|
||||||
|
this.scheduleAttempt()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connected() {
|
||||||
|
debug('ReconnectManager.connected')
|
||||||
|
// this notification could've been delayed, recheck synchronously
|
||||||
|
if (this.socketAdapter._ws?.readyState === WebSocket.OPEN) {
|
||||||
|
debug('ReconnectManager.connected: websocket is OPEN')
|
||||||
|
this.state = 'connected'
|
||||||
|
this.clearReconnectTimeout()
|
||||||
|
this.intendedDelay = ACTIVE_MIN_DELAY
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close() {
|
||||||
|
this.disposables.forEach((d) => d())
|
||||||
|
this.isDisposed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -34,6 +34,9 @@
|
||||||
{
|
{
|
||||||
"path": "../../packages/tlsync"
|
"path": "../../packages/tlsync"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"path": "../../packages/utils"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"path": "../../packages/validate"
|
"path": "../../packages/validate"
|
||||||
}
|
}
|
||||||
|
|
|
@ -11623,6 +11623,7 @@ __metadata:
|
||||||
"@sentry/react": "npm:^7.77.0"
|
"@sentry/react": "npm:^7.77.0"
|
||||||
"@tldraw/assets": "workspace:*"
|
"@tldraw/assets": "workspace:*"
|
||||||
"@tldraw/tlsync": "workspace:*"
|
"@tldraw/tlsync": "workspace:*"
|
||||||
|
"@tldraw/utils": "workspace:*"
|
||||||
"@tldraw/validate": "workspace:*"
|
"@tldraw/validate": "workspace:*"
|
||||||
"@types/qrcode": "npm:^1.5.0"
|
"@types/qrcode": "npm:^1.5.0"
|
||||||
"@types/react": "npm:^18.2.47"
|
"@types/react": "npm:^18.2.47"
|
||||||
|
|
Loading…
Reference in a new issue