Skip to content

Commit

Permalink
Running a task as a leader in the app view (bluesky-social#604)
Browse files Browse the repository at this point in the history
Interface for picking a pds leader
  • Loading branch information
devinivy authored Feb 27, 2023
1 parent bb05232 commit 5d2895f
Showing 2 changed files with 152 additions and 0 deletions.
76 changes: 76 additions & 0 deletions packages/pds/src/db/leader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { PoolClient } from 'pg'
import Database from '.'

export class Leader {
session: Session | null = null
static inProcessLocks = new Map<number, WeakSet<Database>>() // Only for sqlite in-process locking mechanism

constructor(public id: number, public db: Database) {}

async run<T>(
task: (ctx: { signal: AbortSignal }) => Promise<T>,
): Promise<RunResult<T>> {
const session = await this.lock()
if (!session) return { ran: false }
try {
const result = await task({ signal: session.abortController.signal })
return { ran: true, result }
} finally {
this.release()
}
}

private async lock(): Promise<Session | null> {
if (this.session) {
return null
}

if (this.db.cfg.dialect === 'sqlite') {
const locksForId = Leader.inProcessLocks.get(this.id) ?? new WeakSet()
if (locksForId.has(this.db)) {
return null
} else {
Leader.inProcessLocks.set(this.id, locksForId.add(this.db))
this.session = { abortController: new AbortController() }
return this.session
}
}

// Postgres implementation uses advisory locking, automatically released by ending connection.

const client = await this.db.cfg.pool.connect()
try {
const lock = await client.query(
'SELECT pg_try_advisory_lock($1) as acquired',
[this.id],
)
if (!lock.rows[0].acquired) {
client.release()
return null
}
} catch (err) {
client.release(true)
throw err
}

const abortController = new AbortController()
client.once('error', (err) => abortController.abort(err))
this.session = { abortController, client }
return this.session
}

private release() {
if (this.db.cfg.dialect === 'sqlite') {
Leader.inProcessLocks.get(this.id)?.delete(this.db)
} else {
// The flag ensures the connection is destroyed on release, not reused.
// This is required, as that is how the pg advisory lock is released.
this.session?.client?.release(true)
}
this.session = null
}
}

type Session = { abortController: AbortController; client?: PoolClient }

type RunResult<T> = { ran: false } | { ran: true; result: T }
76 changes: 76 additions & 0 deletions packages/pds/tests/db.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { once } from 'events'
import { wait } from '@atproto/common'
import { Database } from '../src'
import { Leader } from '../src/db/leader'
import { runTestServer, CloseFn } from './_util'

describe('db', () => {
@@ -98,4 +101,77 @@ describe('db', () => {
})
})
})

describe('Leader', () => {
it('allows leaders to run sequentially.', async () => {
const task = async () => {
await wait(25)
return 'complete'
}
const leader1 = new Leader(777, db)
const leader2 = new Leader(777, db)
const leader3 = new Leader(777, db)
const result1 = await leader1.run(task)
const result2 = await leader2.run(task)
const result3 = await leader3.run(task)
const result4 = await leader3.run(task)
expect([result1, result2, result3, result4]).toEqual([
{ ran: true, result: 'complete' },
{ ran: true, result: 'complete' },
{ ran: true, result: 'complete' },
{ ran: true, result: 'complete' },
])
})

it('only allows one leader at a time.', async () => {
const task = async () => {
await wait(25)
return 'complete'
}
const results = await Promise.all([
new Leader(777, db).run(task),
new Leader(777, db).run(task),
new Leader(777, db).run(task),
])
const byRan = (a, b) => Number(a.ran) - Number(b.ran)
expect(results.sort(byRan)).toEqual([
{ ran: false },
{ ran: false },
{ ran: true, result: 'complete' },
])
})

it('leaders with different ids do not conflict.', async () => {
const task = async () => {
await wait(25)
return 'complete'
}
const results = await Promise.all([
new Leader(777, db).run(task),
new Leader(778, db).run(task),
new Leader(779, db).run(task),
])
expect(results).toEqual([
{ ran: true, result: 'complete' },
{ ran: true, result: 'complete' },
{ ran: true, result: 'complete' },
])
})

it('supports abort.', async () => {
const task = async (ctx: { signal: AbortSignal }) => {
return await Promise.race([
wait(100),
once(ctx.signal, 'abort').then(() => ctx.signal.reason),
])
}
const leader = new Leader(777, db)
setTimeout(
() => leader.session?.abortController.abort(new Error('Oops!')),
25,
)
const result = await leader.run(task)
expect(result).toEqual({ ran: true, result: new Error('Oops!') })
})
})
})

0 comments on commit 5d2895f

Please sign in to comment.