Skip to content

Commit

Permalink
PDS transactions (bluesky-social#245)
Browse files Browse the repository at this point in the history
* Run getProfile in transaction, setup/test transaction helpers

* Test tidy

* Transactionalize invite code usage in acct creation, add transaction assertions

* Transactionalize account creation

* crud txns

* test txns for account

* cleanup

* lock table

* fixed up crud operations

* fixed updateProfile route

* test data races

* cleanup

* Transactional invite code use, username/email creation (bluesky-social#253)

* Remove table lock for invite code use, surface username race

* Add and use db constraint for unique account username/email

* Update packages/server/src/db/index.ts

Co-authored-by: devin ivy <[email protected]>

* Update packages/server/src/db/records/badgeAccept.ts

Co-authored-by: devin ivy <[email protected]>

* Update packages/server/src/db/util.ts

Co-authored-by: devin ivy <[email protected]>

* pr fixups

* remove old todos

* test for updateProfile races

* logging on repo update errors

* give feedback on misuse of forUpdate

Co-authored-by: dholms <[email protected]>
  • Loading branch information
devinivy and dholms authored Oct 24, 2022
1 parent a8368d1 commit e3c4efd
Show file tree
Hide file tree
Showing 24 changed files with 765 additions and 361 deletions.
10 changes: 1 addition & 9 deletions packages/repo/src/blockstore/ipld-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,13 @@ import { BlockReader } from '@ipld/car/api'
import CidSet from '../cid-set'
import { CarReader } from '@ipld/car/reader'

type AllowedIpldRecordVal = string | number | CID | CID[] | Uint8Array | null

export type AllowedIpldVal =
| AllowedIpldRecordVal
| Record<string, AllowedIpldRecordVal>

export abstract class IpldStore {
abstract has(cid: CID): Promise<boolean>
abstract getBytes(cid: CID): Promise<Uint8Array>
abstract putBytes(cid: CID, bytes: Uint8Array): Promise<void>
abstract destroy(): Promise<void>

async put(
value: Record<string, AllowedIpldVal> | AllowedIpldVal,
): Promise<CID> {
async put(value: unknown): Promise<CID> {
const block = await Block.encode({
value,
codec: blockCodec,
Expand Down
1 change: 1 addition & 0 deletions packages/repo/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './blockstore'
export * from './repo'
export * from './structure'
export * from './mst'
export * from './types'
export * from './verify'
Expand Down
2 changes: 1 addition & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"postbuild": "tsc --build tsconfig.build.json",
"start": "node dist/bin.js",
"test": "jest",
"test:pg": "../pg/with-test-db.sh jest",
"test:pg": "../pg/with-test-db.sh yarn test",
"test:log": "cat test.log | pino-pretty",
"test:updateSnapshot": "jest --updateSnapshot",
"prettier": "prettier --check src/",
Expand Down
202 changes: 112 additions & 90 deletions packages/server/src/api/app/bsky/updateProfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,117 +3,139 @@ import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import * as locals from '../../../locals'
import * as schema from '../../../lexicon/schemas'
import { AtUri } from '@atproto/uri'
import { RepoStructure } from '@atproto/repo'
import { CID } from 'multiformats/cid'
import * as Profile from '../../../lexicon/types/app/bsky/profile'

const profileNsid = schema.ids.AppBskyProfile

export default function (server: Server) {
server.app.bsky.updateProfile(async (_params, input, req, res) => {
const { auth, db } = locals.get(res)
const { auth, db, blockstore, logger } = locals.get(res)

const requester = auth.getUserDid(req)
if (!requester) {
throw new AuthRequiredError()
}

const repo = await locals.loadRepo(res, requester)
if (!repo) {
throw new InvalidRequestError(
`${requester} is not a registered repo on this server`,
)
}

const current = await repo.getCollection(profileNsid).getRecord('self')
if (!db.records.profile.matchesSchema(current)) {
// @TODO need a way to get a profile out of a broken state
throw new InvalidRequestError('could not parse current profile')
}

const updated = {
...current,
displayName: input.body.displayName || current.displayName,
description: input.body.description || current.description,
pinnedBadges: input.body.pinnedBadges || current.pinnedBadges,
}
if (!db.records.profile.matchesSchema(updated)) {
throw new InvalidRequestError(
'requested updates do not produce a valid profile doc',
)
}

const authStore = await locals.getAuthstore(res, requester)
const uri = new AtUri(`${requester}/${profileNsid}/self`)

const currBadges = await db.db
.selectFrom('app_bsky_profile_badge')
.selectAll()
.where('profileUri', '=', uri.toString())
.execute()

const updatedBadges = updated.pinnedBadges || []
const toDelete = currBadges
.filter(
(row) => !updatedBadges.some((badge) => badge.uri === row.badgeUri),
)
.map((row) => row.badgeUri)
const toAdd = updatedBadges
.filter((badge) => !currBadges.some((row) => badge.uri === row.badgeUri))
.map((badge) => ({
profileUri: uri.toString(),
badgeUri: badge.uri,
badgeCid: badge.cid,
}))

const newCid = await repo
.getCollection(profileNsid)
.updateRecord('self', updated)

const recordQuery = db.db
.updateTable('record')
.set({
raw: JSON.stringify(updated),
cid: newCid.toString(),
indexedAt: new Date().toISOString(),
})
.where('uri', '=', uri.toString())
.execute()

const profileQuery = db.db
.updateTable('app_bsky_profile')
.set({
cid: newCid.toString(),
displayName: updated.displayName,
description: updated.description,
indexedAt: new Date().toISOString(),
})
.where('uri', '=', uri.toString())
.execute()

const delBadgesQuery = toDelete.length
? db.db
.deleteFrom('app_bsky_profile_badge')
const { profileCid, updated } = await db.transaction(
async (txnDb): Promise<{ profileCid: CID; updated: Profile.Record }> => {
const currRoot = await txnDb.getRepoRoot(requester, true)
if (!currRoot) {
throw new InvalidRequestError(
`${requester} is not a registered repo on this server`,
)
}
const repo = await RepoStructure.load(blockstore, currRoot)
const current = await repo.getRecord(profileNsid, 'self')
if (!db.records.profile.matchesSchema(current)) {
// @TODO need a way to get a profile out of a broken state
throw new InvalidRequestError('could not parse current profile')
}

const updated = {
...current,
displayName: input.body.displayName || current.displayName,
description: input.body.description || current.description,
pinnedBadges: input.body.pinnedBadges || current.pinnedBadges,
}
if (!db.records.profile.matchesSchema(updated)) {
throw new InvalidRequestError(
'requested updates do not produce a valid profile doc',
)
}

const currBadges = await txnDb.db
.selectFrom('app_bsky_profile_badge')
.selectAll()
.where('profileUri', '=', uri.toString())
.where('badgeUri', 'in', toDelete)
.execute()
: null

const addBadgesQuery = db.db
.insertInto('app_bsky_profile_badge')
.values(toAdd)
.execute()
const updatedBadges = updated.pinnedBadges || []
const toDelete = currBadges
.filter(
(row) => !updatedBadges.some((badge) => badge.uri === row.badgeUri),
)
.map((row) => row.badgeUri)
const toAdd = updatedBadges
.filter(
(badge) => !currBadges.some((row) => badge.uri === row.badgeUri),
)
.map((badge) => ({
profileUri: uri.toString(),
badgeUri: badge.uri,
badgeCid: badge.cid,
}))

const profileCid = await repo.blockstore.put(updated)

// Update profile record
await txnDb.db
.updateTable('record')
.set({
raw: JSON.stringify(updated),
cid: profileCid.toString(),
indexedAt: new Date().toISOString(),
})
.where('uri', '=', uri.toString())
.execute()

// Update profile app index
await txnDb.db
.updateTable('app_bsky_profile')
.set({
cid: profileCid.toString(),
displayName: updated.displayName,
description: updated.description,
indexedAt: new Date().toISOString(),
})
.where('uri', '=', uri.toString())
.execute()

// @TODO transactionalize
await Promise.all([
recordQuery,
profileQuery,
delBadgesQuery,
addBadgesQuery,
db.updateRepoRoot(requester, repo.cid),
])
// Remove old badges
if (toDelete.length > 0) {
await txnDb.db
.deleteFrom('app_bsky_profile_badge')
.where('profileUri', '=', uri.toString())
.where('badgeUri', 'in', toDelete)
.execute()
}

// Add new badges
if (toAdd.length > 0) {
await txnDb.db
.insertInto('app_bsky_profile_badge')
.values(toAdd)
.execute()
}

await repo
.stageUpdate({
action: 'update',
collection: profileNsid,
rkey: 'self',
cid: profileCid,
})
.createCommit(authStore, async (prev, curr) => {
const success = await txnDb.updateRepoRoot(requester, curr, prev)
if (!success) {
logger.error({ did: requester, curr, prev }, 'repo update failed')
throw new Error('Could not update repo root')
}
return null
})

return { profileCid, updated }
},
)

return {
encoding: 'application/json',
body: {
uri: uri.toString(),
cid: newCid.toString(),
cid: profileCid.toString(),
record: updated,
},
}
Expand Down
Loading

0 comments on commit e3c4efd

Please sign in to comment.