Skip to content

Commit

Permalink
Bsky log commit error (bluesky-social#1275)
Browse files Browse the repository at this point in the history
* don't bail on bad record index

* add build

* temporarily disable check, full reindex on rabase

* don't bail on bad record index during rebase, track last commit on rebase

* log bsky repo subscription stats

* add running and waiting count to repo sub stats

* re-enable fast path for happy rebases

* only hold onto seq in cursor consecutivelist, don't hold onto whole completed messages
  • Loading branch information
devinivy authored Jul 5, 2023
1 parent 4f7fd8b commit bb2848e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-push-bsky-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- bsky-log-commit-error
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
Expand Down
17 changes: 16 additions & 1 deletion packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { IdResolver } from '@atproto/identity'
import API, { health, blobResolver } from './api'
import Database from './db'
import * as error from './error'
import { dbLogger, loggerMiddleware } from './logger'
import { dbLogger, loggerMiddleware, subLogger } from './logger'
import { ServerConfig } from './config'
import { createServer } from './lexicon'
import { ImageUriBuilder } from './image/uri'
Expand Down Expand Up @@ -41,6 +41,7 @@ export class BskyAppView {
public server?: http.Server
private terminator?: HttpTerminator
private dbStatsInterval: NodeJS.Timer
private subStatsInterval: NodeJS.Timer

constructor(opts: {
ctx: AppContext
Expand Down Expand Up @@ -188,6 +189,19 @@ export class BskyAppView {
'background queue stats',
)
}, 10000)
if (this.sub) {
this.subStatsInterval = setInterval(() => {
subLogger.info(
{
seq: this.sub?.lastSeq,
cursor: this.sub?.lastCursor,
runningCount: this.sub?.repoQueue.main.pending,
waitingCount: this.sub?.repoQueue.main.size,
},
'repo subscription stats',
)
}, 500)
}
const server = this.app.listen(this.ctx.cfg.port)
this.server = server
this.terminator = createHttpTerminator({ server })
Expand All @@ -201,6 +215,7 @@ export class BskyAppView {
async destroy(): Promise<void> {
await this.ctx.didCache.destroy()
await this.sub?.destroy()
clearInterval(this.subStatsInterval)
await this.terminator?.terminate()
await this.ctx.backgroundQueue.destroy()
await this.ctx.db.close()
Expand Down
5 changes: 4 additions & 1 deletion packages/bsky/src/services/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ export class IndexingService {
'skipping indexing of invalid record',
)
} else {
throw err
subLogger.error(
{ err, did, commit, uri: uri.toString(), cid: cid.toString() },
'skipping indexing due to error processing record',
)
}
}
})
Expand Down
42 changes: 32 additions & 10 deletions packages/bsky/src/subscription/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ export class RepoSubscription {
leader = new Leader(this.subLockId, this.ctx.db)
repoQueue: PartitionedQueue
cursorQueue = new LatestQueue()
consecutive = new ConsecutiveList<ProcessableMessage>()
consecutive = new ConsecutiveList<number>()
destroyed = false
lastSeq: number | undefined
lastCursor: number | undefined

constructor(
public ctx: AppContext,
Expand Down Expand Up @@ -56,7 +58,8 @@ export class RepoSubscription {
)
continue
}
const item = this.consecutive.push(details.message)
this.lastSeq = details.seq
const item = this.consecutive.push(details.seq)
this.repoQueue
.add(details.repo, () => this.handleMessage(details.message))
.catch((err) => {
Expand Down Expand Up @@ -113,7 +116,7 @@ export class RepoSubscription {
this.destroyed = false
this.repoQueue = new PartitionedQueue({ concurrency: this.concurrency })
this.cursorQueue = new LatestQueue()
this.consecutive = new ConsecutiveList<ProcessableMessage>()
this.consecutive = new ConsecutiveList<number>()
await this.run()
}

Expand All @@ -138,14 +141,19 @@ export class RepoSubscription {
const indexRecords = async () => {
const { root, rootCid, ops } = await getOps(msg)
if (msg.tooBig) {
return await indexingService.indexRepo(msg.repo, rootCid.toString())
await indexingService.indexRepo(msg.repo, rootCid.toString())
await indexingService.setCommitLastSeen(root, msg)
return
}
if (msg.rebase) {
const needsReindex = await indexingService.checkCommitNeedsIndexing(
root,
)
if (!needsReindex) return
return await indexingService.indexRepo(msg.repo, rootCid.toString())
if (needsReindex) {
await indexingService.indexRepo(msg.repo, rootCid.toString())
}
await indexingService.setCommitLastSeen(root, msg)
return
}
for (const op of ops) {
if (op.action === WriteOpAction.Delete) {
Expand All @@ -171,7 +179,16 @@ export class RepoSubscription {
'skipping indexing of invalid record',
)
} else {
throw err
subLogger.error(
{
err,
did: msg.repo,
commit: msg.commit.toString(),
uri: op.uri.toString(),
cid: op.cid.toString(),
},
'skipping indexing due to error processing record',
)
}
}
}
Expand All @@ -195,10 +212,10 @@ export class RepoSubscription {
await services.indexing(db).tombstoneActor(msg.did)
}

private async handleCursor(msg: ProcessableMessage) {
private async handleCursor(seq: number) {
const { db } = this.ctx
await db.transaction(async (tx) => {
await this.setState(tx, { cursor: msg.seq })
await this.setState(tx, { cursor: seq })
})
}

Expand All @@ -209,7 +226,9 @@ export class RepoSubscription {
.where('service', '=', this.service)
.where('method', '=', METHOD)
.executeTakeFirst()
return sub ? (JSON.parse(sub.state) as State) : { cursor: 0 }
const state = sub ? (JSON.parse(sub.state) as State) : { cursor: 0 }
this.lastCursor = state.cursor
return state
}

async resetState(): Promise<void> {
Expand All @@ -222,6 +241,9 @@ export class RepoSubscription {

private async setState(tx: Database, state: State): Promise<void> {
tx.assertTransaction()
tx.onCommit(() => {
this.lastCursor = state.cursor
})
const res = await tx.db
.updateTable('subscription')
.where('service', '=', this.service)
Expand Down

0 comments on commit bb2848e

Please sign in to comment.