Skip to content

Commit

Permalink
Move to kysely (tldraw#5140)
Browse files Browse the repository at this point in the history
Switches from using postgres js to kysely. We still use postgres js for
subscription to the db.

During stress tests we noticed that postgres js was not good at managing
the connection pool. Kysely performed much better.

### Change type

- [ ] `bugfix`
- [x] `improvement`
- [ ] `feature`
- [ ] `api`
- [ ] `other`
  • Loading branch information
MitjaBezensek authored Jan 8, 2025
1 parent f5aef12 commit ae594c6
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 93 deletions.
5 changes: 4 additions & 1 deletion apps/dotcom/sync-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"test-ci": "lazy inherit",
"test": "yarn run -T jest",
"test-coverage": "lazy inherit",
"check-bundle-size": "yarn run -T tsx ../../../internal/scripts/check-worker-bundle.ts --entry src/worker.ts --size-limit-bytes 500000",
"check-bundle-size": "yarn run -T tsx ../../../internal/scripts/check-worker-bundle.ts --entry src/worker.ts --size-limit-bytes 800000",
"reset-db": "./reset-db.sh",
"lint": "yarn run -T tsx ../../../internal/scripts/lint.ts"
},
Expand All @@ -32,12 +32,15 @@
"@tldraw/worker-shared": "workspace:*",
"itty-router": "^5.0.17",
"jose": "^5.9.6",
"kysely": "^0.27.5",
"pg": "^8.13.1",
"postgres": "patch:postgres@npm%3A3.4.5#~/.yarn/patches/postgres-npm-3.4.5-8a680ccbcd.patch",
"react": "^18.2.0",
"react-dom": "^18.2.0"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20241022.0",
"@types/pg": "^8.11.10",
"esbuild": "^0.21.5",
"lazyrepo": "0.0.0-alpha.27",
"typescript": "~5.4.2",
Expand Down
25 changes: 16 additions & 9 deletions apps/dotcom/sync-worker/src/TLDrawDurableObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { SupabaseClient } from '@supabase/supabase-js'
import {
DB,
READ_ONLY_LEGACY_PREFIX,
READ_ONLY_PREFIX,
ROOM_OPEN_MODE,
Expand All @@ -24,9 +25,10 @@ import { ExecutionQueue, assert, assertExists, exhaustiveSwitchError } from '@tl
import { createSentry } from '@tldraw/worker-shared'
import { DurableObject } from 'cloudflare:workers'
import { IRequest, Router } from 'itty-router'
import { Kysely } from 'kysely'
import { AlarmScheduler } from './AlarmScheduler'
import { PERSIST_INTERVAL_MS } from './config'
import { getPostgres } from './getPostgres'
import { createPostgresConnectionPool } from './postgres'
import { getR2KeyForRoom } from './r2'
import { Analytics, DBLoadResult, Environment, TLServerEvent } from './types'
import { EventData, writeDataPoint } from './utils/analytics'
Expand Down Expand Up @@ -156,6 +158,8 @@ export class TLDrawDurableObject extends DurableObject {

_documentInfo: DocumentInfo | null = null

db: Kysely<DB>

constructor(
private state: DurableObjectState,
override env: Environment
Expand All @@ -181,6 +185,7 @@ export class TLDrawDurableObject extends DurableObject {
this._documentInfo = existingDocumentInfo
}
})
this.db = createPostgresConnectionPool(env, 'TLDrawDurableObject')
}

