Skip to content

Commit

Permalink
Run PDS using postgres (bluesky-social#233)
Browse files Browse the repository at this point in the history
* Setup dockerization of postgres db

* Fix typo

* Setup postgres dialect config in pds, fix migrations for pg

* Convert home and author feed to work on pg

* Convert pds follow views to work on pg

* Convert pds liked-by and notifications views to work on pg

* Unify bigint handling between sqlite and pg

* Convert pds account, crud, and profile, reposts, thread views to work on pg

* Ensure a more complete reset of containers/volumes in with-test-db.sh, set libpq env vars

* Add readme for dockerized postgres

* Fix build issues
  • Loading branch information
devinivy authored Oct 12, 2022
1 parent 45ddf23 commit 0b6bf87
Show file tree
Hide file tree
Showing 38 changed files with 646 additions and 281 deletions.
7 changes: 3 additions & 4 deletions packages/dev-env/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ require('esbuild')
outdir: 'dist',
platform: 'node',
external: [
// @TODO May be able to get rid of the mapbox externals
'../server/node_modules/@mapbox/node-pre-gyp/*',
'../server/node_modules/better-sqlite3/*',
'../plc/node_modules/@mapbox/node-pre-gyp/*',
'../plc/node_modules/better-sqlite3/*',
'../server/node_modules/better-sqlite3/*',
'../../node_modules/classic-level/*',
// Referenced in pg driver, but optional and we don't use it
'pg-native',
],
plugins: [
copy({
Expand Down
65 changes: 65 additions & 0 deletions packages/pg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# pg

Helpers for working with postgres

## Usage

### `with-test-db.sh`

This script allows you to run any command with a fresh, ephemeral/single-use postgres database available. When the script starts a Dockerized postgres container starts-up, and when the script completes that container is removed.

The environment variable `DB_POSTGRES_URL` will be set with a connection string that can be used to connect to the database. The [`PG*` environment variables](https://www.postgresql.org/docs/current/libpq-envars.html) that are recognized by libpq (i.e. used by the `psql` client) are also set.

**Example**

```
$ ./with-test-db.sh psql -c 'select 1;'
[+] Running 1/1
⠿ Container pg-db_test-1 Healthy 1.8s
?column?
----------
1
(1 row)
[+] Running 1/1
⠿ Container pg-db_test-1 Stopped 0.1s
Going to remove pg-db_test-1
[+] Running 1/0
⠿ Container pg-db_test-1 Removed
```

### `docker-compose.yaml`

The Docker compose file can be used to run containerized versions of postgres either for single use (as is used by `with-test-db.sh`), or for longer-term use. These are setup as separate services named `test_db` and `db` respectively. In both cases the database is available on the host machine's `localhost` and credentials are:

- Username: pg
- Password: password

However, each service uses a different port, documented below, to avoid conflicts.

#### `test_db` service for single use

The single-use `test_db` service does not have any persistent storage. When the container is removed, data in the database disappears with it.

This service runs on port `5433`.

```
$ docker compose up test_db # start container
$ docker compose stop test_db # stop container
$ docker compose rm test_db # remove container
```

#### `db` service for persistent use

The `db` service has persistent storage on the host machine managed by Docker under a volume named `pg_adx_db`. When the container is removed, data in the database will remain on the host machine. In order to start fresh, you would need to remove the volume.

This service runs on port `5432`.

```
$ docker compose up db -d # start container
$ docker compose stop db # stop container
$ docker compose rm db # remove container
$ docker volume rm pg_adx_db # remove volume
```
27 changes: 27 additions & 0 deletions packages/pg/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: '3.8'
services:
# An ephermerally-stored postgres database for single-use test runs
db_test: &db_test
image: postgres:14.4-alpine
environment:
- POSTGRES_USER=pg
- POSTGRES_PASSWORD=password
ports:
- '5433:5432'
# Healthcheck ensures db is queryable when `docker-compose up --wait` completes
healthcheck:
test: 'pg_isready -U pg'
interval: 500ms
timeout: 10s
retries: 20
# A persistently-stored postgres database
db:
<<: *db_test
ports:
- '5432:5432'
healthcheck:
disable: true
volumes:
- adx_db:/var/lib/postgresql/data
volumes:
adx_db:
26 changes: 26 additions & 0 deletions packages/pg/with-test-db.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env sh
# @TODO handle sigint for cleanup

# Example usage:
# ./with-test-db.sh psql postgresql://pg:password@localhost:5433/postgres -c 'select 1;'

dir=$(dirname $0)
compose_file="$dir/docker-compose.yaml"

docker compose -f $compose_file up --wait --force-recreate db_test
echo # newline

# Based on creds in compose.yaml
export PGPORT=5433
export PGHOST=localhost
export PGUSER=pg
export PGPASSWORD=password
export PGDATABASE=postgres
export DB_POSTGRES_URL="postgresql://pg:password@localhost:5433/postgres"
"$@"
code=$?

echo # newline
docker compose -f $compose_file rm -f --stop --volumes db_test

exit $code
4 changes: 2 additions & 2 deletions packages/server/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require('esbuild')
outdir: 'dist',
platform: 'node',
external: [
// @TODO May be able to get rid of the mapbox externals
'../plc/node_modules/@mapbox/node-pre-gyp/*',
'../../node_modules/level/*',
'../../node_modules/classic-level/*',
// Referenced in pg driver, but optional and we don't use it
'pg-native',
],
plugins: [
copy({
Expand Down
3 changes: 3 additions & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"build": "node ./build.js",
"start": "node dist/bin.js",
"test": "jest",
"test:pg": "../pg/with-test-db.sh jest",
"test:log": "cat test.log | pino-pretty",
"prettier": "prettier --check src/",
"prettier:fix": "prettier --write src/",
Expand Down Expand Up @@ -36,6 +37,7 @@
"level": "^8.0.0",
"nodemailer": "^6.8.0",
"nodemailer-html-to-text": "^3.2.0",
"pg": "^8.8.0",
"pino-http": "^8.2.1",
"uint8arrays": "3.0.0",
"zod": "^3.14.2"
Expand All @@ -46,6 +48,7 @@
"@types/express": "^4.17.13",
"@types/jsonwebtoken": "^8.5.9",
"@types/nodemailer": "^6.4.6",
"@types/pg": "^8.6.5",
"axios": "^0.26.1",
"esbuild-plugin-copy": "^1.3.0",
"get-port": "^6.1.2",
Expand Down
114 changes: 63 additions & 51 deletions packages/server/src/api/app/bsky/getAuthorFeed.ts
Original file line number Diff line number Diff line change
@@ -1,108 +1,120 @@
import { sql } from 'kysely'
import { AuthRequiredError } from '@adxp/xrpc-server'
import { Server } from '../../../lexicon'
import * as GetAuthorFeed from '../../../lexicon/types/app/bsky/getAuthorFeed'
import * as locals from '../../../locals'
import { queryResultToFeedItem } from './util'
import {
countClause,
isNotRepostClause,
paginate,
postOrRepostIndexedAtClause,
} from '../../../db/util'
import { rowToFeedItem } from './util/feed'
import { countAll, paginate } from '../../../db/util'

export default function (server: Server) {
server.app.bsky.getAuthorFeed(
async (params: GetAuthorFeed.QueryParams, _input, req, res) => {
const { auth, db } = locals.get(res)
const { author, limit, before } = params
const { ref } = db.db.dynamic

const { auth, db } = locals.get(res)
const requester = auth.getUserDid(req)
if (!requester) {
throw new AuthRequiredError()
}

const { ref } = db.db.dynamic
const authorIsDid = author.startsWith('did:')
const userLookupCol = author.startsWith('did:')
? 'user.did'
: 'user.username'
const userQb = db.db
.selectFrom('user')
.selectAll()
.where(userLookupCol, '=', author)

// @TODO break this query up, share parts with home feed and post thread
let builder = db.db
.selectFrom('app_bsky_post as post')
// Determine result set of posts and reposts
.leftJoin('app_bsky_repost as repost', 'repost.subject', 'post.uri')
.leftJoin('user as originator', (join) =>
join
.onRef('originator.did', '=', 'post.creator')
.orOnRef('originator.did', '=', 'repost.creator'),
const postsQb = db.db
.selectFrom('app_bsky_post')
.whereExists(
userQb.whereRef('user.did', '=', ref('app_bsky_post.creator')),
)
.if(authorIsDid, (qb) => qb.where('originator.did', '=', author))
.if(!authorIsDid, (qb) => qb.where('originator.username', '=', author))
// Select data for presentation into FeedItem
.leftJoin('user as author', 'author.did', 'post.creator')
.select([
sql<'post' | 'repost'>`${'post'}`.as('type'),
'uri as postUri',
'creator as originatorDid',
'indexedAt as cursor',
])

const repostsQb = db.db
.selectFrom('app_bsky_repost')
.whereExists(
userQb.whereRef('user.did', '=', ref('app_bsky_repost.creator')),
)
.select([
sql<'post' | 'repost'>`${'repost'}`.as('type'),
'subject as postUri',
'creator as originatorDid',
'indexedAt as cursor',
])

let postsAndRepostsQb = db.db
.selectFrom(postsQb.union(repostsQb).as('posts_and_reposts'))
.innerJoin('app_bsky_post as post', 'post.uri', 'postUri')
.innerJoin('record', 'record.uri', 'postUri')
.innerJoin('user as author', 'author.did', 'post.creator')
.leftJoin(
'app_bsky_profile as author_profile',
'author_profile.creator',
'author.did',
)
.leftJoin('user as reposted_by', 'reposted_by.did', 'repost.creator')
.innerJoin('user as originator', 'originator.did', 'originatorDid')
.leftJoin(
'app_bsky_profile as reposted_by_profile',
'reposted_by_profile.creator',
'reposted_by.did',
'app_bsky_profile as originator_profile',
'originator_profile.creator',
'originatorDid',
)
.leftJoin('record', 'record.uri', 'post.uri')
.select([
'post.uri as uri',
'record.raw as rawRecord',
'type',
'postUri',
'cursor',
'record.raw as recordRaw',
'record.indexedAt as indexedAt',
'author.did as authorDid',
'author.username as authorName',
'author_profile.displayName as authorDisplayName',
'reposted_by.did as repostedByDid',
'reposted_by.username as repostedByName',
'reposted_by_profile.displayName as repostedByDisplayName',
isNotRepostClause.as('isNotRepost'),
postOrRepostIndexedAtClause.as('cursor'),
'originator.did as originatorDid',
'originator.username as originatorName',
'originator_profile.displayName as originatorDisplayName',
db.db
.selectFrom('app_bsky_like')
.whereRef('subject', '=', ref('post.uri'))
.select(countClause.as('count'))
.whereRef('subject', '=', ref('postUri'))
.select(countAll.as('count'))
.as('likeCount'),
db.db
.selectFrom('app_bsky_repost')
.whereRef('subject', '=', ref('post.uri'))
.select(countClause.as('count'))
.whereRef('subject', '=', ref('postUri'))
.select(countAll.as('count'))
.as('repostCount'),
db.db
.selectFrom('app_bsky_post')
.whereRef('replyParent', '=', ref('post.uri'))
.select(countClause.as('count'))
.whereRef('replyParent', '=', ref('postUri'))
.select(countAll.as('count'))
.as('replyCount'),
db.db
.selectFrom('app_bsky_repost')
.where('creator', '=', requester)
.whereRef('subject', '=', ref('post.uri'))
.whereRef('subject', '=', ref('postUri'))
.select('uri')
.as('requesterRepost'),
db.db
.selectFrom('app_bsky_like')
.where('creator', '=', requester)
.whereRef('subject', '=', ref('post.uri'))
.whereRef('subject', '=', ref('postUri'))
.select('uri')
.as('requesterLike'),
])
// Grouping by post then originator preserves one row for each
// post or repost. Reposts of a given post only vary by originator.
.groupBy(['post.uri', 'originator.did'])

// Apply pagination
builder = paginate(builder, {
postsAndRepostsQb = paginate(postsAndRepostsQb, {
limit,
before,
by: postOrRepostIndexedAtClause,
by: ref('cursor'),
})

const queryRes = await builder.execute()
const feed: GetAuthorFeed.FeedItem[] = queryRes.map(queryResultToFeedItem)
const queryRes = await postsAndRepostsQb.execute()
const feed = queryRes.map(rowToFeedItem)

return { encoding: 'application/json', body: { feed } }
},
Expand Down
Loading

0 comments on commit 0b6bf87

Please sign in to comment.