Revert "feat: migrate data persistence from Supabase to Postgres"
Some checks failed
Checks / Tests & checks (push) Has been cancelled
Checks / Build all projects (push) Has been cancelled
Deploy bemo / Deploy bemo to ${{ (github.ref == 'refs/heads/production' && 'production') || (github.ref == 'refs/heads/main' && 'staging') || 'preview' }} (push) Has been cancelled
Deploy .com / Deploy dotcom to ${{ (github.ref == 'refs/heads/production' && 'production') || (github.ref == 'refs/heads/main' && 'staging') || 'preview' }} (push) Has been cancelled
End to end tests / End to end tests (push) Has been cancelled
Publish Canary Packages / Publish Canary Packages (push) Has been cancelled
Publish VS Code Extension / Publish VS Code Extension (push) Has been cancelled
Some checks failed
Checks / Tests & checks (push) Has been cancelled
Checks / Build all projects (push) Has been cancelled
Deploy bemo / Deploy bemo to ${{ (github.ref == 'refs/heads/production' && 'production') || (github.ref == 'refs/heads/main' && 'staging') || 'preview' }} (push) Has been cancelled
Deploy .com / Deploy dotcom to ${{ (github.ref == 'refs/heads/production' && 'production') || (github.ref == 'refs/heads/main' && 'staging') || 'preview' }} (push) Has been cancelled
End to end tests / End to end tests (push) Has been cancelled
Publish Canary Packages / Publish Canary Packages (push) Has been cancelled
Publish VS Code Extension / Publish VS Code Extension (push) Has been cancelled
This reverts commit 43419581be
.
This commit is contained in:
parent
69a88c8981
commit
c0a9644296
3 changed files with 398 additions and 423 deletions
|
@ -1,28 +1,29 @@
|
||||||
/// <reference no-default-lib="true"/>
|
/// <reference no-default-lib="true"/>
|
||||||
/// <reference types="@cloudflare/workers-types" />
|
/// <reference types="@cloudflare/workers-types" />
|
||||||
|
|
||||||
import { Client as PostgresClient } from 'pg'
|
import { SupabaseClient } from '@supabase/supabase-js'
|
||||||
import {
|
import {
|
||||||
READ_ONLY_LEGACY_PREFIX,
|
READ_ONLY_LEGACY_PREFIX,
|
||||||
READ_ONLY_PREFIX,
|
READ_ONLY_PREFIX,
|
||||||
ROOM_OPEN_MODE,
|
ROOM_OPEN_MODE,
|
||||||
ROOM_PREFIX,
|
ROOM_PREFIX,
|
||||||
type RoomOpenMode,
|
type RoomOpenMode,
|
||||||
} from '@tldraw/dotcom-shared'
|
} from '@tldraw/dotcom-shared'
|
||||||
import {
|
import {
|
||||||
RoomSnapshot,
|
RoomSnapshot,
|
||||||
TLSocketRoom,
|
TLCloseEventCode,
|
||||||
type PersistedRoomSnapshotForSupabase,
|
TLSocketRoom,
|
||||||
|
type PersistedRoomSnapshotForSupabase,
|
||||||
} from '@tldraw/sync-core'
|
} from '@tldraw/sync-core'
|
||||||
import { TLRecord } from '@tldraw/tlschema'
|
import { TLRecord } from '@tldraw/tlschema'
|
||||||
import { assertExists, exhaustiveSwitchError } from '@tldraw/utils'
|
import { assert, assertExists, exhaustiveSwitchError } from '@tldraw/utils'
|
||||||
import { createPersistQueue, createSentry } from '@tldraw/worker-shared'
|
import { createPersistQueue, createSentry } from '@tldraw/worker-shared'
|
||||||
import { IRequest, Router } from 'itty-router'
|
import { IRequest, Router } from 'itty-router'
|
||||||
import { AlarmScheduler } from './AlarmScheduler'
|
import { AlarmScheduler } from './AlarmScheduler'
|
||||||
import { PERSIST_INTERVAL_MS } from './config'
|
import { PERSIST_INTERVAL_MS } from './config'
|
||||||
import { getR2KeyForRoom } from './r2'
|
import { getR2KeyForRoom } from './r2'
|
||||||
import { Analytics, DBLoadResult, Environment, TLServerEvent } from './types'
|
import { Analytics, DBLoadResult, Environment, TLServerEvent } from './types'
|
||||||
import { createPostgresClient } from './utils/createPostgresClient'
|
import { createSupabaseClient } from './utils/createSupabaseClient'
|
||||||
import { getSlug } from './utils/roomOpenMode'
|
import { getSlug } from './utils/roomOpenMode'
|
||||||
import { throttle } from './utils/throttle'
|
import { throttle } from './utils/throttle'
|
||||||
|
|
||||||
|
@ -31,384 +32,392 @@ const MAX_CONNECTIONS = 50
|
||||||
// increment this any time you make a change to this type
|
// increment this any time you make a change to this type
|
||||||
const CURRENT_DOCUMENT_INFO_VERSION = 0
|
const CURRENT_DOCUMENT_INFO_VERSION = 0
|
||||||
interface DocumentInfo {
|
interface DocumentInfo {
|
||||||
version: number
|
version: number
|
||||||
slug: string
|
slug: string
|
||||||
}
|
}
|
||||||
|
|
||||||
const ROOM_NOT_FOUND = Symbol('room_not_found')
|
const ROOM_NOT_FOUND = Symbol('room_not_found')
|
||||||
|
|
||||||
export class TLDrawDurableObject {
|
export class TLDrawDurableObject {
|
||||||
// A unique identifier for this instance of the Durable Object
|
// A unique identifier for this instance of the Durable Object
|
||||||
id: DurableObjectId
|
id: DurableObjectId
|
||||||
|
|
||||||
// For TLSyncRoom
|
// For TLSyncRoom
|
||||||
_room: Promise<TLSocketRoom<TLRecord, { storeId: string }>> | null = null
|
_room: Promise<TLSocketRoom<TLRecord, { storeId: string }>> | null = null
|
||||||
|
|
||||||
getRoom() {
|
getRoom() {
|
||||||
if (!this._documentInfo) {
|
if (!this._documentInfo) {
|
||||||
throw new Error('documentInfo must be present when accessing room')
|
throw new Error('documentInfo must be present when accessing room')
|
||||||
}
|
}
|
||||||
const slug = this._documentInfo.slug
|
const slug = this._documentInfo.slug
|
||||||
if (!this._room) {
|
if (!this._room) {
|
||||||
this._room = this.loadFromDatabase(slug).then((result) => {
|
this._room = this.loadFromDatabase(slug).then((result) => {
|
||||||
switch (result.type) {
|
switch (result.type) {
|
||||||
case 'room_found': {
|
case 'room_found': {
|
||||||
const room = new TLSocketRoom<TLRecord, { storeId: string }>({
|
const room = new TLSocketRoom<TLRecord, { storeId: string }>({
|
||||||
initialSnapshot: result.snapshot,
|
initialSnapshot: result.snapshot,
|
||||||
onSessionRemoved: async (room, args) => {
|
onSessionRemoved: async (room, args) => {
|
||||||
this.logEvent({
|
this.logEvent({
|
||||||
type: 'client',
|
type: 'client',
|
||||||
roomId: slug,
|
roomId: slug,
|
||||||
name: 'leave',
|
name: 'leave',
|
||||||
instanceId: args.sessionKey,
|
instanceId: args.sessionKey,
|
||||||
localClientId: args.meta.storeId,
|
localClientId: args.meta.storeId,
|
||||||
})
|
})
|
||||||
|
|
||||||
if (args.numSessionsRemaining > 0) return
|
if (args.numSessionsRemaining > 0) return
|
||||||
if (!this._room) return
|
if (!this._room) return
|
||||||
this.logEvent({
|
this.logEvent({
|
||||||
type: 'client',
|
type: 'client',
|
||||||
roomId: slug,
|
roomId: slug,
|
||||||
name: 'last_out',
|
name: 'last_out',
|
||||||
instanceId: args.sessionKey,
|
instanceId: args.sessionKey,
|
||||||
localClientId: args.meta.storeId,
|
localClientId: args.meta.storeId,
|
||||||
})
|
})
|
||||||
try {
|
try {
|
||||||
await this.persistToDatabase()
|
await this.persistToDatabase()
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// already logged
|
// already logged
|
||||||
}
|
}
|
||||||
// make sure nobody joined the room while we were persisting
|
// make sure nobody joined the room while we were persisting
|
||||||
if (room.getNumActiveSessions() > 0) return
|
if (room.getNumActiveSessions() > 0) return
|
||||||
this._room = null
|
this._room = null
|
||||||
this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' })
|
this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' })
|
||||||
room.close()
|
room.close()
|
||||||
},
|
},
|
||||||
onDataChange: () => {
|
onDataChange: () => {
|
||||||
this.triggerPersistSchedule()
|
this.triggerPersistSchedule()
|
||||||
},
|
},
|
||||||
onBeforeSendMessage: ({ message, stringified }) => {
|
onBeforeSendMessage: ({ message, stringified }) => {
|
||||||
this.logEvent({
|
this.logEvent({
|
||||||
type: 'send_message',
|
type: 'send_message',
|
||||||
roomId: slug,
|
roomId: slug,
|
||||||
messageType: message.type,
|
messageType: message.type,
|
||||||
messageLength: stringified.length,
|
messageLength: stringified.length,
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
this.logEvent({ type: 'room', roomId: slug, name: 'room_start' })
|
this.logEvent({ type: 'room', roomId: slug, name: 'room_start' })
|
||||||
return room
|
return room
|
||||||
}
|
}
|
||||||
case 'room_not_found': {
|
case 'room_not_found': {
|
||||||
throw ROOM_NOT_FOUND
|
throw ROOM_NOT_FOUND
|
||||||
}
|
}
|
||||||
case 'error': {
|
case 'error': {
|
||||||
throw result.error
|
throw result.error
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
exhaustiveSwitchError(result)
|
exhaustiveSwitchError(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return this._room
|
return this._room
|
||||||
}
|
}
|
||||||
|
|
||||||
// For storage
|
// For storage
|
||||||
storage: DurableObjectStorage
|
storage: DurableObjectStorage
|
||||||
|
|
||||||
// For persistence
|
// For persistence
|
||||||
postgresClient: PostgresClient | null
|
supabaseClient: SupabaseClient | void
|
||||||
|
|
||||||
// For analytics
|
// For analytics
|
||||||
measure: Analytics | undefined
|
measure: Analytics | undefined
|
||||||
|
|
||||||
// For error tracking
|
// For error tracking
|
||||||
sentryDSN: string | undefined
|
sentryDSN: string | undefined
|
||||||
|
|
||||||
readonly postgresTable: string
|
readonly supabaseTable: string
|
||||||
readonly r2: {
|
readonly r2: {
|
||||||
readonly rooms: R2Bucket
|
readonly rooms: R2Bucket
|
||||||
readonly versionCache: R2Bucket
|
readonly versionCache: R2Bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
_documentInfo: DocumentInfo | null = null
|
_documentInfo: DocumentInfo | null = null
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private state: DurableObjectState,
|
private state: DurableObjectState,
|
||||||
private env: Environment
|
private env: Environment
|
||||||
) {
|
) {
|
||||||
this.id = state.id
|
this.id = state.id
|
||||||
this.storage = state.storage
|
this.storage = state.storage
|
||||||
this.sentryDSN = env.SENTRY_DSN
|
this.sentryDSN = env.SENTRY_DSN
|
||||||
this.measure = env.MEASURE
|
this.measure = env.MEASURE
|
||||||
this.postgresClient = createPostgresClient(env)
|
this.supabaseClient = createSupabaseClient(env)
|
||||||
|
|
||||||
this.postgresTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging'
|
this.supabaseTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging'
|
||||||
this.r2 = {
|
this.r2 = {
|
||||||
rooms: env.ROOMS,
|
rooms: env.ROOMS,
|
||||||
versionCache: env.ROOMS_HISTORY_EPHEMERAL,
|
versionCache: env.ROOMS_HISTORY_EPHEMERAL,
|
||||||
}
|
}
|
||||||
|
|
||||||
state.blockConcurrencyWhile(async () => {
|
state.blockConcurrencyWhile(async () => {
|
||||||
const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null
|
const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null
|
||||||
if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) {
|
if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) {
|
||||||
this._documentInfo = null
|
this._documentInfo = null
|
||||||
} else {
|
} else {
|
||||||
this._documentInfo = existingDocumentInfo
|
this._documentInfo = existingDocumentInfo
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
readonly router = Router()
|
readonly router = Router()
|
||||||
.get(
|
.get(
|
||||||
`/${ROOM_PREFIX}/:roomId`,
|
`/${ROOM_PREFIX}/:roomId`,
|
||||||
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
|
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
|
||||||
(req) => this.onRequest(req)
|
(req) => this.onRequest(req)
|
||||||
)
|
)
|
||||||
.get(
|
.get(
|
||||||
`/${READ_ONLY_LEGACY_PREFIX}/:roomId`,
|
`/${READ_ONLY_LEGACY_PREFIX}/:roomId`,
|
||||||
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY),
|
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY),
|
||||||
(req) => this.onRequest(req)
|
(req) => this.onRequest(req)
|
||||||
)
|
)
|
||||||
.get(
|
.get(
|
||||||
`/${READ_ONLY_PREFIX}/:roomId`,
|
`/${READ_ONLY_PREFIX}/:roomId`,
|
||||||
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY),
|
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY),
|
||||||
(req) => this.onRequest(req)
|
(req) => this.onRequest(req)
|
||||||
)
|
)
|
||||||
.post(
|
.post(
|
||||||
`/${ROOM_PREFIX}/:roomId/restore`,
|
`/${ROOM_PREFIX}/:roomId/restore`,
|
||||||
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
|
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
|
||||||
(req) => this.onRestore(req)
|
(req) => this.onRestore(req)
|
||||||
)
|
)
|
||||||
.all('*', () => new Response('Not found', { status: 404 }))
|
.all('*', () => new Response('Not found', { status: 404 }))
|
||||||
|
|
||||||
readonly scheduler = new AlarmScheduler({
|
readonly scheduler = new AlarmScheduler({
|
||||||
storage: () => this.storage,
|
storage: () => this.storage,
|
||||||
alarms: {
|
alarms: {
|
||||||
persist: async () => {
|
persist: async () => {
|
||||||
this.persistToDatabase()
|
this.persistToDatabase()
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// eslint-disable-next-line no-restricted-syntax
|
// eslint-disable-next-line no-restricted-syntax
|
||||||
get documentInfo() {
|
get documentInfo() {
|
||||||
return assertExists(this._documentInfo, 'documentInfo must be present')
|
return assertExists(this._documentInfo, 'documentInfo must be present')
|
||||||
}
|
}
|
||||||
extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => {
|
extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => {
|
||||||
const slug = assertExists(
|
const slug = assertExists(
|
||||||
await getSlug(this.env, req.params.roomId, roomOpenMode),
|
await getSlug(this.env, req.params.roomId, roomOpenMode),
|
||||||
'roomId must be present'
|
'roomId must be present'
|
||||||
)
|
)
|
||||||
if (this._documentInfo) {
|
if (this._documentInfo) {
|
||||||
assert(this._documentInfo.slug === slug, 'slug must match')
|
assert(this._documentInfo.slug === slug, 'slug must match')
|
||||||
} else {
|
} else {
|
||||||
this._documentInfo = {
|
this._documentInfo = {
|
||||||
version: CURRENT_DOCUMENT_INFO_VERSION,
|
version: CURRENT_DOCUMENT_INFO_VERSION,
|
||||||
slug,
|
slug,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a request to the Durable Object.
|
// Handle a request to the Durable Object.
|
||||||
async fetch(req: IRequest) {
|
async fetch(req: IRequest) {
|
||||||
const sentry = createSentry(this.state, this.env, req)
|
const sentry = createSentry(this.state, this.env, req)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await this.router.handle(req)
|
return await this.router.handle(req)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(err)
|
console.error(err)
|
||||||
// eslint-disable-next-line deprecation/deprecation
|
// eslint-disable-next-line deprecation/deprecation
|
||||||
sentry?.captureException(err)
|
sentry?.captureException(err)
|
||||||
return new Response('Something went wrong', {
|
return new Response('Something went wrong', {
|
||||||
status: 500,
|
status: 500,
|
||||||
statusText: 'Internal Server Error',
|
statusText: 'Internal Server Error',
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_isRestoring = false
|
_isRestoring = false
|
||||||
async onRestore(req: IRequest) {
|
async onRestore(req: IRequest) {
|
||||||
this._isRestoring = true
|
this._isRestoring = true
|
||||||
try {
|
try {
|
||||||
const roomId = this.documentInfo.slug
|
const roomId = this.documentInfo.slug
|
||||||
const roomKey = getR2KeyForRoom(roomId)
|
const roomKey = getR2KeyForRoom(roomId)
|
||||||
const timestamp = ((await req.json()) as any).timestamp
|
const timestamp = ((await req.json()) as any).timestamp
|
||||||
if (!timestamp) {
|
if (!timestamp) {
|
||||||
return new Response('Missing timestamp', { status: 400 })
|
return new Response('Missing timestamp', { status: 400 })
|
||||||
}
|
}
|
||||||
const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`)
|
const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`)
|
||||||
if (!data) {
|
if (!data) {
|
||||||
return new Response('Version not found', { status: 400 })
|
return new Response('Version not found', { status: 400 })
|
||||||
}
|
}
|
||||||
const dataText = await data.text()
|
const dataText = await data.text()
|
||||||
await this.r2.rooms.put(roomKey, dataText)
|
await this.r2.rooms.put(roomKey, dataText)
|
||||||
const room = await this.getRoom()
|
const room = await this.getRoom()
|
||||||
|
|
||||||
const snapshot: RoomSnapshot = JSON.parse(dataText)
|
const snapshot: RoomSnapshot = JSON.parse(dataText)
|
||||||
room.loadSnapshot(snapshot)
|
room.loadSnapshot(snapshot)
|
||||||
|
|
||||||
return new Response()
|
return new Response()
|
||||||
} finally {
|
} finally {
|
||||||
this._isRestoring = false
|
this._isRestoring = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async onRequest(req: IRequest) {
|
async onRequest(req: IRequest) {
|
||||||
// extract query params from request, should include instanceId
|
// extract query params from request, should include instanceId
|
||||||
const url = new URL(req.url)
|
const url = new URL(req.url)
|
||||||
const params = Object.fromEntries(url.searchParams.entries())
|
const params = Object.fromEntries(url.searchParams.entries())
|
||||||
let { sessionKey, storeId } = params
|
let { sessionKey, storeId } = params
|
||||||
|
|
||||||
// handle legacy param names
|
// handle legacy param names
|
||||||
sessionKey ??= params.instanceId
|
sessionKey ??= params.instanceId
|
||||||
storeId ??= params.localClientId
|
storeId ??= params.localClientId
|
||||||
const isNewSession = !this._room
|
const isNewSession = !this._room
|
||||||
|
|
||||||
// Create the websocket pair for the client
|
// Create the websocket pair for the client
|
||||||
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
|
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
|
||||||
serverWebSocket.accept()
|
serverWebSocket.accept()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const room = await this.getRoom()
|
const room = await this.getRoom()
|
||||||
// Don't connect if we're already at max connections
|
// Don't connect if we're already at max connections
|
||||||
if (room.getNumActiveSessions() >= MAX_CONNECTIONS) {
|
if (room.getNumActiveSessions() >= MAX_CONNECTIONS) {
|
||||||
return new Response('Room is full', { status: 403 })
|
return new Response('Room is full', { status: 403 })
|
||||||
}
|
}
|
||||||
|
|
||||||
// all good
|
// all good
|
||||||
room.handleSocketConnect(sessionKey, serverWebSocket, { storeId })
|
room.handleSocketConnect(sessionKey, serverWebSocket, { storeId })
|
||||||
if (isNewSession) {
|
if (isNewSession) {
|
||||||
this.logEvent({
|
this.logEvent({
|
||||||
type: 'client',
|
type: 'client',
|
||||||
roomId: this.documentInfo.slug,
|
roomId: this.documentInfo.slug,
|
||||||
name: 'room_reopen',
|
name: 'room_reopen',
|
||||||
instanceId: sessionKey,
|
instanceId: sessionKey,
|
||||||
localClientId: storeId,
|
localClientId: storeId,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
this.logEvent({
|
this.logEvent({
|
||||||
type: 'client',
|
type: 'client',
|
||||||
roomId: this.documentInfo.slug,
|
roomId: this.documentInfo.slug,
|
||||||
name: 'enter',
|
name: 'enter',
|
||||||
instanceId: sessionKey,
|
instanceId: sessionKey,
|
||||||
localClientId: storeId,
|
localClientId: storeId,
|
||||||
})
|
})
|
||||||
return new Response(null, { status: 101, webSocket: clientWebSocket })
|
return new Response(null, { status: 101, webSocket: clientWebSocket })
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e === ROOM_NOT_FOUND) {
|
if (e === ROOM_NOT_FOUND) {
|
||||||
serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found')
|
serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found')
|
||||||
return new Response(null, { status: 101, webSocket: clientWebSocket })
|
return new Response(null, { status: 101, webSocket: clientWebSocket })
|
||||||
}
|
}
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
triggerPersistSchedule = throttle(() => {
|
triggerPersistSchedule = throttle(() => {
|
||||||
this.schedulePersist()
|
this.schedulePersist()
|
||||||
}, 2000)
|
}, 2000)
|
||||||
|
|
||||||
private writeEvent(
|
private writeEvent(
|
||||||
name: string,
|
name: string,
|
||||||
{ blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] }
|
{ blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] }
|
||||||
) {
|
) {
|
||||||
this.measure?.writeDataPoint({
|
this.measure?.writeDataPoint({
|
||||||
blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])],
|
blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])],
|
||||||
doubles,
|
doubles,
|
||||||
indexes,
|
indexes,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
logEvent(event: TLServerEvent) {
|
logEvent(event: TLServerEvent) {
|
||||||
switch (event.type) {
|
switch (event.type) {
|
||||||
case 'room': {
|
case 'room': {
|
||||||
// we would add user/connection ids here if we could
|
// we would add user/connection ids here if we could
|
||||||
this.writeEvent(event.name, { blobs: [event.roomId] })
|
this.writeEvent(event.name, { blobs: [event.roomId] })
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'client': {
|
case 'client': {
|
||||||
// we would add user/connection ids here if we could
|
// we would add user/connection ids here if we could
|
||||||
this.writeEvent(event.name, {
|
this.writeEvent(event.name, {
|
||||||
blobs: [event.roomId, 'unused', event.instanceId],
|
blobs: [event.roomId, 'unused', event.instanceId],
|
||||||
indexes: [event.localClientId],
|
indexes: [event.localClientId],
|
||||||
})
|
})
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case 'send_message': {
|
case 'send_message': {
|
||||||
this.writeEvent(event.type, {
|
this.writeEvent(event.type, {
|
||||||
blobs: [event.roomId, event.messageType],
|
blobs: [event.roomId, event.messageType],
|
||||||
doubles: [event.messageLength],
|
doubles: [event.messageLength],
|
||||||
})
|
})
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
exhaustiveSwitchError(event)
|
exhaustiveSwitchError(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the room's drawing data. First we check the R2 bucket, then we fallback to Postgres (legacy).
|
// Load the room's drawing data. First we check the R2 bucket, then we fallback to supabase (legacy).
|
||||||
async loadFromDatabase(persistenceKey: string): Promise<DBLoadResult> {
|
async loadFromDatabase(persistenceKey: string): Promise<DBLoadResult> {
|
||||||
try {
|
try {
|
||||||
const key = getR2KeyForRoom(persistenceKey)
|
const key = getR2KeyForRoom(persistenceKey)
|
||||||
// when loading, prefer to fetch documents from the bucket
|
// when loading, prefer to fetch documents from the bucket
|
||||||
const roomFromBucket = await this.r2.rooms.get(key)
|
const roomFromBucket = await this.r2.rooms.get(key)
|
||||||
if (roomFromBucket) {
|
if (roomFromBucket) {
|
||||||
return { type: 'room_found', snapshot: await roomFromBucket.json() }
|
return { type: 'room_found', snapshot: await roomFromBucket.json() }
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we don't have a room in the bucket, try to load from Postgres
|
// if we don't have a room in the bucket, try to load from supabase
|
||||||
if (!this.postgresClient) return { type: 'room_not_found' }
|
if (!this.supabaseClient) return { type: 'room_not_found' }
|
||||||
await this.postgresClient.connect()
|
const { data, error } = await this.supabaseClient
|
||||||
const result = await this.postgresClient.query('SELECT * FROM ' + this.postgresTable + ' WHERE slug = $1', [persistenceKey])
|
.from(this.supabaseTable)
|
||||||
await this.postgresClient.end()
|
.select('*')
|
||||||
|
.eq('slug', persistenceKey)
|
||||||
|
|
||||||
if (result.rows.length === 0) {
|
if (error) {
|
||||||
return { type: 'room_not_found' }
|
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
|
||||||
}
|
|
||||||
|
|
||||||
const roomFromPostgres = result.rows[0] as PersistedRoomSnapshotForSupabase
|
console.error('failed to retrieve document', persistenceKey, error)
|
||||||
return { type: 'room_found', snapshot: roomFromPostgres.drawing }
|
return { type: 'error', error: new Error(error.message) }
|
||||||
} catch (error) {
|
}
|
||||||
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
|
// if it didn't find a document, data will be an empty array
|
||||||
|
if (data.length === 0) {
|
||||||
|
return { type: 'room_not_found' }
|
||||||
|
}
|
||||||
|
|
||||||
console.error('failed to fetch doc', persistenceKey, error)
|
const roomFromSupabase = data[0] as PersistedRoomSnapshotForSupabase
|
||||||
return { type: 'error', error: error as Error }
|
return { type: 'room_found', snapshot: roomFromSupabase.drawing }
|
||||||
}
|
} catch (error) {
|
||||||
}
|
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
|
||||||
|
|
||||||
_lastPersistedClock: number | null = null
|
console.error('failed to fetch doc', persistenceKey, error)
|
||||||
_persistQueue = createPersistQueue(async () => {
|
return { type: 'error', error: error as Error }
|
||||||
// check whether the worker was woken up to persist after having gone to sleep
|
}
|
||||||
if (!this._room) return
|
}
|
||||||
const slug = this.documentInfo.slug
|
|
||||||
const room = await this.getRoom()
|
|
||||||
const clock = room.getCurrentDocumentClock()
|
|
||||||
if (this._lastPersistedClock === clock) return
|
|
||||||
if (this._isRestoring) return
|
|
||||||
|
|
||||||
const snapshot = JSON.stringify(room.getCurrentSnapshot())
|
_lastPersistedClock: number | null = null
|
||||||
|
_persistQueue = createPersistQueue(async () => {
|
||||||
|
// check whether the worker was woken up to persist after having gone to sleep
|
||||||
|
if (!this._room) return
|
||||||
|
const slug = this.documentInfo.slug
|
||||||
|
const room = await this.getRoom()
|
||||||
|
const clock = room.getCurrentDocumentClock()
|
||||||
|
if (this._lastPersistedClock === clock) return
|
||||||
|
if (this._isRestoring) return
|
||||||
|
|
||||||
const key = getR2KeyForRoom(slug)
|
const snapshot = JSON.stringify(room.getCurrentSnapshot())
|
||||||
await Promise.all([
|
|
||||||
this.r2.rooms.put(key, snapshot),
|
|
||||||
this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot),
|
|
||||||
])
|
|
||||||
this._lastPersistedClock = clock
|
|
||||||
// use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop
|
|
||||||
// just in case there's any possibility of setting up a neverending queue
|
|
||||||
}, PERSIST_INTERVAL_MS / 2)
|
|
||||||
|
|
||||||
// Save the room to Postgres
|
const key = getR2KeyForRoom(slug)
|
||||||
async persistToDatabase() {
|
await Promise.all([
|
||||||
await this._persistQueue()
|
this.r2.rooms.put(key, snapshot),
|
||||||
}
|
this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot),
|
||||||
|
])
|
||||||
|
this._lastPersistedClock = clock
|
||||||
|
// use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop
|
||||||
|
// just in case there's any possibility of setting up a neverending queue
|
||||||
|
}, PERSIST_INTERVAL_MS / 2)
|
||||||
|
|
||||||
async schedulePersist() {
|
// Save the room to supabase
|
||||||
await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, {
|
async persistToDatabase() {
|
||||||
overwrite: 'if-sooner',
|
await this._persistQueue()
|
||||||
})
|
}
|
||||||
|
|
||||||
|
async schedulePersist() {
|
||||||
|
await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, {
|
||||||
|
overwrite: 'if-sooner',
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Will be called automatically when the alarm ticks.
|
// Will be called automatically when the alarm ticks.
|
||||||
|
|
|
@ -3,57 +3,58 @@ import { notFound } from '@tldraw/worker-shared'
|
||||||
import { IRequest } from 'itty-router'
|
import { IRequest } from 'itty-router'
|
||||||
import { getR2KeyForSnapshot } from '../r2'
|
import { getR2KeyForSnapshot } from '../r2'
|
||||||
import { Environment } from '../types'
|
import { Environment } from '../types'
|
||||||
import { createPostgresClient, noPostgresSorry } from '../utils/createPostgresClient'
|
import { createSupabaseClient, noSupabaseSorry } from '../utils/createSupabaseClient'
|
||||||
|
import { getSnapshotsTable } from '../utils/getSnapshotsTable'
|
||||||
|
import { R2Snapshot } from './createRoomSnapshot'
|
||||||
|
|
||||||
function generateReponse(roomId: string, data: RoomSnapshot) {
|
function generateReponse(roomId: string, data: RoomSnapshot) {
|
||||||
return new Response(
|
return new Response(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
roomId,
|
roomId,
|
||||||
records: data.documents.map((d) => d.state),
|
records: data.documents.map((d) => d.state),
|
||||||
schema: data.schema,
|
schema: data.schema,
|
||||||
error: false,
|
error: false,
|
||||||
}),
|
}),
|
||||||
{
|
{
|
||||||
headers: { 'content-type': 'application/json' },
|
headers: { 'content-type': 'application/json' },
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a snapshot of the room at a given point in time
|
// Returns a snapshot of the room at a given point in time
|
||||||
export async function getRoomSnapshot(request: IRequest, env: Environment): Promise<Response> {
|
export async function getRoomSnapshot(request: IRequest, env: Environment): Promise<Response> {
|
||||||
const roomId = request.params.roomId
|
const roomId = request.params.roomId
|
||||||
if (!roomId) return notFound()
|
if (!roomId) return notFound()
|
||||||
|
|
||||||
// Get the parent slug if it exists
|
// Get the parent slug if it exists
|
||||||
const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId)
|
const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId)
|
||||||
|
|
||||||
// Get the room snapshot from R2
|
// Get the room snapshot from R2
|
||||||
const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId))
|
const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId))
|
||||||
|
|
||||||
if (snapshot) {
|
if (snapshot) {
|
||||||
const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot
|
const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot
|
||||||
if (data) {
|
if (data) {
|
||||||
return generateReponse(roomId, data)
|
return generateReponse(roomId, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we can't find the snapshot in R2 then fallback to Postgres
|
// If we can't find the snapshot in R2 then fallback to Supabase
|
||||||
// Create a Postgres client
|
// Create a supabase client
|
||||||
const postgresClient = createPostgresClient(env)
|
const supabase = createSupabaseClient(env)
|
||||||
if (!postgresClient) return noPostgresSorry()
|
if (!supabase) return noSupabaseSorry()
|
||||||
|
|
||||||
try {
|
// Get the snapshot from the table
|
||||||
await postgresClient.connect()
|
const supabaseTable = getSnapshotsTable(env)
|
||||||
const result = await postgresClient.query('SELECT drawing FROM snapshots WHERE slug = $1 LIMIT 1', [roomId])
|
const result = await supabase
|
||||||
await postgresClient.end()
|
.from(supabaseTable)
|
||||||
|
.select('drawing')
|
||||||
|
.eq('slug', roomId)
|
||||||
|
.maybeSingle()
|
||||||
|
const data = result.data?.drawing as RoomSnapshot
|
||||||
|
|
||||||
if (result.rows.length === 0) return notFound()
|
if (!data) return notFound()
|
||||||
const data = result.rows[0].drawing as RoomSnapshot
|
|
||||||
|
|
||||||
// Send back the snapshot!
|
// Send back the snapshot!
|
||||||
return generateReponse(roomId, data)
|
return generateReponse(roomId, data)
|
||||||
} catch (err) {
|
|
||||||
console.error('Error querying Postgres', err)
|
|
||||||
return new Response(JSON.stringify({ error: true, message: 'Error querying Postgres' }), { status: 500 })
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -1,35 +0,0 @@
|
||||||
import { Client } from 'pg'
|
|
||||||
import { Environment } from '../types'
|
|
||||||
|
|
||||||
export function createPostgresClient(env: Environment) {
|
|
||||||
if (env.POSTGRES_HOST && env.POSTGRES_USER && env.POSTGRES_PASSWORD && env.POSTGRES_DB) {
|
|
||||||
var client = new Client({
|
|
||||||
host: env.POSTGRES_HOST,
|
|
||||||
port: env.POSTGRES_PORT ? parseInt(env.POSTGRES_PORT) : 5432,
|
|
||||||
user: env.POSTGRES_USER,
|
|
||||||
password: env.POSTGRES_PASSWORD,
|
|
||||||
database: env.POSTGRES_DB,
|
|
||||||
})
|
|
||||||
|
|
||||||
client.connect()
|
|
||||||
|
|
||||||
client.query(`
|
|
||||||
CREATE TABLE IF NOT EXISTS snapshots (
|
|
||||||
id SERIAL PRIMARY KEY,
|
|
||||||
slug VARCHAR(255) UNIQUE NOT NULL,
|
|
||||||
drawing JSONB NOT NULL,
|
|
||||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
|
||||||
);
|
|
||||||
`)
|
|
||||||
|
|
||||||
return client
|
|
||||||
|
|
||||||
} else {
|
|
||||||
console.warn('No Postgres credentials, loading from Postgres disabled')
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function noPostgresSorry() {
|
|
||||||
return new Response(JSON.stringify({ error: true, message: 'Could not create Postgres client' }))
|
|
||||||
}
|
|
Loading…
Reference in a new issue