/// /// import { SupabaseClient } from '@supabase/supabase-js' import { RoomSnapshot, TLServer, TLSyncRoom, type DBLoadResult, type PersistedRoomSnapshotForSupabase, type RoomState, } from '@tldraw/tlsync' import { assert, assertExists } from '@tldraw/utils' import { IRequest, Router } from 'itty-router' import Toucan from 'toucan-js' import { AlarmScheduler } from './AlarmScheduler' import { PERSIST_INTERVAL_MS } from './config' import { getR2KeyForRoom } from './r2' import { Analytics, Environment } from './types' import { createSupabaseClient } from './utils/createSupabaseClient' import { throttle } from './utils/throttle' const MAX_CONNECTIONS = 50 // increment this any time you make a change to this type const CURRENT_DOCUMENT_INFO_VERSION = 0 type DocumentInfo = { version: number slug: string } export class TLDrawDurableObject extends TLServer { // A unique identifier for this instance of the Durable Object id: DurableObjectId // For TLSyncRoom _roomState: RoomState | undefined // For storage storage: DurableObjectStorage // For persistence supabaseClient: SupabaseClient | void // For analytics measure: Analytics | undefined // For error tracking sentryDSN: string | undefined readonly supabaseTable: string readonly r2: { readonly rooms: R2Bucket readonly versionCache: R2Bucket } _documentInfo: DocumentInfo | null = null constructor( private controller: DurableObjectState, private env: Environment ) { super() this.id = controller.id this.storage = controller.storage this.sentryDSN = env.SENTRY_DSN this.measure = env.MEASURE this.supabaseClient = createSupabaseClient(env) this.supabaseTable = env.TLDRAW_ENV === 'production' ? 'drawings' : 'drawings_staging' this.r2 = { rooms: env.ROOMS, versionCache: env.ROOMS_HISTORY_EPHEMERAL, } controller.blockConcurrencyWhile(async () => { const existingDocumentInfo = (await this.storage.get('documentInfo')) as DocumentInfo | null if (existingDocumentInfo?.version !== CURRENT_DOCUMENT_INFO_VERSION) { this._documentInfo = null } else { this._documentInfo = existingDocumentInfo } }) } readonly router = Router() .get( '/r/:roomId', (req) => this.extractDocumentInfoFromRequest(req), (req) => this.onRequest(req) ) .post( '/r/:roomId/restore', (req) => this.extractDocumentInfoFromRequest(req), (req) => this.onRestore(req) ) .all('*', () => new Response('Not found', { status: 404 })) readonly scheduler = new AlarmScheduler({ storage: () => this.storage, alarms: { persist: async () => { const room = this.getRoomForPersistenceKey(this.documentInfo.slug) if (!room) return this.persistToDatabase(room.persistenceKey) }, }, }) // eslint-disable-next-line no-restricted-syntax get documentInfo() { return assertExists(this._documentInfo, 'documentInfo must be present') } extractDocumentInfoFromRequest = async (req: IRequest) => { const slug = assertExists(req.params.roomId, 'roomId must be present') if (this._documentInfo) { assert(this._documentInfo.slug === slug, 'slug must match') } else { this._documentInfo = { version: CURRENT_DOCUMENT_INFO_VERSION, slug, } } } // Handle a request to the Durable Object. async fetch(req: IRequest) { const sentry = new Toucan({ dsn: this.sentryDSN, request: req, allowedHeaders: ['user-agent'], allowedSearchParams: /(.*)/, }) try { return await this.router.handle(req).catch((err) => { console.error(err) sentry.captureException(err) return new Response('Something went wrong', { status: 500, statusText: 'Internal Server Error', }) }) } catch (err) { sentry.captureException(err) return new Response('Something went wrong', { status: 500, statusText: 'Internal Server Error', }) } } async onRestore(req: IRequest) { const roomId = this.documentInfo.slug const roomKey = getR2KeyForRoom(roomId) const timestamp = ((await req.json()) as any).timestamp if (!timestamp) { return new Response('Missing timestamp', { status: 400 }) } const data = await this.r2.versionCache.get(`${roomKey}/${timestamp}`) if (!data) { return new Response('Version not found', { status: 400 }) } const dataText = await data.text() await this.r2.rooms.put(roomKey, dataText) const roomState = this.getRoomForPersistenceKey(roomId) if (!roomState) { // nothing else to do because the room is not currently in use return new Response() } const snapshot: RoomSnapshot = JSON.parse(dataText) const oldRoom = roomState.room const oldIds = oldRoom.getSnapshot().documents.map((d) => d.state.id) const newIds = new Set(snapshot.documents.map((d) => d.state.id)) const removedIds = oldIds.filter((id) => !newIds.has(id)) const tombstones = { ...snapshot.tombstones } removedIds.forEach((id) => { tombstones[id] = oldRoom.clock + 1 }) newIds.forEach((id) => { delete tombstones[id] }) const newRoom = new TLSyncRoom(roomState.room.schema, { clock: oldRoom.clock + 1, documents: snapshot.documents.map((d) => ({ lastChangedClock: oldRoom.clock + 1, state: d.state, })), schema: snapshot.schema, tombstones, }) // replace room with new one and kick out all the clients this.setRoomState(this.documentInfo.slug, { ...roomState, room: newRoom }) oldRoom.close() return new Response() } async onRequest(req: IRequest) { // extract query params from request, should include instanceId const url = new URL(req.url) const params = Object.fromEntries(url.searchParams.entries()) let { sessionKey, storeId } = params // handle legacy param names sessionKey ??= params.instanceId storeId ??= params.localClientId // Don't connect if we're already at max connections const roomState = this.getRoomForPersistenceKey(this.documentInfo.slug) if (roomState !== undefined) { if (roomState.room.sessions.size >= MAX_CONNECTIONS) { return new Response('Room is full', { status: 403, }) } } // Create the websocket pair for the client const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair() // Handle the connection (see TLServer) try { // block concurrency while initializing the room if that needs to happen await this.controller.blockConcurrencyWhile(() => this.handleConnection({ socket: serverWebSocket as any, persistenceKey: this.documentInfo.slug!, sessionKey, storeId, }) ) } catch (e: any) { console.error(e) return new Response(e.message, { status: 500 }) } // Accept the websocket connection serverWebSocket.accept() serverWebSocket.addEventListener( 'message', throttle(() => { this.schedulePersist() }, 2000) ) serverWebSocket.addEventListener('close', () => { this.schedulePersist() }) return new Response(null, { status: 101, webSocket: clientWebSocket }) } logEvent( event: | { type: 'client' roomId: string name: string clientId: string instanceId: string localClientId: string } | { type: 'room' roomId: string name: string } ) { switch (event.type) { case 'room': { this.measure?.writeDataPoint({ blobs: [event.name, event.roomId], // we would add user/connection ids here if we could }) break } case 'client': { this.measure?.writeDataPoint({ blobs: [event.name, event.roomId, event.clientId, event.instanceId], // we would add user/connection ids here if we could indexes: [event.localClientId], }) break } } } getRoomForPersistenceKey(_persistenceKey: string): RoomState | undefined { return this._roomState // only one room per worker } setRoomState(_persistenceKey: string, roomState: RoomState): void { this.deleteRoomState() this._roomState = roomState } deleteRoomState(): void { this._roomState = undefined } // Load the room's drawing data from supabase override async loadFromDatabase(persistenceKey: string): Promise { try { const key = getR2KeyForRoom(persistenceKey) // when loading, prefer to fetch documents from the bucket const roomFromBucket = await this.r2.rooms.get(key) if (roomFromBucket) { return { type: 'room_found', snapshot: await roomFromBucket.json() } } // if we don't have a room in the bucket, try to load from supabase if (!this.supabaseClient) return { type: 'room_not_found' } const { data, error } = await this.supabaseClient .from(this.supabaseTable) .select('*') .eq('slug', persistenceKey) if (error) { this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) console.error('failed to retrieve document', persistenceKey, error) return { type: 'error', error: new Error(error.message) } } // if it didn't find a document, data will be an empty array if (data.length === 0) { return { type: 'room_not_found' } } const roomFromSupabase = data[0] as PersistedRoomSnapshotForSupabase return { type: 'room_found', snapshot: roomFromSupabase.drawing } } catch (error) { this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_load_from_db' }) console.error('failed to fetch doc', persistenceKey, error) return { type: 'error', error: error as Error } } } _isPersisting = false _lastPersistedClock: number | null = null // Save the room to supabase async persistToDatabase(persistenceKey: string) { if (this._isPersisting) { setTimeout(() => { this.schedulePersist() }, 5000) return } try { this._isPersisting = true const roomState = this.getRoomForPersistenceKey(persistenceKey) if (!roomState) { // room was closed return } const { room } = roomState const { clock } = room if (this._lastPersistedClock === clock) return try { const snapshot = JSON.stringify(room.getSnapshot()) const key = getR2KeyForRoom(persistenceKey) await Promise.all([ this.r2.rooms.put(key, snapshot), this.r2.versionCache.put(key + `/` + new Date().toISOString(), snapshot), ]) this._lastPersistedClock = clock } catch (error) { this.logEvent({ type: 'room', roomId: persistenceKey, name: 'failed_persist_to_db' }) console.error('failed to persist document', persistenceKey, error) throw error } } finally { this._isPersisting = false } } async schedulePersist() { await this.scheduler.scheduleAlarmAfter('persist', PERSIST_INTERVAL_MS, { overwrite: 'if-sooner', }) } // Will be called automatically when the alarm ticks. async alarm() { await this.scheduler.onAlarm() } }