Skip to content

Commit

Permalink
Setup PLC server with postgres and migrations (bluesky-social#249)
Browse files Browse the repository at this point in the history
* Setup PLC with postgres

* Setup PLC with kysely migrations

* Add note to PLC migrations file
  • Loading branch information
devinivy authored Oct 21, 2022
1 parent 960a0fc commit 000507e
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 83 deletions.
3 changes: 2 additions & 1 deletion packages/dev-env/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export class DevEnvServer {
break
}
case ServerType.DidPlaceholder: {
const db = await plc.Database.memory().createTables()
const db = plc.Database.memory()
await db.migrateToLatestOrThrow()
this.inst = await onServerReady(plc.server(db, this.port))
break
}
Expand Down
2 changes: 1 addition & 1 deletion packages/did-resolver/tests/resolver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('resolver', () => {
})

const plcDB = DidPlcDb.memory()
await plcDB.createTables()
await plcDB.migrateToLatestOrThrow()
const plcPort = await getPort()
const plcServer = await runPlcServer(plcDB, plcPort)

Expand Down
38 changes: 38 additions & 0 deletions packages/plc/bin/migration-create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env ts-node

import * as fs from 'fs/promises'
import * as path from 'path'

export async function main() {
const now = new Date()
const prefix = now.toISOString().replace(/[^a-z0-9]/gi, '') // Order of migrations matches alphabetical order of their names
const name = process.argv[2]
if (!name || !name.match(/^[a-z0-9-]+$/)) {
process.exitCode = 1
return console.error(
'Must pass a migration name consisting of lowercase digits, numbers, and dashes.',
)
}
const filename = `${prefix}-${name}`
const dir = path.join(__dirname, '..', 'src', 'server', 'migrations')

await fs.writeFile(path.join(dir, `${filename}.ts`), template, { flag: 'wx' })
await fs.writeFile(
path.join(dir, 'index.ts'),
`export * as _${prefix} from './${filename}'\n`,
{ flag: 'a' },
)
}

const template = `import { Kysely } from 'kysely'
export async function up(db: Kysely<unknown>): Promise<void> {
// Migration code
}
export async function down(db: Kysely<unknown>): Promise<void> {
// Migration code
}
`

main()
5 changes: 3 additions & 2 deletions packages/plc/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ require('esbuild')
platform: 'node',
assetNames: 'src/static',
external: [
'./node_modules/better-sqlite3/*',
'../../node_modules/better-sqlite3/*',
'better-sqlite3',
'../../node_modules/level/*',
'../../node_modules/classic-level/*',
// Referenced in pg driver, but optional and we don't use it
'pg-native',
],
})
.catch(() => process.exit(1))
6 changes: 5 additions & 1 deletion packages/plc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"scripts": {
"start": "node dist/bin.js",
"test": "jest",
"test:pg": "../pg/with-test-db.sh jest",
"test:log": "cat test.log | pino-pretty",
"prettier": "prettier --check src/",
"prettier:fix": "prettier --write src/",
Expand All @@ -15,7 +16,8 @@
"verify:fix": "yarn prettier:fix && yarn lint:fix",
"build": "node ./build.js",
"postbuild": "tsc --build tsconfig.build.json",
"low": "node dist/scripts/low_pid.js"
"low": "node dist/scripts/low_pid.js",
"migration:create": "ts-node ./bin/migration-create.ts"
},
"dependencies": {
"@atproto/common": "*",
Expand All @@ -29,12 +31,14 @@
"express": "^4.17.2",
"express-async-errors": "^3.1.1",
"kysely": "^0.22.0",
"pg": "^8.8.0",
"pino": "^8.6.1",
"pino-http": "^8.2.1",
"uint8arrays": "3.0.0",
"zod": "^3.14.2"
},
"devDependencies": {
"@types/pg": "^8.6.5",
"eslint-plugin-prettier": "^4.2.1",
"get-port": "^6.1.2"
}
Expand Down
14 changes: 10 additions & 4 deletions packages/plc/src/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ const run = async () => {
dotenv.config()
}

