Skip to content

Commit

Permalink
Remove post hierarchy indexing from bsky appview (bluesky-social#1257)
Browse files Browse the repository at this point in the history
* remove post_hierarchy from db model and indexing in bsky appview

* update bsky appview getPostThread to use recursive query to build thread

* add covering index to speed-up descendents query

* tidy post/notification processing w/o post_hierarchy

* tidy, disallow infinitely following reply cycles
  • Loading branch information
devinivy authored Jun 30, 2023
1 parent ee68a40 commit 2e4a114
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 262 deletions.
42 changes: 24 additions & 18 deletions packages/bsky/src/api/app/bsky/feed/getPostThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import {
ThreadViewPost,
isNotFoundPost,
} from '../../../../lexicon/types/app/bsky/feed/defs'
import {
getAncestorsAndSelfQb,
getDescendentsQb,
} from '../../../../services/util/post'

export type PostThread = {
post: FeedRow
Expand All @@ -32,12 +36,7 @@ export default function (server: Server, ctx: AppContext) {
const feedService = ctx.services.feed(ctx.db)
const labelService = ctx.services.label(ctx.db)

const threadData = await getThreadData(
feedService,
uri,
depth,
parentHeight,
)
const threadData = await getThreadData(ctx, uri, depth, parentHeight)
if (!threadData) {
throw new InvalidRequestError(`Post not found: ${uri}`, 'NotFound')
}
Expand Down Expand Up @@ -173,24 +172,31 @@ const getRelevantIds = (
}

const getThreadData = async (
feedService: FeedService,
ctx: AppContext,
uri: string,
depth: number,
parentHeight: number,
): Promise<PostThread | null> => {
const feedService = ctx.services.feed(ctx.db)
const [parents, children] = await Promise.all([
feedService
.selectPostQb()
.innerJoin('post_hierarchy', 'post_hierarchy.ancestorUri', 'post.uri')
.where('post_hierarchy.uri', '=', uri)
getAncestorsAndSelfQb(ctx.db.db, { uri, parentHeight })
.selectFrom('ancestor')
.innerJoin(
feedService.selectPostQb().as('post'),
'post.uri',
'ancestor.uri',
)
.selectAll('post')
.execute(),
feedService
.selectPostQb()
.innerJoin('post_hierarchy', 'post_hierarchy.uri', 'post.uri')
.where('post_hierarchy.uri', '!=', uri)
.where('post_hierarchy.ancestorUri', '=', uri)
.where('depth', '<=', depth)
.orderBy('post.createdAt', 'desc')
getDescendentsQb(ctx.db.db, { uri, depth })
.selectFrom('descendent')
.innerJoin(
feedService.selectPostQb().as('post'),
'post.uri',
'descendent.uri',
)
.selectAll('post')
.orderBy('sortAt', 'desc')
.execute(),
])
const parentsByUri = parents.reduce((acc, parent) => {
Expand Down
2 changes: 0 additions & 2 deletions packages/bsky/src/db/database-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as profile from './tables/profile'
import * as profileAgg from './tables/profile-agg'
import * as post from './tables/post'
import * as postEmbed from './tables/post-embed'
import * as postHierarchy from './tables/post-hierarchy'
import * as postAgg from './tables/post-agg'
import * as repost from './tables/repost'
import * as feedItem from './tables/feed-item'
Expand Down Expand Up @@ -34,7 +33,6 @@ export type DatabaseSchemaType = duplicateRecord.PartialDB &
profileAgg.PartialDB &
post.PartialDB &
postEmbed.PartialDB &
postHierarchy.PartialDB &
postAgg.PartialDB &
repost.PartialDB &
feedItem.PartialDB &
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Kysely, sql } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('post_hierarchy').execute()
// recreate index that calculates e.g. "replyCount", turning it into a covering index
// for uri so that recursive query for post descendents can use an index-only scan.
await db.schema.dropIndex('post_replyparent_idx').execute()
await sql`create index "post_replyparent_idx" on "post" ("replyParent") include ("uri")`.execute(
db,
)
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable('post_hierarchy')
.addColumn('uri', 'varchar', (col) => col.notNull())
.addColumn('ancestorUri', 'varchar', (col) => col.notNull())
.addColumn('depth', 'integer', (col) => col.notNull())
.addPrimaryKeyConstraint('post_hierarchy_pkey', ['uri', 'ancestorUri'])
.execute()
await db.schema
.createIndex('post_hierarchy_ancestoruri_idx')
.on('post_hierarchy')
.column('ancestorUri')
.execute()
await db.schema.dropIndex('post_replyparent_idx').execute()
await db.schema
.createIndex('post_replyparent_idx')
.on('post')
.column('replyParent')
.execute()
}
1 change: 1 addition & 0 deletions packages/bsky/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ export * as _20230610T203555962Z from './20230610T203555962Z-suggested-follows'
export * as _20230611T215300060Z from './20230611T215300060Z-actor-state'
export * as _20230620T161134972Z from './20230620T161134972Z-post-langs'
export * as _20230627T212437895Z from './20230627T212437895Z-optional-handle'
export * as _20230629T220835893Z from './20230629T220835893Z-remove-post-hierarchy'
11 changes: 0 additions & 11 deletions packages/bsky/src/db/tables/post-hierarchy.ts

This file was deleted.

187 changes: 60 additions & 127 deletions packages/bsky/src/services/indexing/plugins/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,43 @@ import {
import * as lex from '../../../lexicon/lexicons'
import { DatabaseSchema, DatabaseSchemaType } from '../../../db/database-schema'
import RecordProcessor from '../processor'
import { PostHierarchy } from '../../../db/tables/post-hierarchy'
import { Notification } from '../../../db/tables/notification'
import { toSimplifiedISOSafe } from '../util'
import Database from '../../../db'
import { countAll, excluded } from '../../../db/util'
import { BackgroundQueue } from '../../../background'
import { getAncestorsAndSelfQb, getDescendentsQb } from '../../util/post'

type Notif = Insertable<Notification>
type Post = Selectable<DatabaseSchemaType['post']>
type PostEmbedImage = DatabaseSchemaType['post_embed_image']
type PostEmbedExternal = DatabaseSchemaType['post_embed_external']
type PostEmbedRecord = DatabaseSchemaType['post_embed_record']
type PostAncestor = {
uri: string
height: number
}
type PostDescendent = {
uri: string
depth: number
cid: string
creator: string
indexedAt: string
}
type IndexedPost = {
post: Post
facets?: { type: 'mention' | 'link'; value: string }[]
embeds?: (PostEmbedImage[] | PostEmbedExternal | PostEmbedRecord)[]
ancestors?: PostHierarchy[]
descendents?: IndexedPost[]
ancestors?: PostAncestor[]
descendents?: PostDescendent[]
}

const lexId = lex.ids.AppBskyFeedPost

const REPLY_NOTIF_DEPTH = 5
const BLESSED_HELL_THREAD =
'at://did:plc:wgaezxqi2spqm3mhrb5xvkzi/app.bsky.feed.post/3juzlwllznd24'

const insertFn = async (
db: DatabaseSchema,
uri: AtUri,
Expand Down Expand Up @@ -135,94 +150,31 @@ const insertFn = async (
await db.insertInto('post_embed_record').values(recordEmbed).execute()
}
}
// Thread indexing

// Concurrent, out-of-order updates are difficult, so we essentially
// take a lock on the thread for indexing, by locking the thread's root post.
await db
.selectFrom('post')
.forUpdate()
const ancestors = await getAncestorsAndSelfQb(db, {
uri: post.uri,
parentHeight:
post.replyRoot === BLESSED_HELL_THREAD ? 100 : REPLY_NOTIF_DEPTH,
})
.selectFrom('ancestor')
.selectAll()
.where('uri', '=', post.replyRoot ?? post.uri)
.execute()

// Track the minimum we know about the post hierarchy: the post and its parent.
// Importantly, this works even if the parent hasn't been indexed yet.
const minimalPostHierarchy = [
{
uri: post.uri,
ancestorUri: post.uri,
depth: 0,
},
]
if (post.replyParent) {
minimalPostHierarchy.push({
uri: post.uri,
ancestorUri: post.replyParent,
depth: 1,
})
}
const ancestors = await db
.insertInto('post_hierarchy')
.values(minimalPostHierarchy)
.onConflict((oc) => oc.doNothing())
.returningAll()
.execute()

// Copy all the parent's relations down to the post.
// It's possible that the parent hasn't been indexed yet and this will no-op.
if (post.replyParent) {
const deepAncestors = await db
.insertInto('post_hierarchy')
.columns(['uri', 'ancestorUri', 'depth'])
.expression(
db
.selectFrom('post_hierarchy as parent_hierarchy')
.where('parent_hierarchy.uri', '=', post.replyParent)
.select([
sql`${post.uri}`.as('uri'),
'ancestorUri',
sql`depth + 1`.as('depth'),
]),
)
.onConflict((oc) => oc.doNothing())
.returningAll()
.execute()
ancestors.push(...deepAncestors)
}

// Copy all post's relations down to its descendents. This ensures
// that out-of-order indexing (i.e. descendent before parent) is resolved.
const descendents = await db
.insertInto('post_hierarchy')
.columns(['uri', 'ancestorUri', 'depth'])
.expression(
db
.selectFrom('post_hierarchy as target')
.innerJoin(
'post_hierarchy as source',
'source.ancestorUri',
'target.uri',
)
.where('target.uri', '=', post.uri)
.select([
'source.uri as uri',
'target.ancestorUri as ancestorUri',
sql`source.depth + target.depth`.as('depth'),
]),
)
.onConflict((oc) => oc.doNothing())
.returningAll()
const descendents = await getDescendentsQb(db, {
uri: post.uri,
depth: post.replyRoot === BLESSED_HELL_THREAD ? 100 : REPLY_NOTIF_DEPTH,
})
.selectFrom('descendent')
.innerJoin('post', 'post.uri', 'descendent.uri')
.selectAll('descendent')
.select(['cid', 'creator', 'indexedAt'])
.execute()

return {
post: insertedPost,
facets,
embeds,
ancestors,
descendents: await collateDescendents(db, descendents),
descendents,
}
// return { post: insertedPost, facets, embeds, ancestors }
}

const findDuplicate = async (): Promise<AtUri | null> => {
Expand Down Expand Up @@ -267,14 +219,13 @@ const notifsForInsert = (obj: IndexedPost) => {
}
}

const ancestors = (obj.ancestors ?? [])
.filter((a) => a.depth > 0) // no need to notify self
.sort((a, b) => a.depth - b.depth)
const BLESSED_HELL_THREAD =
'at://did:plc:wgaezxqi2spqm3mhrb5xvkzi/app.bsky.feed.post/3juzlwllznd24'
for (const relation of ancestors) {
if (relation.depth < 5 || obj.post.replyRoot === BLESSED_HELL_THREAD) {
const ancestorUri = new AtUri(relation.ancestorUri)
for (const ancestor of obj.ancestors ?? []) {
if (ancestor.uri === obj.post.uri) continue // no need to notify for own post
if (
ancestor.height < REPLY_NOTIF_DEPTH ||
obj.post.replyRoot === BLESSED_HELL_THREAD
) {
const ancestorUri = new AtUri(ancestor.uri)
maybeNotify({
did: ancestorUri.host,
reason: 'reply',
Expand All @@ -287,10 +238,26 @@ const notifsForInsert = (obj: IndexedPost) => {
}
}

if (obj.descendents) {
// May generate notifications for out-of-order indexing of replies
for (const descendent of obj.descendents) {
notifs.push(...notifsForInsert(descendent))
// descendents indicate out-of-order indexing: need to notify
// the current post and upwards.
for (const descendent of obj.descendents ?? []) {
for (const ancestor of obj.ancestors ?? []) {
const totalHeight = descendent.depth + ancestor.height
if (
totalHeight < REPLY_NOTIF_DEPTH ||
obj.post.replyRoot === BLESSED_HELL_THREAD
) {
const ancestorUri = new AtUri(ancestor.uri)
maybeNotify({
did: ancestorUri.host,
reason: 'reply',
reasonSubject: ancestorUri.toString(),
author: descendent.creator,
recordUri: descendent.uri,
recordCid: descendent.cid,
sortAt: descendent.indexedAt,
})
}
}
}

Expand Down Expand Up @@ -341,19 +308,11 @@ const deleteFn = async (
if (deletedPosts) {
deletedEmbeds.push(deletedPosts)
}
// Do not delete, maintain thread hierarchy even if post no longer exists
const ancestors = await db
.selectFrom('post_hierarchy')
.where('uri', '=', uriStr)
.where('depth', '>', 0)
.selectAll()
.execute()
return deleted
? {
post: deleted,
facets: [], // Not used
embeds: deletedEmbeds,
ancestors,
}
: null
}
Expand Down Expand Up @@ -429,29 +388,3 @@ function separateEmbeds(embed: PostRecord['embed']) {
}
return [embed]
}

async function collateDescendents(
db: DatabaseSchema,
descendents: PostHierarchy[],
): Promise<IndexedPost[] | undefined> {
if (!descendents.length) return

const ancestorsByUri = descendents.reduce((acc, descendent) => {
acc[descendent.uri] ??= []
acc[descendent.uri].push(descendent)
return acc
}, {} as Record<string, PostHierarchy[]>)

const descendentPosts = await db
.selectFrom('post')
.selectAll()
.where('uri', 'in', Object.keys(ancestorsByUri))
.execute()

return descendentPosts.map((post) => {
return {
post,
ancestors: ancestorsByUri[post.uri],
}
})
}
Loading

0 comments on commit 2e4a114

Please sign in to comment.