feat: migrate data persistence from Supabase to Postgres
Some checks are pending
Checks / Tests & checks (push) Waiting to run
Checks / Build all projects (push) Waiting to run
Deploy bemo / Deploy bemo to ${{ (github.ref == 'refs/heads/production' && 'production') || (github.ref == 'refs/heads/main' && 'staging') || 'preview' }} (push) Waiting to run
Deploy .com / Deploy dotcom to ${{ (github.ref == 'refs/heads/production' && 'production') || (github.ref == 'refs/heads/main' && 'staging') || 'preview' }} (push) Waiting to run
End to end tests / End to end tests (push) Waiting to run
Publish Canary Packages / Publish Canary Packages (push) Waiting to run
Publish VS Code Extension / Publish VS Code Extension (push) Waiting to run

Switched from Supabase to Postgres for handling database
operations related to room snapshots and drawings. This change
involves updating the imports and persistence logic in various
components to utilize Postgres instead of Supabase.

Benefits include improved performance and greater control over
database operations. Added connection and query handling for
Postgres in the utility function.

Includes:
- Updated imports and logic in TLDrawDurableObject
- Changes in getRoomSnapshot to use Postgres
- New createPostgresClient utility function for DB connection
This commit is contained in:
Kumi 2024-07-16 21:11:07 +02:00
parent 8aa4fd3352
commit 43419581be
Signed by: kumi
GPG key ID: ECBCC9082395383F
3 changed files with 423 additions and 398 deletions

View file

