diff --git a/apps/bemo-worker/src/BemoDO.ts b/apps/bemo-worker/src/BemoDO.ts index 7aa3bfc15..e29c9e2cb 100644 --- a/apps/bemo-worker/src/BemoDO.ts +++ b/apps/bemo-worker/src/BemoDO.ts @@ -1,3 +1,4 @@ +import { AnalyticsEngineDataset } from '@cloudflare/workers-types' import { RoomSnapshot, TLCloseEventCode, TLSocketRoom } from '@tldraw/sync-core' import { TLRecord } from '@tldraw/tlschema' import { throttle } from '@tldraw/utils' @@ -13,16 +14,32 @@ const connectRequestQuery = T.object({ storeId: T.string.optional(), }) +interface AnalyticsEvent { + type: 'connect' | 'send_message' | 'receive_message' + origin: string + sessionKey: string + slug: string +} + export class BemoDO extends DurableObject { r2: R2Bucket _slug: string | null = null + analytics: AnalyticsEngineDataset + + writeEvent({ type, origin, sessionKey, slug }: AnalyticsEvent) { + this.analytics.writeDataPoint({ + blobs: [type, origin, slug, sessionKey], + }) + } + constructor( public state: DurableObjectState, env: Environment ) { super(state, env) this.r2 = env.BEMO_BUCKET + this.analytics = env.BEMO_ANALYTICS state.blockConcurrencyWhile(async () => { this._slug = ((await this.state.storage.get('slug')) ?? null) as string | null @@ -68,6 +85,8 @@ export class BemoDO extends DurableObject { const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair() serverWebSocket.accept() + const origin = req.headers.get('origin') ?? 'unknown' + try { const room = await this.getRoom() // Don't connect if we're already at max connections @@ -81,7 +100,13 @@ export class BemoDO extends DurableObject { } // all good - room.handleSocketConnect(sessionKey, serverWebSocket) + room.handleSocketConnect(sessionKey, serverWebSocket, { origin }) + this.writeEvent({ + type: 'connect', + origin, + sessionKey, + slug: this.getSlug(), + }) return new Response(null, { status: 101, webSocket: clientWebSocket }) } catch (e) { if (e === ROOM_NOT_FOUND) { @@ -93,7 +118,7 @@ export class BemoDO extends DurableObject { } // For TLSyncRoom - _room: Promise> | null = null + _room: Promise> | null = null getSlug() { if (!this._slug) { @@ -106,9 +131,25 @@ export class BemoDO extends DurableObject { const slug = this.getSlug() if (!this._room) { this._room = this.loadFromDatabase(slug).then((result) => { - return new TLSocketRoom({ + return new TLSocketRoom({ schema: makePermissiveSchema(), initialSnapshot: result.type === 'room_found' ? result.snapshot : undefined, + onAfterReceiveMessage: ({ sessionId, meta }) => { + this.writeEvent({ + type: 'receive_message', + origin: meta.origin, + sessionKey: sessionId, + slug, + }) + }, + onBeforeSendMessage: ({ sessionId, meta }) => { + this.writeEvent({ + type: 'send_message', + origin: meta.origin, + sessionKey: sessionId, + slug, + }) + }, onSessionRemoved: async (room, args) => { if (args.numSessionsRemaining > 0) return if (!this._room) return diff --git a/apps/bemo-worker/src/types.ts b/apps/bemo-worker/src/types.ts index f6650d3ee..f382d7d0c 100644 --- a/apps/bemo-worker/src/types.ts +++ b/apps/bemo-worker/src/types.ts @@ -1,3 +1,4 @@ +import { AnalyticsEngineDataset } from '@cloudflare/workers-types' import { BemoDO } from './BemoDO' export interface Environment { @@ -11,4 +12,6 @@ export interface Environment { SENTRY_DSN: string | undefined IS_LOCAL: string | undefined WORKER_NAME: string | undefined + + BEMO_ANALYTICS: AnalyticsEngineDataset } diff --git a/apps/bemo-worker/wrangler.toml b/apps/bemo-worker/wrangler.toml index b7ab477dd..d9beb7008 100644 --- a/apps/bemo-worker/wrangler.toml +++ b/apps/bemo-worker/wrangler.toml @@ -84,3 +84,26 @@ binding = "CF_VERSION_METADATA" [env.production.version_metadata] binding = "CF_VERSION_METADATA" + + +#################### Analytics engine #################### +# analytics engine has the same configuration in all environments: +[[analytics_engine_datasets]] +binding = "BEMO_ANALYTICS" +dataset = "BEMO_ANALYTICS" + +[[env.dev.analytics_engine_datasets]] +binding = "BEMO_ANALYTICS" +dataset = "BEMO_ANALYTICS_DEV" + +[[env.preview.analytics_engine_datasets]] +binding = "BEMO_ANALYTICS" +dataset = "BEMO_ANALYTICS_PREVIEW" + +[[env.staging.analytics_engine_datasets]] +binding = "BEMO_ANALYTICS" +dataset = "BEMO_ANALYTICS_STAGING" + +[[env.production.analytics_engine_datasets]] +binding = "BEMO_ANALYTICS" +dataset = "BEMO_ANALYTICS" \ No newline at end of file diff --git a/packages/sync-core/src/lib/TLSocketRoom.ts b/packages/sync-core/src/lib/TLSocketRoom.ts index 86c64f074..4b8a0d509 100644 --- a/packages/sync-core/src/lib/TLSocketRoom.ts +++ b/packages/sync-core/src/lib/TLSocketRoom.ts @@ -36,6 +36,13 @@ export class TLSocketRoom { sessionId: string message: TLSocketServerSentEvent stringified: string + meta: SessionMeta + }) => void + onAfterReceiveMessage?: (args: { + sessionId: string + message: TLSocketServerSentEvent + stringified: string + meta: SessionMeta }) => void onDataChange?: () => void } @@ -90,6 +97,7 @@ export class TLSocketRoom { sessionId, message, stringified, + meta: this.room.sessions.get(sessionId)?.meta as SessionMeta, }) : undefined, }), @@ -114,6 +122,19 @@ export class TLSocketRoom { typeof message === 'string' ? message : new TextDecoder().decode(message) const res = assembler.handleMessage(messageString) if (res?.data) { + // need to do this first in case the session gets removed as a result of handling the message + if (this.opts.onAfterReceiveMessage) { + const session = this.room.sessions.get(sessionId) + if (session) { + this.opts.onAfterReceiveMessage({ + sessionId, + message: res.data as any, + stringified: messageString, + meta: session.meta, + }) + } + } + this.room.handleMessage(sessionId, res.data as any) } if (res?.error) {