Skip to content

Commit

Permalink
Firehose Identity event (bluesky-social#2208)
Browse files Browse the repository at this point in the history
* add new identity event

* add note for tombstone

* send identity evts

* add emission of identity evts

* emit handle event on activate account & fix subscribeRepos

* add time to evt

* update indexer for tests

* rm logs
  • Loading branch information
dholms authored Feb 21, 2024
1 parent 30b05a7 commit 1a12c7e
Show file tree
Hide file tree
Showing 23 changed files with 306 additions and 41 deletions.
25 changes: 21 additions & 4 deletions lexicons/com/atproto/sync/subscribeRepos.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
"message": {
"schema": {
"type": "union",
"refs": ["#commit", "#handle", "#migrate", "#tombstone", "#info"]
"refs": [
"#commit",
"#identity",
"#handle",
"#migrate",
"#tombstone",
"#info"
]
}
},
"errors": [
Expand Down Expand Up @@ -104,9 +111,19 @@
}
}
},
"identity": {
"type": "object",
"description": "Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache.",
"required": ["seq", "did", "time"],
"properties": {
"seq": { "type": "integer" },
"did": { "type": "string", "format": "did" },
"time": { "type": "string", "format": "datetime" }
}
},
"handle": {
"type": "object",
"description": "Represents an update of the account's handle, or transition to/from invalid state.",
"description": "Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity.",
"required": ["seq", "did", "handle", "time"],
"properties": {
"seq": { "type": "integer" },
Expand All @@ -117,7 +134,7 @@
},
"migrate": {
"type": "object",
"description": "Represents an account moving from one PDS instance to another. NOTE: not implemented; full account migration may introduce a new message instead.",
"description": "Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead",
"required": ["seq", "did", "migrateTo", "time"],
"nullable": ["migrateTo"],
"properties": {
Expand All @@ -129,7 +146,7 @@
},
"tombstone": {
"type": "object",
"description": "Indicates that an account has been deleted.",
"description": "Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event",
"required": ["seq", "did", "time"],
"properties": {
"seq": { "type": "integer" },
Expand Down
30 changes: 26 additions & 4 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4579,6 +4579,7 @@ export const schemaDict = {
type: 'union',
refs: [
'lex:com.atproto.sync.subscribeRepos#commit',
'lex:com.atproto.sync.subscribeRepos#identity',
'lex:com.atproto.sync.subscribeRepos#handle',
'lex:com.atproto.sync.subscribeRepos#migrate',
'lex:com.atproto.sync.subscribeRepos#tombstone',
Expand Down Expand Up @@ -4685,10 +4686,29 @@ export const schemaDict = {
},
},
},
identity: {
type: 'object',
description:
"Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache.",
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
handle: {
type: 'object',
description:
"Represents an update of the account's handle, or transition to/from invalid state.",
"Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity.",
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
Expand All @@ -4711,7 +4731,7 @@ export const schemaDict = {
migrate: {
type: 'object',
description:
'Represents an account moving from one PDS instance to another. NOTE: not implemented; full account migration may introduce a new message instead.',
'Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead',
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
Expand All @@ -4733,7 +4753,8 @@ export const schemaDict = {
},
tombstone: {
type: 'object',
description: 'Indicates that an account has been deleted.',
description:
'Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event',
required: ['seq', 'did', 'time'],
properties: {
seq: {
Expand Down Expand Up @@ -7087,7 +7108,8 @@ export const schemaDict = {
},
tags: {
type: 'array',
description: 'Additional non-inline tags describing this post.',
description:
'Additional hashtags, in addition to any included in post text and facets.',
maxLength: 8,
items: {
type: 'string',
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/client/types/app/bsky/feed/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface Record {
labels?:
| ComAtprotoLabelDefs.SelfLabels
| { $type: string; [k: string]: unknown }
/** Additional non-inline tags describing this post. */
/** Additional hashtags, in addition to any included in post text and facets. */
tags?: string[]
/** Client-declared timestamp when this post was originally created. */
createdAt: string
Expand Down
26 changes: 23 additions & 3 deletions packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,27 @@ export function validateCommit(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#commit', v)
}

/** Represents an update of the account's handle, or transition to/from invalid state. */
/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */
export interface Identity {
seq: number
did: string
time: string
[k: string]: unknown
}

export function isIdentity(v: unknown): v is Identity {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeRepos#identity'
)
}

export function validateIdentity(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#identity', v)
}

/** Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity. */
export interface Handle {
seq: number
did: string
Expand All @@ -67,7 +87,7 @@ export function validateHandle(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#handle', v)
}

/** Represents an account moving from one PDS instance to another. NOTE: not implemented; full account migration may introduce a new message instead. */
/** Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead */
export interface Migrate {
seq: number
did: string
Expand All @@ -88,7 +108,7 @@ export function validateMigrate(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#migrate', v)
}

/** Indicates that an account has been deleted. */
/** Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event */
export interface Tombstone {
seq: number
did: string
Expand Down
6 changes: 6 additions & 0 deletions packages/bsky/src/indexer/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ export class IndexerSubscription {
await this.handleCommit(msg)
} else if (message.isHandle(msg)) {
await this.handleUpdateHandle(msg)
} else if (message.isIdentity(msg)) {
await this.handleIdentityEvt(msg)
} else if (message.isTombstone(msg)) {
await this.handleTombstone(msg)
} else if (message.isMigrate(msg)) {
Expand Down Expand Up @@ -244,6 +246,10 @@ export class IndexerSubscription {
await this.indexingSvc.indexHandle(msg.did, msg.time, true)
}

private async handleIdentityEvt(msg: message.Identity) {
await this.indexingSvc.indexHandle(msg.did, msg.time, true)
}

private async handleTombstone(msg: message.Tombstone) {
await this.indexingSvc.tombstoneActor(msg.did)
}
Expand Down
2 changes: 2 additions & 0 deletions packages/bsky/src/ingester/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ function getMessageDetails(msg: Message):
return { seq: msg.seq, repo: msg.repo, message: msg }
} else if (message.isHandle(msg)) {
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isIdentity(msg)) {
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isMigrate(msg)) {
return { seq: msg.seq, repo: msg.did, message: msg }
} else if (message.isTombstone(msg)) {
Expand Down
30 changes: 26 additions & 4 deletions packages/bsky/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4579,6 +4579,7 @@ export const schemaDict = {
type: 'union',
refs: [
'lex:com.atproto.sync.subscribeRepos#commit',
'lex:com.atproto.sync.subscribeRepos#identity',
'lex:com.atproto.sync.subscribeRepos#handle',
'lex:com.atproto.sync.subscribeRepos#migrate',
'lex:com.atproto.sync.subscribeRepos#tombstone',
Expand Down Expand Up @@ -4685,10 +4686,29 @@ export const schemaDict = {
},
},
},
identity: {
type: 'object',
description:
"Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache.",
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
handle: {
type: 'object',
description:
"Represents an update of the account's handle, or transition to/from invalid state.",
"Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity.",
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
Expand All @@ -4711,7 +4731,7 @@ export const schemaDict = {
migrate: {
type: 'object',
description:
'Represents an account moving from one PDS instance to another. NOTE: not implemented; full account migration may introduce a new message instead.',
'Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead',
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
Expand All @@ -4733,7 +4753,8 @@ export const schemaDict = {
},
tombstone: {
type: 'object',
description: 'Indicates that an account has been deleted.',
description:
'Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event',
required: ['seq', 'did', 'time'],
properties: {
seq: {
Expand Down Expand Up @@ -7087,7 +7108,8 @@ export const schemaDict = {
},
tags: {
type: 'array',
description: 'Additional non-inline tags describing this post.',
description:
'Additional hashtags, in addition to any included in post text and facets.',
maxLength: 8,
items: {
type: 'string',
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/src/lexicon/types/app/bsky/feed/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface Record {
labels?:
| ComAtprotoLabelDefs.SelfLabels
| { $type: string; [k: string]: unknown }
/** Additional non-inline tags describing this post. */
/** Additional hashtags, in addition to any included in post text and facets. */
tags?: string[]
/** Client-declared timestamp when this post was originally created. */
createdAt: string
Expand Down
27 changes: 24 additions & 3 deletions packages/bsky/src/lexicon/types/com/atproto/sync/subscribeRepos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface QueryParams {

export type OutputSchema =
| Commit
| Identity
| Handle
| Migrate
| Tombstone
Expand Down Expand Up @@ -71,7 +72,27 @@ export function validateCommit(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#commit', v)
}

/** Represents an update of the account's handle, or transition to/from invalid state. */
/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */
export interface Identity {
seq: number
did: string
time: string
[k: string]: unknown
}

export function isIdentity(v: unknown): v is Identity {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeRepos#identity'
)
}

export function validateIdentity(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#identity', v)
}

/** Represents an update of the account's handle, or transition to/from invalid state. NOTE: Will be deprecated in favor of #identity. */
export interface Handle {
seq: number
did: string
Expand All @@ -92,7 +113,7 @@ export function validateHandle(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#handle', v)
}

/** Represents an account moving from one PDS instance to another. NOTE: not implemented; full account migration may introduce a new message instead. */
/** Represents an account moving from one PDS instance to another. NOTE: not implemented; account migration uses #identity instead */
export interface Migrate {
seq: number
did: string
Expand All @@ -113,7 +134,7 @@ export function validateMigrate(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeRepos#migrate', v)
}

/** Indicates that an account has been deleted. */
/** Indicates that an account has been deleted. NOTE: may be deprecated in favor of #identity or a future #account event */
export interface Tombstone {
seq: number
did: string
Expand Down
Loading

0 comments on commit 1a12c7e

Please sign in to comment.