@ -1,29 +1,28 @@
/// <reference no-default-lib="true"/>
/// <reference types="@cloudflare/workers-types" />
import { SupabaseClient } from '@supabase/supabase-js'
import { Client as PostgresClient } from 'pg'
import {
READ_ONLY_LEGACY_PREFIX,
READ_ONLY_PREFIX,
ROOM_OPEN_MODE,
ROOM_PREFIX,
type RoomOpenMode,
READ_ONLY_LEGACY_PREFIX,
READ_ONLY_PREFIX,
ROOM_OPEN_MODE,
ROOM_PREFIX,
type RoomOpenMode,
} from '@tldraw/dotcom-shared'
import {
RoomSnapshot,
TLCloseEventCode,
TLSocketRoom,
type PersistedRoomSnapshotForSupabase,
RoomSnapshot,
TLSocketRoom,
type PersistedRoomSnapshotForSupabase,
} from '@tldraw/sync-core'
import { TLRecord } from '@tldraw/tlschema'
import { assert, assertExists, exhaustiveSwitchError } from '@tldraw/utils'
import { assertExists, exhaustiveSwitchError } from '@tldraw/utils'
import { createPersistQueue, createSentry } from '@tldraw/worker-shared'
import { IRequest, Router } from 'itty-router'
import { AlarmScheduler } from './AlarmScheduler'
import { PERSIST_INTERVAL_MS } from './config'
import { getR2KeyForRoom } from './r2'
import { Analytics, DBLoadResult, Environment, TLServerEvent } from './types'
import { createSupabaseClient } from './utils/createSupabaseClient'
import { createPostgresClient } from './utils/createPostgresClient'
import { getSlug } from './utils/roomOpenMode'
import { throttle } from './utils/throttle'
@ -32,392 +31,384 @@ const MAX_CONNECTIONS = 50
// increment this any time you make a change to this type
const CURRENT_DOCUMENT_INFO_VERSION = 0
interface DocumentInfo {
version: number
slug: string
version: number
slug: string
}
const ROOM_NOT_FOUND = Symbol('room_not_found')
export class TLDrawDurableObject {
// A unique identifier for this instance of the Durable Object
id: DurableObjectId
// A unique identifier for this instance of the Durable Object
id: DurableObjectId
// For TLSyncRoom
_room: Promise<TLSocketRoom<TLRecord, { storeId: string }>> | null = null
// For TLSyncRoom
_room: Promise<TLSocketRoom<TLRecord, { storeId: string }>> | null = null
getRoom() {
if (!this._documentInfo) {
throw new Error('documentInfo must be present when accessing room')
}
const slug = this._documentInfo.slug
if (!this._room) {
this._room = this.loadFromDatabase(slug).then((result) => {
switch (result.type) {
case 'room_found': {
const room = new TLSocketRoom<TLRecord, { storeId: string }>({
initialSnapshot: result.snapshot,
onSessionRemoved: async (room, args) => {
this.logEvent({
type: 'client',
roomId: slug,
name: 'leave',
instanceId: args.sessionKey,
localClientId: args.meta.storeId,
})
getRoom() {
if (!this._documentInfo) {
throw new Error('documentInfo must be present when accessing room')
}
const slug = this._documentInfo.slug
if (!this._room) {
this._room = this.loadFromDatabase(slug).then((result) => {
switch (result.type) {
case 'room_found': {
const room = new TLSocketRoom<TLRecord, { storeId: string }>({
initialSnapshot: result.snapshot,
onSessionRemoved: async (room, args) => {
this.logEvent({
type: 'client',
roomId: slug,
name: 'leave',
instanceId: args.sessionKey,
localClientId: args.meta.storeId,
})
if (args.numSessionsRemaining > 0) return
if (!this._room) return
this.logEvent({
type: 'client',
roomId: slug,
name: 'last_out',
instanceId: args.sessionKey,
localClientId: args.meta.storeId,
})
try {
await this.persistToDatabase()
} catch (err) {
// already logged
}
// make sure nobody joined the room while we were persisting
if (room.getNumActiveSessions() > 0) return
this._room = null
this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' })
room.close()
},
onDataChange: () => {
this.triggerPersistSchedule()
},
onBeforeSendMessage: ({ message, stringified }) => {
this.logEvent({
type: 'send_message',
roomId: slug,
messageType: message.type,
messageLength: stringified.length,
})
},
})
this.logEvent({ type: 'room', roomId: slug, name: 'room_start' })
return room
}
case 'room_not_found': {
throw ROOM_NOT_FOUND
}
case 'error': {
throw result.error
}
default: {
exhaustiveSwitchError(result)
}
}
})
}
return this._room
}
if (args.numSessionsRemaining > 0) return
if (!this._room) return
this.logEvent({
type: 'client',
roomId: slug,
name: 'last_out',
instanceId: args.sessionKey,
localClientId: args.meta.storeId,
})
try {
await this.persistToDatabase()
} catch (err) {
// already logged
}
// make sure nobody joined the room while we were persisting
if (room.getNumActiveSessions() > 0) return
this._room = null
this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' })
room.close()
},
onDataChange: () => {
this.triggerPersistSchedule()
},
onBeforeSendMessage: ({ message, stringified }) => {
this.logEvent({
type: 'send_message',
roomId: slug,
messageType: message.type,
messageLength: stringified.length,
})
},
})
this.logEvent({ type: 'room', roomId: slug, name: 'room_start' })
return room
}
case 'room_not_found': {
throw ROOM_NOT_FOUND
}
case 'error': {
throw result.error
}
default: {
exhaustiveSwitchError(result)
}
}
})
}
return this._room
}
// For storage
storage: DurableObjectStorage
// For storage
storage: DurableObjectStorage
// For persistence
supabaseClient: SupabaseClient | void
// For persistence
postgresClient: PostgresClient | null
// For analytics
measure: Analytics | undefined
// For analytics
measure: Analytics | undefined
// For error tracking
sentryDSN: string | undefined
// For error tracking
sentryDSN: string | undefined
readonly supabaseTable: string
readonly r2: {
readonly rooms: R2Bucket
readonly versionCache: R2Bucket
}
readonly postgresTable: string
readonly r2: {
readonly rooms: R2Bucket
readonly versionCache: R2Bucket
}
_documentInfo: DocumentInfo | null = null
_documentInfo: DocumentInfo | null = null
constructor(
private state: DurableObjectState,
private env: Environment
) {
this.id = state.id
this.storage = state.storage
this.sentryDSN = env.SENTRY_DSN
this.measure = env.MEASURE
this.supabaseClient = createSupabaseClient(env)
constructor(
private state: DurableObjectState,
private env: Environment
) {
this.id = state.id
this.storage = state.storage
this.sentryDSN = env.SENTRY_DSN
this.measure = env.MEASURE
this.postgresClient = createPostgresClient(env)
this.supabaseTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging'
this.r2 = {
rooms: env.ROOMS,
versionCache: env.ROOMS_HISTORY_EPHEMERAL,
}
this.postgresTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging'
this.r2 = {
rooms: env.ROOMS,
versionCache: env.ROOMS_HISTORY_EPHEMERAL,
}
state.blockConcurrencyWhile(async () => {
const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null
if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) {
this._documentInfo = null
} else {
this._documentInfo = existingDocumentInfo
}
})
}
state.blockConcurrencyWhile(async () => {
const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null
if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) {
this._documentInfo = null
} else {
this._documentInfo = existingDocumentInfo
}
})
}
readonly router = Router()
.get(
`/${ROOM_PREFIX}/:roomId`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
(req) => this.onRequest(req)
)
.get(
`/${READ_ONLY_LEGACY_PREFIX}/:roomId`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY),
(req) => this.onRequest(req)
)
.get(
`/${READ_ONLY_PREFIX}/:roomId`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY),
(req) => this.onRequest(req)
)
.post(
`/${ROOM_PREFIX}/:roomId/restore`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
(req) => this.onRestore(req)
)
.all('*', () => new Response('Not found', { status: 404 }))
readonly router = Router()
.get(
`/${ROOM_PREFIX}/:roomId`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
(req) => this.onRequest(req)
)
.get(
`/${READ_ONLY_LEGACY_PREFIX}/:roomId`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY),
(req) => this.onRequest(req)
)
.get(
`/${READ_ONLY_PREFIX}/:roomId`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY),
(req) => this.onRequest(req)
)
.post(
`/${ROOM_PREFIX}/:roomId/restore`,
(req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE),
(req) => this.onRestore(req)
)
.all('*', () => new Response('Not found', { status: 404 }))
readonly scheduler = new AlarmScheduler({
storage: () => this.storage,
alarms: {
persist: async () => {
this.persistToDatabase()
},
},
})
readonly scheduler = new AlarmScheduler({
storage: () => this.storage,
alarms: {
persist: async () => {
this.persistToDatabase()
},
},
})
// eslint-disable-next-line no-restricted-syntax
get documentInfo() {
return assertExists(this._documentInfo, 'documentInfo must be present')
}
extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => {
const slug = assertExists(
await getSlug(this.env, req.params.roomId, roomOpenMode),
'roomId must be present'
)
if (this._documentInfo) {
assert(this._documentInfo.slug === slug, 'slug must match')
} else {
this._documentInfo = {
version: CURRENT_DOCUMENT_INFO_VERSION,
slug,
}
}
}
// eslint-disable-next-line no-restricted-syntax
get documentInfo() {
return assertExists(this._documentInfo, 'documentInfo must be present')
}
extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => {
const slug = assertExists(
await getSlug(this.env, req.params.roomId, roomOpenMode),
'roomId must be present'
)
if (this._documentInfo) {
assert(this._documentInfo.slug === slug, 'slug must match')
} else {
this._documentInfo = {
version: CURRENT_DOCUMENT_INFO_VERSION,
slug,
}
}
}
// Handle a request to the Durable Object.
async fetch(req: IRequest) {
const sentry = createSentry(this.state, this.env, req)
// Handle a request to the Durable Object.
async fetch(req: IRequest) {
const sentry = createSentry(this.state, this.env, req)
try {
return await this.router.handle(req)
} catch (err) {
console.error(err)
// eslint-disable-next-line deprecation/deprecation
sentry?.captureException(err)
return new Response('Something went wrong', {
status: 500,
statusText: 'Internal Server Error',
})
}
}
try {
return await this.router.handle(req)
} catch (err) {
console.error(err)
// eslint-disable-next-line deprecation/deprecation
sentry?.captureException(err)
return new Response('Something went wrong', {
status: 500,
statusText: 'Internal Server Error',
})
}
}
_isRestoring = false
async onRestore(req: IRequest) {
this._isRestoring = true
try {
const roomId = this.documentInfo.slug
const roomKey = getR2KeyForRoom(roomId)
const timestamp = ((await req.json()) as any).timestamp
if (!timestamp) {
return new Response('Missing timestamp', { status: 400 })
}
const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`)
if (!data) {
return new Response('Version not found', { status: 400 })
}
const dataText = await data.text()
await this.r2.rooms.put(roomKey, dataText)
const room = await this.getRoom()
_isRestoring = false
async onRestore(req: IRequest) {
this._isRestoring = true
try {
const roomId = this.documentInfo.slug
const roomKey = getR2KeyForRoom(roomId)
const timestamp = ((await req.json()) as any).timestamp
if (!timestamp) {
return new Response('Missing timestamp', { status: 400 })
}
const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`)
if (!data) {
return new Response('Version not found', { status: 400 })
}
const dataText = await data.text()
await this.r2.rooms.put(roomKey, dataText)
const room = await this.getRoom()
const snapshot: RoomSnapshot = JSON.parse(dataText)
room.loadSnapshot(snapshot)
const snapshot: RoomSnapshot = JSON.parse(dataText)
room.loadSnapshot(snapshot)
return new Response()
} finally {
this._isRestoring = false
}
}
return new Response()
} finally {
this._isRestoring = false
}
}
async onRequest(req: IRequest) {
// extract query params from request, should include instanceId
const url = new URL(req.url)
const params = Object.fromEntries(url.searchParams.entries())
let { sessionKey, storeId } = params
async onRequest(req: IRequest) {
// extract query params from request, should include instanceId
const url = new URL(req.url)
const params = Object.fromEntries(url.searchParams.entries())
let { sessionKey, storeId } = params
// handle legacy param names
sessionKey ??= params.instanceId
storeId ??= params.localClientId
const isNewSession = !this._room
// handle legacy param names
sessionKey ??= params.instanceId
storeId ??= params.localClientId
const isNewSession = !this._room
// Create the websocket pair for the client
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
serverWebSocket.accept()
// Create the websocket pair for the client
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
serverWebSocket.accept()
try {
const room = await this.getRoom()
// Don't connect if we're already at max connections
if (room.getNumActiveSessions() >= MAX_CONNECTIONS) {
return new Response('Room is full', { status: 403 })
}
try {
const room = await this.getRoom()
// Don't connect if we're already at max connections
if (room.getNumActiveSessions() >= MAX_CONNECTIONS) {
return new Response('Room is full', { status: 403 })
}
// all good
room.handleSocketConnect(sessionKey, serverWebSocket, { storeId })
if (isNewSession) {
this.logEvent({
type: 'client',
roomId: this.documentInfo.slug,
name: 'room_reopen',
instanceId: sessionKey,
localClientId: storeId,
})
}
this.logEvent({
type: 'client',
roomId: this.documentInfo.slug,
name: 'enter',
instanceId: sessionKey,
localClientId: storeId,
})
return new Response(null, { status: 101, webSocket: clientWebSocket })
} catch (e) {
if (e === ROOM_NOT_FOUND) {
serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found')
return new Response(null, { status: 101, webSocket: clientWebSocket })
}
throw e
}
}
// all good
room.handleSocketConnect(sessionKey, serverWebSocket, { storeId })
if (isNewSession) {
this.logEvent({
type: 'client',
roomId: this.documentInfo.slug,
name: 'room_reopen',
instanceId: sessionKey,
localClientId: storeId,
})
}
this.logEvent({
type: 'client',
roomId: this.documentInfo.slug,
name: 'enter',
instanceId: sessionKey,
localClientId: storeId,
})
return new Response(null, { status: 101, webSocket: clientWebSocket })
} catch (e) {
if (e === ROOM_NOT_FOUND) {
serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found')
return new Response(null, { status: 101, webSocket: clientWebSocket })
}
throw e
}
}
triggerPersistSchedule = throttle(() => {
this.schedulePersist()
}, 2000)
triggerPersistSchedule = throttle(() => {
this.schedulePersist()
}, 2000)
private writeEvent(
name: string,
{ blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] }
) {
this.measure?.writeDataPoint({
blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])],
doubles,
indexes,
})
}
private writeEvent(
name: string,
{ blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] }
) {
this.measure?.writeDataPoint({
blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])],
doubles,
indexes,
})
}
logEvent(event: TLServerEvent) {
switch (event.type) {
case 'room': {
// we would add user/connection ids here if we could
this.writeEvent(event.name, { blobs: [event.roomId] })
break
}
case 'client': {
// we would add user/connection ids here if we could
this.writeEvent(event.name, {
blobs: [event.roomId, 'unused', event.instanceId],
indexes: [event.localClientId],
})
break
}
case 'send_message': {
this.writeEvent(event.type, {
blobs: [event.roomId, event.messageType],
doubles: [event.messageLength],
})
break
}
default: {
exhaustiveSwitchError(event)
}
}
}
logEvent(event: TLServerEvent) {
switch (event.type) {
case 'room': {
// we would add user/connection ids here if we could
this.writeEvent(event.name, { blobs: [event.roomId] })
break
}
case 'client': {
// we would add user/connection ids here if we could
this.writeEvent(event.name, {
blobs: [event.roomId, 'unused', event.instanceId],
indexes: [event.localClientId],
})
break
}
case 'send_message': {
this.writeEvent(event.type, {
blobs: [event.roomId, event.messageType],
doubles: [event.messageLength],
})
break
}
default: {
exhaustiveSwitchError(event)
}
}
}
// Load the room's drawing data. First we check the R2 bucket, then we fallback to supabase (legacy).
async loadFromDatabase(persistenceKey: string): Promise<DBLoadResult> {
try {
const key = getR2KeyForRoom(persistenceKey)
// when loading, prefer to fetch documents from the bucket
const roomFromBucket = await this.r2.rooms.get(key)
if (roomFromBucket) {
return { type: 'room_found', snapshot: await roomFromBucket.json() }
}
// Load the room's drawing data. First we check the R2 bucket, then we fallback to Postgres (legacy).
async loadFromDatabase(persistenceKey: string): Promise<DBLoadResult> {
try {
const key = getR2KeyForRoom(persistenceKey)
// when loading, prefer to fetch documents from the bucket
const roomFromBucket = await this.r2.rooms.get(key)
if (roomFromBucket) {
return { type: 'room_found', snapshot: await roomFromBucket.json() }
}
// if we don't have a room in the bucket, try to load from supabase
if (!this.supabaseClient) return { type: 'room_not_found' }
const { data, error } = await this.supabaseClient
.from(this.supabaseTable)
.select('*')
.eq('slug', persistenceKey)
// if we don't have a room in the bucket, try to load from Postgres
if (!this.postgresClient) return { type: 'room_not_found' }
await this.postgresClient.connect()
const result = await this.postgresClient.query('SELECT * FROM ' + this.postgresTable + ' WHERE slug = $1', [persistenceKey])
await this.postgresClient.end()
if (error) {
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
if (result.rows.length === 0) {
return { type: 'room_not_found' }
}
console.error('failed to retrieve document', persistenceKey, error)
return { type: 'error', error: new Error(error.message) }
}
// if it didn't find a document, data will be an empty array
if (data.length === 0) {
return { type: 'room_not_found' }
}
const roomFromPostgres = result.rows[0] as PersistedRoomSnapshotForSupabase
return { type: 'room_found', snapshot: roomFromPostgres.drawing }
} catch (error) {
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
const roomFromSupabase = data[0] as PersistedRoomSnapshotForSupabase
return { type: 'room_found', snapshot: roomFromSupabase.drawing }
} catch (error) {
this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' })
console.error('failed to fetch doc', persistenceKey, error)
return { type: 'error', error: error as Error }
}
}
console.error('failed to fetch doc', persistenceKey, error)
return { type: 'error', error: error as Error }
}
}
_lastPersistedClock: number | null = null
_persistQueue = createPersistQueue(async () => {
// check whether the worker was woken up to persist after having gone to sleep
if (!this._room) return
const slug = this.documentInfo.slug
const room = await this.getRoom()
const clock = room.getCurrentDocumentClock()
if (this._lastPersistedClock === clock) return
if (this._isRestoring) return
_lastPersistedClock: number | null = null
_persistQueue = createPersistQueue(async () => {
// check whether the worker was woken up to persist after having gone to sleep
if (!this._room) return
const slug = this.documentInfo.slug
const room = await this.getRoom()
const clock = room.getCurrentDocumentClock()
if (this._lastPersistedClock === clock) return
if (this._isRestoring) return
const snapshot = JSON.stringify(room.getCurrentSnapshot())
const snapshot = JSON.stringify(room.getCurrentSnapshot())
const key = getR2KeyForRoom(slug)
await Promise.all([
this.r2.rooms.put(key, snapshot),
this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot),
])
this._lastPersistedClock = clock
// use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop
// just in case there's any possibility of setting up a neverending queue
}, PERSIST_INTERVAL_MS / 2)
const key = getR2KeyForRoom(slug)
await Promise.all([
this.r2.rooms.put(key, snapshot),
this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot),
])
this._lastPersistedClock = clock
// use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop
// just in case there's any possibility of setting up a neverending queue
}, PERSIST_INTERVAL_MS / 2)
// Save the room to Postgres
async persistToDatabase() {
await this._persistQueue()
}
// Save the room to supabase
async persistToDatabase() {
await this._persistQueue()
}
async schedulePersist() {
await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, {
overwrite: 'if-sooner',
})
async schedulePersist() {
await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, {
overwrite: 'if-sooner',
})
}
// Will be called automatically when the alarm ticks.

