Skip to content

Commit

Permalink
Appview v1 generating and ingesting mute ops w/ bsync (bluesky-social…
Browse files Browse the repository at this point in the history
…#2067)

* build bsync protos in appview, standardize per-package proto gen directory

* configure appview with bsync, allow mute endpoints to use bsync

* import fixes in bsync

* configure appview ingester with bsync, ingest mute ops into db

* test bsync mutes roundtrip w/ appview, setup bsync in dev-env

* build

* ensure to propagate errors when bsyncOnlyMutes is on
  • Loading branch information
devinivy authored Jan 22, 2024
1 parent f069c67 commit d108310
Show file tree
Hide file tree
Showing 32 changed files with 1,274 additions and 32 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-push-bsky-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- appview-v1-sync-mutes
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
Expand Down
12 changes: 12 additions & 0 deletions packages/bsky/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: v1
plugins:
- plugin: es
opt:
- target=ts
- import_extension=.ts
out: src/proto
- plugin: connect-es
opt:
- target=ts
- import_extension=.ts
out: src/proto
9 changes: 8 additions & 1 deletion packages/bsky/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"test": "../dev-infra/with-test-redis-and-db.sh jest",
"test:log": "tail -50 test.log | pino-pretty",
"test:updateSnapshot": "jest --updateSnapshot",
"migration:create": "ts-node ./bin/migration-create.ts"
"migration:create": "ts-node ./bin/migration-create.ts",
"buf:gen": "buf generate ../bsync/proto"
},
"dependencies": {
"@atproto/api": "workspace:^",
Expand All @@ -39,6 +40,9 @@
"@atproto/lexicon": "workspace:^",
"@atproto/repo": "workspace:^",
"@atproto/xrpc-server": "workspace:^",
"@bufbuild/protobuf": "^1.5.0",
"@connectrpc/connect": "^1.1.4",
"@connectrpc/connect-node": "^1.1.4",
"@did-plc/lib": "^0.0.1",
"@isaacs/ttlcache": "^1.4.1",
"compression": "^1.7.4",
Expand All @@ -65,6 +69,9 @@
"@atproto/lex-cli": "workspace:^",
"@atproto/pds": "workspace:^",
"@atproto/xrpc": "workspace:^",
"@bufbuild/buf": "^1.28.1",
"@bufbuild/protoc-gen-es": "^1.5.0",
"@connectrpc/protoc-gen-connect-es": "^1.1.4",
"@did-plc/server": "^0.0.1",
"@types/cors": "^2.8.12",
"@types/express": "^4.17.13",
Expand Down
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/muteActor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import assert from 'node:assert'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.muteActor({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { actor } = input.body
const requester = auth.credentials.iss
const db = ctx.db.getPrimary()
Expand All @@ -18,10 +21,34 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError('Cannot mute oneself')
}

await ctx.services.graph(db).muteActor({
subjectDid,
mutedByDid: requester,
})
const muteActor = async () => {
await ctx.services.graph(db).muteActor({
subjectDid,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.ADD,
actorDid: requester,
subject: subjectDid,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await muteActor()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/muteActorList.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import assert from 'node:assert'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import * as lex from '../../../../lexicon/lexicons'
import AppContext from '../../../../context'
import { AtUri } from '@atproto/syntax'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.muteActorList({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { list } = input.body
const requester = auth.credentials.iss

Expand All @@ -19,10 +22,34 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError(`Invalid collection: expected: ${collId}`)
}

await ctx.services.graph(db).muteActorList({
list,
mutedByDid: requester,
})
const muteActorList = async () => {
await ctx.services.graph(db).muteActorList({
list,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.ADD,
actorDid: requester,
subject: list,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await muteActorList()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/unmuteActor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import assert from 'node:assert'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.unmuteActor({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { actor } = input.body
const requester = auth.credentials.iss
const db = ctx.db.getPrimary()
Expand All @@ -18,10 +21,34 @@ export default function (server: Server, ctx: AppContext) {
throw new InvalidRequestError('Cannot mute oneself')
}

await ctx.services.graph(db).unmuteActor({
subjectDid,
mutedByDid: requester,
})
const unmuteActor = async () => {
await ctx.services.graph(db).unmuteActor({
subjectDid,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.REMOVE,
actorDid: requester,
subject: subjectDid,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await unmuteActor()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
37 changes: 32 additions & 5 deletions packages/bsky/src/api/app/bsky/graph/unmuteActorList.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,45 @@
import assert from 'node:assert'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { MuteOperation_Type } from '../../../../proto/bsync_pb'
import { BsyncClient } from '../../../../bsync'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.graph.unmuteActorList({
auth: ctx.authVerifier.standard,
handler: async ({ auth, input }) => {
handler: async ({ req, auth, input }) => {
const { list } = input.body
const requester = auth.credentials.iss
const db = ctx.db.getPrimary()

await ctx.services.graph(db).unmuteActorList({
list,
mutedByDid: requester,
})
const unmuteActorList = async () => {
await ctx.services.graph(db).unmuteActorList({
list,
mutedByDid: requester,
})
}

const addBsyncMuteOp = async (bsyncClient: BsyncClient) => {
await bsyncClient.addMuteOperation({
type: MuteOperation_Type.REMOVE,
actorDid: requester,
subject: list,
})
}

if (ctx.cfg.bsyncOnlyMutes) {
assert(ctx.bsyncClient)
await addBsyncMuteOp(ctx.bsyncClient)
} else {
await unmuteActorList()
if (ctx.bsyncClient) {
try {
await addBsyncMuteOp(ctx.bsyncClient)
} catch (err) {
req.log.warn(err, 'failed to sync mute op to bsync')
}
}
}
},
})
}
41 changes: 41 additions & 0 deletions packages/bsky/src/bsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Service } from './proto/bsync_connect'
import {
Code,
ConnectError,
PromiseClient,
createPromiseClient,
Interceptor,
} from '@connectrpc/connect'
import {
createConnectTransport,
ConnectTransportOptions,
} from '@connectrpc/connect-node'

export type BsyncClient = PromiseClient<typeof Service>

export const createBsyncClient = (
opts: ConnectTransportOptions,
): BsyncClient => {
const transport = createConnectTransport(opts)
return createPromiseClient(Service, transport)
}

export { Code }

export const isBsyncError = (
err: unknown,
code?: Code,
): err is ConnectError => {
if (err instanceof ConnectError) {
return !code || err.code === code
}
return false
}

export const authWithApiKey =
(apiKey: string): Interceptor =>
(next) =>
(req) => {
req.header.set('authorization', `Bearer ${apiKey}`)
return next(req)
}
37 changes: 37 additions & 0 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export interface ServerConfigValues {
imgUriEndpoint?: string
blobCacheLocation?: string
searchEndpoint?: string
bsyncUrl?: string
bsyncApiKey?: string
bsyncHttpVersion?: '1.1' | '2'
bsyncIgnoreBadTls?: boolean
bsyncOnlyMutes?: boolean
adminPassword: string
moderatorPassword: string
triagePassword: string
Expand Down Expand Up @@ -88,6 +93,13 @@ export class ServerConfig {
const imgUriEndpoint = process.env.IMG_URI_ENDPOINT
const blobCacheLocation = process.env.BLOB_CACHE_LOC
const searchEndpoint = process.env.SEARCH_ENDPOINT
const bsyncUrl = process.env.BSKY_BSYNC_URL || undefined
const bsyncApiKey = process.env.BSKY_BSYNC_API_KEY || undefined
const bsyncHttpVersion = process.env.BSKY_BSYNC_HTTP_VERSION || '2'
const bsyncIgnoreBadTls = process.env.BSKY_BSYNC_IGNORE_BAD_TLS === 'true'
const bsyncOnlyMutes = process.env.BSKY_BSYNC_ONLY_MUTES === 'true'
assert(!bsyncOnlyMutes || bsyncUrl, 'bsync-only mutes requires a bsync url')
assert(bsyncHttpVersion === '1.1' || bsyncHttpVersion === '2')
const dbPrimaryPostgresUrl =
overrides?.dbPrimaryPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL
let dbReplicaPostgresUrls = overrides?.dbReplicaPostgresUrls
Expand Down Expand Up @@ -152,6 +164,11 @@ export class ServerConfig {
imgUriEndpoint,
blobCacheLocation,
searchEndpoint,
bsyncUrl,
bsyncApiKey,
bsyncHttpVersion,
bsyncIgnoreBadTls,
bsyncOnlyMutes,
adminPassword,
moderatorPassword,
triagePassword,
Expand Down Expand Up @@ -268,6 +285,26 @@ export class ServerConfig {
return this.cfg.searchEndpoint
}

get bsyncUrl() {
return this.cfg.bsyncUrl
}

get bsyncApiKey() {
return this.cfg.bsyncApiKey
}

get bsyncOnlyMutes() {
return this.cfg.bsyncOnlyMutes
}

get bsyncHttpVersion() {
return this.cfg.bsyncHttpVersion
}

get bsyncIgnoreBadTls() {
return this.cfg.bsyncIgnoreBadTls
}

get adminPassword() {
return this.cfg.adminPassword
}
Expand Down
Loading

0 comments on commit d108310

Please sign in to comment.