From 43419581be9656af26f61ff3d96cb72de086d2be Mon Sep 17 00:00:00 2001 From: Kumi Date: Tue, 16 Jul 2024 21:11:07 +0200 Subject: [PATCH] feat: migrate data persistence from Supabase to Postgres Switched from Supabase to Postgres for handling database operations related to room snapshots and drawings. This change involves updating the imports and persistence logic in various components to utilize Postgres instead of Supabase. Benefits include improved performance and greater control over database operations. Added connection and query handling for Postgres in the utility function. Includes: - Updated imports and logic in TLDrawDurableObject - Changes in getRoomSnapshot to use Postgres - New createPostgresClient utility function for DB connection --- apps/dotcom-worker/src/TLDrawDurableObject.ts | 703 +++++++++--------- .../src/routes/getRoomSnapshot.ts | 83 +-- .../src/utils/createPostgresClient.ts | 35 + 3 files changed, 423 insertions(+), 398 deletions(-) create mode 100644 apps/dotcom-worker/src/utils/createPostgresClient.ts diff --git a/apps/dotcom-worker/src/TLDrawDurableObject.ts b/apps/dotcom-worker/src/TLDrawDurableObject.ts index 6e9390ef0..245ae9850 100644 --- a/apps/dotcom-worker/src/TLDrawDurableObject.ts +++ b/apps/dotcom-worker/src/TLDrawDurableObject.ts @@ -1,29 +1,28 @@ /// /// -import { SupabaseClient } from '@supabase/supabase-js' +import { Client as PostgresClient } from 'pg' import { - READ_ONLY_LEGACY_PREFIX, - READ_ONLY_PREFIX, - ROOM_OPEN_MODE, - ROOM_PREFIX, - type RoomOpenMode, + READ_ONLY_LEGACY_PREFIX, + READ_ONLY_PREFIX, + ROOM_OPEN_MODE, + ROOM_PREFIX, + type RoomOpenMode, } from '@tldraw/dotcom-shared' import { - RoomSnapshot, - TLCloseEventCode, - TLSocketRoom, - type PersistedRoomSnapshotForSupabase, + RoomSnapshot, + TLSocketRoom, + type PersistedRoomSnapshotForSupabase, } from '@tldraw/sync-core' import { TLRecord } from '@tldraw/tlschema' -import { assert, assertExists, exhaustiveSwitchError } from '@tldraw/utils' +import { assertExists, exhaustiveSwitchError } from '@tldraw/utils' import { createPersistQueue, createSentry } from '@tldraw/worker-shared' import { IRequest, Router } from 'itty-router' import { AlarmScheduler } from './AlarmScheduler' import { PERSIST_INTERVAL_MS } from './config' import { getR2KeyForRoom } from './r2' import { Analytics, DBLoadResult, Environment, TLServerEvent } from './types' -import { createSupabaseClient } from './utils/createSupabaseClient' +import { createPostgresClient } from './utils/createPostgresClient' import { getSlug } from './utils/roomOpenMode' import { throttle } from './utils/throttle' @@ -32,396 +31,388 @@ const MAX_CONNECTIONS = 50 // increment this any time you make a change to this type const CURRENT_DOCUMENT_INFO_VERSION = 0 interface DocumentInfo { - version: number - slug: string + version: number + slug: string } const ROOM_NOT_FOUND = Symbol('room_not_found') export class TLDrawDurableObject { - // A unique identifier for this instance of the Durable Object - id: DurableObjectId + // A unique identifier for this instance of the Durable Object + id: DurableObjectId - // For TLSyncRoom - _room: Promise> | null = null + // For TLSyncRoom + _room: Promise> | null = null - getRoom() { - if (!this._documentInfo) { - throw new Error('documentInfo must be present when accessing room') - } - const slug = this._documentInfo.slug - if (!this._room) { - this._room = this.loadFromDatabase(slug).then((result) => { - switch (result.type) { - case 'room_found': { - const room = new TLSocketRoom({ - initialSnapshot: result.snapshot, - onSessionRemoved: async (room, args) => { - this.logEvent({ - type: 'client', - roomId: slug, - name: 'leave', - instanceId: args.sessionKey, - localClientId: args.meta.storeId, - }) + getRoom() { + if (!this._documentInfo) { + throw new Error('documentInfo must be present when accessing room') + } + const slug = this._documentInfo.slug + if (!this._room) { + this._room = this.loadFromDatabase(slug).then((result) => { + switch (result.type) { + case 'room_found': { + const room = new TLSocketRoom({ + initialSnapshot: result.snapshot, + onSessionRemoved: async (room, args) => { + this.logEvent({ + type: 'client', + roomId: slug, + name: 'leave', + instanceId: args.sessionKey, + localClientId: args.meta.storeId, + }) - if (args.numSessionsRemaining > 0) return - if (!this._room) return - this.logEvent({ - type: 'client', - roomId: slug, - name: 'last_out', - instanceId: args.sessionKey, - localClientId: args.meta.storeId, - }) - try { - await this.persistToDatabase() - } catch (err) { - // already logged - } - // make sure nobody joined the room while we were persisting - if (room.getNumActiveSessions() > 0) return - this._room = null - this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' }) - room.close() - }, - onDataChange: () => { - this.triggerPersistSchedule() - }, - onBeforeSendMessage: ({ message, stringified }) => { - this.logEvent({ - type: 'send_message', - roomId: slug, - messageType: message.type, - messageLength: stringified.length, - }) - }, - }) - this.logEvent({ type: 'room', roomId: slug, name: 'room_start' }) - return room - } - case 'room_not_found': { - throw ROOM_NOT_FOUND - } - case 'error': { - throw result.error - } - default: { - exhaustiveSwitchError(result) - } - } - }) - } - return this._room - } + if (args.numSessionsRemaining > 0) return + if (!this._room) return + this.logEvent({ + type: 'client', + roomId: slug, + name: 'last_out', + instanceId: args.sessionKey, + localClientId: args.meta.storeId, + }) + try { + await this.persistToDatabase() + } catch (err) { + // already logged + } + // make sure nobody joined the room while we were persisting + if (room.getNumActiveSessions() > 0) return + this._room = null + this.logEvent({ type: 'room', roomId: slug, name: 'room_empty' }) + room.close() + }, + onDataChange: () => { + this.triggerPersistSchedule() + }, + onBeforeSendMessage: ({ message, stringified }) => { + this.logEvent({ + type: 'send_message', + roomId: slug, + messageType: message.type, + messageLength: stringified.length, + }) + }, + }) + this.logEvent({ type: 'room', roomId: slug, name: 'room_start' }) + return room + } + case 'room_not_found': { + throw ROOM_NOT_FOUND + } + case 'error': { + throw result.error + } + default: { + exhaustiveSwitchError(result) + } + } + }) + } + return this._room + } - // For storage - storage: DurableObjectStorage + // For storage + storage: DurableObjectStorage - // For persistence - supabaseClient: SupabaseClient | void + // For persistence + postgresClient: PostgresClient | null - // For analytics - measure: Analytics | undefined + // For analytics + measure: Analytics | undefined - // For error tracking - sentryDSN: string | undefined + // For error tracking + sentryDSN: string | undefined - readonly supabaseTable: string - readonly r2: { - readonly rooms: R2Bucket - readonly versionCache: R2Bucket - } + readonly postgresTable: string + readonly r2: { + readonly rooms: R2Bucket + readonly versionCache: R2Bucket + } - _documentInfo: DocumentInfo | null = null + _documentInfo: DocumentInfo | null = null - constructor( - private state: DurableObjectState, - private env: Environment - ) { - this.id = state.id - this.storage = state.storage - this.sentryDSN = env.SENTRY_DSN - this.measure = env.MEASURE - this.supabaseClient = createSupabaseClient(env) + constructor( + private state: DurableObjectState, + private env: Environment + ) { + this.id = state.id + this.storage = state.storage + this.sentryDSN = env.SENTRY_DSN + this.measure = env.MEASURE + this.postgresClient = createPostgresClient(env) - this.supabaseTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging' - this.r2 = { - rooms: env.ROOMS, - versionCache: env.ROOMS_HISTORY_EPHEMERAL, - } + this.postgresTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging' + this.r2 = { + rooms: env.ROOMS, + versionCache: env.ROOMS_HISTORY_EPHEMERAL, + } - state.blockConcurrencyWhile(async () => { - const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null - if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) { - this._documentInfo = null - } else { - this._documentInfo = existingDocumentInfo - } - }) - } + state.blockConcurrencyWhile(async () => { + const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null + if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) { + this._documentInfo = null + } else { + this._documentInfo = existingDocumentInfo + } + }) + } - readonly router = Router() - .get( - `/${ROOM_PREFIX}/:roomId`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), - (req) => this.onRequest(req) - ) - .get( - `/${READ_ONLY_LEGACY_PREFIX}/:roomId`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY), - (req) => this.onRequest(req) - ) - .get( - `/${READ_ONLY_PREFIX}/:roomId`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY), - (req) => this.onRequest(req) - ) - .post( - `/${ROOM_PREFIX}/:roomId/restore`, - (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), - (req) => this.onRestore(req) - ) - .all('*', () => new Response('Not found', { status: 404 })) + readonly router = Router() + .get( + `/${ROOM_PREFIX}/:roomId`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), + (req) => this.onRequest(req) + ) + .get( + `/${READ_ONLY_LEGACY_PREFIX}/:roomId`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY_LEGACY), + (req) => this.onRequest(req) + ) + .get( + `/${READ_ONLY_PREFIX}/:roomId`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_ONLY), + (req) => this.onRequest(req) + ) + .post( + `/${ROOM_PREFIX}/:roomId/restore`, + (req) => this.extractDocumentInfoFromRequest(req, ROOM_OPEN_MODE.READ_WRITE), + (req) => this.onRestore(req) + ) + .all('*', () => new Response('Not found', { status: 404 })) - readonly scheduler = new AlarmScheduler({ - storage: () => this.storage, - alarms: { - persist: async () => { - this.persistToDatabase() - }, - }, - }) + readonly scheduler = new AlarmScheduler({ + storage: () => this.storage, + alarms: { + persist: async () => { + this.persistToDatabase() + }, + }, + }) - // eslint-disable-next-line no-restricted-syntax - get documentInfo() { - return assertExists(this._documentInfo, 'documentInfo must be present') - } - extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => { - const slug = assertExists( - await getSlug(this.env, req.params.roomId, roomOpenMode), - 'roomId must be present' - ) - if (this._documentInfo) { - assert(this._documentInfo.slug === slug, 'slug must match') - } else { - this._documentInfo = { - version: CURRENT_DOCUMENT_INFO_VERSION, - slug, - } - } - } + // eslint-disable-next-line no-restricted-syntax + get documentInfo() { + return assertExists(this._documentInfo, 'documentInfo must be present') + } + extractDocumentInfoFromRequest = async (req: IRequest, roomOpenMode: RoomOpenMode) => { + const slug = assertExists( + await getSlug(this.env, req.params.roomId, roomOpenMode), + 'roomId must be present' + ) + if (this._documentInfo) { + assert(this._documentInfo.slug === slug, 'slug must match') + } else { + this._documentInfo = { + version: CURRENT_DOCUMENT_INFO_VERSION, + slug, + } + } + } - // Handle a request to the Durable Object. - async fetch(req: IRequest) { - const sentry = createSentry(this.state, this.env, req) + // Handle a request to the Durable Object. + async fetch(req: IRequest) { + const sentry = createSentry(this.state, this.env, req) - try { - return await this.router.handle(req) - } catch (err) { - console.error(err) - // eslint-disable-next-line deprecation/deprecation - sentry?.captureException(err) - return new Response('Something went wrong', { - status: 500, - statusText: 'Internal Server Error', - }) - } - } + try { + return await this.router.handle(req) + } catch (err) { + console.error(err) + // eslint-disable-next-line deprecation/deprecation + sentry?.captureException(err) + return new Response('Something went wrong', { + status: 500, + statusText: 'Internal Server Error', + }) + } + } - _isRestoring = false - async onRestore(req: IRequest) { - this._isRestoring = true - try { - const roomId = this.documentInfo.slug - const roomKey = getR2KeyForRoom(roomId) - const timestamp = ((await req.json()) as any).timestamp - if (!timestamp) { - return new Response('Missing timestamp', { status: 400 }) - } - const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`) - if (!data) { - return new Response('Version not found', { status: 400 }) - } - const dataText = await data.text() - await this.r2.rooms.put(roomKey, dataText) - const room = await this.getRoom() + _isRestoring = false + async onRestore(req: IRequest) { + this._isRestoring = true + try { + const roomId = this.documentInfo.slug + const roomKey = getR2KeyForRoom(roomId) + const timestamp = ((await req.json()) as any).timestamp + if (!timestamp) { + return new Response('Missing timestamp', { status: 400 }) + } + const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`) + if (!data) { + return new Response('Version not found', { status: 400 }) + } + const dataText = await data.text() + await this.r2.rooms.put(roomKey, dataText) + const room = await this.getRoom() - const snapshot: RoomSnapshot = JSON.parse(dataText) - room.loadSnapshot(snapshot) + const snapshot: RoomSnapshot = JSON.parse(dataText) + room.loadSnapshot(snapshot) - return new Response() - } finally { - this._isRestoring = false - } - } + return new Response() + } finally { + this._isRestoring = false + } + } - async onRequest(req: IRequest) { - // extract query params from request, should include instanceId - const url = new URL(req.url) - const params = Object.fromEntries(url.searchParams.entries()) - let { sessionKey, storeId } = params + async onRequest(req: IRequest) { + // extract query params from request, should include instanceId + const url = new URL(req.url) + const params = Object.fromEntries(url.searchParams.entries()) + let { sessionKey, storeId } = params - // handle legacy param names - sessionKey ??= params.instanceId - storeId ??= params.localClientId - const isNewSession = !this._room + // handle legacy param names + sessionKey ??= params.instanceId + storeId ??= params.localClientId + const isNewSession = !this._room - // Create the websocket pair for the client - const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair() - serverWebSocket.accept() + // Create the websocket pair for the client + const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair() + serverWebSocket.accept() - try { - const room = await this.getRoom() - // Don't connect if we're already at max connections - if (room.getNumActiveSessions() >= MAX_CONNECTIONS) { - return new Response('Room is full', { status: 403 }) - } + try { + const room = await this.getRoom() + // Don't connect if we're already at max connections + if (room.getNumActiveSessions() >= MAX_CONNECTIONS) { + return new Response('Room is full', { status: 403 }) + } - // all good - room.handleSocketConnect(sessionKey, serverWebSocket, { storeId }) - if (isNewSession) { - this.logEvent({ - type: 'client', - roomId: this.documentInfo.slug, - name: 'room_reopen', - instanceId: sessionKey, - localClientId: storeId, - }) - } - this.logEvent({ - type: 'client', - roomId: this.documentInfo.slug, - name: 'enter', - instanceId: sessionKey, - localClientId: storeId, - }) - return new Response(null, { status: 101, webSocket: clientWebSocket }) - } catch (e) { - if (e === ROOM_NOT_FOUND) { - serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found') - return new Response(null, { status: 101, webSocket: clientWebSocket }) - } - throw e - } - } + // all good + room.handleSocketConnect(sessionKey, serverWebSocket, { storeId }) + if (isNewSession) { + this.logEvent({ + type: 'client', + roomId: this.documentInfo.slug, + name: 'room_reopen', + instanceId: sessionKey, + localClientId: storeId, + }) + } + this.logEvent({ + type: 'client', + roomId: this.documentInfo.slug, + name: 'enter', + instanceId: sessionKey, + localClientId: storeId, + }) + return new Response(null, { status: 101, webSocket: clientWebSocket }) + } catch (e) { + if (e === ROOM_NOT_FOUND) { + serverWebSocket.close(TLCloseEventCode.NOT_FOUND, 'Room not found') + return new Response(null, { status: 101, webSocket: clientWebSocket }) + } + throw e + } + } - triggerPersistSchedule = throttle(() => { - this.schedulePersist() - }, 2000) + triggerPersistSchedule = throttle(() => { + this.schedulePersist() + }, 2000) - private writeEvent( - name: string, - { blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] } - ) { - this.measure?.writeDataPoint({ - blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])], - doubles, - indexes, - }) - } + private writeEvent( + name: string, + { blobs, indexes, doubles }: { blobs?: string[]; indexes?: [string]; doubles?: number[] } + ) { + this.measure?.writeDataPoint({ + blobs: [name, this.env.WORKER_NAME ?? 'development-tldraw-multiplayer', ...(blobs ?? [])], + doubles, + indexes, + }) + } - logEvent(event: TLServerEvent) { - switch (event.type) { - case 'room': { - // we would add user/connection ids here if we could - this.writeEvent(event.name, { blobs: [event.roomId] }) - break - } - case 'client': { - // we would add user/connection ids here if we could - this.writeEvent(event.name, { - blobs: [event.roomId, 'unused', event.instanceId], - indexes: [event.localClientId], - }) - break - } - case 'send_message': { - this.writeEvent(event.type, { - blobs: [event.roomId, event.messageType], - doubles: [event.messageLength], - }) - break - } - default: { - exhaustiveSwitchError(event) - } - } - } + logEvent(event: TLServerEvent) { + switch (event.type) { + case 'room': { + // we would add user/connection ids here if we could + this.writeEvent(event.name, { blobs: [event.roomId] }) + break + } + case 'client': { + // we would add user/connection ids here if we could + this.writeEvent(event.name, { + blobs: [event.roomId, 'unused', event.instanceId], + indexes: [event.localClientId], + }) + break + } + case 'send_message': { + this.writeEvent(event.type, { + blobs: [event.roomId, event.messageType], + doubles: [event.messageLength], + }) + break + } + default: { + exhaustiveSwitchError(event) + } + } + } - // Load the room's drawing data. First we check the R2 bucket, then we fallback to supabase (legacy). - async loadFromDatabase(persistenceKey: string): Promise { - try { - const key = getR2KeyForRoom(persistenceKey) - // when loading, prefer to fetch documents from the bucket - const roomFromBucket = await this.r2.rooms.get(key) - if (roomFromBucket) { - return { type: 'room_found', snapshot: await roomFromBucket.json() } - } + // Load the room's drawing data. First we check the R2 bucket, then we fallback to Postgres (legacy). + async loadFromDatabase(persistenceKey: string): Promise { + try { + const key = getR2KeyForRoom(persistenceKey) + // when loading, prefer to fetch documents from the bucket + const roomFromBucket = await this.r2.rooms.get(key) + if (roomFromBucket) { + return { type: 'room_found', snapshot: await roomFromBucket.json() } + } - // if we don't have a room in the bucket, try to load from supabase - if (!this.supabaseClient) return { type: 'room_not_found' } - const { data, error } = await this.supabaseClient - .from(this.supabaseTable) - .select('*') - .eq('slug', persistenceKey) + // if we don't have a room in the bucket, try to load from Postgres + if (!this.postgresClient) return { type: 'room_not_found' } + await this.postgresClient.connect() + const result = await this.postgresClient.query('SELECT * FROM ' + this.postgresTable + ' WHERE slug = $1', [persistenceKey]) + await this.postgresClient.end() - if (error) { - this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) + if (result.rows.length === 0) { + return { type: 'room_not_found' } + } - console.error('failed to retrieve document', persistenceKey, error) - return { type: 'error', error: new Error(error.message) } - } - // if it didn't find a document, data will be an empty array - if (data.length === 0) { - return { type: 'room_not_found' } - } + const roomFromPostgres = result.rows[0] as PersistedRoomSnapshotForSupabase + return { type: 'room_found', snapshot: roomFromPostgres.drawing } + } catch (error) { + this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) - const roomFromSupabase = data[0] as PersistedRoomSnapshotForSupabase - return { type: 'room_found', snapshot: roomFromSupabase.drawing } - } catch (error) { - this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) + console.error('failed to fetch doc', persistenceKey, error) + return { type: 'error', error: error as Error } + } + } - console.error('failed to fetch doc', persistenceKey, error) - return { type: 'error', error: error as Error } - } - } + _lastPersistedClock: number | null = null + _persistQueue = createPersistQueue(async () => { + // check whether the worker was woken up to persist after having gone to sleep + if (!this._room) return + const slug = this.documentInfo.slug + const room = await this.getRoom() + const clock = room.getCurrentDocumentClock() + if (this._lastPersistedClock === clock) return + if (this._isRestoring) return - _lastPersistedClock: number | null = null - _persistQueue = createPersistQueue(async () => { - // check whether the worker was woken up to persist after having gone to sleep - if (!this._room) return - const slug = this.documentInfo.slug - const room = await this.getRoom() - const clock = room.getCurrentDocumentClock() - if (this._lastPersistedClock === clock) return - if (this._isRestoring) return + const snapshot = JSON.stringify(room.getCurrentSnapshot()) - const snapshot = JSON.stringify(room.getCurrentSnapshot()) + const key = getR2KeyForRoom(slug) + await Promise.all([ + this.r2.rooms.put(key, snapshot), + this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot), + ]) + this._lastPersistedClock = clock + // use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop + // just in case there's any possibility of setting up a neverending queue + }, PERSIST_INTERVAL_MS / 2) - const key = getR2KeyForRoom(slug) - await Promise.all([ - this.r2.rooms.put(key, snapshot), - this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot), - ]) - this._lastPersistedClock = clock - // use a shorter timeout for this 'inner' loop than the 'outer' alarm-scheduled loop - // just in case there's any possibility of setting up a neverending queue - }, PERSIST_INTERVAL_MS / 2) + // Save the room to Postgres + async persistToDatabase() { + await this._persistQueue() + } - // Save the room to supabase - async persistToDatabase() { - await this._persistQueue() - } - - async schedulePersist() { - await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, { - overwrite: 'if-sooner', - }) + async schedulePersist() { + await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, { + overwrite: 'if-sooner', + }) } // Will be called automatically when the alarm ticks. async alarm() { await this.scheduler.onAlarm() } -} +} \ No newline at end of file diff --git a/apps/dotcom-worker/src/routes/getRoomSnapshot.ts b/apps/dotcom-worker/src/routes/getRoomSnapshot.ts index f5c6a682a..f97d13966 100644 --- a/apps/dotcom-worker/src/routes/getRoomSnapshot.ts +++ b/apps/dotcom-worker/src/routes/getRoomSnapshot.ts @@ -3,58 +3,57 @@ import { notFound } from '@tldraw/worker-shared' import { IRequest } from 'itty-router' import { getR2KeyForSnapshot } from '../r2' import { Environment } from '../types' -import { createSupabaseClient, noSupabaseSorry } from '../utils/createSupabaseClient' -import { getSnapshotsTable } from '../utils/getSnapshotsTable' -import { R2Snapshot } from './createRoomSnapshot' +import { createPostgresClient, noPostgresSorry } from '../utils/createPostgresClient' function generateReponse(roomId: string, data: RoomSnapshot) { - return new Response( - JSON.stringify({ - roomId, - records: data.documents.map((d) => d.state), - schema: data.schema, - error: false, - }), - { - headers: { 'content-type': 'application/json' }, - } - ) + return new Response( + JSON.stringify({ + roomId, + records: data.documents.map((d) => d.state), + schema: data.schema, + error: false, + }), + { + headers: { 'content-type': 'application/json' }, + } + ) } // Returns a snapshot of the room at a given point in time export async function getRoomSnapshot(request: IRequest, env: Environment): Promise { - const roomId = request.params.roomId - if (!roomId) return notFound() + const roomId = request.params.roomId + if (!roomId) return notFound() - // Get the parent slug if it exists - const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId) + // Get the parent slug if it exists + const parentSlug = await env.SNAPSHOT_SLUG_TO_PARENT_SLUG.get(roomId) - // Get the room snapshot from R2 - const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId)) + // Get the room snapshot from R2 + const snapshot = await env.ROOM_SNAPSHOTS.get(getR2KeyForSnapshot(parentSlug, roomId)) - if (snapshot) { - const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot - if (data) { - return generateReponse(roomId, data) - } - } + if (snapshot) { + const data = ((await snapshot.json()) as R2Snapshot)?.drawing as RoomSnapshot + if (data) { + return generateReponse(roomId, data) + } + } - // If we can't find the snapshot in R2 then fallback to Supabase - // Create a supabase client - const supabase = createSupabaseClient(env) - if (!supabase) return noSupabaseSorry() + // If we can't find the snapshot in R2 then fallback to Postgres + // Create a Postgres client + const postgresClient = createPostgresClient(env) + if (!postgresClient) return noPostgresSorry() - // Get the snapshot from the table - const supabaseTable = getSnapshotsTable(env) - const result = await supabase - .from(supabaseTable) - .select('drawing') - .eq('slug', roomId) - .maybeSingle() - const data = result.data?.drawing as RoomSnapshot + try { + await postgresClient.connect() + const result = await postgresClient.query('SELECT drawing FROM snapshots WHERE slug = $1 LIMIT 1', [roomId]) + await postgresClient.end() - if (!data) return notFound() + if (result.rows.length === 0) return notFound() + const data = result.rows[0].drawing as RoomSnapshot - // Send back the snapshot! - return generateReponse(roomId, data) -} + // Send back the snapshot! + return generateReponse(roomId, data) + } catch (err) { + console.error('Error querying Postgres', err) + return new Response(JSON.stringify({ error: true, message: 'Error querying Postgres' }), { status: 500 }) + } +} \ No newline at end of file diff --git a/apps/dotcom-worker/src/utils/createPostgresClient.ts b/apps/dotcom-worker/src/utils/createPostgresClient.ts new file mode 100644 index 000000000..dc94eb3f9 --- /dev/null +++ b/apps/dotcom-worker/src/utils/createPostgresClient.ts @@ -0,0 +1,35 @@ +import { Client } from 'pg' +import { Environment } from '../types' + +export function createPostgresClient(env: Environment) { + if (env.POSTGRES_HOST && env.POSTGRES_USER && env.POSTGRES_PASSWORD && env.POSTGRES_DB) { + var client = new Client({ + host: env.POSTGRES_HOST, + port: env.POSTGRES_PORT ? parseInt(env.POSTGRES_PORT) : 5432, + user: env.POSTGRES_USER, + password: env.POSTGRES_PASSWORD, + database: env.POSTGRES_DB, + }) + + client.connect() + + client.query(` + CREATE TABLE IF NOT EXISTS snapshots ( + id SERIAL PRIMARY KEY, + slug VARCHAR(255) UNIQUE NOT NULL, + drawing JSONB NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ); + `) + + return client + + } else { + console.warn('No Postgres credentials, loading from Postgres disabled') + return null + } +} + +export function noPostgresSorry() { + return new Response(JSON.stringify({ error: true, message: 'Could not create Postgres client' })) +} \ No newline at end of file