View file

@ -3,58 +3,57 @@ import { notFound } from '@tldraw/worker-shared'
import { IRequest } from 'itty-router'
import { getR2KeyForSnapshot } from '../r2'
import { Environment } from '../types'
import { createSupabaseClient, noSupabaseSorry } from '../utils/createSupabaseClient'
import { getSnapshotsTable } from '../utils/getSnapshotsTable'
import { R2Snapshot } from './createRoomSnapshot'
import { createPostgresClient, noPostgresSorry } from '../utils/createPostgresClient'
function generateReponse(roomId: string, data: RoomSnapshot) {
return new Response(
JSON.stringify({
roomId,
records: data.documents.map((d) => d.state),
schema: data.schema,
error: false,
}),
{
headers: { 'content-type': 'application/json' },
}
)
return new Response(
JSON.stringify({
roomId,
records: data.documents.map((d) => d.state),
schema: data.schema,
error: false,
}),
{
headers: { 'content-type': 'application/json' },
}
)
}
// Returns a snapshot of the room at a given point in time
export async function getRoomSnapshot(request: IRequest, env: Environment): Promise<Response> {
const roomId = request.params.roomId
if (!roomId) return notFound()
const roomId = request.params.roomId
if (!roomId) return notFound()
// Get the parent slug if it exists
const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId)
// Get the parent slug if it exists
const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId)
// Get the room snapshot from R2
const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId))
// Get the room snapshot from R2
const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId))
if (snapshot) {
const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot
if (data) {
return generateReponse(roomId, data)
}
}
if (snapshot) {
const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot
if (data) {
return generateReponse(roomId, data)
}
}
// If we can't find the snapshot in R2 then fallback to Supabase
// Create a supabase client
const supabase = createSupabaseClient(env)
if (!supabase) return noSupabaseSorry()
// If we can't find the snapshot in R2 then fallback to Postgres
// Create a Postgres client
const postgresClient = createPostgresClient(env)
if (!postgresClient) return noPostgresSorry()
// Get the snapshot from the table
const supabaseTable = getSnapshotsTable(env)
const result = await supabase
.from(supabaseTable)
.select('drawing')
.eq('slug', roomId)
.maybeSingle()
const data = result.data?.drawing as RoomSnapshot
try {
await postgresClient.connect()
const result = await postgresClient.query('SELECT drawing FROM snapshots WHERE slug = $1 LIMIT 1', [roomId])
await postgresClient.end()
if (!data) return notFound()
if (result.rows.length === 0) return notFound()
const data = result.rows[0].drawing as RoomSnapshot
// Send back the snapshot!
return generateReponse(roomId, data)
// Send back the snapshot!
return generateReponse(roomId, data)
} catch (err) {
console.error('Error querying Postgres', err)
return new Response(JSON.stringify({ error: true, message: 'Error querying Postgres' }), { status: 500 })
}
}

View file

@ -0,0 +1,35 @@
import { Client } from 'pg'
import { Environment } from '../types'
export function createPostgresClient(env: Environment) {
if (env.POSTGRES_HOST && env.POSTGRES_USER && env.POSTGRES_PASSWORD && env.POSTGRES_DB) {
var client = new Client({
host: env.POSTGRES_HOST,
port: env.POSTGRES_PORT ? parseInt(env.POSTGRES_PORT) : 5432,
user: env.POSTGRES_USER,
password: env.POSTGRES_PASSWORD,
database: env.POSTGRES_DB,
})
client.connect()
client.query(`
CREATE TABLE IF NOT EXISTS snapshots (
id SERIAL PRIMARY KEY,
slug VARCHAR(255) UNIQUE NOT NULL,
drawing JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
`)
return client
} else {
console.warn('No Postgres credentials, loading from Postgres disabled')
return null
}
}
export function noPostgresSorry() {
return new Response(JSON.stringify({ error: true, message: 'Could not create Postgres client' }))
}