Skip to content

Commit

Permalink
Rebuild repo script (bluesky-social#2528)
Browse files Browse the repository at this point in the history
* wip

* first pass on script

* move scrip

* handle sequencing & account manager table

* runner

* fix while loop

* script framework

* build branch

* add prompt

* reorder script

* patch script

* move readline

* dont build branch

* tidy promise
  • Loading branch information
dholms authored May 28, 2024
1 parent d983063 commit ca0ca08
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export { createServer as createLexiconServer } from './lexicon'
export * as sequencer from './sequencer'
export { type PreparedWrite } from './repo'
export * as repoPrepare from './repo/prepare'
export { scripts } from './scripts'

export class PDS {
public ctx: AppContext
Expand Down
5 changes: 5 additions & 0 deletions packages/pds/src/scripts/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { rebuildRepo } from './rebuild-repo'

export const scripts = {
'rebuild-repo': rebuildRepo,
}
143 changes: 143 additions & 0 deletions packages/pds/src/scripts/rebuild-repo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import readline from 'node:readline/promises'
import { CID } from 'multiformats/cid'
import {
BlockMap,
CidSet,
MST,
MemoryBlockstore,
formatDataKey,
signCommit,
} from '@atproto/repo'
import { AtUri } from '@atproto/syntax'
import { TID } from '@atproto/common'
import { ActorStoreTransactor } from '../actor-store'
import AppContext from '../context'

export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
const did = args[0]
if (!did || !did.startsWith('did:')) {
throw new Error('Expected DID as argument')
}

const memoryStore = new MemoryBlockstore()
const rev = TID.nextStr()
const commit = await ctx.actorStore.transact(did, async (store) => {
const [records, existingCids] = await Promise.all([
listAllRecords(store),
listExistingBlocks(store),
])
let mst = await MST.create(memoryStore)
for (const record of records) {
mst = await mst.add(record.path, record.cid)
}
const newBlocks = new BlockMap()
for await (const node of mst.walk()) {
if (node.isTree()) {
const pointer = await node.getPointer()
if (!existingCids.has(pointer)) {
const serialized = await node.serialize()
newBlocks.set(serialized.cid, serialized.bytes)
}
}
}
const mstCids = await mst.allCids()
const toDelete = new CidSet(existingCids.toList()).subtractSet(mstCids)
const newCommit = await signCommit(
{
did,
version: 3,
rev,
prev: null,
data: await mst.getPointer(),
},
store.repo.signingKey,
)
const commitCid = await newBlocks.add(newCommit)

console.log('Record count: ', records.length)
console.log('Existing blocks: ', existingCids.toList().length)
console.log('Deleting blocks:', toDelete.toList().length)
console.log('Adding blocks: ', newBlocks.size)

const shouldContinue = await promptContinue()
if (!shouldContinue) {
throw new Error('Aborted')
}

await store.repo.storage.deleteMany(toDelete.toList())
await store.repo.storage.putMany(newBlocks, rev)
await store.repo.storage.updateRoot(commitCid, rev)
return {
cid: commitCid,
rev,
since: null,
prev: null,
newBlocks,
removedCids: toDelete,
}
})
await ctx.accountManager.updateRepoRoot(did, commit.cid, rev)
await ctx.sequencer.sequenceCommit(did, commit, [])
}

const listExistingBlocks = async (
store: ActorStoreTransactor,
): Promise<CidSet> => {
const cids = new CidSet()
let cursor: string | undefined = ''
while (cursor !== undefined) {
const res = await store.db.db
.selectFrom('repo_block')
.select('cid')
.where('cid', '>', cursor)
.orderBy('cid', 'asc')
.limit(1000)
.execute()
for (const row of res) {
cids.add(CID.parse(row.cid))
}
cursor = res.at(-1)?.cid
}
return cids
}

const listAllRecords = async (
store: ActorStoreTransactor,
): Promise<RecordDescript[]> => {
const records: RecordDescript[] = []
let cursor: string | undefined = ''
while (cursor !== undefined) {
const res = await store.db.db
.selectFrom('record')
.select(['uri', 'cid'])
.where('uri', '>', cursor)
.orderBy('uri', 'asc')
.limit(1000)
.execute()
for (const row of res) {
const parsed = new AtUri(row.uri)
records.push({
uri: row.uri,
path: formatDataKey(parsed.collection, parsed.rkey),
cid: CID.parse(row.cid),
})
}
cursor = res.at(-1)?.uri
}
return records
}

const promptContinue = async (): Promise<boolean> => {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
})
const answer = await rl.question('Continue? y/n ')
return answer === ''
}

type RecordDescript = {
uri: string
path: string
cid: CID
}
26 changes: 26 additions & 0 deletions services/pds/run-script.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* eslint-env node */

'use strict'
const {
envToCfg,
envToSecrets,
readEnv,
AppContext,
scripts,
} = require('@atproto/pds')

const main = async () => {
const env = readEnv()
const cfg = envToCfg(env)
const secrets = envToSecrets(env)
const ctx = await AppContext.fromConfig(cfg, secrets)
const scriptName = process.argv[2]
const script = scripts[scriptName ?? '']
if (!script) {
throw new Error(`could not find script: ${scriptName}`)
}
await script(ctx, process.argv.slice(3))
console.log('DONE')
}

main()

0 comments on commit ca0ca08

Please sign in to comment.