Skip to content

Commit

Permalink
Account event (bluesky-social#2269)
Browse files Browse the repository at this point in the history
* subscribe repos lex: new #account event type

* lex: getAccountStatus endpoint

* lex: add account status errors to sync methods

* tweak type of token union

* fix getAccountStatus parameter name

* codegen

* Account -> Repo

* re-codegen

* update errors in sync methods

* add getRepoStatus route

* add account events to sequencer

* emit account evts

* fix test + small bugfixin

* handle evt on bsky side

* codegen

* loggable message

* schema tweaks

* build errors & tidy

* tidy account deactivation tests

* more subscribe repos tests

* identity evt tests + tidy

* return optional did doc on identity evts

* update identity evt

* update impl for identity evt handles

* add description to handle field

* status on listRepos & notate deprecated firehose events

* return status on listRepos

* pr feedback

* tests

* fix account deletion test

* fix more tests

* fix type error in repo.test

---------

Co-authored-by: bryan newbold <[email protected]>
  • Loading branch information
dholms and bnewbold authored May 30, 2024
1 parent ee0e635 commit 5643fa4
Show file tree
Hide file tree
Showing 40 changed files with 653 additions and 172 deletions.
8 changes: 8 additions & 0 deletions packages/bsky/src/data-plane/server/subscription/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ export class RepoSubscription {
await this.handleUpdateHandle(msg)
} else if (message.isIdentity(msg)) {
await this.handleIdentityEvt(msg)
} else if (message.isAccount(msg)) {
await this.handleAccountEvt(msg)
} else if (message.isTombstone(msg)) {
await this.handleTombstone(msg)
} else if (message.isMigrate(msg)) {
Expand Down Expand Up @@ -197,6 +199,10 @@ export class RepoSubscription {
await this.indexingSvc.indexHandle(msg.did, msg.time, true)
}

private async handleAccountEvt(_msg: message.Account) {
// no-op for now
}

private async handleTombstone(msg: message.Tombstone) {
await this.indexingSvc.tombstoneActor(msg.did)
}
Expand Down Expand Up @@ -269,6 +275,8 @@ function getMessageDetails(msg: Message):
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isIdentity(msg)) {
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isAccount(msg)) {
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isMigrate(msg)) {
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isTombstone(msg)) {
Expand Down
2 changes: 2 additions & 0 deletions packages/bsky/src/data-plane/server/subscription/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ export function loggableMessage(msg: RepoMessage) {
return msg
} else if (message.isIdentity(msg)) {
return msg
} else if (message.isAccount(msg)) {
return msg
} else if (message.isMigrate(msg)) {
return msg
} else if (message.isTombstone(msg)) {
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/tests/data-plane/subscription/repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ describe('sync', () => {
const evt = cborDecode(seqEvt.event) as sequencer.CommitEvt
evt.blocks = new Uint8Array() // bad blocks
seqEvt.event = cborEncode(evt)
await network.pds.ctx.sequencer.sequenceEvt(seqEvt)
return await network.pds.ctx.sequencer.sequenceEvt(seqEvt)
}
// create account and index the initial commit event
await sc.createAccount('jack', {
Expand Down
8 changes: 8 additions & 0 deletions packages/pds/src/account-manager/helpers/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ export type AvailabilityFlags = {
includeDeactivated?: boolean
}

export enum AccountStatus {
Active = 'active',
Takendown = 'takendown',
Suspended = 'suspended',
Deleted = 'deleted',
Deactivated = 'deactivated',
}

const selectAccountQB = (db: AccountDb, flags?: AvailabilityFlags) => {
const { includeTakenDown = false, includeDeactivated = false } = flags ?? {}
const { ref } = db.db.dynamic
Expand Down
25 changes: 19 additions & 6 deletions packages/pds/src/account-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { CID } from 'multiformats/cid'
import { AccountDb, EmailTokenPurpose, getDb, getMigrator } from './db'
import * as scrypt from './helpers/scrypt'
import * as account from './helpers/account'
import { AccountStatus } from './helpers/account'
import { ActorAccount } from './helpers/account'
import * as repo from './helpers/repo'
import * as auth from './helpers/auth'
Expand All @@ -13,6 +14,8 @@ import * as emailToken from './helpers/email-token'
import { AuthScope } from '../auth-verifier'
import { StatusAttr } from '../lexicon/types/com/atproto/admin/defs'

export { AccountStatus } from './helpers/account'

export class AccountManager {
db: AccountDb

Expand Down Expand Up @@ -51,12 +54,6 @@ export class AccountManager {
return account.getAccountByEmail(this.db, email, flags)
}

// Repo exists and is not taken-down
async isRepoAvailable(did: string) {
const got = await this.getAccount(did)
return !!got
}

async isAccountActivated(did: string): Promise<boolean> {
const account = await this.getAccount(did, { includeDeactivated: true })
if (!account) return false
Expand All @@ -71,6 +68,22 @@ export class AccountManager {
return got?.did ?? null
}

async getAccountStatus(handleOrDid: string): Promise<AccountStatus> {
const got = await this.getAccount(handleOrDid, {
includeDeactivated: true,
includeTakenDown: true,
})
if (!got) {
return AccountStatus.Deleted
} else if (got.takedownRef) {
return AccountStatus.Takendown
} else if (got.deactivatedAt) {
return AccountStatus.Deactivated
} else {
return AccountStatus.Active
}
}

async createAccount(opts: {
did: string
handle: string
Expand Down
9 changes: 7 additions & 2 deletions packages/pds/src/api/com/atproto/admin/deleteAccount.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { AccountStatus } from '../../../../account-manager'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.admin.deleteAccount({
Expand All @@ -8,8 +9,12 @@ export default function (server: Server, ctx: AppContext) {
const { did } = input.body
await ctx.actorStore.destroy(did)
await ctx.accountManager.deleteAccount(did)
await ctx.sequencer.sequenceTombstone(did)
await ctx.sequencer.deleteAllForUser(did)
const tombstoneSeq = await ctx.sequencer.sequenceTombstone(did)
const accountSeq = await ctx.sequencer.sequenceAccountEvt(
did,
AccountStatus.Deleted,
)
await ctx.sequencer.deleteAllForUser(did, [accountSeq, tombstoneSeq])
},
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export default function (server: Server, ctx: AppContext) {

try {
await ctx.sequencer.sequenceHandleUpdate(did, handle)
await ctx.sequencer.sequenceIdentityEvt(did)
await ctx.sequencer.sequenceIdentityEvt(did, handle)
} catch (err) {
httpLogger.error(
{ err, did, handle },
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/api/com/atproto/admin/updateSubjectStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export default function (server: Server, ctx: AppContext) {
if (takedown) {
if (isRepoRef(subject)) {
await ctx.accountManager.takedownAccount(subject.did, takedown)
const status = await ctx.accountManager.getAccountStatus(subject.did)
await ctx.sequencer.sequenceAccountEvt(subject.did, status)
} else if (isStrongRef(subject)) {
const uri = new AtUri(subject.uri)
await ctx.actorStore.transact(uri.hostname, (store) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export default function (server: Server, ctx: AppContext) {

await ctx.plcClient.sendOperation(requester, op)
await ctx.sequencer.sequenceIdentityEvt(requester)

try {
await ctx.idResolver.did.resolve(requester, true)
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion packages/pds/src/api/com/atproto/identity/updateHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export default function (server: Server, ctx: AppContext) {

try {
await ctx.sequencer.sequenceHandleUpdate(requester, handle)
await ctx.sequencer.sequenceIdentityEvt(requester)
await ctx.sequencer.sequenceIdentityEvt(requester, handle)
} catch (err) {
httpLogger.error(
{ err, did: requester, handle },
Expand Down
6 changes: 2 additions & 4 deletions packages/pds/src/api/com/atproto/repo/describeRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ import * as id from '@atproto/identity'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { INVALID_HANDLE } from '@atproto/syntax'
import { assertRepoAvailability } from '../sync/util'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.describeRepo(async ({ params }) => {
const { repo } = params

const account = await ctx.accountManager.getAccount(repo)
if (account === null) {
throw new InvalidRequestError(`Could not find user: ${repo}`)
}
const account = await assertRepoAvailability(ctx, repo, false)

let didDoc
try {
Expand Down
3 changes: 2 additions & 1 deletion packages/pds/src/api/com/atproto/server/activateAccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ export default function (server: Server, ctx: AppContext) {
})

// @NOTE: we're over-emitting for now for backwards compatibility, can reduce this in the future
await ctx.sequencer.sequenceIdentityEvt(requester)
const status = await ctx.accountManager.getAccountStatus(requester)
await ctx.sequencer.sequenceAccountEvt(requester, status)
await ctx.sequencer.sequenceHandleUpdate(
requester,
account.handle ?? INVALID_HANDLE,
Expand Down
9 changes: 6 additions & 3 deletions packages/pds/src/api/com/atproto/server/createAccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {
import { Server } from '../../../../lexicon'
import { InputSchema as CreateAccountInput } from '../../../../lexicon/types/com/atproto/server/createAccount'
import AppContext from '../../../../context'
import { didDocForSession } from './util'
import { safeResolveDidDoc } from './util'
import { AccountStatus } from '../../../../account-manager'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.server.createAccount({
Expand Down Expand Up @@ -56,6 +57,8 @@ export default function (server: Server, ctx: AppContext) {
}
}

didDoc = await safeResolveDidDoc(ctx, did, true)

creds = await ctx.accountManager.createAccount({
did,
handle,
Expand All @@ -68,11 +71,11 @@ export default function (server: Server, ctx: AppContext) {
})

if (!deactivated) {
await ctx.sequencer.sequenceIdentityEvt(did, handle)
await ctx.sequencer.sequenceAccountEvt(did, AccountStatus.Active)
await ctx.sequencer.sequenceCommit(did, commit, [])
await ctx.sequencer.sequenceIdentityEvt(did)
}
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
didDoc = await didDocForSession(ctx, did, true)
await ctx.actorStore.clearReservedKeypair(signingKey.did(), did)
} catch (err) {
// this will only be reached if the actor store _did not_ exist before
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/api/com/atproto/server/deactivateAccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export default function (server: Server, ctx: AppContext) {
requester,
input.body.deleteAfter ?? null,
)
const status = await ctx.accountManager.getAccountStatus(requester)
await ctx.sequencer.sequenceAccountEvt(requester, status)
},
})
}
10 changes: 7 additions & 3 deletions packages/pds/src/api/com/atproto/server/deleteAccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { authPassthru } from '../../../proxy'
import { AccountStatus } from '../../../../account-manager'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.server.deleteAccount({
Expand Down Expand Up @@ -44,9 +45,12 @@ export default function (server: Server, ctx: AppContext) {
)
await ctx.actorStore.destroy(did)
await ctx.accountManager.deleteAccount(did)
await ctx.sequencer.sequenceIdentityEvt(did)
await ctx.sequencer.sequenceTombstone(did)
await ctx.sequencer.deleteAllForUser(did)
const accountSeq = await ctx.sequencer.sequenceAccountEvt(
did,
AccountStatus.Deleted,
)
const tombstoneSeq = await ctx.sequencer.sequenceTombstone(did)
await ctx.sequencer.deleteAllForUser(did, [accountSeq, tombstoneSeq])
},
})
}
17 changes: 12 additions & 5 deletions packages/pds/src/api/com/atproto/server/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as crypto from '@atproto/crypto'
import { DidDocument } from '@atproto/identity'
import { ServerConfig } from '../../../../config'
import AppContext from '../../../../context'
import { dbLogger } from '../../../../logger'
import { httpLogger } from '../../../../logger'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { getPdsEndpoint, getSigningDidKey } from '@atproto/common'

Expand All @@ -28,21 +28,28 @@ export const getRandomToken = () => {
return token.slice(0, 5) + '-' + token.slice(5, 10)
}

// @TODO once supporting multiple pdses, validate pds in did doc based on allow-list.
export const didDocForSession = async (
export const safeResolveDidDoc = async (
ctx: AppContext,
did: string,
forceRefresh?: boolean,
): Promise<DidDocument | undefined> => {
if (!ctx.cfg.identity.enableDidDocWithSession) return
try {
const didDoc = await ctx.idResolver.did.resolve(did, forceRefresh)
return didDoc ?? undefined
} catch (err) {
dbLogger.warn({ err, did }, 'failed to resolve did doc')
httpLogger.warn({ err, did }, 'failed to resolve did doc')
}
}

export const didDocForSession = async (
ctx: AppContext,
did: string,
forceRefresh?: boolean,
): Promise<DidDocument | undefined> => {
if (!ctx.cfg.identity.enableDidDocWithSession) return
return safeResolveDidDoc(ctx, did, forceRefresh)
}

export const isValidDidDocForService = async (
ctx: AppContext,
did: string,
Expand Down
14 changes: 6 additions & 8 deletions packages/pds/src/api/com/atproto/sync/deprecated/getCheckout.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../../lexicon'
import AppContext from '../../../../../context'
import { getCarStream } from '../getRepo'
import { assertRepoAvailability } from '../util'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getCheckout({
auth: ctx.authVerifier.optionalAccessOrAdminToken,
handler: async ({ params, auth }) => {
const { did } = params
// takedown check for anyone other than an admin or the user
if (!ctx.authVerifier.isUserOrAdmin(auth, did)) {
const available = await ctx.accountManager.isRepoAvailable(did)
if (!available) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
}
await assertRepoAvailability(
ctx,
did,
ctx.authVerifier.isUserOrAdmin(auth, did),
)

const carStream = await getCarStream(ctx, did)

Expand Down
17 changes: 7 additions & 10 deletions packages/pds/src/api/com/atproto/sync/deprecated/getHead.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../../lexicon'
import AppContext from '../../../../../context'
import { assertRepoAvailability } from '../util'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getHead({
auth: ctx.authVerifier.optionalAccessOrAdminToken,
handler: async ({ params, auth }) => {
const { did } = params
// takedown check for anyone other than an admin or the user
if (!ctx.authVerifier.isUserOrAdmin(auth, did)) {
const available = await ctx.accountManager.isRepoAvailable(did)
if (!available) {
throw new InvalidRequestError(
`Could not find root for DID: ${did}`,
'HeadNotFound',
)
}
}
await assertRepoAvailability(
ctx,
did,
ctx.authVerifier.isUserOrAdmin(auth, did),
)

const root = await ctx.actorStore.read(did, (store) =>
store.repo.storage.getRoot(),
)
Expand Down
14 changes: 8 additions & 6 deletions packages/pds/src/api/com/atproto/sync/getBlob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { BlobNotFoundError } from '@atproto/repo'
import { assertRepoAvailability } from './util'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getBlob({
auth: ctx.authVerifier.optionalAccessOrAdminToken,
handler: async ({ params, res, auth }) => {
if (!ctx.authVerifier.isUserOrAdmin(auth, params.did)) {
const available = await ctx.accountManager.isRepoAvailable(params.did)
if (!available) {
throw new InvalidRequestError('Blob not found')
}
}
const { did } = params
await assertRepoAvailability(
ctx,
did,
ctx.authVerifier.isUserOrAdmin(auth, did),
)

const cid = CID.parse(params.cid)
const found = await ctx.actorStore.read(params.did, async (store) => {
try {
Expand Down
Loading

0 comments on commit 5643fa4

Please sign in to comment.