Skip to content

Commit

Permalink
Bsky appview sync service (bluesky-social#2031)
Browse files Browse the repository at this point in the history
* init bsky-sync

* add bsync models and config

* rename bsky-sync to bsync

* protos and gen for bsync service

* start roughing-out bsync routes

* adjust bsync model, validation

* bsync auth, context, notify

* implement bsync scan mute ops, listen for mute op event

* setup basic bsync tests, misc fixes

* rename some files

* reorg bsync server routes

* reorg bsync server routes

* tests

* test input validation to addMuteOperation

* add db stats to bsync

* add bsync service

* redact bsync auth header from logs

* upgrade typescript to v5.3

* prettier on codegened bsync files
  • Loading branch information
devinivy authored Jan 9, 2024
1 parent e41a25f commit c2d0578
Show file tree
Hide file tree
Showing 36 changed files with 2,333 additions and 109 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
"@swc/jest": "^0.2.24",
"@types/jest": "^28.1.4",
"@types/node": "^18.0.0",
"@typescript-eslint/eslint-plugin": "^5.38.1",
"@typescript-eslint/parser": "^5.38.1",
"@typescript-eslint/eslint-plugin": "^6.14.0",
"@typescript-eslint/parser": "^6.14.0",
"babel-eslint": "^10.1.0",
"dotenv": "^16.0.3",
"esbuild": "^0.14.48",
Expand All @@ -48,7 +48,7 @@
"prettier": "^2.7.1",
"prettier-config-standard": "^5.0.0",
"ts-node": "^10.8.2",
"typescript": "^4.8.4"
"typescript": "^5.3.3"
},
"workspaces": {
"packages": [
Expand Down
15 changes: 15 additions & 0 deletions packages/bsync/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# @atproto/bsync: Synchronizing Service for the Bluesky AppView

This is an optional service that may be used to synchronize certain state between otherwise independent AppViews.

[![NPM](https://img.shields.io/npm/v/@atproto/bsync)](https://www.npmjs.com/package/@atproto/bsync)
[![Github CI Status](https://github.com/bluesky-social/atproto/actions/workflows/repo.yaml/badge.svg)](https://github.com/bluesky-social/atproto/actions/workflows/repo.yaml)

## License

This project is dual-licensed under MIT and Apache 2.0 terms:

- MIT license ([LICENSE-MIT.txt](https://github.com/bluesky-social/atproto/blob/main/LICENSE-MIT.txt) or http://opensource.org/licenses/MIT)
- Apache License, Version 2.0, ([LICENSE-APACHE.txt](https://github.com/bluesky-social/atproto/blob/main/LICENSE-APACHE.txt) or http://www.apache.org/licenses/LICENSE-2.0)

Downstream projects and end users may chose either license individually, or both together, at their discretion. The motivation for this dual-licensing is the additional software patent assurance provided by Apache 2.0.
3 changes: 3 additions & 0 deletions packages/bsync/babel.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports = {
presets: [['@babel/preset-env']],
}
38 changes: 38 additions & 0 deletions packages/bsync/bin/migration-create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env ts-node

import * as fs from 'fs/promises'
import * as path from 'path'

export async function main() {
const now = new Date()
const prefix = now.toISOString().replace(/[^a-z0-9]/gi, '') // Order of migrations matches alphabetical order of their names
const name = process.argv[2]
if (!name || !name.match(/^[a-z0-9-]+$/)) {
process.exitCode = 1
return console.error(
'Must pass a migration name consisting of lowercase digits, numbers, and dashes.',
)
}
const filename = `${prefix}-${name}`
const dir = path.join(__dirname, '..', 'src', 'db', 'migrations')

await fs.writeFile(path.join(dir, `${filename}.ts`), template, { flag: 'wx' })
await fs.writeFile(
path.join(dir, 'index.ts'),
`export * as _${prefix} from './${filename}'\n`,
{ flag: 'a' },
)
}

const template = `import { Kysely } from 'kysely'
export async function up(db: Kysely<unknown>): Promise<void> {
// Migration code
}
export async function down(db: Kysely<unknown>): Promise<void> {
// Migration code
}
`

main()
13 changes: 13 additions & 0 deletions packages/bsync/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: v1
plugins:
- plugin: es
opt:
- target=ts
- import_extension=.ts

out: src/gen
- plugin: connect-es
opt:
- target=ts
- import_extension=.ts
out: src/gen
18 changes: 18 additions & 0 deletions packages/bsync/build.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
const { nodeExternalsPlugin } = require('esbuild-node-externals')

const buildShallow =
process.argv.includes('--shallow') || process.env.ATP_BUILD_SHALLOW === 'true'

require('esbuild').build({
logLevel: 'info',
entryPoints: ['src/index.ts'],
bundle: true,
sourcemap: true,
outdir: 'dist',
platform: 'node',
external: [
// Referenced in pg driver, but optional and we don't use it
'pg-native',
],
plugins: buildShallow ? [nodeExternalsPlugin()] : [],
})
6 changes: 6 additions & 0 deletions packages/bsync/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const base = require('../../jest.config.base.js')

module.exports = {
...base,
displayName: 'Bsync',
}
52 changes: 52 additions & 0 deletions packages/bsync/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"name": "@atproto/bsync",
"version": "0.0.0",
"license": "MIT",
"description": "Sychronizing service for app.bsky App View (Bluesky API)",
"keywords": [
"atproto",
"bluesky"
],
"homepage": "https://atproto.com",
"repository": {
"type": "git",
"url": "https://github.com/bluesky-social/atproto",
"directory": "packages/bsync"
},
"main": "src/index.ts",
"publishConfig": {
"main": "dist/index.js",
"types": "dist/index.d.ts"
},
"scripts": {
"build": "node ./build.js",
"postbuild": "tsc --build tsconfig.build.json",
"update-main-to-dist": "node ../../update-main-to-dist.js packages/bsync",
"start": "node --enable-source-maps dist/bin.js",
"test": "../dev-infra/with-test-db.sh jest",
"test:log": "tail -50 test.log | pino-pretty",
"test:updateSnapshot": "jest --updateSnapshot",
"migration:create": "ts-node ./bin/migration-create.ts",
"buf:gen": "buf generate proto"
},
"dependencies": {
"@atproto/common": "workspace:^",
"@atproto/syntax": "workspace:^",
"@bufbuild/protobuf": "^1.5.0",
"@connectrpc/connect": "^1.1.4",
"@connectrpc/connect-express": "^1.1.4",
"@connectrpc/connect-node": "^1.1.4",
"http-terminator": "^3.2.0",
"kysely": "^0.22.0",
"pg": "^8.10.0",
"pino": "^8.15.0",
"pino-http": "^8.2.1",
"typed-emitter": "^2.1.0"
},
"devDependencies": {
"@bufbuild/buf": "^1.28.1",
"@bufbuild/protoc-gen-es": "^1.5.0",
"@connectrpc/protoc-gen-connect-es": "^1.1.4",
"@types/pg": "^8.6.6"
}
}
54 changes: 54 additions & 0 deletions packages/bsync/proto/bsync.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
syntax = "proto3";

package bsync;
option go_package = "./;bsync";

//
// Sync
//


message MuteOperation {
enum Type {
ADD = 0;
REMOVE = 1;
CLEAR = 2;
}
string id = 1;
Type type = 2;
string actor_did = 3;
string subject = 4;
}

message AddMuteOperationRequest {
MuteOperation.Type type = 1;
string actor_did = 2;
string subject = 3;
}

message AddMuteOperationResponse {
MuteOperation operation = 1;
}

message ScanMuteOperationsRequest {
string cursor = 1;
int32 limit = 2;
}

message ScanMuteOperationsResponse {
repeated MuteOperation operations = 1;
string cursor = 2;
}

// Ping
message PingRequest {}
message PingResponse {}


service Service {
// Sync
rpc AddMuteOperation(AddMuteOperationRequest) returns (AddMuteOperationResponse);
rpc ScanMuteOperations(ScanMuteOperationsRequest) returns (ScanMuteOperationsResponse);
// Ping
rpc Ping(PingRequest) returns (PingResponse);
}
25 changes: 25 additions & 0 deletions packages/bsync/src/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {
Interceptor,
PromiseClient,
createPromiseClient,
} from '@connectrpc/connect'
import {
ConnectTransportOptions,
createConnectTransport,
} from '@connectrpc/connect-node'
import { Service } from './gen/bsync_connect'

export type BsyncClient = PromiseClient<typeof Service>

export const createClient = (opts: ConnectTransportOptions): BsyncClient => {
const transport = createConnectTransport(opts)
return createPromiseClient(Service, transport)
}

export const authWithApiKey =
(apiKey: string): Interceptor =>
(next) =>
(req) => {
req.header.set('authorization', `Bearer ${apiKey}`)
return next(req)
}
86 changes: 86 additions & 0 deletions packages/bsync/src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import assert from 'node:assert'
import { envInt, envStr, envList } from '@atproto/common'

export const envToCfg = (env: ServerEnvironment): ServerConfig => {
const serviceCfg: ServerConfig['service'] = {
port: env.port ?? 2585,
version: env.version ?? 'unknown',
longPollTimeoutMs: env.longPollTimeoutMs ?? 10000,
}

assert(env.dbUrl, 'missing postgres url')
const dbCfg: ServerConfig['db'] = {
url: env.dbUrl,
schema: env.dbSchema,
poolSize: env.dbPoolSize,
poolMaxUses: env.dbPoolMaxUses,
poolIdleTimeoutMs: env.dbPoolIdleTimeoutMs,
}

assert(env.apiKeys.length > 0, 'missing api keys')
const authCfg: ServerConfig['auth'] = {
apiKeys: new Set(env.apiKeys),
}

return {
service: serviceCfg,
db: dbCfg,
auth: authCfg,
}
}

export type ServerConfig = {
service: ServiceConfig
db: DatabaseConfig
auth: AuthConfig
}

type ServiceConfig = {
port: number
version?: string
longPollTimeoutMs: number
}

type DatabaseConfig = {
url: string
schema?: string
poolSize?: number
poolMaxUses?: number
poolIdleTimeoutMs?: number
}

type AuthConfig = {
apiKeys: Set<string>
}

export const readEnv = (): ServerEnvironment => {
return {
// service
port: envInt('BSYNC_PORT'),
version: envStr('BSYNC_VERSION'),
longPollTimeoutMs: envInt('BSYNC_LONG_POLL_TIMEOUT_MS'),
// database
dbUrl: envStr('BSYNC_DB_POSTGRES_URL'),
dbSchema: envStr('BSYNC_DB_POSTGRES_SCHEMA'),
dbPoolSize: envInt('BSYNC_DB_POOL_SIZE'),
dbPoolMaxUses: envInt('BSYNC_DB_POOL_MAX_USES'),
dbPoolIdleTimeoutMs: envInt('BSYNC_DB_POOL_IDLE_TIMEOUT_MS'),
// secrets
apiKeys: envList('BSYNC_API_KEYS'),
}
}

export type ServerEnvironment = {
// service
port?: number
version?: string
longPollTimeoutMs?: number
// database
dbUrl?: string
dbSchema?: string
dbPoolSize?: number
dbPoolMaxUses?: number
dbPoolIdleTimeoutMs?: number
// secrets
apiKeys: string[]
}
42 changes: 42 additions & 0 deletions packages/bsync/src/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import TypedEventEmitter from 'typed-emitter'
import { ServerConfig } from './config'
import Database from './db'
import { createMuteOpChannel } from './db/schema/mute_op'
import { EventEmitter } from 'stream'

export type AppContextOptions = {
db: Database
cfg: ServerConfig
}

export class AppContext {
db: Database
cfg: ServerConfig
events: TypedEventEmitter<AppEvents>

constructor(opts: AppContextOptions) {
this.db = opts.db
this.cfg = opts.cfg
this.events = new EventEmitter() as TypedEventEmitter<AppEvents>
}

static async fromConfig(
cfg: ServerConfig,
overrides?: Partial<AppContextOptions>,
): Promise<AppContext> {
const db = new Database({
url: cfg.db.url,
schema: cfg.db.schema,
poolSize: cfg.db.poolSize,
poolMaxUses: cfg.db.poolMaxUses,
poolIdleTimeoutMs: cfg.db.poolIdleTimeoutMs,
})
return new AppContext({ db, cfg, ...overrides })
}
}

export default AppContext

export type AppEvents = {
[createMuteOpChannel]: () => void
}
Loading

0 comments on commit c2d0578

Please sign in to comment.