Skip to content

Commit

Permalink
Deduping indexing (bluesky-social#354)
Browse files Browse the repository at this point in the history
* record processor

* moving records to new processor

* plugins finished

* hook up to db

* migration

* yay tests working

* Tx migration (bluesky-social#355)

* migrations in txs

* fix tx issue

* testing

* test deduping

* test assertions

* rm types on migrations

* dont do migrations in txs, fixes pg issue
  • Loading branch information
dholms authored Nov 17, 2022
1 parent 4fa2110 commit 227b856
Show file tree
Hide file tree
Showing 24 changed files with 1,545 additions and 607 deletions.
16 changes: 10 additions & 6 deletions packages/pds/src/db/database-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import * as record from './tables/record'
import * as ipldBlock from './tables/ipld-block'
import * as ipldBlockCreator from './tables/ipld-block-creator'
import * as inviteCode from './tables/invite-code'
import * as duplicateRecords from './tables/duplicate-record'
import * as notification from './tables/user-notification'
import * as assertion from './tables/assertion'
import * as profile from './records/profile'
import * as post from './records/post'
import * as vote from './records/vote'
import * as repost from './records/repost'
import * as trend from './records/trend'
import * as follow from './records/follow'
import * as profile from './tables/profile'
import * as post from './tables/post'
import * as vote from './tables/vote'
import * as repost from './tables/repost'
import * as trend from './tables/trend'
import * as follow from './tables/follow'
import * as messageQueue from './message-queue/tables/messageQueue'
import * as messageQueueCursor from './message-queue/tables/messageQueueCursor'
import * as sceneMemberCount from './message-queue/tables/sceneMemberCount'
Expand All @@ -29,6 +30,7 @@ export type DatabaseSchema = user.PartialDB &
ipldBlock.PartialDB &
ipldBlockCreator.PartialDB &
inviteCode.PartialDB &
duplicateRecords.PartialDB &
notification.PartialDB &
assertion.PartialDB &
profile.PartialDB &
Expand All @@ -41,3 +43,5 @@ export type DatabaseSchema = user.PartialDB &
messageQueueCursor.PartialDB &
sceneMemberCount.PartialDB &
sceneVotesOnPost.PartialDB

export default DatabaseSchema
9 changes: 6 additions & 3 deletions packages/pds/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,13 @@ export class Database {
await this.db.insertInto('record').values(record).execute()

const table = this.findTableForCollection(uri.collection)
const events = await table.insert(uri, cid, obj, timestamp)
const events = await table.insertRecord(uri, cid, obj, timestamp)
this.messageQueue && (await this.messageQueue.send(this, events))

log.info({ uri }, 'indexed record')
}

async deleteRecord(uri: AtUri) {
async deleteRecord(uri: AtUri, cascading = false) {
this.assertTransaction()
log.debug({ uri }, 'deleting indexed record')
const table = this.findTableForCollection(uri.collection)
Expand All @@ -389,7 +389,10 @@ export class Database {
.where('uri', '=', uri.toString())
.execute()

const [events, _] = await Promise.all([table.delete(uri), deleteQuery])
const [events, _] = await Promise.all([
table.deleteRecord(uri, cascading),
deleteQuery,
])
this.messageQueue && (await this.messageQueue.send(this, events))

log.info({ uri }, 'deleted indexed record')
Expand Down
8 changes: 2 additions & 6 deletions packages/pds/src/db/migrations/20221021T162202001Z-init.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Kysely, sql } from 'kysely'
import { Dialect } from '..'
import { DatabaseSchema } from '../database-schema'

const userTable = 'user'
const didHandleTable = 'did_handle'
Expand All @@ -27,10 +26,7 @@ const messageQueueCursorTable = 'message_queue_cursor'
const sceneMemberCountTable = 'scene_member_count'
const sceneVotesOnPostTable = 'scene_votes_on_post'

export async function up(
db: Kysely<DatabaseSchema>,
dialect: Dialect,
): Promise<void> {
export async function up(db: Kysely<unknown>, dialect: Dialect): Promise<void> {
if (dialect === 'pg') {
try {
// Add trigram support, supporting user search.
Expand Down Expand Up @@ -292,7 +288,7 @@ export async function up(
.execute()
}

export async function down(db: Kysely<DatabaseSchema>): Promise<void> {
export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable(sceneVotesOnPostTable).execute()
await db.schema.dropTable(sceneMemberCountTable).execute()
await db.schema.dropTable(messageQueueCursorTable).execute()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { Kysely } from 'kysely'

const duplicateRecordTable = 'duplicate_record'

export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable(duplicateRecordTable)
.addColumn('uri', 'varchar', (col) => col.primaryKey())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('duplicateOf', 'varchar', (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.execute()

await db.schema
.createTable('repost_temp')
.addColumn('uri', 'varchar', (col) => col.primaryKey())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('creator', 'varchar', (col) => col.notNull())
.addColumn('subject', 'varchar', (col) => col.notNull())
.addColumn('subjectCid', 'varchar', (col) => col.notNull())
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.addUniqueConstraint('repost_unique_subject', ['creator', 'subject'])
.execute()
await db
.insertInto('repost_temp')
.expression((exp) =>
exp
.selectFrom('repost')
.selectAll()
.where('uri', 'in', (qb) =>
qb
.selectFrom('repost')
.select(db.fn.min('uri').as('uri'))
.groupBy(['creator', 'subject']),
),
)
.execute()
await db.schema.dropTable('repost').execute()
await db.schema.alterTable('repost_temp').renameTo('repost').execute()

await db.schema
.createTable('trend_temp')
.addColumn('uri', 'varchar', (col) => col.primaryKey())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('creator', 'varchar', (col) => col.notNull())
.addColumn('subject', 'varchar', (col) => col.notNull())
.addColumn('subjectCid', 'varchar', (col) => col.notNull())
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.addUniqueConstraint('trend_unique_subject', ['creator', 'subject'])
.execute()
await db
.insertInto('trend_temp')
.expression((exp) =>
exp
.selectFrom('trend')
.selectAll()
.where('uri', 'in', (qb) =>
qb
.selectFrom('trend')
.select(db.fn.min('uri').as('uri'))
.groupBy(['creator', 'subject']),
),
)
.execute()
await db.schema.dropTable('trend').execute()
await db.schema.alterTable('trend_temp').renameTo('trend').execute()

await db.schema
.createTable('vote_temp')
.addColumn('uri', 'varchar', (col) => col.primaryKey())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('creator', 'varchar', (col) => col.notNull())
.addColumn('direction', 'varchar', (col) => col.notNull())
.addColumn('subject', 'varchar', (col) => col.notNull())
.addColumn('subjectCid', 'varchar', (col) => col.notNull())
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.addUniqueConstraint('vote_unique_subject', ['creator', 'subject'])
.execute()
await db
.insertInto('vote_temp')
.expression((exp) =>
exp
.selectFrom('vote')
.selectAll()
.where('uri', 'in', (qb) =>
qb
.selectFrom('vote')
.select(db.fn.min('uri').as('uri'))
.groupBy(['creator', 'subject']),
),
)
.execute()
await db.schema.dropTable('vote').execute()
await db.schema.alterTable('vote_temp').renameTo('vote').execute()

await db.schema
.createTable('follow_temp')
.addColumn('uri', 'varchar', (col) => col.primaryKey())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('creator', 'varchar', (col) => col.notNull())
.addColumn('subjectDid', 'varchar', (col) => col.notNull())
.addColumn('subjectDeclarationCid', 'varchar', (col) => col.notNull())
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.addUniqueConstraint('follow_unique_subject', ['creator', 'subjectDid'])
.execute()
await db
.insertInto('follow_temp')
.expression((exp) =>
exp
.selectFrom('follow')
.selectAll()
.where('uri', 'in', (qb) =>
qb
.selectFrom('follow')
.select(db.fn.min('uri').as('uri'))
.groupBy(['creator', 'subjectDid']),
),
)
.execute()
await db.schema.dropTable('follow').execute()
await db.schema.alterTable('follow_temp').renameTo('follow').execute()

await db.schema
.createTable('assertion_temp')
.addColumn('uri', 'varchar', (col) => col.primaryKey())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('creator', 'varchar', (col) => col.notNull())
.addColumn('assertion', 'varchar', (col) => col.notNull())
.addColumn('subjectDid', 'varchar', (col) => col.notNull())
.addColumn('subjectDeclarationCid', 'varchar', (col) => col.notNull())
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.addColumn('confirmUri', 'varchar')
.addColumn('confirmCid', 'varchar')
.addColumn('confirmCreated', 'varchar')
.addColumn('confirmIndexed', 'varchar')
.addUniqueConstraint('assertion_unique_subject', [
'creator',
'subjectDid',
'assertion',
])
.execute()
await db
.insertInto('assertion_temp')
.expression((exp) =>
exp
.selectFrom('assertion')
.selectAll()
.where('uri', 'in', (qb) =>
qb
.selectFrom('assertion')
.select(db.fn.min('uri').as('uri'))
.groupBy(['creator', 'subjectDid', 'assertion']),
),
)
.execute()
await db.schema.dropTable('assertion').execute()
await db.schema.alterTable('assertion_temp').renameTo('assertion').execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable(duplicateRecordTable)
}
1 change: 1 addition & 0 deletions packages/pds/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
// this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process.

export * as _20221021T162202001Z from './20221021T162202001Z-init'
export * as _20221116T234458063Z from './20221116T234458063Z-duplicate-records'
122 changes: 122 additions & 0 deletions packages/pds/src/db/record-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { AtUri } from '@atproto/uri'
import * as common from '@atproto/common'
import { Kysely } from 'kysely'
import { CID } from 'multiformats/cid'
import { DatabaseSchema } from './database-schema'
import { Message } from './message-queue/messages'
import * as schemas from './schemas'
import { RecordValidator, ValidationResult } from '@atproto/lexicon'

type RecordProcessorParams<T, S> = {
schemaId: string
insertFn: (
db: Kysely<DatabaseSchema>,
uri: AtUri,
cid: CID,
obj: T,
timestamp?: string,
) => Promise<S | null>
findDuplicate: (
db: Kysely<DatabaseSchema>,
uri: AtUri,
obj: T,
) => Promise<AtUri | null>
deleteFn: (db: Kysely<DatabaseSchema>, uri: AtUri) => Promise<S | null>
eventsForInsert: (obj: S) => Message[]
eventsForDelete: (prev: S, replacedBy: S | null) => Message[]
}

export class RecordProcessor<T, S> {
collection: string
validator: RecordValidator
constructor(
private db: Kysely<DatabaseSchema>,
private params: RecordProcessorParams<T, S>,
) {
this.collection = this.params.schemaId
this.validator = schemas.records.createRecordValidator(this.params.schemaId)
}

matchesSchema(obj: unknown): obj is T {
return this.validator.isValid(obj)
}

validateSchema(obj: unknown): ValidationResult {
return this.validator.validate(obj)
}

async insertRecord(
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<Message[]> {
if (!this.matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${this.params.schemaId}`)
}
const inserted = await this.params.insertFn(
this.db,
uri,
cid,
obj,
timestamp,
)
// if this was a new record, return events
if (inserted) {
return this.params.eventsForInsert(inserted)
}
// if duplicate, insert into duplicates table with no events
const found = await this.params.findDuplicate(this.db, uri, obj)
if (found) {
await this.db
.insertInto('duplicate_record')
.values({
uri: uri.toString(),
cid: cid.toString(),
duplicateOf: found.toString(),
indexedAt: timestamp || new Date().toISOString(),
})
.execute()
}
return []
}

async deleteRecord(uri: AtUri, cascading = false): Promise<Message[]> {
const deleted = await this.params.deleteFn(this.db, uri)
if (!deleted) return []
if (cascading) {
await this.db
.deleteFrom('duplicate_record')
.where('duplicateOf', '=', uri.toString())
.execute()
return this.params.eventsForDelete(deleted, null)
} else {
const found = await this.db
.selectFrom('duplicate_record')
.innerJoin('ipld_block', 'ipld_block.cid', 'duplicate_record.cid')
.where('duplicateOf', '=', uri.toString())
.orderBy('indexedAt', 'asc')
.limit(1)
.selectAll()
.executeTakeFirst()

if (!found) {
return this.params.eventsForDelete(deleted, null)
}
const record = common.ipldBytesToRecord(found.content)
if (!this.matchesSchema(record)) {
return this.params.eventsForDelete(deleted, null)
}
const inserted = await this.params.insertFn(
this.db,
new AtUri(found.uri),
CID.parse(found.cid),
record,
found.indexedAt,
)
return this.params.eventsForDelete(deleted, inserted)
}
}
}

export default RecordProcessor
Loading

0 comments on commit 227b856

Please sign in to comment.