[bemo] add analytics to bemo worker (#4146)

Sets up worker analytics. No further grafana setup needed, just need to
start querying once this is live.

### Change type

- [ ] `bugfix`
- [ ] `improvement`
- [x] `feature`
- [ ] `api`
- [ ] `other`

### Test plan

1. Create a shape...
2.

- [ ] Unit tests
- [ ] End to end tests

### Release notes

- Fixed a bug with…
This commit is contained in:
David Sheldrick 2024-07-11 17:00:39 +01:00 committed by GitHub
parent 70a26862c4
commit 9229d2e3c7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 91 additions and 3 deletions

View file

@ -1,3 +1,4 @@
import { AnalyticsEngineDataset } from '@cloudflare/workers-types'
import { RoomSnapshot, TLCloseEventCode, TLSocketRoom } from '@tldraw/sync-core'
import { TLRecord } from '@tldraw/tlschema'
import { throttle } from '@tldraw/utils'
@ -13,16 +14,32 @@ const connectRequestQuery = T.object({
storeId: T.string.optional(),
})
interface AnalyticsEvent {
type: 'connect' | 'send_message' | 'receive_message'
origin: string
sessionKey: string
slug: string
}
export class BemoDO extends DurableObject<Environment> {
r2: R2Bucket
_slug: string | null = null
analytics: AnalyticsEngineDataset
writeEvent({ type, origin, sessionKey, slug }: AnalyticsEvent) {
this.analytics.writeDataPoint({
blobs: [type, origin, slug, sessionKey],
})
}
constructor(
public state: DurableObjectState,
env: Environment
) {
super(state, env)
this.r2 = env.BEMO_BUCKET
this.analytics = env.BEMO_ANALYTICS
state.blockConcurrencyWhile(async () => {
this._slug = ((await this.state.storage.get('slug')) ?? null) as string | null
@ -68,6 +85,8 @@ export class BemoDO extends DurableObject<Environment> {
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair()
serverWebSocket.accept()
const origin = req.headers.get('origin') ?? 'unknown'
try {
const room = await this.getRoom()
// Don't connect if we're already at max connections
@ -81,7 +100,13 @@ export class BemoDO extends DurableObject<Environment> {
}
// all good
room.handleSocketConnect(sessionKey, serverWebSocket)
room.handleSocketConnect(sessionKey, serverWebSocket, { origin })
this.writeEvent({
type: 'connect',
origin,
sessionKey,
slug: this.getSlug(),
})
return new Response(null, { status: 101, webSocket: clientWebSocket })
} catch (e) {
if (e === ROOM_NOT_FOUND) {
@ -93,7 +118,7 @@ export class BemoDO extends DurableObject<Environment> {
}
// For TLSyncRoom
_room: Promise<TLSocketRoom<TLRecord, void>> | null = null
_room: Promise<TLSocketRoom<TLRecord, { origin: string }>> | null = null
getSlug() {
if (!this._slug) {
@ -106,9 +131,25 @@ export class BemoDO extends DurableObject<Environment> {
const slug = this.getSlug()
if (!this._room) {
this._room = this.loadFromDatabase(slug).then((result) => {
return new TLSocketRoom<TLRecord, void>({
return new TLSocketRoom<TLRecord, { origin: string }>({
schema: makePermissiveSchema(),
initialSnapshot: result.type === 'room_found' ? result.snapshot : undefined,
onAfterReceiveMessage: ({ sessionId, meta }) => {
this.writeEvent({
type: 'receive_message',
origin: meta.origin,
sessionKey: sessionId,
slug,
})
},
onBeforeSendMessage: ({ sessionId, meta }) => {
this.writeEvent({
type: 'send_message',
origin: meta.origin,
sessionKey: sessionId,
slug,
})
},
onSessionRemoved: async (room, args) => {
if (args.numSessionsRemaining > 0) return
if (!this._room) return

View file

@ -1,3 +1,4 @@
import { AnalyticsEngineDataset } from '@cloudflare/workers-types'
import { BemoDO } from './BemoDO'
export interface Environment {
@ -11,4 +12,6 @@ export interface Environment {
SENTRY_DSN: string | undefined
IS_LOCAL: string | undefined
WORKER_NAME: string | undefined
BEMO_ANALYTICS: AnalyticsEngineDataset
}

View file

@ -84,3 +84,26 @@ binding = "CF_VERSION_METADATA"
[env.production.version_metadata]
binding = "CF_VERSION_METADATA"
#################### Analytics engine ####################
# analytics engine has the same configuration in all environments:
[[analytics_engine_datasets]]
binding = "BEMO_ANALYTICS"
dataset = "BEMO_ANALYTICS"
[[env.dev.analytics_engine_datasets]]
binding = "BEMO_ANALYTICS"
dataset = "BEMO_ANALYTICS_DEV"
[[env.preview.analytics_engine_datasets]]
binding = "BEMO_ANALYTICS"
dataset = "BEMO_ANALYTICS_PREVIEW"
[[env.staging.analytics_engine_datasets]]
binding = "BEMO_ANALYTICS"
dataset = "BEMO_ANALYTICS_STAGING"
[[env.production.analytics_engine_datasets]]
binding = "BEMO_ANALYTICS"
dataset = "BEMO_ANALYTICS"

View file

@ -36,6 +36,13 @@ export class TLSocketRoom<R extends UnknownRecord, SessionMeta> {
sessionId: string
message: TLSocketServerSentEvent<R>
stringified: string
meta: SessionMeta
}) => void
onAfterReceiveMessage?: (args: {
sessionId: string
message: TLSocketServerSentEvent<R>
stringified: string
meta: SessionMeta
}) => void
onDataChange?: () => void
}
@ -90,6 +97,7 @@ export class TLSocketRoom<R extends UnknownRecord, SessionMeta> {
sessionId,
message,
stringified,
meta: this.room.sessions.get(sessionId)?.meta as SessionMeta,
})
: undefined,
}),
@ -114,6 +122,19 @@ export class TLSocketRoom<R extends UnknownRecord, SessionMeta> {
typeof message === 'string' ? message : new TextDecoder().decode(message)
const res = assembler.handleMessage(messageString)
if (res?.data) {
// need to do this first in case the session gets removed as a result of handling the message
if (this.opts.onAfterReceiveMessage) {
const session = this.room.sessions.get(sessionId)
if (session) {
this.opts.onAfterReceiveMessage({
sessionId,
message: res.data as any,
stringified: messageString,
meta: session.meta,
})
}
}
this.room.handleMessage(sessionId, res.data as any)
}
if (res?.error) {