Skip to content

Commit

Permalink
Materialize post and profile aggregations on write (bluesky-social#858)
Browse files Browse the repository at this point in the history
* Add migration for post & profile aggs to pds

* Post/profile agg db tweaks

* Post/profile aggregation queries on pds

* Add facility for performing work on db commit in pds

* Add background queue to pds

* Perform aggregations during indexing on background queue

* Tidy pds background queue, incorporate into labeler

* Fix pds onCommit, remove unused indexing dep, fix zeros for aggs

* Finish background tasks for view tests in pds

* Update aggregations on account deletion, tidy
  • Loading branch information
devinivy authored Apr 21, 2023
1 parent a5016eb commit 587b243
Show file tree
Hide file tree
Showing 34 changed files with 610 additions and 52 deletions.
4 changes: 4 additions & 0 deletions packages/pds/src/app-view/db/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as duplicateRecords from './tables/duplicate-record'
import * as profile from './tables/profile'
import * as profileAgg from './tables/profile-agg'
import * as post from './tables/post'
import * as postAgg from './tables/post-agg'
import * as postEmbed from './tables/post-embed'
import * as postHierarchy from './tables/post-hierarchy'
import * as repost from './tables/repost'
Expand All @@ -12,7 +14,9 @@ import * as subscription from './tables/subscription'
// @NOTE app-view also shares did-handle, record, and repo-root tables w/ main pds
export type DatabaseSchemaType = duplicateRecords.PartialDB &
profile.PartialDB &
profileAgg.PartialDB &
post.PartialDB &
postAgg.PartialDB &
postEmbed.PartialDB &
postHierarchy.PartialDB &
repost.PartialDB &
Expand Down
14 changes: 14 additions & 0 deletions packages/pds/src/app-view/db/tables/post-agg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Generated } from 'kysely'

export const tableName = 'post_agg'

export interface PostAgg {
uri: string
likeCount: Generated<number>
replyCount: Generated<number>
repostCount: Generated<number>
}

export type PartialDB = {
[tableName]: PostAgg
}
14 changes: 14 additions & 0 deletions packages/pds/src/app-view/db/tables/profile-agg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Generated } from 'kysely'

export const tableName = 'profile_agg'

export interface ProfileAgg {
did: string
followersCount: Generated<number>
followsCount: Generated<number>
postsCount: Generated<number>
}

export type PartialDB = {
[tableName]: ProfileAgg
}
132 changes: 123 additions & 9 deletions packages/pds/src/app-view/services/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { CID } from 'multiformats/cid'
import { WriteOpAction } from '@atproto/repo'
import { AtUri } from '@atproto/uri'
import Database from '../../../db'
import DatabaseSchema from '../../../db/database-schema'
import { excluded } from '../../../db/util'
import * as Post from './plugins/post'
import * as Like from './plugins/like'
import * as Repost from './plugins/repost'
import * as Follow from './plugins/follow'
import * as Profile from './plugins/profile'
import { MessageQueue } from '../../../event-stream/types'
import { BackgroundQueue } from '../../../event-stream/background-queue'