readonly router = Router()
Expand Down Expand Up @@ -305,11 +310,11 @@ export class TLDrawDurableObject extends DurableObject {
return this._fileRecordCache
}
try {
const postgres = getPostgres(this.env, { pooled: true, name: 'TLDrawDurableObject' })
const fileRecord =
await postgres`SELECT * FROM public.file WHERE ID = ${this.documentInfo.slug}`
this._fileRecordCache = fileRecord[0] as TlaFile
postgres.end()
this._fileRecordCache = await this.db
.selectFrom('file')
.where('id', '=', this.documentInfo.slug)
.selectAll()
.executeTakeFirstOrThrow()
return this._fileRecordCache
} catch (_e) {
return null
Expand Down Expand Up @@ -588,9 +593,11 @@ export class TLDrawDurableObject extends DurableObject {

// Update the updatedAt timestamp in the database
if (this.documentInfo.isApp) {
const pg = getPostgres(this.env, { pooled: true, name: 'TLDrawDurableObject' })
await pg`UPDATE public.file SET "updatedAt" = ${new Date().getTime()} WHERE id = ${this.documentInfo.slug}`
await pg.end()
await this.db
.updateTable('file')
.set({ updatedAt: new Date().getTime() })
.where('id', '=', this.documentInfo.slug)
.execute()
}
})
} catch (e) {
Expand Down
4 changes: 2 additions & 2 deletions apps/dotcom/sync-worker/src/TLPostgresReplicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import postgres from 'postgres'
import type { EventHint } from 'toucan-js/node_modules/@sentry/types'
import type { TLDrawDurableObject } from './TLDrawDurableObject'
import { ZReplicationEventWithoutSequenceInfo } from './UserDataSyncer'
import { getPostgres } from './getPostgres'
import { createPostgresConnection } from './postgres'
import { Analytics, Environment, TLPostgresReplicatorEvent } from './types'
import { EventData, writeDataPoint } from './utils/analytics'
import { getUserDurableObject } from './utils/durableObjects'
Expand Down Expand Up @@ -228,7 +228,7 @@ export class TLPostgresReplicator extends DurableObject<Environment> {
// preserve the promise so any awaiters do eventually get resolved
// TODO: set a timeout on the promise?
promise,
db: getPostgres(this.env, { pooled: false, name: 'TLPostgresReplicator' }),
db: createPostgresConnection(this.env, { name: 'TLPostgresReplicator' }),
sequenceId: uniqueId(),
}
const subscription = await this.state.db.subscribe(
Expand Down
70 changes: 51 additions & 19 deletions apps/dotcom/sync-worker/src/TLUserDurableObject.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
DB,
isColumnMutable,
ROOM_PREFIX,
TlaFile,
Expand All @@ -15,9 +16,9 @@ import { assert } from '@tldraw/utils'
import { createSentry } from '@tldraw/worker-shared'
import { DurableObject } from 'cloudflare:workers'
import { IRequest, Router } from 'itty-router'
import postgres from 'postgres'
import { Kysely, sql } from 'kysely'
import type { EventHint } from 'toucan-js/node_modules/@sentry/types'
import { getPostgres } from './getPostgres'
import { createPostgresConnectionPool } from './postgres'
import { getR2KeyForRoom } from './r2'
import { type TLPostgresReplicator } from './TLPostgresReplicator'
import { Analytics, Environment, TLUserDurableObjectEvent } from './types'
Expand All @@ -29,7 +30,7 @@ import { retryOnConnectionFailure } from './utils/retryOnConnectionFailure'
import { getCurrentSerializedRoomSnapshot } from './utils/tla/getCurrentSerializedRoomSnapshot'

export class TLUserDurableObject extends DurableObject<Environment> {
private readonly db: ReturnType<typeof postgres>
private readonly db: Kysely<DB>
private readonly replicator: TLPostgresReplicator
private measure: Analytics | undefined

Expand All @@ -50,7 +51,7 @@ export class TLUserDurableObject extends DurableObject<Environment> {
this.sentry = createSentry(ctx, env)
this.replicator = getReplicator(env)

this.db = getPostgres(env, { pooled: true, name: 'TLUserDurableObject' })
this.db = createPostgresConnectionPool(env, 'TLUserDurableObject')
this.debug('created')
this.measure = env.MEASURE
}
Expand Down Expand Up @@ -286,22 +287,32 @@ export class TLUserDurableObject extends DurableObject<Environment> {

private async _doMutate(msg: ZClientSentMessage) {
this.assertCache()
await this.db.begin(async (sql) => {
await this.db.transaction().execute(async (tx) => {
for (const update of msg.updates) {
await this.assertValidMutation(update)
switch (update.event) {
case 'insert': {
if (update.table === 'file_state') {
const { fileId: _fileId, userId: _userId, ...rest } = update.row as any
if (Object.keys(rest).length === 0) {
await sql`insert into ${sql('public.' + update.table)} ${sql(update.row)} ON CONFLICT ("fileId", "userId") DO NOTHING`
} else {
await sql`insert into ${sql('public.' + update.table)} ${sql(update.row)} ON CONFLICT ("fileId", "userId") DO UPDATE SET ${sql(rest)}`
}
await tx
.insertInto(update.table)
.values(update.row as TlaFileState)
.onConflict((oc) => {
if (Object.keys(rest).length === 0) {
return oc.columns(['fileId', 'userId']).doNothing()
} else {
return oc.columns(['fileId', 'userId']).doUpdateSet(rest)
}
})
.execute()
break
} else {
const { id: _id, ...rest } = update.row as any
await sql`insert into ${sql('public.' + update.table)} ${sql(update.row)} ON CONFLICT ("id") DO UPDATE SET ${sql(rest)}`
await tx
.insertInto(update.table)
.values(update.row as any)
.onConflict((oc) => oc.column('id').doUpdateSet(rest))
.execute()
break
}
}
Expand All @@ -315,11 +326,16 @@ export class TLUserDurableObject extends DurableObject<Environment> {
)
if (update.table === 'file_state') {
const { fileId, userId } = update.row as any
await sql`update public.file_state set ${sql(updates)} where "fileId" = ${fileId} and "userId" = ${userId}`
await tx
.updateTable('file_state')
.set(updates)
.where('fileId', '=', fileId)
.where('userId', '=', userId)
.execute()
} else {
const { id, ...rest } = update.row as any

await sql`update ${sql('public.' + update.table)} set ${sql(updates)} where id = ${id}`
await tx.updateTable(update.table).set(updates).where('id', '=', id).execute()
if (update.table === 'file') {
const currentFile = this.cache.store.getFullData()?.files.find((f) => f.id === id)
if (currentFile && currentFile.published !== rest.published) {
Expand All @@ -342,10 +358,14 @@ export class TLUserDurableObject extends DurableObject<Environment> {
case 'delete':
if (update.table === 'file_state') {
const { fileId, userId } = update.row as any
await sql`delete from public.file_state where "fileId" = ${fileId} and "userId" = ${userId}`
await tx
.deleteFrom('file_state')
.where('fileId', '=', fileId)
.where('userId', '=', userId)
.execute()
} else {
const { id } = update.row as any
await sql`delete from ${sql('public.' + update.table)} where id = ${id}`
await tx.deleteFrom(update.table).where('id', '=', id).execute()
}
if (update.table === 'file') {
const { id } = update.row as TlaFile
Expand All @@ -372,11 +392,23 @@ export class TLUserDurableObject extends DurableObject<Environment> {
// TODO: We should probably handle a case where the above operation succeeds but the one below fails
this.debug('mutation success', this.userId)
await this.db
.begin(async (sql) => {
const result =
await sql`insert into public.user_mutation_number ("userId", "mutationNumber") values (${this.userId}, 1) on conflict ("userId") do update set "mutationNumber" = user_mutation_number."mutationNumber" + 1 returning "mutationNumber"`
.transaction()
.execute(async (tx) => {
const result = await tx
.insertInto('user_mutation_number')
.values({
userId: this.userId!,
mutationNumber: 1,
})
.onConflict((oc) =>
oc.column('userId').doUpdateSet({
mutationNumber: sql`user_mutation_number."mutationNumber" + 1`,
})
)
.returning('mutationNumber')
.executeTakeFirstOrThrow()
this.debug('mutation number success', this.userId)
const mutationNumber = Number(result[0].mutationNumber)
const mutationNumber = Number(result.mutationNumber)
const currentMutationNumber = this.cache.mutations.at(-1)?.mutationNumber ?? 0
assert(
mutationNumber > currentMutationNumber,
Expand Down
52 changes: 24 additions & 28 deletions apps/dotcom/sync-worker/src/UserDataSyncer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
DB,
OptimisticAppStore,
TlaRow,
ZEvent,
Expand All @@ -8,7 +9,7 @@ import {
} from '@tldraw/dotcom-shared'
import { ExecutionQueue, assert, promiseWithResolve, sleep, uniqueId } from '@tldraw/utils'
import { createSentry } from '@tldraw/worker-shared'
import postgres from 'postgres'
import { Kysely } from 'kysely'
import type { EventHint } from 'toucan-js/node_modules/@sentry/types'
import { TLPostgresReplicator } from './TLPostgresReplicator'
import {
Expand Down Expand Up @@ -116,7 +117,7 @@ export class UserDataSyncer {
constructor(
ctx: DurableObjectState,
env: Environment,
private db: postgres.Sql,
private db: Kysely<DB>,
private userId: string,
private broadcast: (message: ZServerSentMessage) => void,
private logEvent: (event: TLUserDurableObjectEvent) => void
Expand Down Expand Up @@ -231,7 +232,7 @@ export class UserDataSyncer {

const promise = this.state.promise

const sql = getFetchUserDataSql(this.userId, bootId)
const userSql = getFetchUserDataSql(this.userId, bootId)
const initialData: ZStoreData = {
user: null as any,
files: [],
Expand All @@ -246,31 +247,26 @@ export class UserDataSyncer {
initialData.files = []
initialData.fileStates = []

await this.db.begin(async (db) => {
return db
.unsafe(sql, [], { prepare: false })
.simple()
.forEach((row: any) => {
assert(this.state.type === 'connecting', 'state should be connecting in boot')
switch (row.table) {
case 'user':
initialData.user = parseResultRow(userKeys, row)
break
case 'file':
initialData.files.push(parseResultRow(fileKeys, row))
break
case 'file_state':
initialData.fileStates.push(parseResultRow(fileStateKeys, row))
break
case 'user_mutation_number':
assert(
typeof row.mutationNumber === 'number',
'mutationNumber should be a number'
)
this.state.mutationNumber = row.mutationNumber
break
}
})
await this.db.transaction().execute(async (tx) => {
const result = await userSql.execute(tx)
return result.rows.forEach((row: any) => {
assert(this.state.type === 'connecting', 'state should be connecting in boot')
switch (row.table) {
case 'user':
initialData.user = parseResultRow(userKeys, row)
break
case 'file':
initialData.files.push(parseResultRow(fileKeys, row))
break
case 'file_state':
initialData.fileStates.push(parseResultRow(fileStateKeys, row))
break
case 'user_mutation_number':
assert(typeof row.mutationNumber === 'number', 'mutationNumber should be a number')
this.state.mutationNumber = row.mutationNumber
break
}
})
})
},
() => {
Expand Down
17 changes: 11 additions & 6 deletions apps/dotcom/sync-worker/src/getFetchEverythingSql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ZColumn, tlaFileSchema, tlaFileStateSchema, tlaUserSchema } from '@tldraw/dotcom-shared'
import { sql } from 'kysely'
interface ColumnStuff {
name: string
type: 'string' | 'number' | 'boolean'
Expand Down Expand Up @@ -41,15 +42,19 @@ const fileColumns = fileKeys.map((c) => `${c.reference} as "${c.alias}"`)
const fileStateColumns = fileStateKeys.map((c) => `${c.reference} as "${c.alias}"`)

export function getFetchUserDataSql(userId: string, bootId: string) {
return `
INSERT INTO public.user_boot_id ("userId", "bootId") VALUES ('${userId}', '${bootId}') ON CONFLICT ("userId") DO UPDATE SET "bootId" = '${bootId}';
SELECT 'user' AS "table", null::bigint as "mutationNumber", ${userColumns.concat(fileNulls).concat(fileStateNulls)} FROM public.user WHERE "id" = '${userId}'
return sql`
WITH upsert AS (
INSERT INTO public.user_boot_id ("userId", "bootId")
VALUES (${userId}, ${bootId})
ON CONFLICT ("userId") DO UPDATE SET "bootId" = ${bootId}
)
SELECT 'user' AS "table", null::bigint as "mutationNumber", ${sql.raw(userColumns + ',' + fileNulls + ',' + fileStateNulls)} FROM public.user WHERE "id" = '${sql.raw(userId)}'
UNION
SELECT 'file' AS "table", null::bigint as "mutationNumber", ${userNulls.concat(fileColumns).concat(fileStateNulls)} FROM public.file WHERE "ownerId" = '${userId}' OR "shared" = true AND EXISTS(SELECT 1 FROM public.file_state WHERE "userId" = '${userId}' AND public.file_state."fileId" = public.file.id)
SELECT 'file' AS "table", null::bigint as "mutationNumber", ${sql.raw(userNulls + ',' + fileColumns + ',' + fileStateNulls)} FROM public.file WHERE "ownerId" = '${sql.raw(userId)}' OR "shared" = true AND EXISTS(SELECT 1 FROM public.file_state WHERE "userId" = '${sql.raw(userId)}' AND public.file_state."fileId" = public.file.id)
UNION
SELECT 'file_state' AS "table", null::bigint as "mutationNumber", ${userNulls.concat(fileNulls).concat(fileStateColumns)} FROM public.file_state WHERE "userId" = '${userId}'
SELECT 'file_state' AS "table", null::bigint as "mutationNumber", ${sql.raw(userNulls + ',' + fileNulls + ',' + fileStateColumns)} FROM public.file_state WHERE "userId" = '${sql.raw(userId)}'
UNION
SELECT 'user_mutation_number' as "table", "mutationNumber"::bigint, ${userNulls.concat(fileNulls).concat(fileStateNulls)} FROM public.user_mutation_number WHERE "userId" = '${userId}';
SELECT 'user_mutation_number' as "table", "mutationNumber"::bigint, ${sql.raw(userNulls + ',' + fileNulls + ',' + fileStateNulls)} FROM public.user_mutation_number WHERE "userId" = '${sql.raw(userId)}';
`
}

Expand Down
28 changes: 0 additions & 28 deletions apps/dotcom/sync-worker/src/getPostgres.ts

This file was deleted.

Loading

0 comments on commit ae594c6

Please sign in to comment.