let db: Database
const dbLoc = process.env.DATABASE_LOC
if (dbLoc) {
db = await Database.sqlite(dbLoc)
const dbPostgresUrl = process.env.DB_POSTGRES_URL

let db: Database
if (dbPostgresUrl) {
db = Database.postgres({ url: dbPostgresUrl })
} else if (dbLoc) {
db = Database.sqlite(dbLoc)
} else {
db = await Database.memory()
db = Database.memory()
}

await db.migrateToLatestOrThrow()

const envPort = parseInt(process.env.PORT || '')
const port = isNaN(envPort) ? 2582 : envPort

Expand Down
197 changes: 128 additions & 69 deletions packages/plc/src/server/db.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,72 @@
import { Kysely, SqliteDialect } from 'kysely'
import {
Kysely,
KyselyConfig,
Migrator,
PostgresDialect,
SqliteDialect,
} from 'kysely'
import SqliteDB from 'better-sqlite3'
import { Pool as PgPool, types as pgTypes } from 'pg'
import { CID } from 'multiformats/cid'
import { cidForData } from '@atproto/common'
import * as document from '../lib/document'
import * as t from '../lib/types'
import { ServerError } from './error'

interface OperationsTable {
did: string
operation: string
cid: string
nullified: 0 | 1
createdAt: string
}

interface DatabaseSchema {
operations: OperationsTable
}
import * as migrations from './migrations'

export class Database {
constructor(public db: Kysely<DatabaseSchema>) {}
migrator: Migrator
constructor(
public db: KyselyWithDialect<DatabaseSchema>,
public schema?: string,
) {
this.migrator = new Migrator({
db,
migrationTableSchema: schema,
provider: {
async getMigrations() {
return migrations
},
},
})
}

static sqlite(location: string): Database {
const db = new Kysely<DatabaseSchema>({
const db = new KyselyWithDialect<DatabaseSchema>('sqlite', {
dialect: new SqliteDialect({
database: new SqliteDB(location),
}),
})
return new Database(db)
}

static postgres(opts: { url: string; schema?: string }): Database {
const { url, schema } = opts
const pool = new PgPool({ connectionString: url })

// Select count(*) and other pg bigints as js integer
pgTypes.setTypeParser(pgTypes.builtins.INT8, (n) => parseInt(n, 10))

// Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema)
if (schema !== undefined) {
if (!/^[a-z_]+$/i.test(schema)) {
throw new Error(
`Postgres schema must only contain [A-Za-z_]: ${schema}`,
)
}
pool.on('connect', (client) =>
// Shared objects such as extensions will go in the public schema
client.query(`SET search_path TO "${schema}",public`),
)
}

const db = new KyselyWithDialect<DatabaseSchema>('pg', {
dialect: new PostgresDialect({ pool }),
})

return new Database(db, schema)
}

static memory(): Database {
return Database.sqlite(':memory:')
}
Expand All @@ -38,21 +75,18 @@ export class Database {
await this.db.destroy()
}

async createTables(): Promise<this> {
await this.db.schema
.createTable('operations')
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('operation', 'text', (col) => col.notNull())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('nullified', 'int2', (col) => col.defaultTo(0))
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint('primary_key', ['did', 'cid'])
.execute()
return this
}

async dropTables(): Promise<void> {
await this.db.schema.dropTable('operations').execute()
async migrateToLatestOrThrow() {
if (this.schema !== undefined) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
}
const { error, results } = await this.migrator.migrateToLatest()
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}

async validateAndAddOp(did: string, proposed: t.Operation): Promise<void> {
Expand All @@ -65,48 +99,51 @@ export class Database {
)
const cid = await cidForData(proposed)

await this.db.transaction().execute(async (tx) => {
await tx
.insertInto('operations')
.values({
did,
operation: JSON.stringify(proposed),
cid: cid.toString(),
nullified: 0,
createdAt: new Date().toISOString(),
})
.execute()

if (nullified.length > 0) {
const nullfiedStrs = nullified.map((cid) => cid.toString())
await this.db
.transaction()
.setIsolationLevel('serializable')
.execute(async (tx) => {
await tx
.updateTable('operations')
.set({ nullified: 1 })
.where('did', '=', did)
.where('cid', 'in', nullfiedStrs)
.insertInto('operations')
.values({
did,
operation: JSON.stringify(proposed),
cid: cid.toString(),
nullified: 0,
createdAt: new Date().toISOString(),
})
.execute()
}

// verify that the 2nd to last tx matches the proposed prev
// otherwise rollback to prevent forks in history
const mostRecent = await tx
.selectFrom('operations')
.select('cid')
.where('did', '=', did)
.where('nullified', '=', 0)
.orderBy('createdAt', 'desc')
.limit(2)
.execute()
const isMatch =
(prev === null && !mostRecent[1]) ||
(prev && prev.equals(CID.parse(mostRecent[1].cid)))
if (!isMatch) {
throw new ServerError(
409,
`Proposed prev does not match the most recent operation: ${mostRecent?.toString()}`,
)
}
})
if (nullified.length > 0) {
const nullfiedStrs = nullified.map((cid) => cid.toString())
await tx
.updateTable('operations')
.set({ nullified: 1 })
.where('did', '=', did)
.where('cid', 'in', nullfiedStrs)
.execute()
}

// verify that the 2nd to last tx matches the proposed prev
// otherwise rollback to prevent forks in history
const mostRecent = await tx
.selectFrom('operations')
.select('cid')
.where('did', '=', did)
.where('nullified', '=', 0)
.orderBy('createdAt', 'desc')
.limit(2)
.execute()
const isMatch =
(prev === null && !mostRecent[1]) ||
(prev && prev.equals(CID.parse(mostRecent[1].cid)))
if (!isMatch) {
throw new ServerError(
409,
`Proposed prev does not match the most recent operation: ${mostRecent?.toString()}`,
)
}
})
}

async mostRecentCid(did: string, notIncluded: CID[]): Promise<CID | null> {
Expand Down Expand Up @@ -148,3 +185,25 @@ export class Database {
}

export default Database

export type Dialect = 'pg' | 'sqlite'

// By placing the dialect on the kysely instance itself,
// you can utilize this information inside migrations.
export class KyselyWithDialect<DB> extends Kysely<DB> {
constructor(public dialect: Dialect, config: KyselyConfig) {
super(config)
}
}

interface OperationsTable {
did: string
operation: string
cid: string
nullified: 0 | 1
createdAt: string
}

interface DatabaseSchema {
operations: OperationsTable
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable('operations')
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('operation', 'text', (col) => col.notNull())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('nullified', 'int2', (col) => col.defaultTo(0))
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint('primary_key', ['did', 'cid'])
.execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('operations').execute()
}
5 changes: 5 additions & 0 deletions packages/plc/src/server/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// NOTE this file can be edited by hand, but it is also appended to by the migrations:create command.
// It's important that every migration is exported from here with the proper name. We'd simplify
// this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process.

export * as _20221020T204908820Z from './20221020T204908820Z-operations-init'
Loading

0 comments on commit 000507e

Please sign in to comment.