Skip to content

Commit

Permalink
Atproto sync package (bluesky-social#2752)
Browse files Browse the repository at this point in the history
* first pass/port

* reworking

* authenticated commit parsing

* authenticate identity evts

* some testing

* tidy & add firehose to queue

* error handling

* fix test

* refactor sync queue + some tests

* fix race in sync queue

* rm firehose from syncqueue

* add tests for queue utils

* README

* lint readme

* filter before parsing

* pr feedback

* small fix

* changesets

* fix type

* Rework dataplane subscription (bluesky-social#2766)

* working sync package into appview subscription

* add restart method to subscription for tests

* fix another test

* tidy subscription utils/files

* remove dupe property

* tidy after merge

* fix start cursor on subscription

* tweak process full subscription logic

* fixes
  • Loading branch information
dholms authored Sep 5, 2024
1 parent 642c7ae commit b15dec2
Show file tree
Hide file tree
Showing 40 changed files with 1,681 additions and 801 deletions.
5 changes: 5 additions & 0 deletions .changeset/gorgeous-oranges-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/sync": minor
---

Introduced initial sync package for consuming firehose (com.atproto.sync.subscribeRepos)
5 changes: 5 additions & 0 deletions .changeset/silver-badgers-chew.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/repo": minor
---

Updated verifyProofs consumer method to verify Cid claims rather than record claims
1 change: 1 addition & 0 deletions packages/bsky/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"@atproto/identity": "workspace:^",
"@atproto/lexicon": "workspace:^",
"@atproto/repo": "workspace:^",
"@atproto/sync": "workspace:^",
"@atproto/syntax": "workspace:^",
"@atproto/xrpc-server": "workspace:^",
"@bufbuild/protobuf": "^1.5.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema.alterTable('actor_sync').dropColumn('commitDataCid').execute()
await db.schema.alterTable('actor_sync').dropColumn('rebaseCount').execute()
await db.schema.alterTable('actor_sync').dropColumn('tooBigCount').execute()
// Migration code
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema
.alterTable('actor_sync')
.addColumn('commitDataCid', 'varchar', (col) => col.notNull())
.execute()
await db.schema
.alterTable('actor_sync')
.addColumn('rebaseCount', 'integer', (col) => col.notNull())
.execute()
await db.schema
.alterTable('actor_sync')
.addColumn('tooBigCount', 'integer', (col) => col.notNull())
.execute()
}
1 change: 1 addition & 0 deletions packages/bsky/src/data-plane/server/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ export * as _20240723T220700077Z from './20240723T220700077Z-quotes-post-aggs'
export * as _20240723T220703655Z from './20240723T220703655Z-quotes'
export * as _20240801T193939827Z from './20240801T193939827Z-post-gate'
export * as _20240808T224251220Z from './20240808T224251220Z-post-gate-flags'
export * as _20240829T211238293Z from './20240829T211238293Z-simplify-actor-sync'
3 changes: 0 additions & 3 deletions packages/bsky/src/data-plane/server/db/tables/actor-sync.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
export interface ActorSync {
did: string
commitCid: string
commitDataCid: string
repoRev: string | null
rebaseCount: number
tooBigCount: number
}

export const tableName = 'actor_sync'
Expand Down
29 changes: 4 additions & 25 deletions packages/bsky/src/data-plane/server/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
readCarWithRoot,
WriteOpAction,
verifyRepo,
Commit,
VerifiedRepo,
getAndParseRecord,
} from '@atproto/repo'
Expand Down Expand Up @@ -227,45 +226,25 @@ export class IndexingService {
)
}

