Skip to content

Commit

Permalink
Serve materialized post and profile aggregations (bluesky-social#859)
Browse files Browse the repository at this point in the history
* Add migration for post & profile aggs to pds

* Post/profile agg db tweaks

* Post/profile aggregation queries on pds

* Add facility for performing work on db commit in pds

* Add background queue to pds

* Perform aggregations during indexing on background queue

* Tidy pds background queue, incorporate into labeler

* Fix pds onCommit, remove unused indexing dep, fix zeros for aggs

* Finish background tasks for view tests in pds

* Update aggregations on account deletion, tidy

* Update profile and post views to serve materialized aggregations
  • Loading branch information
devinivy authored Apr 21, 2023
1 parent 7679b2e commit a0f0301
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 40 deletions.
20 changes: 4 additions & 16 deletions packages/pds/src/app-view/services/actor/views.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
ProfileViewBasic,
} from '../../../lexicon/types/app/bsky/actor/defs'
import { DidHandle } from '../../../db/tables/did-handle'
import { countAll } from '../../../db/util'
import Database from '../../../db'
import { ImageUriBuilder } from '../../../image/uri'
import { LabelService } from '../label'
Expand Down Expand Up @@ -40,6 +39,7 @@ export class ActorViews {
.selectFrom('did_handle')
.where('did_handle.did', 'in', dids)
.leftJoin('profile', 'profile.creator', 'did_handle.did')
.leftJoin('profile_agg', 'profile_agg.did', 'did_handle.did')
.select([
'did_handle.did as did',
'profile.uri as profileUri',
Expand All @@ -48,21 +48,9 @@ export class ActorViews {
'profile.avatarCid as avatarCid',
'profile.bannerCid as bannerCid',
'profile.indexedAt as indexedAt',
this.db.db
.selectFrom('follow')
.whereRef('creator', '=', ref('did_handle.did'))
.select(countAll.as('count'))
.as('followsCount'),
this.db.db
.selectFrom('follow')
.whereRef('subjectDid', '=', ref('did_handle.did'))
.select(countAll.as('count'))
.as('followersCount'),
this.db.db
.selectFrom('post')
.whereRef('creator', '=', ref('did_handle.did'))
.select(countAll.as('count'))
.as('postsCount'),
'profile_agg.followsCount as followsCount',
'profile_agg.followersCount as followersCount',
'profile_agg.postsCount as postsCount',
this.db.db
.selectFrom('follow')
.where('creator', '=', viewer)
Expand Down
27 changes: 8 additions & 19 deletions packages/pds/src/app-view/services/feed/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { sql } from 'kysely'
import { cborToLexRecord } from '@atproto/repo'
import Database from '../../../db'
import { countAll, notSoftDeletedClause } from '../../../db/util'
import { notSoftDeletedClause } from '../../../db/util'
import { ImageUriBuilder } from '../../../image/uri'
import { isView as isViewImages } from '../../../lexicon/types/app/bsky/embed/images'
import { isView as isViewExternal } from '../../../lexicon/types/app/bsky/embed/external'
Expand Down Expand Up @@ -143,6 +143,7 @@ export class FeedService {
const posts = await db
.selectFrom('post')
.where('post.uri', 'in', postUris)
.leftJoin('post_agg', 'post_agg.uri', 'post.uri')
.innerJoin('ipld_block', (join) =>
join
.onRef('ipld_block.cid', '=', 'post.cid')
Expand All @@ -158,21 +159,9 @@ export class FeedService {
'post.creator as creator',
'post.indexedAt as indexedAt',
'ipld_block.content as recordBytes',
db
.selectFrom('like')
.whereRef('subject', '=', ref('post.uri'))
.select(countAll.as('count'))
.as('likeCount'),
db
.selectFrom('repost')
.whereRef('subject', '=', ref('post.uri'))
.select(countAll.as('count'))
.as('repostCount'),
db
.selectFrom('post as reply')
.whereRef('reply.replyParent', '=', ref('post.uri'))
.select(countAll.as('count'))
.as('replyCount'),
'post_agg.likeCount as likeCount',
'post_agg.repostCount as repostCount',
'post_agg.replyCount as replyCount',
db
.selectFrom('repost')
.where('creator', '=', requester)
Expand Down Expand Up @@ -347,9 +336,9 @@ export class FeedService {
author: author,
record: cborToLexRecord(post.recordBytes),
embed: embeds[uri],
replyCount: post.replyCount,
repostCount: post.repostCount,
likeCount: post.likeCount,
replyCount: post.replyCount ?? 0,
repostCount: post.repostCount ?? 0,
likeCount: post.likeCount ?? 0,
indexedAt: post.indexedAt,
viewer: {
repost: post.requesterRepost ?? undefined,
Expand Down
6 changes: 3 additions & 3 deletions packages/pds/src/app-view/services/feed/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ export type PostInfo = {
creator: string
recordBytes: Uint8Array
indexedAt: string
likeCount: number
repostCount: number
replyCount: number
likeCount: number | null
repostCount: number | null
replyCount: number | null
requesterRepost: string | null
requesterLike: string | null
}
Expand Down
63 changes: 61 additions & 2 deletions packages/pds/tests/account-deletion.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import assert from 'assert'
import { once, EventEmitter } from 'events'
import { Selectable } from 'kysely'
import Mail from 'nodemailer/lib/mailer'
import AtpAgent from '@atproto/api'
import { isThreadViewPost } from '@atproto/api/src/client/types/app/bsky/feed/defs'
import { SeedClient } from './seeds/client'
import basicSeed from './seeds/basic'
import { Database } from '../src'
Expand All @@ -25,7 +28,6 @@ import { RepoCommitHistory } from '../src/db/tables/repo-commit-history'
import { RepoCommitBlock } from '../src/db/tables/repo-commit-block'
import { Record } from '../src/db/tables/record'
import { RepoSeq } from '../src/db/tables/repo-seq'
import { Selectable } from 'kysely'

describe('account deletion', () => {
let agent: AtpAgent
Expand Down Expand Up @@ -118,12 +120,69 @@ describe('account deletion', () => {
await expect(attempt).rejects.toThrow('Invalid did or password')
})

it('deletes account with a valid token & password', async () => {
it('deletes account with a valid token & password, updating aggregations', async () => {
const postAUri = sc.posts[sc.dids.alice][1].ref.uriStr
const postBUri = sc.posts[sc.dids.dan][1].ref.uriStr
const { data: profileBefore } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{ headers: sc.getHeaders(sc.dids.alice) },
)
const { data: threadBeforeA } = await agent.api.app.bsky.feed.getPostThread(
{ uri: postAUri, depth: 0 },
{ headers: sc.getHeaders(sc.dids.alice) },
)
const { data: threadBeforeB } = await agent.api.app.bsky.feed.getPostThread(
{ uri: postBUri, depth: 0 },
{ headers: sc.getHeaders(sc.dids.alice) },
)

// Perform account deletion
await agent.api.com.atproto.server.deleteAccount({
token,
did: carol.did,
password: carol.password,
})

// Check aggregations: some will be decremented now that the account is deleted.
const { data: profileAfter } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{ headers: sc.getHeaders(sc.dids.alice) },
)
const { data: threadAfterA } = await agent.api.app.bsky.feed.getPostThread(
{ uri: postAUri, depth: 0 },
{ headers: sc.getHeaders(sc.dids.alice) },
)
const { data: threadAfterB } = await agent.api.app.bsky.feed.getPostThread(
{ uri: postBUri, depth: 0 },
{ headers: sc.getHeaders(sc.dids.alice) },
)
assert(isThreadViewPost(threadBeforeA.thread))
assert(isThreadViewPost(threadBeforeB.thread))
assert(isThreadViewPost(threadAfterA.thread))
assert(isThreadViewPost(threadAfterB.thread))
expect(profileAfter.followsCount).toEqual(profileBefore.followsCount)
expect(profileAfter.followersCount).toEqual(
(profileBefore.followersCount ?? 0) - 1,
)
expect(profileAfter.postsCount).toEqual(profileBefore.postsCount)
expect(threadAfterA.thread.post.likeCount).toEqual(
(threadBeforeA.thread.post.likeCount ?? 0) - 1,
)
expect(threadAfterA.thread.post.replyCount).toEqual(
(threadBeforeA.thread.post.replyCount ?? 0) - 1,
)
expect(threadAfterA.thread.post.repostCount).toEqual(
threadBeforeA.thread.post.repostCount,
)
expect(threadAfterB.thread.post.likeCount).toEqual(
threadBeforeB.thread.post.likeCount,
)
expect(threadAfterB.thread.post.replyCount).toEqual(
threadBeforeB.thread.post.replyCount,
)
expect(threadAfterB.thread.post.repostCount).toEqual(
(threadBeforeB.thread.post.repostCount ?? 0) - 1,
)
})

it('no longer lets the user log in', async () => {
Expand Down

0 comments on commit a0f0301

Please sign in to comment.