From c70cb2b6e5a350233d451c3d9a5f6cd5f2c003a4 Mon Sep 17 00:00:00 2001 From: devin ivy Date: Tue, 18 Apr 2023 17:55:03 -0400 Subject: [PATCH] Notifications from the appview (#829) * Update notif lexicons for stateless seenAt param * In-progress work on appview notifs * Fix-up bsky notification methods * Add appview notification table table * Process notifications in bsky appview, test notif indexing * Test bsky appview notification methods * Tidy bsky notification tests * Explicitly don't support seenAt notif params on pds * Tidy bsky notifs tests * Sync bsky notif handling with pds * Remove stale comment * Switch bsky notifs tests to use testenv --- .../app/bsky/notification/getUnreadCount.json | 6 + .../bsky/notification/listNotifications.json | 3 +- packages/api/src/client/lexicons.ts | 13 + .../app/bsky/notification/getUnreadCount.ts | 4 +- .../bsky/notification/listNotifications.ts | 1 + .../app/bsky/notification/getUnreadCount.ts | 35 ++ .../bsky/notification/listNotifications.ts | 87 ++++ packages/bsky/src/api/index.ts | 4 + packages/bsky/src/db/database-schema.ts | 4 +- .../20230408T152211201Z-notification-init.ts | 25 + packages/bsky/src/db/migrations/index.ts | 1 + packages/bsky/src/db/tables/notification.ts | 16 + packages/bsky/src/lexicon/lexicons.ts | 13 + .../app/bsky/notification/getUnreadCount.ts | 4 +- .../bsky/notification/listNotifications.ts | 1 + packages/bsky/src/services/indexing/index.ts | 15 +- .../bsky/src/services/indexing/messages.ts | 43 -- .../src/services/indexing/plugins/follow.ts | 23 +- .../src/services/indexing/plugins/like.ts | 41 +- .../src/services/indexing/plugins/post.ts | 47 +- .../src/services/indexing/plugins/profile.ts | 10 +- .../src/services/indexing/plugins/repost.ts | 33 +- .../bsky/src/services/indexing/processor.ts | 66 ++- .../tests/__snapshots__/indexing.test.ts.snap | 35 +- packages/bsky/tests/indexing.test.ts | 41 +- .../__snapshots__/notifications.test.ts.snap | 433 ++++++++++++++++++ .../bsky/tests/views/notifications.test.ts | 177 +++++++ .../app/bsky/notification/getUnreadCount.ts | 7 +- .../bsky/notification/listNotifications.ts | 5 +- packages/pds/src/lexicon/lexicons.ts | 13 + .../app/bsky/notification/getUnreadCount.ts | 4 +- .../bsky/notification/listNotifications.ts | 1 + 32 files changed, 1010 insertions(+), 201 deletions(-) create mode 100644 packages/bsky/src/api/app/bsky/notification/getUnreadCount.ts create mode 100644 packages/bsky/src/api/app/bsky/notification/listNotifications.ts create mode 100644 packages/bsky/src/db/migrations/20230408T152211201Z-notification-init.ts create mode 100644 packages/bsky/src/db/tables/notification.ts delete mode 100644 packages/bsky/src/services/indexing/messages.ts create mode 100644 packages/bsky/tests/views/__snapshots__/notifications.test.ts.snap create mode 100644 packages/bsky/tests/views/notifications.test.ts diff --git a/lexicons/app/bsky/notification/getUnreadCount.json b/lexicons/app/bsky/notification/getUnreadCount.json index f8c57d824be..ba213622872 100644 --- a/lexicons/app/bsky/notification/getUnreadCount.json +++ b/lexicons/app/bsky/notification/getUnreadCount.json @@ -4,6 +4,12 @@ "defs": { "main": { "type": "query", + "parameters": { + "type": "params", + "properties": { + "seenAt": { "type": "string", "format": "datetime"} + } + }, "output": { "encoding": "application/json", "schema": { diff --git a/lexicons/app/bsky/notification/listNotifications.json b/lexicons/app/bsky/notification/listNotifications.json index 174388f7d5b..d3775e6177c 100644 --- a/lexicons/app/bsky/notification/listNotifications.json +++ b/lexicons/app/bsky/notification/listNotifications.json @@ -8,7 +8,8 @@ "type": "params", "properties": { "limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 50}, - "cursor": {"type": "string"} + "cursor": {"type": "string"}, + "seenAt": { "type": "string", "format": "datetime"} } }, "output": { diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 77b55413c8b..dfcd418edd9 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -4692,6 +4692,15 @@ export const schemaDict = { defs: { main: { type: 'query', + parameters: { + type: 'params', + properties: { + seenAt: { + type: 'string', + format: 'datetime', + }, + }, + }, output: { encoding: 'application/json', schema: { @@ -4725,6 +4734,10 @@ export const schemaDict = { cursor: { type: 'string', }, + seenAt: { + type: 'string', + format: 'datetime', + }, }, }, output: { diff --git a/packages/api/src/client/types/app/bsky/notification/getUnreadCount.ts b/packages/api/src/client/types/app/bsky/notification/getUnreadCount.ts index 0f086b41df1..c05b420ae18 100644 --- a/packages/api/src/client/types/app/bsky/notification/getUnreadCount.ts +++ b/packages/api/src/client/types/app/bsky/notification/getUnreadCount.ts @@ -7,7 +7,9 @@ import { isObj, hasProp } from '../../../../util' import { lexicons } from '../../../../lexicons' import { CID } from 'multiformats/cid' -export interface QueryParams {} +export interface QueryParams { + seenAt?: string +} export type InputSchema = undefined diff --git a/packages/api/src/client/types/app/bsky/notification/listNotifications.ts b/packages/api/src/client/types/app/bsky/notification/listNotifications.ts index 929cfbc9131..149ad58e475 100644 --- a/packages/api/src/client/types/app/bsky/notification/listNotifications.ts +++ b/packages/api/src/client/types/app/bsky/notification/listNotifications.ts @@ -12,6 +12,7 @@ import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs' export interface QueryParams { limit?: number cursor?: string + seenAt?: string } export type InputSchema = undefined diff --git a/packages/bsky/src/api/app/bsky/notification/getUnreadCount.ts b/packages/bsky/src/api/app/bsky/notification/getUnreadCount.ts new file mode 100644 index 00000000000..a87c64b9d33 --- /dev/null +++ b/packages/bsky/src/api/app/bsky/notification/getUnreadCount.ts @@ -0,0 +1,35 @@ +import { Server } from '../../../../lexicon' +import { countAll, notSoftDeletedClause } from '../../../../db/util' +import AppContext from '../../../../context' +import { authVerifier } from '../util' + +export default function (server: Server, ctx: AppContext) { + server.app.bsky.notification.getUnreadCount({ + auth: authVerifier, + handler: async ({ auth, params }) => { + const requester = auth.credentials.did + const { seenAt } = params + + const { ref } = ctx.db.db.dynamic + const result = await ctx.db.db + .selectFrom('notification') + .select(countAll.as('count')) + .innerJoin('actor', 'actor.did', 'notification.did') + .innerJoin('record', 'record.uri', 'notification.recordUri') + .where(notSoftDeletedClause(ref('actor'))) + .where(notSoftDeletedClause(ref('record'))) + .where('notification.did', '=', requester) + .if(!!seenAt, (qb) => + qb.where('notification.sortAt', '>', String(seenAt)), + ) + .executeTakeFirst() + + const count = result?.count ?? 0 + + return { + encoding: 'application/json', + body: { count }, + } + }, + }) +} diff --git a/packages/bsky/src/api/app/bsky/notification/listNotifications.ts b/packages/bsky/src/api/app/bsky/notification/listNotifications.ts new file mode 100644 index 00000000000..9fae1187f1e --- /dev/null +++ b/packages/bsky/src/api/app/bsky/notification/listNotifications.ts @@ -0,0 +1,87 @@ +import { jsonStringToLex } from '@atproto/lexicon' +import { Server } from '../../../../lexicon' +import { paginate, TimeCidKeyset } from '../../../../db/pagination' +import AppContext from '../../../../context' +import { notSoftDeletedClause } from '../../../../db/util' +import { authVerifier } from '../util' + +export default function (server: Server, ctx: AppContext) { + server.app.bsky.notification.listNotifications({ + auth: authVerifier, + handler: async ({ params, auth }) => { + const { limit, cursor } = params + const requester = auth.credentials.did + const { seenAt } = params + + const { ref } = ctx.db.db.dynamic + let notifBuilder = ctx.db.db + .selectFrom('notification as notif') + .innerJoin('record', 'record.uri', 'notif.recordUri') + .innerJoin('actor as author', 'author.did', 'notif.author') + .where(notSoftDeletedClause(ref('record'))) + .where(notSoftDeletedClause(ref('author'))) + .where('notif.did', '=', requester) + .select([ + 'notif.recordUri as uri', + 'notif.recordCid as cid', + 'author.did as authorDid', + 'author.handle as authorHandle', + 'author.indexedAt as authorIndexedAt', + 'author.takedownId as authorTakedownId', + 'notif.reason as reason', + 'notif.reasonSubject as reasonSubject', + 'notif.sortAt as indexedAt', + 'record.json as recordJson', + ]) + + const keyset = new NotifsKeyset( + ref('notif.sortAt'), + ref('notif.recordCid'), + ) + notifBuilder = paginate(notifBuilder, { + cursor, + limit, + keyset, + }) + + const notifs = await notifBuilder.execute() + + const actorService = ctx.services.actor(ctx.db) + const authors = await actorService.views.profile( + notifs.map((notif) => ({ + did: notif.authorDid, + handle: notif.authorHandle, + indexedAt: notif.authorIndexedAt, + takedownId: notif.authorTakedownId, + })), + requester, + ) + + const notifications = notifs.map((notif, i) => ({ + uri: notif.uri, + cid: notif.cid, + author: authors[i], + reason: notif.reason, + reasonSubject: notif.reasonSubject || undefined, + record: jsonStringToLex(notif.recordJson) as Record, + isRead: seenAt ? notif.indexedAt <= seenAt : false, + indexedAt: notif.indexedAt, + })) + + return { + encoding: 'application/json', + body: { + notifications, + cursor: keyset.packFromResult(notifs), + }, + } + }, + }) +} + +type NotifRow = { indexedAt: string; cid: string } +class NotifsKeyset extends TimeCidKeyset { + labelResult(result: NotifRow) { + return { primary: result.indexedAt, secondary: result.cid } + } +} diff --git a/packages/bsky/src/api/index.ts b/packages/bsky/src/api/index.ts index 825adc7f819..33ad60725e7 100644 --- a/packages/bsky/src/api/index.ts +++ b/packages/bsky/src/api/index.ts @@ -12,6 +12,8 @@ import getFollows from './app/bsky/graph/getFollows' import searchActors from './app/bsky/actor/searchActors' import searchActorsTypeahead from './app/bsky/actor/searchActorsTypeahead' import getSuggestions from './app/bsky/actor/getSuggestions' +import getUnreadCount from './app/bsky/notification/getUnreadCount' +import listNotifications from './app/bsky/notification/listNotifications' import unspecced from './app/bsky/unspecced' export * as health from './health' @@ -31,6 +33,8 @@ export default function (server: Server, ctx: AppContext) { searchActors(server, ctx) searchActorsTypeahead(server, ctx) getSuggestions(server, ctx) + getUnreadCount(server, ctx) + listNotifications(server, ctx) unspecced(server, ctx) return server } diff --git a/packages/bsky/src/db/database-schema.ts b/packages/bsky/src/db/database-schema.ts index d40e88b484b..cfc2dc7822e 100644 --- a/packages/bsky/src/db/database-schema.ts +++ b/packages/bsky/src/db/database-schema.ts @@ -12,6 +12,7 @@ import * as subscription from './tables/subscription' import * as actor from './tables/actor' import * as actorSync from './tables/actor-sync' import * as record from './tables/record' +import * as notification from './tables/notification' export type DatabaseSchemaType = duplicateRecord.PartialDB & profile.PartialDB & @@ -25,7 +26,8 @@ export type DatabaseSchemaType = duplicateRecord.PartialDB & subscription.PartialDB & actor.PartialDB & actorSync.PartialDB & - record.PartialDB + record.PartialDB & + notification.PartialDB export type DatabaseSchema = Kysely diff --git a/packages/bsky/src/db/migrations/20230408T152211201Z-notification-init.ts b/packages/bsky/src/db/migrations/20230408T152211201Z-notification-init.ts new file mode 100644 index 00000000000..9c11ed9835f --- /dev/null +++ b/packages/bsky/src/db/migrations/20230408T152211201Z-notification-init.ts @@ -0,0 +1,25 @@ +import { Kysely } from 'kysely' + +export async function up(db: Kysely): Promise { + // Notifications + await db.schema + .createTable('notification') + .addColumn('id', 'bigserial', (col) => col.primaryKey()) + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('recordUri', 'varchar', (col) => col.notNull()) + .addColumn('recordCid', 'varchar', (col) => col.notNull()) + .addColumn('author', 'varchar', (col) => col.notNull()) + .addColumn('reason', 'varchar', (col) => col.notNull()) + .addColumn('reasonSubject', 'varchar') + .addColumn('sortAt', 'varchar', (col) => col.notNull()) + .execute() + await db.schema + .createIndex('notification_did_sortat_idx') + .on('notification') + .columns(['did', 'sortAt']) + .execute() +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('notification').execute() +} diff --git a/packages/bsky/src/db/migrations/index.ts b/packages/bsky/src/db/migrations/index.ts index 649ac3da20a..4f75b0812dd 100644 --- a/packages/bsky/src/db/migrations/index.ts +++ b/packages/bsky/src/db/migrations/index.ts @@ -3,3 +3,4 @@ // this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process. export * as _20230309T045948368Z from './20230309T045948368Z-init' +export * as _20230408T152211201Z from './20230408T152211201Z-notification-init' diff --git a/packages/bsky/src/db/tables/notification.ts b/packages/bsky/src/db/tables/notification.ts new file mode 100644 index 00000000000..12531a75a69 --- /dev/null +++ b/packages/bsky/src/db/tables/notification.ts @@ -0,0 +1,16 @@ +import { Generated } from 'kysely' + +export const tableName = 'notification' + +export interface Notification { + id: Generated + did: string + recordUri: string + recordCid: string + author: string + reason: string + reasonSubject: string | null + sortAt: string +} + +export type PartialDB = { [tableName]: Notification } diff --git a/packages/bsky/src/lexicon/lexicons.ts b/packages/bsky/src/lexicon/lexicons.ts index 76c912cfa6e..884d3adb1c0 100644 --- a/packages/bsky/src/lexicon/lexicons.ts +++ b/packages/bsky/src/lexicon/lexicons.ts @@ -4557,6 +4557,15 @@ export const schemaDict = { defs: { main: { type: 'query', + parameters: { + type: 'params', + properties: { + seenAt: { + type: 'string', + format: 'datetime', + }, + }, + }, output: { encoding: 'application/json', schema: { @@ -4590,6 +4599,10 @@ export const schemaDict = { cursor: { type: 'string', }, + seenAt: { + type: 'string', + format: 'datetime', + }, }, }, output: { diff --git a/packages/bsky/src/lexicon/types/app/bsky/notification/getUnreadCount.ts b/packages/bsky/src/lexicon/types/app/bsky/notification/getUnreadCount.ts index 2b7cbad8cc0..2f49ca1e78e 100644 --- a/packages/bsky/src/lexicon/types/app/bsky/notification/getUnreadCount.ts +++ b/packages/bsky/src/lexicon/types/app/bsky/notification/getUnreadCount.ts @@ -8,7 +8,9 @@ import { isObj, hasProp } from '../../../../util' import { CID } from 'multiformats/cid' import { HandlerAuth } from '@atproto/xrpc-server' -export interface QueryParams {} +export interface QueryParams { + seenAt?: string +} export type InputSchema = undefined diff --git a/packages/bsky/src/lexicon/types/app/bsky/notification/listNotifications.ts b/packages/bsky/src/lexicon/types/app/bsky/notification/listNotifications.ts index 2b5416931aa..09f913a37f3 100644 --- a/packages/bsky/src/lexicon/types/app/bsky/notification/listNotifications.ts +++ b/packages/bsky/src/lexicon/types/app/bsky/notification/listNotifications.ts @@ -13,6 +13,7 @@ import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs' export interface QueryParams { limit: number cursor?: string + seenAt?: string } export type InputSchema = undefined diff --git a/packages/bsky/src/services/indexing/index.ts b/packages/bsky/src/services/indexing/index.ts index 0829fbd2f0e..02c75a3cc3e 100644 --- a/packages/bsky/src/services/indexing/index.ts +++ b/packages/bsky/src/services/indexing/index.ts @@ -57,21 +57,18 @@ export class IndexingService { this.db.assertTransaction() const indexer = this.findIndexerForCollection(uri.collection) if (!indexer) return - // @TODO(bsky) direct notifs - const notifs = - action === WriteOpAction.Create - ? await indexer.insertRecord(uri, cid, obj, timestamp) - : await indexer.updateRecord(uri, cid, obj, timestamp) - return notifs + if (action === WriteOpAction.Create) { + await indexer.insertRecord(uri, cid, obj, timestamp) + } else { + await indexer.updateRecord(uri, cid, obj, timestamp) + } } async deleteRecord(uri: AtUri, cascading = false) { this.db.assertTransaction() const indexer = this.findIndexerForCollection(uri.collection) if (!indexer) return - // @TODO(bsky) direct notifs - const notifs = await indexer.deleteRecord(uri, cascading) - return notifs + await indexer.deleteRecord(uri, cascading) } async indexHandle(did: string, timestamp: string, force = false) { diff --git a/packages/bsky/src/services/indexing/messages.ts b/packages/bsky/src/services/indexing/messages.ts deleted file mode 100644 index 61ce24c18ac..00000000000 --- a/packages/bsky/src/services/indexing/messages.ts +++ /dev/null @@ -1,43 +0,0 @@ -// @TODO(bsky) direct notifications to the right channels - -export type CreateNotification = NotificationInfo & { - type: 'create_notification' -} - -export type NotificationInfo = { - userDid: string - author: string - recordUri: string - recordCid: string - reason: NotificationReason - reasonSubject?: string -} - -export type NotificationReason = - | 'like' - | 'repost' - | 'follow' - | 'mention' - | 'reply' - | 'quote' - -export type DeleteNotifications = { - type: 'delete_notifications' - recordUri: string -} - -export type Message = CreateNotification | DeleteNotifications - -export const createNotification = ( - notif: NotificationInfo, -): CreateNotification => ({ - type: 'create_notification', - ...notif, -}) - -export const deleteNotifications = ( - recordUri: string, -): DeleteNotifications => ({ - type: 'delete_notifications', - recordUri, -}) diff --git a/packages/bsky/src/services/indexing/plugins/follow.ts b/packages/bsky/src/services/indexing/plugins/follow.ts index e3f0b75cede..31a28136825 100644 --- a/packages/bsky/src/services/indexing/plugins/follow.ts +++ b/packages/bsky/src/services/indexing/plugins/follow.ts @@ -4,7 +4,6 @@ import { CID } from 'multiformats/cid' import * as Follow from '../../../lexicon/types/app/bsky/graph/follow' import * as lex from '../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../../db/database-schema' -import * as messages from '../messages' import RecordProcessor from '../processor' const lexId = lex.ids.AppBskyGraphFollow @@ -47,15 +46,17 @@ const findDuplicate = async ( return found ? new AtUri(found.uri) : null } -const eventsForInsert = (obj: IndexedFollow) => { +const notifsForInsert = (obj: IndexedFollow) => { return [ - messages.createNotification({ - userDid: obj.subjectDid, + { + did: obj.subjectDid, author: obj.creator, recordUri: obj.uri, recordCid: obj.cid, - reason: 'follow', - }), + reason: 'follow' as const, + reasonSubject: null, + sortAt: obj.indexedAt, + }, ] } @@ -71,12 +72,12 @@ const deleteFn = async ( return deleted || null } -const eventsForDelete = ( +const notifsForDelete = ( deleted: IndexedFollow, replacedBy: IndexedFollow | null, ) => { - if (replacedBy) return [] - return [messages.deleteNotifications(deleted.uri)] + const toDelete = replacedBy ? [] : [deleted.uri] + return { notifs: [], toDelete } } export type PluginType = RecordProcessor @@ -87,8 +88,8 @@ export const makePlugin = (db: DatabaseSchema): PluginType => { insertFn, findDuplicate, deleteFn, - eventsForInsert, - eventsForDelete, + notifsForInsert, + notifsForDelete, }) } diff --git a/packages/bsky/src/services/indexing/plugins/like.ts b/packages/bsky/src/services/indexing/plugins/like.ts index 7da6a519fed..3e2bdca94a8 100644 --- a/packages/bsky/src/services/indexing/plugins/like.ts +++ b/packages/bsky/src/services/indexing/plugins/like.ts @@ -4,8 +4,6 @@ import { CID } from 'multiformats/cid' import * as Like from '../../../lexicon/types/app/bsky/feed/like' import * as lex from '../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../../db/database-schema' -import * as messages from '../messages' -import { Message } from '../messages' import RecordProcessor from '../processor' const lexId = lex.ids.AppBskyFeedLike @@ -49,20 +47,19 @@ const findDuplicate = async ( return found ? new AtUri(found.uri) : null } -const createNotif = (obj: IndexedLike) => { +const notifsForInsert = (obj: IndexedLike) => { const subjectUri = new AtUri(obj.subject) - return messages.createNotification({ - userDid: subjectUri.host, - author: obj.creator, - recordUri: obj.uri, - recordCid: obj.cid, - reason: 'like', - reasonSubject: subjectUri.toString(), - }) -} - -const eventsForInsert = (obj: IndexedLike) => { - return [createNotif(obj)] + return [ + { + did: subjectUri.host, + author: obj.creator, + recordUri: obj.uri, + recordCid: obj.cid, + reason: 'like' as const, + reasonSubject: subjectUri.toString(), + sortAt: obj.indexedAt, + }, + ] } const deleteFn = async ( @@ -77,14 +74,12 @@ const deleteFn = async ( return deleted || null } -const eventsForDelete = ( +const notifsForDelete = ( deleted: IndexedLike, replacedBy: IndexedLike | null, -): Message[] => { - if (!replacedBy) { - return [messages.deleteNotifications(deleted.uri)] - } - return [] +) => { + const toDelete = replacedBy ? [] : [deleted.uri] + return { notifs: [], toDelete } } export type PluginType = RecordProcessor @@ -95,8 +90,8 @@ export const makePlugin = (db: DatabaseSchema): PluginType => { insertFn, findDuplicate, deleteFn, - eventsForInsert, - eventsForDelete, + notifsForInsert, + notifsForDelete, }) } diff --git a/packages/bsky/src/services/indexing/plugins/post.ts b/packages/bsky/src/services/indexing/plugins/post.ts index 2b5118073bb..747c05679ba 100644 --- a/packages/bsky/src/services/indexing/plugins/post.ts +++ b/packages/bsky/src/services/indexing/plugins/post.ts @@ -1,4 +1,4 @@ -import { Selectable, sql } from 'kysely' +import { Insertable, Selectable, sql } from 'kysely' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/uri' import { Record as PostRecord } from '../../../lexicon/types/app/bsky/feed/post' @@ -11,12 +11,12 @@ import { isLink, } from '../../../lexicon/types/app/bsky/richtext/facet' import * as lex from '../../../lexicon/lexicons' -import * as messages from '../messages' -import { Message } from '../messages' import { DatabaseSchema, DatabaseSchemaType } from '../../../db/database-schema' import RecordProcessor from '../processor' import { PostHierarchy } from '../../../db/tables/post-hierarchy' +import { Notification } from '../../../db/tables/notification' +type Notif = Insertable type Post = Selectable type PostEmbedImage = DatabaseSchemaType['post_embed_image'] type PostEmbedExternal = DatabaseSchemaType['post_embed_external'] @@ -222,23 +222,24 @@ const findDuplicate = async (): Promise => { return null } -const eventsForInsert = (obj: IndexedPost) => { - const notifs: Message[] = [] +const notifsForInsert = (obj: IndexedPost) => { + const notifs: Notif[] = [] const notified = new Set([obj.post.creator]) - const maybeNotify = (notif: messages.NotificationInfo) => { - if (!notified.has(notif.userDid)) { - notified.add(notif.userDid) - notifs.push(messages.createNotification(notif)) + const maybeNotify = (notif: Notif) => { + if (!notified.has(notif.did)) { + notified.add(notif.did) + notifs.push(notif) } } for (const facet of obj.facets ?? []) { if (facet.type === 'mention') { maybeNotify({ - userDid: facet.value, + did: facet.value, reason: 'mention', author: obj.post.creator, recordUri: obj.post.uri, recordCid: obj.post.cid, + sortAt: obj.post.indexedAt, }) } } @@ -247,12 +248,13 @@ const eventsForInsert = (obj: IndexedPost) => { const embedUri = new AtUri(embed.embedUri) if (embedUri.collection === lex.ids.AppBskyFeedPost) { maybeNotify({ - userDid: embedUri.host, + did: embedUri.host, reason: 'quote', reasonSubject: embedUri.toString(), author: obj.post.creator, recordUri: obj.post.uri, recordCid: obj.post.cid, + sortAt: obj.post.indexedAt, }) } } @@ -264,19 +266,20 @@ const eventsForInsert = (obj: IndexedPost) => { for (const relation of ancestors) { const ancestorUri = new AtUri(relation.ancestorUri) maybeNotify({ - userDid: ancestorUri.host, + did: ancestorUri.host, reason: 'reply', reasonSubject: ancestorUri.toString(), author: obj.post.creator, recordUri: obj.post.uri, recordCid: obj.post.cid, + sortAt: obj.post.indexedAt, }) } if (obj.descendents) { // May generate notifications for out-of-order indexing of replies for (const descendent of obj.descendents) { - notifs.push(...eventsForInsert(descendent)) + notifs.push(...notifsForInsert(descendent)) } } @@ -344,15 +347,15 @@ const deleteFn = async ( : null } -const eventsForDelete = ( +const notifsForDelete = ( deleted: IndexedPost, replacedBy: IndexedPost | null, -): Message[] => { - const replacedNotifications = replacedBy ? eventsForInsert(replacedBy) : [] - return [ - messages.deleteNotifications(deleted.post.uri), - ...replacedNotifications, - ] +) => { + const notifs = replacedBy ? notifsForInsert(replacedBy) : [] + return { + notifs, + toDelete: [deleted.post.uri], + } } export type PluginType = RecordProcessor @@ -363,8 +366,8 @@ export const makePlugin = (db: DatabaseSchema): PluginType => { insertFn, findDuplicate, deleteFn, - eventsForInsert, - eventsForDelete, + notifsForInsert, + notifsForDelete, }) } diff --git a/packages/bsky/src/services/indexing/plugins/profile.ts b/packages/bsky/src/services/indexing/plugins/profile.ts index 08fca8793c2..8aa21e156cd 100644 --- a/packages/bsky/src/services/indexing/plugins/profile.ts +++ b/packages/bsky/src/services/indexing/plugins/profile.ts @@ -38,7 +38,7 @@ const findDuplicate = async (): Promise => { return null } -const eventsForInsert = () => { +const notifsForInsert = () => { return [] } @@ -54,8 +54,8 @@ const deleteFn = async ( return deleted || null } -const eventsForDelete = () => { - return [] +const notifsForDelete = () => { + return { notifs: [], toDelete: [] } } export type PluginType = RecordProcessor @@ -66,8 +66,8 @@ export const makePlugin = (db: DatabaseSchema): PluginType => { insertFn, findDuplicate, deleteFn, - eventsForInsert, - eventsForDelete, + notifsForInsert, + notifsForDelete, }) } diff --git a/packages/bsky/src/services/indexing/plugins/repost.ts b/packages/bsky/src/services/indexing/plugins/repost.ts index ae04d201967..c90e2b6ee20 100644 --- a/packages/bsky/src/services/indexing/plugins/repost.ts +++ b/packages/bsky/src/services/indexing/plugins/repost.ts @@ -4,7 +4,6 @@ import { AtUri } from '@atproto/uri' import * as Repost from '../../../lexicon/types/app/bsky/feed/repost' import * as lex from '../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../../db/database-schema' -import * as messages from '../messages' import RecordProcessor from '../processor' const lexId = lex.ids.AppBskyFeedRepost @@ -67,17 +66,19 @@ const findDuplicate = async ( return found ? new AtUri(found.uri) : null } -const eventsForInsert = (obj: IndexedRepost) => { +const notifsForInsert = (obj: IndexedRepost) => { const subjectUri = new AtUri(obj.subject) - const notif = messages.createNotification({ - userDid: subjectUri.host, - author: obj.creator, - recordUri: obj.uri, - recordCid: obj.cid, - reason: 'repost', - reasonSubject: subjectUri.toString(), - }) - return [notif] + return [ + { + did: subjectUri.host, + author: obj.creator, + recordUri: obj.uri, + recordCid: obj.cid, + reason: 'repost' as const, + reasonSubject: subjectUri.toString(), + sortAt: obj.indexedAt, + }, + ] } const deleteFn = async ( @@ -96,12 +97,12 @@ const deleteFn = async ( return deleted || null } -const eventsForDelete = ( +const notifsForDelete = ( deleted: IndexedRepost, replacedBy: IndexedRepost | null, ) => { - if (replacedBy) return [] - return [messages.deleteNotifications(deleted.uri)] + const toDelete = replacedBy ? [] : [deleted.uri] + return { notifs: [], toDelete } } export type PluginType = RecordProcessor @@ -112,8 +113,8 @@ export const makePlugin = (db: DatabaseSchema): PluginType => { insertFn, findDuplicate, deleteFn, - eventsForInsert, - eventsForDelete, + notifsForInsert, + notifsForDelete, }) } diff --git a/packages/bsky/src/services/indexing/processor.ts b/packages/bsky/src/services/indexing/processor.ts index 7f86ae35668..1613652625d 100644 --- a/packages/bsky/src/services/indexing/processor.ts +++ b/packages/bsky/src/services/indexing/processor.ts @@ -1,9 +1,11 @@ +import { Insertable } from 'kysely' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/uri' import { jsonStringToLex, stringifyLex } from '@atproto/lexicon' import DatabaseSchema from '../../db/database-schema' import { lexicons } from '../../lexicon/lexicons' -import { Message } from './messages' +import { Notification } from '../../db/tables/notification' +import { chunkArray } from '@atproto/common' // @NOTE re: insertions and deletions. Due to how record updates are handled, // (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn). @@ -22,10 +24,15 @@ type RecordProcessorParams = { obj: T, ) => Promise deleteFn: (db: DatabaseSchema, uri: AtUri) => Promise - eventsForInsert: (obj: S) => Message[] - eventsForDelete: (prev: S, replacedBy: S | null) => Message[] + notifsForInsert: (obj: S) => Notif[] + notifsForDelete: ( + prev: S, + replacedBy: S | null, + ) => { notifs: Notif[]; toDelete: string[] } } +type Notif = Insertable + export class RecordProcessor { collection: string constructor( @@ -48,12 +55,7 @@ export class RecordProcessor { lexicons.assertValidRecord(this.params.lexId, obj) } - async insertRecord( - uri: AtUri, - cid: CID, - obj: unknown, - timestamp: string, - ): Promise { + async insertRecord(uri: AtUri, cid: CID, obj: unknown, timestamp: string) { this.assertValidRecord(obj) await this.db .insertInto('record') @@ -75,7 +77,7 @@ export class RecordProcessor { ) // if this was a new record, return events if (inserted) { - return this.params.eventsForInsert(inserted) + return this.handleNotifs({ inserted }) } // if duplicate, insert into duplicates table with no events const found = await this.params.findDuplicate(this.db, uri, obj) @@ -91,19 +93,13 @@ export class RecordProcessor { .onConflict((oc) => oc.doNothing()) .execute() } - return [] } // Currently using a very simple strategy for updates: purge the existing index // for the uri then replace it. The main upside is that this allows the indexer // for each collection to avoid bespoke logic for in-place updates, which isn't // straightforward in the general case. We still get nice control over notifications. - async updateRecord( - uri: AtUri, - cid: CID, - obj: unknown, - timestamp: string, - ): Promise { + async updateRecord(uri: AtUri, cid: CID, obj: unknown, timestamp: string) { this.assertValidRecord(obj) await this.db .updateTable('record') @@ -150,10 +146,10 @@ export class RecordProcessor { 'Record update failed: removed from index but could not be replaced', ) } - return this.params.eventsForDelete(deleted, inserted) + await this.handleNotifs({ inserted, deleted }) } - async deleteRecord(uri: AtUri, cascading = false): Promise { + async deleteRecord(uri: AtUri, cascading = false) { await this.db .deleteFrom('record') .where('uri', '=', uri.toString()) @@ -163,13 +159,13 @@ export class RecordProcessor { .where('uri', '=', uri.toString()) .execute() const deleted = await this.params.deleteFn(this.db, uri) - if (!deleted) return [] + if (!deleted) return if (cascading) { await this.db .deleteFrom('duplicate_record') .where('duplicateOf', '=', uri.toString()) .execute() - return this.params.eventsForDelete(deleted, null) + return this.handleNotifs({ deleted }) } else { const found = await this.db .selectFrom('duplicate_record') @@ -181,11 +177,11 @@ export class RecordProcessor { .executeTakeFirst() if (!found) { - return this.params.eventsForDelete(deleted, null) + return this.handleNotifs({ deleted }) } const record = jsonStringToLex(found.json) if (!this.matchesSchema(record)) { - return this.params.eventsForDelete(deleted, null) + return this.handleNotifs({ deleted }) } const inserted = await this.params.insertFn( this.db, @@ -194,7 +190,29 @@ export class RecordProcessor { record, found.indexedAt, ) - return this.params.eventsForDelete(deleted, inserted) + await this.handleNotifs({ deleted, inserted: inserted ?? undefined }) + } + } + + async handleNotifs(op: { deleted?: S; inserted?: S }) { + let notifs: Notif[] = [] + if (op.deleted) { + const forDelete = this.params.notifsForDelete( + op.deleted, + op.inserted ?? null, + ) + if (forDelete.toDelete.length > 0) { + await this.db + .deleteFrom('notification') + .where('recordUri', 'in', forDelete.toDelete) + .execute() + } + notifs = forDelete.notifs + } else if (op.inserted) { + notifs = this.params.notifsForInsert(op.inserted) + } + for (const chunk of chunkArray(notifs, 500)) { + await this.db.insertInto('notification').values(chunk).execute() } } } diff --git a/packages/bsky/tests/__snapshots__/indexing.test.ts.snap b/packages/bsky/tests/__snapshots__/indexing.test.ts.snap index 2b67be5d4d7..415fde245f9 100644 --- a/packages/bsky/tests/__snapshots__/indexing.test.ts.snap +++ b/packages/bsky/tests/__snapshots__/indexing.test.ts.snap @@ -445,34 +445,29 @@ Object { exports[`indexing indexes posts. 3`] = ` Object { - "createMessages": Array [ + "createNotifications": Array [ Object { "author": "user(1)", + "did": "user(0)", + "id": 0, "reason": "mention", + "reasonSubject": null, "recordCid": "cids(0)", "recordUri": "record(0)", - "type": "create_notification", - "userDid": "user(0)", - }, - ], - "deletedMessages": Array [ - Object { - "recordUri": "record(0)", - "type": "delete_notifications", + "sortAt": "1970-01-01T00:00:00.000Z", }, ], - "updateMessages": Array [ - Object { - "recordUri": "record(0)", - "type": "delete_notifications", - }, + "deleteNotifications": Array [], + "updateNotifications": Array [ Object { "author": "user(1)", + "did": "user(2)", + "id": 0, "reason": "mention", + "reasonSubject": null, "recordCid": "cids(1)", "recordUri": "record(0)", - "type": "create_notification", - "userDid": "user(2)", + "sortAt": "1970-01-01T00:00:00.000Z", }, ], } @@ -514,11 +509,3 @@ Object { "viewer": Object {}, } `; - -exports[`indexing indexes profiles. 4`] = ` -Object { - "createMessages": Array [], - "deletedMessages": Array [], - "updateMessages": Array [], -} -`; diff --git a/packages/bsky/tests/indexing.test.ts b/packages/bsky/tests/indexing.test.ts index c081165ec01..53985effb61 100644 --- a/packages/bsky/tests/indexing.test.ts +++ b/packages/bsky/tests/indexing.test.ts @@ -1,3 +1,4 @@ +import { sql } from 'kysely' import { CID } from 'multiformats/cid' import { cidForCbor, TID } from '@atproto/common' import * as pdsRepo from '@atproto/pds/src/repo/prepare' @@ -11,6 +12,7 @@ import { SeedClient } from './seeds/client' import usersSeed from './seeds/users' import basicSeed from './seeds/basic' import { ids } from '../src/lexicon/lexicons' +import { Database } from '../src/db' describe('indexing', () => { let testEnv: TestEnvInfo @@ -89,7 +91,7 @@ describe('indexing', () => { }) // Create - const createMessages = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { return await services.indexing(tx).indexRecord(...createRecord) }) @@ -98,9 +100,10 @@ describe('indexing', () => { { headers: sc.getHeaders(sc.dids.alice, true) }, ) expect(forSnapshot(getAfterCreate.data)).toMatchSnapshot() + const createNotifications = await getNotifications(db, uri) // Update - const updateMessages = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { return await services.indexing(tx).indexRecord(...updateRecord) }) @@ -109,9 +112,10 @@ describe('indexing', () => { { headers: sc.getHeaders(sc.dids.alice, true) }, ) expect(forSnapshot(getAfterUpdate.data)).toMatchSnapshot() + const updateNotifications = await getNotifications(db, uri) // Delete - const deletedMessages = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { return await services.indexing(tx).deleteRecord(...deleteRecord) }) @@ -120,12 +124,13 @@ describe('indexing', () => { { headers: sc.getHeaders(sc.dids.alice, true) }, ) await expect(getAfterDelete).rejects.toThrow(/Post not found:/) + const deleteNotifications = await getNotifications(db, uri) expect( forSnapshot({ - createMessages, - updateMessages, - deletedMessages, + createNotifications, + updateNotifications, + deleteNotifications, }), ).toMatchSnapshot() }) @@ -158,7 +163,7 @@ describe('indexing', () => { }) // Create - const createMessages = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { return await services.indexing(tx).indexRecord(...createRecord) }) @@ -169,7 +174,7 @@ describe('indexing', () => { expect(forSnapshot(getAfterCreate.data)).toMatchSnapshot() // Update - const updateMessages = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { return await services.indexing(tx).indexRecord(...updateRecord) }) @@ -180,7 +185,7 @@ describe('indexing', () => { expect(forSnapshot(getAfterUpdate.data)).toMatchSnapshot() // Delete - const deletedMessages = await db.transaction(async (tx) => { + await db.transaction(async (tx) => { return await services.indexing(tx).deleteRecord(...deleteRecord) }) @@ -189,14 +194,6 @@ describe('indexing', () => { { headers: sc.getHeaders(sc.dids.alice, true) }, ) expect(forSnapshot(getAfterDelete.data)).toMatchSnapshot() - - expect( - forSnapshot({ - createMessages, - updateMessages, - deletedMessages, - }), - ).toMatchSnapshot() }) describe('indexRepo', () => { @@ -427,6 +424,16 @@ describe('indexing', () => { await expect(getProfileAfter).rejects.toThrow('Profile not found') }) }) + + async function getNotifications(db: Database, uri: AtUri) { + return await db.db + .selectFrom('notification') + .selectAll() + .select(sql`0`.as('id')) // Ignore notification ids in comparisons + .where('recordUri', '=', uri.toString()) + .orderBy('sortAt') + .execute() + } }) async function prepareCreate(opts: { diff --git a/packages/bsky/tests/views/__snapshots__/notifications.test.ts.snap b/packages/bsky/tests/views/__snapshots__/notifications.test.ts.snap new file mode 100644 index 00000000000..effd0aa7725 --- /dev/null +++ b/packages/bsky/tests/views/__snapshots__/notifications.test.ts.snap @@ -0,0 +1,433 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`notification views fetches notifications without a last-seen 1`] = ` +Array [ + Object { + "author": Object { + "did": "user(0)", + "handle": "dan.test", + "viewer": Object { + "following": "record(1)", + }, + }, + "cid": "cids(0)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "repost", + "reasonSubject": "record(2)", + "record": Object { + "$type": "app.bsky.feed.repost", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "uri": "record(0)", + }, + Object { + "author": Object { + "did": "user(0)", + "handle": "dan.test", + "viewer": Object { + "following": "record(1)", + }, + }, + "cid": "cids(2)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "mention", + "record": Object { + "$type": "app.bsky.feed.post", + "createdAt": "1970-01-01T00:00:00.000Z", + "embed": Object { + "$type": "app.bsky.embed.record", + "record": Object { + "cid": "cids(3)", + "uri": "record(4)", + }, + }, + "facets": Array [ + Object { + "features": Array [ + Object { + "$type": "app.bsky.richtext.facet#mention", + "did": "user(1)", + }, + ], + "index": Object { + "byteEnd": 18, + "byteStart": 0, + }, + }, + ], + "text": "@alice.bluesky.xyz is the best", + }, + "uri": "record(3)", + }, + Object { + "author": Object { + "did": "user(0)", + "handle": "dan.test", + "viewer": Object { + "following": "record(1)", + }, + }, + "cid": "cids(4)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "like", + "reasonSubject": "record(2)", + "record": Object { + "$type": "app.bsky.feed.like", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "uri": "record(5)", + }, + Object { + "author": Object { + "did": "user(2)", + "handle": "carol.test", + "viewer": Object { + "followedBy": "record(6)", + "following": "record(7)", + }, + }, + "cid": "cids(5)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "follow", + "record": Object { + "$type": "app.bsky.graph.follow", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": "user(1)", + }, + "uri": "record(6)", + }, + Object { + "author": Object { + "did": "user(2)", + "handle": "carol.test", + "viewer": Object { + "followedBy": "record(6)", + "following": "record(7)", + }, + }, + "cid": "cids(6)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "reply", + "reasonSubject": "record(9)", + "record": Object { + "$type": "app.bsky.feed.post", + "createdAt": "1970-01-01T00:00:00.000Z", + "reply": Object { + "parent": Object { + "cid": "cids(7)", + "uri": "record(9)", + }, + "root": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "text": "indeed", + }, + "uri": "record(8)", + }, + Object { + "author": Object { + "did": "user(2)", + "handle": "carol.test", + "viewer": Object { + "followedBy": "record(6)", + "following": "record(7)", + }, + }, + "cid": "cids(8)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "reply", + "reasonSubject": "record(2)", + "record": Object { + "$type": "app.bsky.feed.post", + "createdAt": "1970-01-01T00:00:00.000Z", + "reply": Object { + "parent": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + "root": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "text": "of course", + }, + "uri": "record(10)", + }, + Object { + "author": Object { + "did": "user(2)", + "handle": "carol.test", + "viewer": Object { + "followedBy": "record(6)", + "following": "record(7)", + }, + }, + "cid": "cids(9)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "like", + "reasonSubject": "record(12)", + "record": Object { + "$type": "app.bsky.feed.like", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(10)", + "uri": "record(12)", + }, + }, + "uri": "record(11)", + }, + Object { + "author": Object { + "did": "user(2)", + "handle": "carol.test", + "viewer": Object { + "followedBy": "record(6)", + "following": "record(7)", + }, + }, + "cid": "cids(11)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "like", + "reasonSubject": "record(2)", + "record": Object { + "$type": "app.bsky.feed.like", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "uri": "record(13)", + }, + Object { + "author": Object { + "avatar": "https://bsky.public.url/image/sig()/rs:fill:1000:1000:1:0/plain/user(4)/cids(13)@jpeg", + "description": "hi im bob", + "did": "user(3)", + "displayName": "bobby", + "handle": "bob.test", + "indexedAt": "1970-01-01T00:00:00.000Z", + "viewer": Object { + "followedBy": "record(14)", + "following": "record(15)", + }, + }, + "cid": "cids(12)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "follow", + "record": Object { + "$type": "app.bsky.graph.follow", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": "user(1)", + }, + "uri": "record(14)", + }, + Object { + "author": Object { + "avatar": "https://bsky.public.url/image/sig()/rs:fill:1000:1000:1:0/plain/user(4)/cids(13)@jpeg", + "description": "hi im bob", + "did": "user(3)", + "displayName": "bobby", + "handle": "bob.test", + "indexedAt": "1970-01-01T00:00:00.000Z", + "viewer": Object { + "followedBy": "record(14)", + "following": "record(15)", + }, + }, + "cid": "cids(14)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "reply", + "reasonSubject": "record(2)", + "record": Object { + "$type": "app.bsky.feed.post", + "createdAt": "1970-01-01T00:00:00.000Z", + "embed": Object { + "$type": "app.bsky.embed.images", + "images": Array [ + Object { + "alt": "tests/image/fixtures/key-landscape-small.jpg", + "image": Object { + "$type": "blob", + "mimeType": "image/jpeg", + "ref": Object { + "$link": "cids(15)", + }, + "size": 4114, + }, + }, + ], + }, + "reply": Object { + "parent": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + "root": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "text": "hear that", + }, + "uri": "record(16)", + }, + Object { + "author": Object { + "avatar": "https://bsky.public.url/image/sig()/rs:fill:1000:1000:1:0/plain/user(4)/cids(13)@jpeg", + "description": "hi im bob", + "did": "user(3)", + "displayName": "bobby", + "handle": "bob.test", + "indexedAt": "1970-01-01T00:00:00.000Z", + "viewer": Object { + "followedBy": "record(14)", + "following": "record(15)", + }, + }, + "cid": "cids(16)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "like", + "reasonSubject": "record(12)", + "record": Object { + "$type": "app.bsky.feed.like", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(10)", + "uri": "record(12)", + }, + }, + "uri": "record(17)", + }, + Object { + "author": Object { + "avatar": "https://bsky.public.url/image/sig()/rs:fill:1000:1000:1:0/plain/user(4)/cids(13)@jpeg", + "description": "hi im bob", + "did": "user(3)", + "displayName": "bobby", + "handle": "bob.test", + "indexedAt": "1970-01-01T00:00:00.000Z", + "viewer": Object { + "followedBy": "record(14)", + "following": "record(15)", + }, + }, + "cid": "cids(17)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "like", + "reasonSubject": "record(2)", + "record": Object { + "$type": "app.bsky.feed.like", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(1)", + "uri": "record(2)", + }, + }, + "uri": "record(18)", + }, +] +`; + +exports[`notification views generates notifications for quotes 1`] = ` +Array [ + Object { + "author": Object { + "did": "user(0)", + "handle": "carol.test", + "viewer": Object {}, + }, + "cid": "cids(0)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "repost", + "reasonSubject": "record(1)", + "record": Object { + "$type": "app.bsky.feed.repost", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": Object { + "cid": "cids(1)", + "uri": "record(1)", + }, + }, + "uri": "record(0)", + }, + Object { + "author": Object { + "avatar": "https://bsky.public.url/image/sig()/rs:fill:1000:1000:1:0/plain/user(2)/cids(3)@jpeg", + "description": "its me!", + "did": "user(1)", + "displayName": "ali", + "handle": "alice.test", + "indexedAt": "1970-01-01T00:00:00.000Z", + "viewer": Object { + "followedBy": "record(2)", + }, + }, + "cid": "cids(2)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "follow", + "record": Object { + "$type": "app.bsky.graph.follow", + "createdAt": "1970-01-01T00:00:00.000Z", + "subject": "user(3)", + }, + "uri": "record(2)", + }, + Object { + "author": Object { + "avatar": "https://bsky.public.url/image/sig()/rs:fill:1000:1000:1:0/plain/user(2)/cids(3)@jpeg", + "description": "its me!", + "did": "user(1)", + "displayName": "ali", + "handle": "alice.test", + "indexedAt": "1970-01-01T00:00:00.000Z", + "viewer": Object { + "followedBy": "record(2)", + }, + }, + "cid": "cids(4)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "isRead": false, + "reason": "quote", + "reasonSubject": "record(1)", + "record": Object { + "$type": "app.bsky.feed.post", + "createdAt": "1970-01-01T00:00:00.000Z", + "embed": Object { + "$type": "app.bsky.embed.record", + "record": Object { + "cid": "cids(1)", + "uri": "record(1)", + }, + }, + "text": "yoohoo", + }, + "uri": "record(3)", + }, +] +`; diff --git a/packages/bsky/tests/views/notifications.test.ts b/packages/bsky/tests/views/notifications.test.ts new file mode 100644 index 00000000000..26f9bf9432a --- /dev/null +++ b/packages/bsky/tests/views/notifications.test.ts @@ -0,0 +1,177 @@ +import AtpAgent from '@atproto/api' +import { TestEnvInfo, runTestEnv } from '@atproto/dev-env' +import { forSnapshot, paginateAll, processAll } from '../_util' +import { SeedClient } from '../seeds/client' +import basicSeed from '../seeds/basic' +import { Notification } from '../../src/lexicon/types/app/bsky/notification/listNotifications' + +describe('notification views', () => { + let testEnv: TestEnvInfo + let agent: AtpAgent + let sc: SeedClient + + // account dids, for convenience + let alice: string + + beforeAll(async () => { + testEnv = await runTestEnv({ + dbPostgresSchema: 'views_notifications', + }) + agent = new AtpAgent({ service: testEnv.bsky.url }) + const pdsAgent = new AtpAgent({ service: testEnv.pds.url }) + sc = new SeedClient(pdsAgent) + await basicSeed(sc) + await processAll(testEnv) + alice = sc.dids.alice + }) + + afterAll(async () => { + await testEnv.close() + }) + + const sort = (notifs: Notification[]) => { + // Need to sort because notification ordering is not well-defined + return notifs.sort((a, b) => { + const stableUriA = a.uri.replace( + /\/did:plc:.+?\//, + `/${a.author.handle}/`, + ) + const stableUriB = b.uri.replace( + /\/did:plc:.+?\//, + `/${b.author.handle}/`, + ) + if (stableUriA === stableUriB) { + return a.indexedAt > b.indexedAt ? -1 : 1 + } + return stableUriA > stableUriB ? -1 : 1 + }) + } + + it('fetches notification count without a last-seen', async () => { + const notifCountAlice = + await agent.api.app.bsky.notification.getUnreadCount( + {}, + { headers: sc.getHeaders(alice, true) }, + ) + + expect(notifCountAlice.data.count).toBe(11) + + const notifCountBob = await agent.api.app.bsky.notification.getUnreadCount( + {}, + { headers: sc.getHeaders(sc.dids.bob, true) }, + ) + + expect(notifCountBob.data.count).toBe(4) + }) + + it('generates notifications for all reply ancestors', async () => { + // Add to reply chain, post ancestors: alice -> bob -> alice -> carol. + // Should have added one notification for each of alice and bob. + await sc.reply( + sc.dids.carol, + sc.posts[alice][1].ref, + sc.replies[alice][0].ref, + 'indeed', + ) + await processAll(testEnv) + + const notifCountAlice = + await agent.api.app.bsky.notification.getUnreadCount( + {}, + { headers: sc.getHeaders(alice, true) }, + ) + + expect(notifCountAlice.data.count).toBe(12) + + const notifCountBob = await agent.api.app.bsky.notification.getUnreadCount( + {}, + { headers: sc.getHeaders(sc.dids.bob, true) }, + ) + + expect(notifCountBob.data.count).toBe(5) + }) + + it('generates notifications for quotes', async () => { + // Dan was quoted by alice + const notifsDan = await agent.api.app.bsky.notification.listNotifications( + {}, + { headers: sc.getHeaders(sc.dids.dan, true) }, + ) + expect(forSnapshot(sort(notifsDan.data.notifications))).toMatchSnapshot() + }) + + it('fetches notifications without a last-seen', async () => { + const notifRes = await agent.api.app.bsky.notification.listNotifications( + {}, + { headers: sc.getHeaders(alice, true) }, + ) + + const notifs = notifRes.data.notifications + expect(notifs.length).toBe(12) + + const readStates = notifs.map((notif) => notif.isRead) + expect(readStates).toEqual(notifs.map(() => false)) + + expect(forSnapshot(sort(notifs))).toMatchSnapshot() + }) + + it('paginates', async () => { + const results = (results) => + sort(results.flatMap((res) => res.notifications)) + const paginator = async (cursor?: string) => { + const res = await agent.api.app.bsky.notification.listNotifications( + { cursor, limit: 6 }, + { headers: sc.getHeaders(alice, true) }, + ) + return res.data + } + + const paginatedAll = await paginateAll(paginator) + paginatedAll.forEach((res) => + expect(res.notifications.length).toBeLessThanOrEqual(6), + ) + + const full = await agent.api.app.bsky.notification.listNotifications( + {}, + { headers: sc.getHeaders(alice, true) }, + ) + + expect(full.data.notifications.length).toEqual(12) + expect(results(paginatedAll)).toEqual(results([full.data])) + }) + + it('fetches notification count with a last-seen', async () => { + const full = await agent.api.app.bsky.notification.listNotifications( + {}, + { headers: sc.getHeaders(alice, true) }, + ) + const seenAt = full.data.notifications[3].indexedAt + const notifCount = await agent.api.app.bsky.notification.getUnreadCount( + { seenAt }, + { headers: sc.getHeaders(alice, true) }, + ) + + expect(notifCount.data.count).toBe( + full.data.notifications.filter((n) => n.indexedAt > seenAt).length, + ) + expect(notifCount.data.count).toBeGreaterThan(0) + }) + + it('fetches notifications with a last-seen', async () => { + const full = await agent.api.app.bsky.notification.listNotifications( + {}, + { headers: sc.getHeaders(alice, true) }, + ) + const seenAt = full.data.notifications[3].indexedAt + const notifRes = await agent.api.app.bsky.notification.listNotifications( + { seenAt }, + { headers: sc.getHeaders(alice, true) }, + ) + + const notifs = notifRes.data.notifications + expect(notifs.length).toBe(12) + + const readStates = notifs.map((notif) => notif.isRead) + expect(readStates).toEqual(notifs.map((n) => n.indexedAt <= seenAt)) + }) +}) diff --git a/packages/pds/src/api/app/bsky/notification/getUnreadCount.ts b/packages/pds/src/api/app/bsky/notification/getUnreadCount.ts index f932dc4ac96..c3481e7c5c2 100644 --- a/packages/pds/src/api/app/bsky/notification/getUnreadCount.ts +++ b/packages/pds/src/api/app/bsky/notification/getUnreadCount.ts @@ -1,3 +1,4 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' import { countAll, notSoftDeletedClause } from '../../../../db/util' import AppContext from '../../../../context' @@ -5,9 +6,13 @@ import AppContext from '../../../../context' export default function (server: Server, ctx: AppContext) { server.app.bsky.notification.getUnreadCount({ auth: ctx.accessVerifier, - handler: async ({ auth }) => { + handler: async ({ auth, params }) => { + const { seenAt } = params const requester = auth.credentials.did const { ref } = ctx.db.db.dynamic + if (seenAt) { + throw new InvalidRequestError('The seenAt parameter is unsupported') + } const result = await ctx.db.db .selectFrom('user_notification as notif') diff --git a/packages/pds/src/api/app/bsky/notification/listNotifications.ts b/packages/pds/src/api/app/bsky/notification/listNotifications.ts index c1813dc2d81..5d02d34d66e 100644 --- a/packages/pds/src/api/app/bsky/notification/listNotifications.ts +++ b/packages/pds/src/api/app/bsky/notification/listNotifications.ts @@ -9,9 +9,12 @@ export default function (server: Server, ctx: AppContext) { server.app.bsky.notification.listNotifications({ auth: ctx.accessVerifier, handler: async ({ params, auth }) => { - const { limit, cursor } = params + const { limit, cursor, seenAt } = params const requester = auth.credentials.did const { ref } = ctx.db.db.dynamic + if (seenAt) { + throw new InvalidRequestError('The seenAt parameter is unsupported') + } let notifBuilder = ctx.db.db .selectFrom('user_notification as notif') diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 77b55413c8b..dfcd418edd9 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -4692,6 +4692,15 @@ export const schemaDict = { defs: { main: { type: 'query', + parameters: { + type: 'params', + properties: { + seenAt: { + type: 'string', + format: 'datetime', + }, + }, + }, output: { encoding: 'application/json', schema: { @@ -4725,6 +4734,10 @@ export const schemaDict = { cursor: { type: 'string', }, + seenAt: { + type: 'string', + format: 'datetime', + }, }, }, output: { diff --git a/packages/pds/src/lexicon/types/app/bsky/notification/getUnreadCount.ts b/packages/pds/src/lexicon/types/app/bsky/notification/getUnreadCount.ts index 2b7cbad8cc0..2f49ca1e78e 100644 --- a/packages/pds/src/lexicon/types/app/bsky/notification/getUnreadCount.ts +++ b/packages/pds/src/lexicon/types/app/bsky/notification/getUnreadCount.ts @@ -8,7 +8,9 @@ import { isObj, hasProp } from '../../../../util' import { CID } from 'multiformats/cid' import { HandlerAuth } from '@atproto/xrpc-server' -export interface QueryParams {} +export interface QueryParams { + seenAt?: string +} export type InputSchema = undefined diff --git a/packages/pds/src/lexicon/types/app/bsky/notification/listNotifications.ts b/packages/pds/src/lexicon/types/app/bsky/notification/listNotifications.ts index 2b5416931aa..09f913a37f3 100644 --- a/packages/pds/src/lexicon/types/app/bsky/notification/listNotifications.ts +++ b/packages/pds/src/lexicon/types/app/bsky/notification/listNotifications.ts @@ -13,6 +13,7 @@ import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs' export interface QueryParams { limit: number cursor?: string + seenAt?: string } export type InputSchema = undefined