From c0a96442965274e63f6b8d0ecfde2e8cb3cfdbac Mon Sep 17 00:00:00 2001 From: Kumi Date: Wed, 17 Jul 2024 09:11:54 +0200 Subject: [PATCH] Revert "feat: migrate data persistence from Supabase to Postgres" This reverts commit 43419581be9656af26f61ff3d96cb72de086d2be. --- apps/dotcom-worker/src/TLDrawDurableObject.ts | 703 +++++++++--------- .../src/routes/getRoomSnapshot.ts | 83 ++- .../src/utils/createPostgresClient.ts | 35 - 3 files changed, 398 insertions(+), 423 deletions(-) delete mode 100644 apps/dotcom-worker/src/utils/createPostgresClient.ts diff --git a/apps/dotcom-worker/src/TLDrawDurableObject.ts b/apps/dotcom-worker/src/TLDrawDurableObject.ts index 245ae9850..6e9390ef0 100644 --- a/apps/dotcom-worker/src/TLDrawDurableObject.ts +++ b/apps/dotcom-worker/src/TLDrawDurableObject.ts @@ -1,28 +1,29 @@ /// /// -import { Client as PostgresClient } from 'pg' +import { SupabaseClient } from '@supabase/supabase-js' import { - READ_ONLY_LEGACY_PREFIX, - READ_ONLY_PREFIX, - ROOM_OPEN_MODE, - ROOM_PREFIX, - type RoomOpenMode, + READ_ONLY_LEGACY_PREFIX, + READ_ONLY_PREFIX, + ROOM_OPEN_MODE, + ROOM_PREFIX, + type RoomOpenMode, } from '@tldraw/dotcom-shared' import { - RoomSnapshot, - TLSocketRoom, - type PersistedRoomSnapshotForSupabase, + RoomSnapshot, + TLCloseEventCode, + TLSocketRoom, + type PersistedRoomSnapshotForSupabase, } from '@tldraw/sync-core' import { TLRecord } from '@tldraw/tlschema' -import { assertExists, exhaustiveSwitchError } from '@tldraw/utils' +import { assert, assertExists, exhaustiveSwitchError } from '@tldraw/utils' import { createPersistQueue, createSentry } from '@tldraw/worker-shared' import { IRequest, Router } from 'itty-router' import { AlarmScheduler } from './AlarmScheduler' import { PERSIST_INTERVAL_MS } from './config' import { getR2KeyForRoom } from './r2' import { Analytics, DBLoadResult, Environment, TLServerEvent } from './types' -import { createPostgresClient } from './utils/createPostgresClient' +import { createSupabaseClient } from './utils/createSupabaseClient' import { getSlug } from './utils/roomOpenMode' import { throttle } from './utils/throttle' @@ -31,388 +32,396 @@ const MAX_CONNECTIONS = 50 // increment this any time you make a change to this type const CURRENT_DOCUMENT_INFO_VERSION = 0 interface DocumentInfo { - version: number - slug: string + version: number + slug: string } const ROOM_NOT_FOUND = Symbol('room_not_found') export class TLDrawDurableObject { - // A unique identifier for this instance of the Durable Object - id: DurableObjectId + // A unique identifier for this instance of the Durable Object + id: DurableObjectId - // For TLSyncRoom - _room: Promise> | null = null + // For TLSyncRoom + _room: Promise> | null = null - getRoom() { - if (!this._documentInfo) { - throw new Error('documentInfo must be present when accessing room') - } - const slug = this._documentInfo.slug - if (!this._room) { - this._room = this.loadFromDatabase(slug).then((result) => { - switch (result.type) { - case 'room_found': { - const room = new TLSocketRoom({ - initialSnapshot: result.snapshot, - onSessionRemoved: async (room, args) => { - this.logEvent({ - type: 'client', - roomId: slug, - name: 'leave', - instanceId: args.sessionKey, - localClientId: args.meta.storeId, - }) + getRoom() { + if (!this._documentInfo) { + throw new Error('documentInfo must be present when accessing room') + } + const slug = this._documentInfo.slug + if (!this._room) { + this._room = this.loadFromDatabase(slug).then((result) => { + switch (result.type) { + case 'room_found': { + const room = new TLSocketRoom({ + initialSnapshot: result.snapshot, + onSessionRemoved: async (room, args) => { + this.logEvent({ + type: 'client', + roomId: slug, + name: 'leave', + instanceId: args.sessionKey, + localClientId: args.meta.storeId, + }) - if (args.numSessionsRemaining > 0) return - if (!this._room) return - this.logEvent({ - type: 'client', - roomId: slug, - name: 'last_out', - instanceId: args.sessionKey, - localClientId: args.meta.storeId, - }) - try { - await this.persistToDatabase() - } catch (err) { - // already logged - } - // make sure nobody joined the room while we were persisting - if (room.getNumActiveSessions() > 0) return - this._room = null - this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' }) - room.close() - }, - onDataChange: () => { - this.triggerPersistSchedule() - }, - onBeforeSendMessage: ({ message, stringified }) => { - this.logEvent({ - type: 'send_message', - roomId: slug, - messageType: message.type, - messageLength: stringified.length, - }) - }, - }) - this.logEvent({ type: 'room', roomId: slug, name: 'room_start' }) - return room - } - case 'room_not_found': { - throw ROOM_NOT_FOUND - } - case 'error': { - throw result.error - } - default: { - exhaustiveSwitchError(result) - } - } - }) - } - return this._room - } + if (args.numSessionsRemaining > 0) return + if (!this._room) return + this.logEvent({ + type: 'client', + roomId: slug, + name: 'last_out', + instanceId: args.sessionKey, + localClientId: args.meta.storeId, + }) + try { + await this.persistToDatabase() + } catch (err) { + // already logged + } + // make sure nobody joined the room while we were persisting + if (room.getNumActiveSessions() > 0) return + this._room = null + this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' }) + room.close() + }, + onDataChange: () => { + this.triggerPersistSchedule() + }, + onBeforeSendMessage: ({ message, stringified }) => { + this.logEvent({ + type: 'send_message', + roomId: slug, + messageType: message.type, + messageLength: stringified.length, + }) + }, + }) + this.logEvent({ type: 'room', roomId: slug, name: 'room_start' }) + return room + } + case 'room_not_found': { + throw ROOM_NOT_FOUND + } + case 'error': { + throw result.error + } + default: { + exhaustiveSwitchError(result) + } + } + }) + } + return this._room + } - // For storage - storage: DurableObjectStorage + // For storage + storage: DurableObjectStorage - // For persistence - postgresClient: PostgresClient | null + // For persistence + supabaseClient: SupabaseClient | void - // For analytics - measure: Analytics | undefined + // For analytics + measure: Analytics | undefined - // For error tracking - sentryDSN: string | undefined + // For error tracking + sentryDSN: string | undefined - readonly postgresTable: string - readonly r2: { - readonly rooms: R2Bucket - readonly versionCache: R2Bucket - } + readonly supabaseTable: string + readonly r2: { + readonly rooms: R2Bucket + readonly versionCache: R2Bucket + } - _documentInfo: DocumentInfo | null = null + _documentInfo: DocumentInfo | null = null - constructor( - private state: DurableObjectState, - private env: Environment - ) { - this.id = state.id - this.storage = state.storage - this.sentryDSN = env.SENTRY_DSN - this.measure = env.MEASURE - this.postgresClient = createPostgresClient(env) + constructor( + private state: DurableObjectState, + private env: Environment + ) { + this.id = state.id + this.storage = state.storage + this.sentryDSN = env.SENTRY_DSN + this.measure = env.MEASURE + this.supabaseClient = createSupabaseClient(env) - this.postgresTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging' - this.r2 = { - rooms: env.ROOMS, - versionCache: env.ROOMS_HISTORY_EPHEMERAL, - } + this.supabaseTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging' + this.r2 = { + rooms: env.ROOMS, + versionCache: env.ROOMS_HISTORY_EPHEMERAL, + } - state.blockConcurrencyWhile(async () => { - const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null - if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) { - this._documentInfo = null - } else { - this._documentInfo = existingDocumentInfo - } - }) - } + state.blockConcurrencyWhile(async () => { + const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null + if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) { + this._documentInfo = null + } else { + this._documentInfo = existingDocumentInfo + } + }) + } - readonly router = Router() - .get( - `/${ROOM_PREFIX}/:roomId`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), - (req) => this.onRequest(req) - ) - .get( - `/${READ_ONLY_LEGACY_PREFIX}/:roomId`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY), - (req) => this.onRequest(req) - ) - .get( - `/${READ_ONLY_PREFIX}/:roomId`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY), - (req) => this.onRequest(req) - ) - .post( - `/${ROOM_PREFIX}/:roomId/restore`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), - (req) => this.onRestore(req) - ) - .all('*', () => new Response('Not found', { status: 404 })) + readonly router = Router() + .get( + `/${ROOM_PREFIX}/:roomId`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), + (req) => this.onRequest(req) + ) + .get( + `/${READ_ONLY_LEGACY_PREFIX}/:roomId`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY), + (req) => this.onRequest(req) + ) + .get( + `/${READ_ONLY_PREFIX}/:roomId`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY), + (req) => this.onRequest(req) + ) + .post( + `/${ROOM_PREFIX}/:roomId/restore`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), + (req) => this.onRestore(req) + ) + .all('*', () => new Response('Not found', { status: 404 })) - readonly scheduler = new AlarmScheduler({ - storage: () => this.storage, - alarms: { - persist: async () => { - this.persistToDatabase() - }, - }, - }) + readonly scheduler = new AlarmScheduler({ + storage: () => this.storage, + alarms: { + persist: async () => { + this.persistToDatabase() + }, + }, + }) - // eslint-disable-next-line no-restricted-syntax - get documentInfo() { - return assertExists(this._documentInfo, 'documentInfo must be present') - } - extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => { - const slug = assertExists( - await getSlug(this.env, req.params.roomId, roomOpenMode), - 'roomId must be present' - ) - if (this._documentInfo) { - assert(this._documentInfo.slug === slug, 'slug must match') - } else { - this._documentInfo = { - version: CURRENT_DOCUMENT_INFO_VERSION, - slug, - } - } - } + // eslint-disable-next-line no-restricted-syntax + get documentInfo() { + return assertExists(this._documentInfo, 'documentInfo must be present') + } + extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => { + const slug = assertExists( + await getSlug(this.env, req.params.roomId, roomOpenMode), + 'roomId must be present' + ) + if (this._documentInfo) { + assert(this._documentInfo.slug === slug, 'slug must match') + } else { + this._documentInfo = { + version: CURRENT_DOCUMENT_INFO_VERSION, + slug, + } + } + } - // Handle a request to the Durable Object. - async fetch(req: IRequest) { - const sentry = createSentry(this.state, this.env, req) + // Handle a request to the Durable Object. + async fetch(req: IRequest) { + const sentry = createSentry(this.state, this.env, req) - try { - return await this.router.handle(req) - } catch (err) { - console.error(err) - // eslint-disable-next-line deprecation/deprecation - sentry?.captureException(err) - return new Response('Something went wrong', { - status: 500, - statusText: 'Internal Server Error', - }) - } - } + try { + return await this.router.handle(req) + } catch (err) { + console.error(err) + // eslint-disable-next-line deprecation/deprecation + sentry?.captureException(err) + return new Response('Something went wrong', { + status: 500, + statusText: 'Internal Server Error', + }) + } + } - _isRestoring = false - async onRestore(req: IRequest) { - this._isRestoring = true - try { - const roomId = this.documentInfo.slug - const roomKey = getR2KeyForRoom(roomId) - const timestamp = ((await req.json()) as any).timestamp - if (!timestamp) { - return new Response('Missing timestamp', { status: 400 }) - } - const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`) - if (!data) { - return new Response('Version not found', { status: 400 }) - } - const dataText = await data.text() - await this.r2.rooms.put(roomKey, dataText) - const room = await this.getRoom() + _isRestoring = false + async onRestore(req: IRequest) { + this._isRestoring = true + try { + const roomId = this.documentInfo.slug + const roomKey = getR2KeyForRoom(roomId) + const timestamp = ((await req.json()) as any).timestamp + if (!timestamp) { + return new Response('Missing timestamp', { status: 400 }) + } + const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`) + if (!data) { + return new Response('Version not found', { status: 400 }) + } + const dataText = await data.text() + await this.r2.rooms.put(roomKey, dataText) + const room = await this.getRoom() - const snapshot: RoomSnapshot = JSON.parse(dataText) - room.loadSnapshot(snapshot) + const snapshot: RoomSnapshot = JSON.parse(dataText) + room.loadSnapshot(snapshot) - return new Response() - } finally { - this._isRestoring = false - } - } + return new Response() + } finally { + this._isRestoring = false + } + } - async onRequest(req: IRequest) { - // extract query params from request, should include instanceId - const url = new URL(req.url) - const params = Object.fromEntries(url.searchParams.entries()) - let { sessionKey, storeId } = params + async onRequest(req: IRequest) { + // extract query params from request, should include instanceId + const url = new URL(req.url) + const params = Object.fromEntries(url.searchParams.entries()) + let { sessionKey, storeId } = params - // handle legacy param names - sessionKey ??= params.instanceId - storeId ??= params.localClientId - const isNewSession = !this._room + // handle legacy param names + sessionKey ??= params.instanceId + storeId ??= params.localClientId + const isNewSession = !this._room - // Create the websocket pair for the client - const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair() - serverWebSocket.accept() + // Create the websocket pair for the client + const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair() + serverWebSocket.accept() - try { - const room = await this.getRoom() - // Don't connect if we're already at max connections - if (room.getNumActiveSessions() >= MAX_CONNECTIONS) { - return new Response('Room is full', { status: 403 }) - } + try { + const room = await this.getRoom() + // Don't connect if we're already at max connections + if (room.getNumActiveSessions() >= MAX_CONNECTIONS) { + return new Response('Room is full', { status: 403 }) + } - // all good - room.handleSocketConnect(sessionKey, serverWebSocket, { storeId }) - if (isNewSession) { - this.logEvent({ - type: 'client', - roomId: this.documentInfo.slug, - name: 'room_reopen', - instanceId: sessionKey, - localClientId: storeId, - }) - } - this.logEvent({ - type: 'client', - roomId: this.documentInfo.slug, - name: 'enter', - instanceId: sessionKey, - localClientId: storeId, - }) - return new Response(null, { status: 101, webSocket: clientWebSocket }) - } catch (e) { - if (e === ROOM_NOT_FOUND) { - serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found') - return new Response(null, { status: 101, webSocket: clientWebSocket }) - } - throw e - } - } + // all good + room.handleSocketConnect(sessionKey, serverWebSocket, { storeId }) + if (isNewSession) { + this.logEvent({ + type: 'client', + roomId: this.documentInfo.slug, + name: 'room_reopen', + instanceId: sessionKey, + localClientId: storeId, + }) + } + this.logEvent({ + type: 'client', + roomId: this.documentInfo.slug, + name: 'enter', + instanceId: sessionKey, + localClientId: storeId, + }) + return new Response(null, { status: 101, webSocket: clientWebSocket }) + } catch (e) { + if (e === ROOM_NOT_FOUND) { + serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found') + return new Response(null, { status: 101, webSocket: clientWebSocket }) + } + throw e + } + } - triggerPersistSchedule = throttle(() => { - this.schedulePersist() - }, 2000) + triggerPersistSchedule = throttle(() => { + this.schedulePersist() + }, 2000) - private writeEvent( - name: string, - { blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] } - ) { - this.measure?.writeDataPoint({ - blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])], - doubles, - indexes, - }) - } + private writeEvent( + name: string, + { blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] } + ) { + this.measure?.writeDataPoint({ + blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])], + doubles, + indexes, + }) + } - logEvent(event: TLServerEvent) { - switch (event.type) { - case 'room': { - // we would add user/connection ids here if we could - this.writeEvent(event.name, { blobs: [event.roomId] }) - break - } - case 'client': { - // we would add user/connection ids here if we could - this.writeEvent(event.name, { - blobs: [event.roomId, 'unused', event.instanceId], - indexes: [event.localClientId], - }) - break - } - case 'send_message': { - this.writeEvent(event.type, { - blobs: [event.roomId, event.messageType], - doubles: [event.messageLength], - }) - break - } - default: { - exhaustiveSwitchError(event) - } - } - } + logEvent(event: TLServerEvent) { + switch (event.type) { + case 'room': { + // we would add user/connection ids here if we could + this.writeEvent(event.name, { blobs: [event.roomId] }) + break + } + case 'client': { + // we would add user/connection ids here if we could + this.writeEvent(event.name, { + blobs: [event.roomId, 'unused', event.instanceId], + indexes: [event.localClientId], + }) + break + } + case 'send_message': { + this.writeEvent(event.type, { + blobs: [event.roomId, event.messageType], + doubles: [event.messageLength], + }) + break + } + default: { + exhaustiveSwitchError(event) + } + } + } - // Load the room's drawing data. First we check the R2 bucket, then we fallback to Postgres (legacy). - async loadFromDatabase(persistenceKey: string): Promise { - try { - const key = getR2KeyForRoom(persistenceKey) - // when loading, prefer to fetch documents from the bucket - const roomFromBucket = await this.r2.rooms.get(key) - if (roomFromBucket) { - return { type: 'room_found', snapshot: await roomFromBucket.json() } - } + // Load the room's drawing data. First we check the R2 bucket, then we fallback to supabase (legacy). + async loadFromDatabase(persistenceKey: string): Promise { + try { + const key = getR2KeyForRoom(persistenceKey) + // when loading, prefer to fetch documents from the bucket + const roomFromBucket = await this.r2.rooms.get(key) + if (roomFromBucket) { + return { type: 'room_found', snapshot: await roomFromBucket.json() } + } - // if we don't have a room in the bucket, try to load from Postgres - if (!this.postgresClient) return { type: 'room_not_found' } - await this.postgresClient.connect() - const result = await this.postgresClient.query('SELECT * FROM ' + this.postgresTable + ' WHERE slug = $1', [persistenceKey]) - await this.postgresClient.end() + // if we don't have a room in the bucket, try to load from supabase + if (!this.supabaseClient) return { type: 'room_not_found' } + const { data, error } = await this.supabaseClient + .from(this.supabaseTable) + .select('*') + .eq('slug', persistenceKey) - if (result.rows.length === 0) { - return { type: 'room_not_found' } - } + if (error) { + this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) - const roomFromPostgres = result.rows[0] as PersistedRoomSnapshotForSupabase - return { type: 'room_found', snapshot: roomFromPostgres.drawing } - } catch (error) { - this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) + console.error('failed to retrieve document', persistenceKey, error) + return { type: 'error', error: new Error(error.message) } + } + // if it didn't find a document, data will be an empty array + if (data.length === 0) { + return { type: 'room_not_found' } + } - console.error('failed to fetch doc', persistenceKey, error) - return { type: 'error', error: error as Error } - } - } + const roomFromSupabase = data[0] as PersistedRoomSnapshotForSupabase + return { type: 'room_found', snapshot: roomFromSupabase.drawing } + } catch (error) { + this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) - _lastPersistedClock: number | null = null - _persistQueue = createPersistQueue(async () => { - // check whether the worker was woken up to persist after having gone to sleep - if (!this._room) return - const slug = this.documentInfo.slug - const room = await this.getRoom() - const clock = room.getCurrentDocumentClock() - if (this._lastPersistedClock === clock) return - if (this._isRestoring) return + console.error('failed to fetch doc', persistenceKey, error) + return { type: 'error', error: error as Error } + } + } - const snapshot = JSON.stringify(room.getCurrentSnapshot()) + _lastPersistedClock: number | null = null + _persistQueue = createPersistQueue(async () => { + // check whether the worker was woken up to persist after having gone to sleep + if (!this._room) return + const slug = this.documentInfo.slug + const room = await this.getRoom() + const clock = room.getCurrentDocumentClock() + if (this._lastPersistedClock === clock) return + if (this._isRestoring) return - const key = getR2KeyForRoom(slug) - await Promise.all([ - this.r2.rooms.put(key, snapshot), - this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot), - ]) - this._lastPersistedClock = clock - // use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop - // just in case there's any possibility of setting up a neverending queue - }, PERSIST_INTERVAL_MS / 2) + const snapshot = JSON.stringify(room.getCurrentSnapshot()) - // Save the room to Postgres - async persistToDatabase() { - await this._persistQueue() - } + const key = getR2KeyForRoom(slug) + await Promise.all([ + this.r2.rooms.put(key, snapshot), + this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot), + ]) + this._lastPersistedClock = clock + // use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop + // just in case there's any possibility of setting up a neverending queue + }, PERSIST_INTERVAL_MS / 2) - async schedulePersist() { - await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, { - overwrite: 'if-sooner', - }) + // Save the room to supabase + async persistToDatabase() { + await this._persistQueue() + } + + async schedulePersist() { + await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, { + overwrite: 'if-sooner', + }) } // Will be called automatically when the alarm ticks. async alarm() { await this.scheduler.onAlarm() } -} \ No newline at end of file +} diff --git a/apps/dotcom-worker/src/routes/getRoomSnapshot.ts b/apps/dotcom-worker/src/routes/getRoomSnapshot.ts index f97d13966..f5c6a682a 100644 --- a/apps/dotcom-worker/src/routes/getRoomSnapshot.ts +++ b/apps/dotcom-worker/src/routes/getRoomSnapshot.ts @@ -3,57 +3,58 @@ import { notFound } from '@tldraw/worker-shared' import { IRequest } from 'itty-router' import { getR2KeyForSnapshot } from '../r2' import { Environment } from '../types' -import { createPostgresClient, noPostgresSorry } from '../utils/createPostgresClient' +import { createSupabaseClient, noSupabaseSorry } from '../utils/createSupabaseClient' +import { getSnapshotsTable } from '../utils/getSnapshotsTable' +import { R2Snapshot } from './createRoomSnapshot' function generateReponse(roomId: string, data: RoomSnapshot) { - return new Response( - JSON.stringify({ - roomId, - records: data.documents.map((d) => d.state), - schema: data.schema, - error: false, - }), - { - headers: { 'content-type': 'application/json' }, - } - ) + return new Response( + JSON.stringify({ + roomId, + records: data.documents.map((d) => d.state), + schema: data.schema, + error: false, + }), + { + headers: { 'content-type': 'application/json' }, + } + ) } // Returns a snapshot of the room at a given point in time export async function getRoomSnapshot(request: IRequest, env: Environment): Promise { - const roomId = request.params.roomId - if (!roomId) return notFound() + const roomId = request.params.roomId + if (!roomId) return notFound() - // Get the parent slug if it exists - const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId) + // Get the parent slug if it exists + const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId) - // Get the room snapshot from R2 - const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId)) + // Get the room snapshot from R2 + const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId)) - if (snapshot) { - const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot - if (data) { - return generateReponse(roomId, data) - } - } + if (snapshot) { + const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot + if (data) { + return generateReponse(roomId, data) + } + } - // If we can't find the snapshot in R2 then fallback to Postgres - // Create a Postgres client - const postgresClient = createPostgresClient(env) - if (!postgresClient) return noPostgresSorry() + // If we can't find the snapshot in R2 then fallback to Supabase + // Create a supabase client + const supabase = createSupabaseClient(env) + if (!supabase) return noSupabaseSorry() - try { - await postgresClient.connect() - const result = await postgresClient.query('SELECT drawing FROM snapshots WHERE slug = $1 LIMIT 1', [roomId]) - await postgresClient.end() + // Get the snapshot from the table + const supabaseTable = getSnapshotsTable(env) + const result = await supabase + .from(supabaseTable) + .select('drawing') + .eq('slug', roomId) + .maybeSingle() + const data = result.data?.drawing as RoomSnapshot - if (result.rows.length === 0) return notFound() - const data = result.rows[0].drawing as RoomSnapshot + if (!data) return notFound() - // Send back the snapshot! - return generateReponse(roomId, data) - } catch (err) { - console.error('Error querying Postgres', err) - return new Response(JSON.stringify({ error: true, message: 'Error querying Postgres' }), { status: 500 }) - } -} \ No newline at end of file + // Send back the snapshot! + return generateReponse(roomId, data) +} diff --git a/apps/dotcom-worker/src/utils/createPostgresClient.ts b/apps/dotcom-worker/src/utils/createPostgresClient.ts deleted file mode 100644 index dc94eb3f9..000000000 --- a/apps/dotcom-worker/src/utils/createPostgresClient.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { Client } from 'pg' -import { Environment } from '../types' - -export function createPostgresClient(env: Environment) { - if (env.POSTGRES_HOST && env.POSTGRES_USER && env.POSTGRES_PASSWORD && env.POSTGRES_DB) { - var client = new Client({ - host: env.POSTGRES_HOST, - port: env.POSTGRES_PORT ? parseInt(env.POSTGRES_PORT) : 5432, - user: env.POSTGRES_USER, - password: env.POSTGRES_PASSWORD, - database: env.POSTGRES_DB, - }) - - client.connect() - - client.query(` - CREATE TABLE IF NOT EXISTS snapshots ( - id SERIAL PRIMARY KEY, - slug VARCHAR(255) UNIQUE NOT NULL, - drawing JSONB NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP - ); - `) - - return client - - } else { - console.warn('No Postgres credentials, loading from Postgres disabled') - return null - } -} - -export function noPostgresSorry() { - return new Response(JSON.stringify({ error: true, message: 'Could not create Postgres client' })) -} \ No newline at end of file