///
///
import { SupabaseClient } from '@supabase/supabase-js'
import {
RoomSnapshot,
TLServer,
TLSyncRoom,
type DBLoadResult,
type PersistedRoomSnapshotForSupabase,
type RoomState,
} from '@tldraw/tlsync'
import { assert, assertExists } from '@tldraw/utils'
import { IRequest, Router } from 'itty-router'
import Toucan from 'toucan-js'
import { AlarmScheduler } from './AlarmScheduler'
import { PERSIST_INTERVAL_MS } from './config'
import { getR2KeyForRoom } from './r2'
import { Analytics, Environment } from './types'
import { createSupabaseClient } from './utils/createSupabaseClient'
import { throttle } from './utils/throttle'
const MAX_CONNECTIONS = 50
// increment this any time you make a change to this type
const CURRENT_DOCUMENT_INFO_VERSION = 0
type DocumentInfo = {
version: number
slug: string
}
export class TLDrawDurableObject extends TLServer {
// A unique identifier for this instance of the Durable Object
id: DurableObjectId
// For TLSyncRoom
_roomState: RoomState | undefined
// For storage
storage: DurableObjectStorage
// For persistence
supabaseClient: SupabaseClient | void
// For analytics
measure: Analytics | undefined
// For error tracking
sentryDSN: string | undefined
readonly supabaseTable: string
readonly r2: {
readonly rooms: R2Bucket
readonly versionCache: R2Bucket
}
_documentInfo: DocumentInfo | null = null
constructor(
private controller: DurableObjectState,
private env: Environment
) {
super()
this.id = controller.id
this.storage = controller.storage
this.sentryDSN = env.SENTRY_DSN
this.measure = env.MEASURE
this.supabaseClient = createSupabaseClient(env)
this.supabaseTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging'
this.r2 = {
rooms: env.ROOMS,
versionCache: env.ROOMS_HISTORY_EPHEMERAL,
}
controller.blockConcurrencyWhile(async () => {
const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null
if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) {
this._documentInfo = null
} else {
this._documentInfo = existingDocumentInfo
}
})
}
readonly router = Router()
.get(
'/r/:roomId',
(req) => this.extractDocumentInfoFromRequest(req),
(req) => this.onRequest(req)
)
.post(
'/r/:roomId/restore',
(req) => this.extractDocumentInfoFromRequest(req),
(req) => this.onRestore(req)
)
.all('*', () => new Response('Not found', { status: 404 }))
readonly scheduler = new AlarmScheduler({
storage: () => this.storage,
alarms: {
persist: async () => {
const room = this.getRoomForPersistenceKey(this.documentInfo.slug)
if (!room) return
this.persistToDatabase(room.persistenceKey)
},
},
})
// eslint-disable-next-line no-restricted-syntax
get documentInfo() {
return assertExists(this._documentInfo, 'documentInfo must be present')
}
extractDocumentInfoFromRequest = async (req: IRequest) => {
const slug = assertExists(req.params.roomId, 'roomId must be present')
if (this._documentInfo) {
assert(this._documentInfo.slug === slug, 'slug must match')
} else {
this._documentInfo = {
version: CURRENT_DOCUMENT_INFO_VERSION,
slug,
}
}
}
// Handle a request to the Durable Object.
async fetch(req: IRequest) {
const sentry = new Toucan({
dsn: this.sentryDSN,
request: req,
allowedHeaders: ['user-agent'],
allowedSearchParams: /(.*)/,
})
try {
return await this.router.handle(req).catch((err) => {
console.error(err)
sentry.captureException(err)
return new Response('Something went wrong', {
status: 500,
statusText: 'Internal Server Error',
})
})
} catch (err) {
sentry.captureException(err)
return new Response('Something went wrong', {
status: 500,
statusText: 'Internal Server Error',
})
}
}
async onRestore(req: IRequest) {
const roomId = this.documentInfo.slug
const roomKey = getR2KeyForRoom(roomId)
const timestamp = ((await req.json()) as any).timestamp
if (!timestamp) {
return new Response('Missing timestamp', { status: 400 })
}
const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`)
if (!data) {
return new Response('Version not found', { status: 400 })
}
const dataText = await data.text()
await this.r2.rooms.put(roomKey, dataText)
const roomState = this.getRoomForPersistenceKey(roomId)
if (!roomState) {
// nothing else to do because the room is not currently in use
return new Response()
}
const snapshot: RoomSnapshot = JSON.parse(dataText)
const oldRoom = roomState.room
const oldIds = oldRoom.getSnapshot().documents.map((d) => d.state.id)
const newIds = new Set(snapshot.documents.map((d) => d.state.id))
const removedIds = oldIds.filter((id) => !newIds.has(id))
const tombstones = { ...snapshot.tombstones }
removedIds.forEach((id) => {
tombstones[id] = oldRoom.clock + 1
})
newIds.forEach((id) => {
delete tombstones[id]
})
const newRoom = new TLSyncRoom(roomState.room.schema, {
clock: oldRoom.clock + 1,
documents: snapshot.documents.map((d) => ({
lastChangedClock: oldRoom.clock + 1,
state: d.state,
})),
schema: snapshot.schema,
tombstones,
})
// replace room with new one and kick out all the clients
this.setRoomState(this.documentInfo.slug, { ...roomState, room: newRoom })
oldRoom.close()
return new Response()
}
async onRequest(req: IRequest) {
// extract query params from request, should include instanceId
const url = new URL(req.url)
const params = Object.fromEntries(url.searchParams.entries())
let { sessionKey, storeId } = params
// handle legacy param names
sessionKey ??= params.instanceId
storeId ??= params.localClientId
// Don't connect if we're already at max connections
const roomState = this.getRoomForPersistenceKey(this.documentInfo.slug)
if (roomState !== undefined) {
if (roomState.room.sessions.size >= MAX_CONNECTIONS) {
return new Response('Room is full', {
status: 403,
})
}
}
// Create the websocket pair for the client
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
// Handle the connection (see TLServer)
try {
// block concurrency while initializing the room if that needs to happen
await this.controller.blockConcurrencyWhile(() =>
this.handleConnection({
socket: serverWebSocket as any,
persistenceKey: this.documentInfo.slug!,
sessionKey,
storeId,
})
)
} catch (e: any) {
console.error(e)
return new Response(e.message, { status: 500 })
}
// Accept the websocket connection
serverWebSocket.accept()
serverWebSocket.addEventListener(
'message',
throttle(() => {
this.schedulePersist()
}, 2000)
)
serverWebSocket.addEventListener('close', () => {
this.schedulePersist()
})
return new Response(null, { status: 101, webSocket: clientWebSocket })
}
logEvent(
event:
| {
type: 'client'
roomId: string
name: string
clientId: string
instanceId: string
localClientId: string
}
| {
type: 'room'
roomId: string
name: string
}
) {
switch (event.type) {
case 'room': {
this.measure?.writeDataPoint({
blobs: [event.name, event.roomId], // we would add user/connection ids here if we could
})
break
}
case 'client': {
this.measure?.writeDataPoint({
blobs: [event.name, event.roomId, event.clientId, event.instanceId], // we would add user/connection ids here if we could
indexes: [event.localClientId],
})
break
}
}
}
getRoomForPersistenceKey(_persistenceKey: string): RoomState | undefined {
return this._roomState // only one room per worker
}
setRoomState(_persistenceKey: string, roomState: RoomState): void {
this.deleteRoomState()
this._roomState = roomState
}
deleteRoomState(): void {
this._roomState = undefined
}
// Load the room's drawing data from supabase
override async loadFromDatabase(persistenceKey: string): Promise {
try {
const key = getR2KeyForRoom(persistenceKey)
// when loading, prefer to fetch documents from the bucket
const roomFromBucket = await this.r2.rooms.get(key)
if (roomFromBucket) {
return { type: 'room_found', snapshot: await roomFromBucket.json() }
}
// if we don't have a room in the bucket, try to load from supabase
if (!this.supabaseClient) return { type: 'room_not_found' }
const { data, error } = await this.supabaseClient
.from(this.supabaseTable)
.select('*')
.eq('slug', persistenceKey)
if (error) {
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
console.error('failed to retrieve document', persistenceKey, error)
return { type: 'error', error: new Error(error.message) }
}
// if it didn't find a document, data will be an empty array
if (data.length === 0) {
return { type: 'room_not_found' }
}
const roomFromSupabase = data[0] as PersistedRoomSnapshotForSupabase
return { type: 'room_found', snapshot: roomFromSupabase.drawing }
} catch (error) {
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
console.error('failed to fetch doc', persistenceKey, error)
return { type: 'error', error: error as Error }
}
}
_isPersisting = false
_lastPersistedClock: number | null = null
// Save the room to supabase
async persistToDatabase(persistenceKey: string) {
if (this._isPersisting) {
setTimeout(() => {
this.schedulePersist()
}, 5000)
return
}
try {
this._isPersisting = true
const roomState = this.getRoomForPersistenceKey(persistenceKey)
if (!roomState) {
// room was closed
return
}
const { room } = roomState
const { clock } = room
if (this._lastPersistedClock === clock) return
try {
const snapshot = JSON.stringify(room.getSnapshot())
const key = getR2KeyForRoom(persistenceKey)
await Promise.all([
this.r2.rooms.put(key, snapshot),
this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot),
])
this._lastPersistedClock = clock
} catch (error) {
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_persist_to_db' })
console.error('failed to persist document', persistenceKey, error)
throw error
}
} finally {
this._isPersisting = false
}
}
async schedulePersist() {
await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, {
overwrite: 'if-sooner',
})
}
// Will be called automatically when the alarm ticks.
async alarm() {
await this.scheduler.onAlarm()
}
}