Skip to content

Commit

Permalink
Handle revalidation (bluesky-social#1474)
Browse files Browse the repository at this point in the history
* easier hanlde revalidation

* remove duplicate line

* backup handle nameservers on appview

* fix tests & add a couple
  • Loading branch information
dholms authored Aug 16, 2023
1 parent 34b8413 commit 4241ee1
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 47 deletions.
11 changes: 10 additions & 1 deletion packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface ServerConfigValues {
didPlcUrl: string
didCacheStaleTTL: number
didCacheMaxTTL: number
handleResolveNameservers?: string[]
imgUriEndpoint?: string
blobCacheLocation?: string
labelerDid: string
Expand Down Expand Up @@ -45,6 +46,9 @@ export class ServerConfig {
process.env.DID_CACHE_MAX_TTL,
DAY,
)
const handleResolveNameservers = process.env.HANDLE_RESOLVE_NAMESERVERS
? process.env.HANDLE_RESOLVE_NAMESERVERS.split(',')
: []
const imgUriEndpoint = process.env.IMG_URI_ENDPOINT
const blobCacheLocation = process.env.BLOB_CACHE_LOC
const dbPrimaryPostgresUrl =
Expand Down Expand Up @@ -90,6 +94,7 @@ export class ServerConfig {
didPlcUrl,
didCacheStaleTTL,
didCacheMaxTTL,
handleResolveNameservers,
imgUriEndpoint,
blobCacheLocation,
labelerDid,
Expand Down Expand Up @@ -159,7 +164,11 @@ export class ServerConfig {
}

get didCacheMaxTTL() {
return this.cfg.didCacheStaleTTL
return this.cfg.didCacheMaxTTL
}

get handleResolveNameservers() {
return this.cfg.handleResolveNameservers
}

get didPlcUrl() {
Expand Down
6 changes: 5 additions & 1 deletion packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ export class BskyAppView {
config.didCacheStaleTTL,
config.didCacheMaxTTL,
)
const idResolver = new IdResolver({ plcUrl: config.didPlcUrl, didCache })
const idResolver = new IdResolver({
plcUrl: config.didPlcUrl,
didCache,
backupNameservers: config.handleResolveNameservers,
})

const imgUriBuilder = new ImageUriBuilder(
config.imgUriEndpoint || `${config.publicUrl}/img`,
Expand Down
9 changes: 9 additions & 0 deletions packages/bsky/src/indexer/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface IndexerConfigValues {
didPlcUrl: string
didCacheStaleTTL: number
didCacheMaxTTL: number
handleResolveNameservers?: string[]
labelerDid: string
hiveApiKey?: string
labelerKeywords: Record<string, string>
Expand Down Expand Up @@ -54,6 +55,9 @@ export class IndexerConfig {
process.env.DID_CACHE_MAX_TTL,
DAY,
)
const handleResolveNameservers = process.env.HANDLE_RESOLVE_NAMESERVERS
? process.env.HANDLE_RESOLVE_NAMESERVERS.split(',')
: []
const labelerDid = process.env.LABELER_DID || 'did:example:labeler'
const labelerPushUrl =
overrides?.labelerPushUrl || process.env.LABELER_PUSH_URL || undefined
Expand Down Expand Up @@ -86,6 +90,7 @@ export class IndexerConfig {
didPlcUrl,
didCacheStaleTTL,
didCacheMaxTTL,
handleResolveNameservers,
labelerDid,
labelerPushUrl,
hiveApiKey,
Expand Down Expand Up @@ -139,6 +144,10 @@ export class IndexerConfig {
return this.cfg.didCacheMaxTTL
}

get handleResolveNameservers() {
return this.cfg.handleResolveNameservers
}

get labelerDid() {
return this.cfg.labelerDid
}
Expand Down
6 changes: 5 additions & 1 deletion packages/bsky/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ export class BskyIndexer {
cfg.didCacheStaleTTL,
cfg.didCacheMaxTTL,
)
const idResolver = new IdResolver({ plcUrl: cfg.didPlcUrl, didCache })
const idResolver = new IdResolver({
plcUrl: cfg.didPlcUrl,
didCache,
backupNameservers: cfg.handleResolveNameservers,
})
const backgroundQueue = new BackgroundQueue(db)
let labeler: Labeler
if (cfg.hiveApiKey) {
Expand Down
22 changes: 14 additions & 8 deletions packages/bsky/src/services/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from '@atproto/repo'
import { AtUri } from '@atproto/uri'
import { IdResolver, getPds } from '@atproto/identity'
import { DAY, chunkArray } from '@atproto/common'
import { DAY, HOUR, chunkArray } from '@atproto/common'
import { ValidationError } from '@atproto/lexicon'
import { PrimaryDatabase } from '../../db'
import * as Post from './plugins/post'
Expand All @@ -28,6 +28,7 @@ import { subLogger } from '../../logger'
import { retryHttp } from '../../util/retry'
import { Labeler } from '../../labeler'
import { BackgroundQueue } from '../../background'
import { Actor } from '../../db/tables/actor'

export class IndexingService {
records: {
Expand Down Expand Up @@ -118,13 +119,7 @@ export class IndexingService {
.where('did', '=', did)
.selectAll()
.executeTakeFirst()
const timestampAt = new Date(timestamp)
const lastIndexedAt = actor && new Date(actor.indexedAt)
const needsReindex =
force ||
!lastIndexedAt ||
timestampAt.getTime() - lastIndexedAt.getTime() > DAY
if (!needsReindex) {
if (!force && !needsHandleReindex(actor, timestamp)) {
return
}
const atpData = await this.idResolver.did.resolveAtprotoData(did, true)
Expand Down Expand Up @@ -357,3 +352,14 @@ function* walkContentsWithCids(contents: RepoContentsWithCids) {
}
}
}

const needsHandleReindex = (actor: Actor | undefined, timestamp: string) => {
if (!actor) return true
const timeDiff =
new Date(timestamp).getTime() - new Date(actor.indexedAt).getTime()
// revalidate daily
if (timeDiff > DAY) return true
// revalidate more aggressively for invalidated handles
if (actor.handle === null && timeDiff > HOUR) return true
return false
}
29 changes: 17 additions & 12 deletions packages/pds/src/api/com/atproto/admin/updateAccountHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError(`Account not found: ${did}`)
}

const seqHandleTok = await ctx.db.transaction(async (dbTxn) => {
let tok: HandleSequenceToken
try {
tok = await ctx.services.account(dbTxn).updateHandle(did, handle)
} catch (err) {
if (err instanceof UserAlreadyExistsError) {
throw new InvalidRequestError(`Handle already taken: ${handle}`)
let seqHandleTok: HandleSequenceToken
if (existingAccnt.handle === handle) {
seqHandleTok = { handle, did }
} else {
seqHandleTok = await ctx.db.transaction(async (dbTxn) => {
let tok: HandleSequenceToken
try {
tok = await ctx.services.account(dbTxn).updateHandle(did, handle)
} catch (err) {
if (err instanceof UserAlreadyExistsError) {
throw new InvalidRequestError(`Handle already taken: ${handle}`)
}
throw err
}
throw err
}
await ctx.plcClient.updateHandle(did, ctx.plcRotationKey, handle)
return tok
})
await ctx.plcClient.updateHandle(did, ctx.plcRotationKey, handle)
return tok
})
}

try {
await ctx.db.transaction(async (dbTxn) => {
Expand Down
47 changes: 27 additions & 20 deletions packages/pds/src/api/com/atproto/identity/updateHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,35 @@ export default function (server: Server, ctx: AppContext) {
})

// Pessimistic check to handle spam: also enforced by updateHandle() and the db.
const available = await ctx.services
.account(ctx.db)
.isHandleAvailable(handle)
if (!available) {
throw new InvalidRequestError(`Handle already taken: ${handle}`)
}
const handleDid = await ctx.services.account(ctx.db).getHandleDid(handle)

const seqHandleTok = await ctx.db.transaction(async (dbTxn) => {
let tok: HandleSequenceToken
try {
tok = await ctx.services
.account(dbTxn)
.updateHandle(requester, handle)
} catch (err) {
if (err instanceof UserAlreadyExistsError) {
throw new InvalidRequestError(`Handle already taken: ${handle}`)
}
throw err
let seqHandleTok: HandleSequenceToken
if (handleDid) {
if (handleDid !== requester) {
throw new InvalidRequestError(`Handle already taken: ${handle}`)
}
await ctx.plcClient.updateHandle(requester, ctx.plcRotationKey, handle)
return tok
})
seqHandleTok = { did: requester, handle: handle }
} else {
seqHandleTok = await ctx.db.transaction(async (dbTxn) => {
let tok: HandleSequenceToken
try {
tok = await ctx.services
.account(dbTxn)
.updateHandle(requester, handle)
} catch (err) {
if (err instanceof UserAlreadyExistsError) {
throw new InvalidRequestError(`Handle already taken: ${handle}`)
}
throw err
}
await ctx.plcClient.updateHandle(
requester,
ctx.plcRotationKey,
handle,
)
return tok
})
}

try {
await ctx.db.transaction(async (dbTxn) => {
Expand Down
6 changes: 3 additions & 3 deletions packages/pds/src/services/account/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ export class AccountService {
await sequencer.sequenceEvt(this.db, seqEvt)
}

async isHandleAvailable(handle: string) {
async getHandleDid(handle: string): Promise<string | null> {
// @NOTE see also condition in updateHandle()
const found = await this.db.db
.selectFrom('did_handle')
.where('handle', '=', handle)
.select('handle')
.selectAll()
.executeTakeFirst()
return !found
return found?.did ?? null
}

async updateEmail(did: string, email: string) {
Expand Down
7 changes: 7 additions & 0 deletions packages/pds/tests/handles.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ describe('handles', () => {
await expect(attempt).rejects.toThrow('Handle already taken: bob.test')
})

it('handle updates are idempotent', async () => {
await agent.api.com.atproto.identity.updateHandle(
{ handle: 'Bob.test' },
{ headers: sc.getHeaders(bob), encoding: 'application/json' },
)
})

it('if handle update fails, it does not update their did document', async () => {
const data = await idResolver.did.resolveAtprotoData(alice)
expect(data.handle).toBe(newHandle)
Expand Down
18 changes: 17 additions & 1 deletion packages/pds/tests/sync/subscribe-repos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ describe('repo subscribe repos', () => {
it('syncs handle changes', async () => {
await sc.updateHandle(alice, 'alice2.test')
await sc.updateHandle(bob, 'bob2.test')
await sc.updateHandle(bob, 'bob2.test') // idempotent update re-sends

const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
Expand All @@ -308,11 +309,26 @@ describe('repo subscribe repos', () => {
ws.terminate()

await verifyCommitEvents(evts)
const handleEvts = getHandleEvts(evts.slice(-2))
const handleEvts = getHandleEvts(evts.slice(-3))
verifyHandleEvent(handleEvts[0], alice, 'alice2.test')
verifyHandleEvent(handleEvts[1], bob, 'bob2.test')
})

it('resends handle events on idempotent updates', async () => {
const update = sc.updateHandle(bob, 'bob2.test')

const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos`,
)

const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen, update)
ws.terminate()

const handleEvts = getHandleEvts(evts.slice(-1))
verifyHandleEvent(handleEvts[0], bob, 'bob2.test')
})

it('syncs tombstones', async () => {
const baddie1 = (
await sc.createAccount('baddie1.test', {
Expand Down

0 comments on commit 4241ee1

Please sign in to comment.