async setCommitLastSeen(
commit: Commit,
details: { commit: CID; rebase: boolean; tooBig: boolean },
) {
async setCommitLastSeen(did: string, commit: CID, rev: string) {
const { ref } = this.db.db.dynamic
await this.db.db
.insertInto('actor_sync')
.values({
did: commit.did,
commitCid: details.commit.toString(),
commitDataCid: commit.data.toString(),
repoRev: commit.rev ?? null,
rebaseCount: details.rebase ? 1 : 0,
tooBigCount: details.tooBig ? 1 : 0,
did,
commitCid: commit.toString(),
repoRev: rev ?? null,
})
.onConflict((oc) => {
const sync = (col: string) => ref(`actor_sync.${col}`)
const excluded = (col: string) => ref(`excluded.${col}`)
return oc.column('did').doUpdateSet({
commitCid: sql`${excluded('commitCid')}`,
commitDataCid: sql`${excluded('commitDataCid')}`,
repoRev: sql`${excluded('repoRev')}`,
rebaseCount: sql`${sync('rebaseCount')} + ${excluded('rebaseCount')}`,
tooBigCount: sql`${sync('tooBigCount')} + ${excluded('tooBigCount')}`,
})
})
.execute()
}

async checkCommitNeedsIndexing(commit: Commit) {
const sync = await this.db.db
.selectFrom('actor_sync')
.select('commitDataCid')
.where('did', '=', commit.did)
.executeTakeFirst()
if (!sync) return true
return sync.commitDataCid !== commit.data.toString()
}

findIndexerForCollection(collection: string) {
const indexers = Object.values(
this.records as Record<string, RecordProcessor<unknown, unknown>>,
Expand Down
104 changes: 104 additions & 0 deletions packages/bsky/src/data-plane/server/subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { Firehose, MemoryRunner } from '@atproto/sync'
import { IdResolver } from '@atproto/identity'
import { WriteOpAction } from '@atproto/repo'
import { subLogger as log } from '../../logger'
import { IndexingService } from './indexing'
import { Database } from './db'
import { BackgroundQueue } from './background'

export class RepoSubscription {
firehose: Firehose
runner: MemoryRunner
background: BackgroundQueue
indexingSvc: IndexingService

constructor(
public opts: { service: string; db: Database; idResolver: IdResolver },
) {
const { service, db, idResolver } = opts
this.background = new BackgroundQueue(db)
this.indexingSvc = new IndexingService(db, idResolver, this.background)

const { runner, firehose } = createFirehose({
idResolver,
service,
indexingSvc: this.indexingSvc,
})
this.runner = runner
this.firehose = firehose
}

start() {
this.firehose.start()
}

async restart() {
await this.destroy()
const { runner, firehose } = createFirehose({
idResolver: this.opts.idResolver,
service: this.opts.service,
indexingSvc: this.indexingSvc,
})
this.runner = runner
this.firehose = firehose
this.start()
}

async processAll() {
await this.runner.processAll()
await this.background.processAll()
}

async destroy() {
await this.firehose.destroy()
await this.runner.destroy()
await this.background.processAll()
}
}

const createFirehose = (opts: {
idResolver: IdResolver
service: string
indexingSvc: IndexingService
}) => {
const { idResolver, service, indexingSvc } = opts
const runner = new MemoryRunner({ startCursor: 0 })
const firehose = new Firehose({
idResolver,
runner,
service,
unauthenticatedHandles: true, // indexing service handles these
unauthenticatedCommits: true, // @TODO there seems to be a very rare issue where the authenticator thinks a block is missing in deletion ops
onError: (err) => log.error({ err }, 'error in subscription'),
handleEvent: async (evt) => {
if (evt.event === 'identity') {
await indexingSvc.indexHandle(evt.did, evt.time, true)
} else if (evt.event === 'account') {
if (evt.active === false && evt.status === 'deleted') {
await indexingSvc.deleteActor(evt.did)
} else {
await indexingSvc.updateActorStatus(evt.did, evt.active, evt.status)
}
} else {
const indexFn =
evt.event === 'delete'
? indexingSvc.deleteRecord(evt.uri)
: indexingSvc.indexRecord(
evt.uri,
evt.cid,
evt.record,
evt.event === 'create'
? WriteOpAction.Create
: WriteOpAction.Update,
evt.time,
)
await Promise.all([
indexFn,
indexingSvc.setCommitLastSeen(evt.did, evt.commit, evt.rev),
indexingSvc.indexHandle(evt.did, evt.time),
])
}
},
})
return { firehose, runner }
}
Loading

0 comments on commit b15dec2

Please sign in to comment.