export class IndexingService {
records: {
Expand All @@ -18,18 +20,18 @@ export class IndexingService {
profile: Profile.PluginType
}

constructor(public db: Database, public messageDispatcher: MessageQueue) {
constructor(public db: Database, public backgroundQueue: BackgroundQueue) {
this.records = {
post: Post.makePlugin(this.db.db),
like: Like.makePlugin(this.db.db),
repost: Repost.makePlugin(this.db.db),
follow: Follow.makePlugin(this.db.db),
profile: Profile.makePlugin(this.db.db),
post: Post.makePlugin(this.db, backgroundQueue),
like: Like.makePlugin(this.db, backgroundQueue),
repost: Repost.makePlugin(this.db, backgroundQueue),
follow: Follow.makePlugin(this.db, backgroundQueue),
profile: Profile.makePlugin(this.db, backgroundQueue),
}
}

static creator(messageDispatcher: MessageQueue) {
return (db: Database) => new IndexingService(db, messageDispatcher)
static creator(backgroundQueue: BackgroundQueue) {
return (db: Database) => new IndexingService(db, backgroundQueue)
}

async indexRecord(
Expand Down Expand Up @@ -97,6 +99,7 @@ export class IndexingService {
)
.execute(),
])
await removeActorAggregates(this.db.db, did)
await Promise.all([
this.db.db.deleteFrom('follow').where('creator', '=', did).execute(),
this.db.db.deleteFrom('post').where('creator', '=', did).execute(),
Expand All @@ -106,3 +109,114 @@ export class IndexingService {
])
}
}

async function removeActorAggregates(db: DatabaseSchema, did: string) {
const ownProfileAggQb = db.deleteFrom('profile_agg').where('did', '=', did)
const ownPostAggsQb = db
.deleteFrom('post_agg')
.where(
'uri',
'in',
db
.selectFrom('post')
.where('post.creator', '=', did)
.select('post.uri as uri'),
)
const replyCountQb = db
.insertInto('post_agg')
.columns(['uri', 'replyCount'])
.expression((exp) =>
exp
.selectFrom('post as target')
.leftJoin('post', (join) =>
join
.onRef('post.replyParent', '=', 'target.replyParent')
.on('post.creator', '!=', did),
)
.where('target.creator', '=', did)
.where('target.replyParent', 'is not', null)
.groupBy('target.replyParent')
.select([
'target.replyParent as uri',
db.fn.count('post.uri').as('replyCount'),
]),
)
.onConflict((oc) =>
oc.column('uri').doUpdateSet({ replyCount: excluded(db, 'replyCount') }),
)
const followersCountQb = db
.insertInto('profile_agg')
.columns(['did', 'followersCount'])
.expression((exp) =>
exp
.selectFrom('follow as target')
.leftJoin('follow', (join) =>
join
.onRef('follow.subjectDid', '=', 'target.subjectDid')
.on('follow.creator', '!=', did),
)
.where('target.creator', '=', did)
.groupBy('target.subjectDid')
.select([
'target.subjectDid as did',
db.fn.count('follow.uri').as('followersCount'),
]),
)
.onConflict((oc) =>
oc.column('did').doUpdateSet({
followersCount: excluded(db, 'followersCount'),
}),
)
const likeCountQb = db
.insertInto('post_agg')
.columns(['uri', 'likeCount'])
.expression((exp) =>
exp
.selectFrom('like as target')
.leftJoin('like', (join) =>
join
.onRef('like.subject', '=', 'target.subject')
.on('like.creator', '!=', did),
)
.where('target.creator', '=', did)
.groupBy('target.subject')
.select([
'target.subject as uri',
db.fn.count('like.uri').as('likeCount'),
]),
)
.onConflict((oc) =>
oc.column('uri').doUpdateSet({ likeCount: excluded(db, 'likeCount') }),
)
const repostCountQb = db
.insertInto('post_agg')
.columns(['uri', 'repostCount'])
.expression((exp) =>
exp
.selectFrom('repost as target')
.leftJoin('repost', (join) =>
join
.onRef('repost.subject', '=', 'target.subject')
.on('repost.creator', '!=', did),
)
.where('target.creator', '=', did)
.groupBy('target.subject')
.select([
'target.subject as uri',
db.fn.count('repost.uri').as('repostCount'),
]),
)
.onConflict((oc) =>
oc
.column('uri')
.doUpdateSet({ repostCount: excluded(db, 'repostCount') }),
)
await Promise.all([
ownProfileAggQb.execute(),
ownPostAggsQb.execute(),
replyCountQb.execute(),
followersCountQb.execute(),
likeCountQb.execute(),
repostCountQb.execute(),
])
}
43 changes: 41 additions & 2 deletions packages/pds/src/app-view/services/indexing/plugins/follow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import { AtUri } from '@atproto/uri'
import { CID } from 'multiformats/cid'
import * as Follow from '../../../../lexicon/types/app/bsky/graph/follow'
import * as lex from '../../../../lexicon/lexicons'
import Database from '../../../../db'
import {
DatabaseSchema,
DatabaseSchemaType,
} from '../../../../db/database-schema'
import { countAll, excluded } from '../../../../db/util'
import { BackgroundQueue } from '../../../../event-stream/background-queue'
import RecordProcessor from '../processor'

const lexId = lex.ids.AppBskyGraphFollow
Expand Down Expand Up @@ -82,16 +85,52 @@ const notifsForDelete = (
return { notifs: [], toDelete }
}

const updateAggregates = async (db: DatabaseSchema, follow: IndexedFollow) => {
const followersCountQb = db
.insertInto('profile_agg')
.values({
did: follow.subjectDid,
followersCount: db
.selectFrom('follow')
.where('follow.subjectDid', '=', follow.subjectDid)
.select(countAll.as('count')),
})
.onConflict((oc) =>
oc.column('did').doUpdateSet({
followersCount: excluded(db, 'followersCount'),
}),
)
const followsCountQb = db
.insertInto('profile_agg')
.values({
did: follow.creator,
followsCount: db
.selectFrom('follow')
.where('follow.creator', '=', follow.creator)
.select(countAll.as('count')),
})
.onConflict((oc) =>
oc.column('did').doUpdateSet({
followsCount: excluded(db, 'followsCount'),
}),
)
await Promise.all([followersCountQb.execute(), followsCountQb.execute()])
}

export type PluginType = RecordProcessor<Follow.Record, IndexedFollow>

export const makePlugin = (db: DatabaseSchema): PluginType => {
return new RecordProcessor(db, {
export const makePlugin = (
db: Database,
backgroundQueue: BackgroundQueue,
): PluginType => {
return new RecordProcessor(db, backgroundQueue, {
lexId,
insertFn,
findDuplicate,
deleteFn,
notifsForInsert,
notifsForDelete,
updateAggregates,
})
}

Expand Down
27 changes: 25 additions & 2 deletions packages/pds/src/app-view/services/indexing/plugins/like.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import { AtUri } from '@atproto/uri'
import { CID } from 'multiformats/cid'
import * as Like from '../../../../lexicon/types/app/bsky/feed/like'
import * as lex from '../../../../lexicon/lexicons'
import Database from '../../../../db'
import {
DatabaseSchema,
DatabaseSchemaType,
} from '../../../../db/database-schema'
import { countAll, excluded } from '../../../../db/util'
import { BackgroundQueue } from '../../../../event-stream/background-queue'
import RecordProcessor from '../processor'

const lexId = lex.ids.AppBskyFeedLike
Expand Down Expand Up @@ -84,16 +87,36 @@ const notifsForDelete = (
return { notifs: [], toDelete }
}

const updateAggregates = async (db: DatabaseSchema, like: IndexedLike) => {
const likeCountQb = db
.insertInto('post_agg')
.values({
uri: like.subject,
likeCount: db
.selectFrom('like')
.where('like.subject', '=', like.subject)
.select(countAll.as('count')),
})
.onConflict((oc) =>
oc.column('uri').doUpdateSet({ likeCount: excluded(db, 'likeCount') }),
)
await likeCountQb.execute()
}

export type PluginType = RecordProcessor<Like.Record, IndexedLike>

export const makePlugin = (db: DatabaseSchema): PluginType => {
return new RecordProcessor(db, {
export const makePlugin = (
db: Database,
backgroundQueue: BackgroundQueue,
): PluginType => {
return new RecordProcessor(db, backgroundQueue, {
lexId,
insertFn,
findDuplicate,
deleteFn,
notifsForInsert,
notifsForDelete,
updateAggregates,
})
}

Expand Down
Loading

0 comments on commit 587b243

Please sign in to comment.