Skip to content

Commit

Permalink
Fix leaky database transactions (bluesky-social#890)
Browse files Browse the repository at this point in the history
* fix leaky txs

* tidy

* end tx in finally
  • Loading branch information
dholms authored Apr 25, 2023
1 parent a8fa9b2 commit cae6779
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 20 deletions.
47 changes: 41 additions & 6 deletions packages/bsky/src/db/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import assert from 'assert'
import { Kysely, PostgresDialect, Migrator } from 'kysely'
import {
Kysely,
PostgresDialect,
Migrator,
KyselyPlugin,
PluginTransformQueryArgs,
PluginTransformResultArgs,
RootOperationNode,
QueryResult,
UnknownRow,
} from 'kysely'
import { Pool as PgPool, types as pgTypes } from 'pg'
import DatabaseSchema, { DatabaseSchemaType } from './database-schema'
import * as migrations from './migrations'
Expand Down Expand Up @@ -46,11 +56,15 @@ export class Database {
}

async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
const res = await this.db.transaction().execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg)
const txRes = await fn(dbTxn)
return txRes
})
const leakyTxPlugin = new LeakyTxPlugin()
const res = await this.db
.withPlugin(leakyTxPlugin)
.transaction()
.execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg)
const txRes = await fn(dbTxn).finally(() => leakyTxPlugin.endTx())
return txRes
})
return res
}

Expand Down Expand Up @@ -115,3 +129,24 @@ type PgOptions = {
pool?: PgPool
schema?: string
}

class LeakyTxPlugin implements KyselyPlugin {
private txOver: boolean

endTx() {
this.txOver = true
}

transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
if (this.txOver) {
throw new Error('tx already failed')
}
return args.node
}

async transformResult(
args: PluginTransformResultArgs,
): Promise<QueryResult<UnknownRow>> {
return args.result
}
}
28 changes: 28 additions & 0 deletions packages/bsky/tests/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,34 @@ describe('db', () => {
expect(() => dbTxn.assertTransaction()).not.toThrow()
})
})

it('does not allow leaky transactions', async () => {
let leakedTx: Database | undefined

const tx = db.transaction(async (dbTxn) => {
leakedTx = dbTxn
await dbTxn.db
.insertInto('actor')
.values({ handle: 'a', did: 'a', indexedAt: 'bad-date' })
.execute()
throw new Error('test tx failed')
})
await expect(tx).rejects.toThrow('test tx failed')

const attempt = leakedTx?.db
.insertInto('actor')
.values({ handle: 'b', did: 'b', indexedAt: 'bad-date' })
.execute()
await expect(attempt).rejects.toThrow('tx already failed')

const res = await db.db
.selectFrom('actor')
.selectAll()
.where('did', 'in', ['a', 'b'])
.execute()

expect(res.length).toBe(0)
})
})

describe('Leader', () => {
Expand Down
55 changes: 46 additions & 9 deletions packages/pds/src/db/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import assert from 'assert'
import { Kysely, SqliteDialect, PostgresDialect, Migrator, sql } from 'kysely'
import {
Kysely,
SqliteDialect,
PostgresDialect,
Migrator,
sql,
KyselyPlugin,
PluginTransformQueryArgs,
PluginTransformResultArgs,
RootOperationNode,
QueryResult,
UnknownRow,
} from 'kysely'
import SqliteDB from 'better-sqlite3'
import { Pool as PgPool, Client as PgClient, types as pgTypes } from 'pg'
import EventEmitter from 'events'
Expand Down Expand Up @@ -161,15 +173,19 @@ export class Database {

async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
let txMsgs: ChannelMsg[] = []
const [dbTxn, res] = await this.db.transaction().execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg, this.channels)
const txRes = await fn(dbTxn)
txMsgs = dbTxn.txChannelMsgs
return [dbTxn, txRes]
})
dbTxn.txEvt.emit('commit')
const leakyTxPlugin = new LeakyTxPlugin()
const { dbTxn, txRes } = await this.db
.withPlugin(leakyTxPlugin)
.transaction()
.execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg, this.channels)
const txRes = await fn(dbTxn).finally(() => leakyTxPlugin.endTx())
txMsgs = dbTxn.txChannelMsgs
return { txRes, dbTxn }
})
dbTxn?.txEvt.emit('commit')
txMsgs.forEach((msg) => this.sendChannelMsg(msg))
return res
return txRes
}

get schema(): string | undefined {
Expand Down Expand Up @@ -276,3 +292,24 @@ type ChannelMsg = 'repo_seq'
type Channels = {
repo_seq: ChannelEmitter
}

class LeakyTxPlugin implements KyselyPlugin {
private txOver: boolean

endTx() {
this.txOver = true
}

transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
if (this.txOver) {
throw new Error('tx already failed')
}
return args.node
}

async transformResult(
args: PluginTransformResultArgs,
): Promise<QueryResult<UnknownRow>> {
return args.result
}
}
2 changes: 1 addition & 1 deletion packages/pds/src/services/repo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export class RepoService {
throw err
}
await wait(timeout)
await this.processWrites(toWrite, times - 1, timeout)
return this.processWrites(toWrite, times - 1, timeout)
} else {
throw err
}
Expand Down
5 changes: 1 addition & 4 deletions packages/pds/src/sql-repo-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class SqlRepoStorage extends RepoStorage {
.select('block.cid'),
)
.execute()
await this.updateHead(rebase.commit, null)
await this.updateHead(rebase.commit, rebase.rebased)
}

async indexCommits(commits: CommitData[]): Promise<void> {
Expand Down Expand Up @@ -237,9 +237,6 @@ export class SqlRepoStorage extends RepoStorage {
root: cid.toString(),
indexedAt: this.getTimestamp(),
})
.onConflict((oc) =>
oc.column('did').doUpdateSet({ root: cid.toString() }),
)
.execute()
} else {
const res = await this.db.db
Expand Down
28 changes: 28 additions & 0 deletions packages/pds/tests/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ describe('db', () => {
expect(() => dbTxn.assertTransaction()).not.toThrow()
})
})

it('does not allow leaky transactions', async () => {
let leakedTx: Database | undefined

const tx = db.transaction(async (dbTxn) => {
leakedTx = dbTxn
await dbTxn.db
.insertInto('repo_root')
.values({ root: 'a', did: 'a', indexedAt: 'bad-date' })
.execute()
throw new Error('test tx failed')
})
await expect(tx).rejects.toThrow('test tx failed')

const attempt = leakedTx?.db
.insertInto('repo_root')
.values({ root: 'b', did: 'b', indexedAt: 'bad-date' })
.execute()
await expect(attempt).rejects.toThrow('tx already failed')

const res = await db.db
.selectFrom('repo_root')
.selectAll()
.where('did', 'in', ['a', 'b'])
.execute()

expect(res.length).toBe(0)
})
})

describe('Leader', () => {
Expand Down

0 comments on commit cae6779

Please sign in to comment.