Skip to content

Commit

Permalink
Notifications from the appview (bluesky-social#829)
Browse files Browse the repository at this point in the history
* Update notif lexicons for stateless seenAt param

* In-progress work on appview notifs

* Fix-up bsky notification methods

* Add appview notification table table

* Process notifications in bsky appview, test notif indexing

* Test bsky appview notification methods

* Tidy bsky notification tests

* Explicitly don't support seenAt notif params on pds

* Tidy bsky notifs tests

* Sync bsky notif handling with pds

* Remove stale comment

* Switch bsky notifs tests to use testenv
  • Loading branch information
devinivy authored Apr 18, 2023
1 parent 6446e8d commit c70cb2b
Show file tree
Hide file tree
Showing 32 changed files with 1,010 additions and 201 deletions.
6 changes: 6 additions & 0 deletions lexicons/app/bsky/notification/getUnreadCount.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
"defs": {
"main": {
"type": "query",
"parameters": {
"type": "params",
"properties": {
"seenAt": { "type": "string", "format": "datetime"}
}
},
"output": {
"encoding": "application/json",
"schema": {
Expand Down
3 changes: 2 additions & 1 deletion lexicons/app/bsky/notification/listNotifications.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"type": "params",
"properties": {
"limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 50},
"cursor": {"type": "string"}
"cursor": {"type": "string"},
"seenAt": { "type": "string", "format": "datetime"}
}
},
"output": {
Expand Down
13 changes: 13 additions & 0 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4692,6 +4692,15 @@ export const schemaDict = {
defs: {
main: {
type: 'query',
parameters: {
type: 'params',
properties: {
seenAt: {
type: 'string',
format: 'datetime',
},
},
},
output: {
encoding: 'application/json',
schema: {
Expand Down Expand Up @@ -4725,6 +4734,10 @@ export const schemaDict = {
cursor: {
type: 'string',
},
seenAt: {
type: 'string',
format: 'datetime',
},
},
},
output: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { isObj, hasProp } from '../../../../util'
import { lexicons } from '../../../../lexicons'
import { CID } from 'multiformats/cid'

export interface QueryParams {}
export interface QueryParams {
seenAt?: string
}

export type InputSchema = undefined

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs'
export interface QueryParams {
limit?: number
cursor?: string
seenAt?: string
}

export type InputSchema = undefined
Expand Down
35 changes: 35 additions & 0 deletions packages/bsky/src/api/app/bsky/notification/getUnreadCount.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Server } from '../../../../lexicon'
import { countAll, notSoftDeletedClause } from '../../../../db/util'
import AppContext from '../../../../context'
import { authVerifier } from '../util'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.notification.getUnreadCount({
auth: authVerifier,
handler: async ({ auth, params }) => {
const requester = auth.credentials.did
const { seenAt } = params

const { ref } = ctx.db.db.dynamic
const result = await ctx.db.db
.selectFrom('notification')
.select(countAll.as('count'))
.innerJoin('actor', 'actor.did', 'notification.did')
.innerJoin('record', 'record.uri', 'notification.recordUri')
.where(notSoftDeletedClause(ref('actor')))
.where(notSoftDeletedClause(ref('record')))
.where('notification.did', '=', requester)
.if(!!seenAt, (qb) =>
qb.where('notification.sortAt', '>', String(seenAt)),
)
.executeTakeFirst()

const count = result?.count ?? 0

return {
encoding: 'application/json',
body: { count },
}
},
})
}
87 changes: 87 additions & 0 deletions packages/bsky/src/api/app/bsky/notification/listNotifications.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { jsonStringToLex } from '@atproto/lexicon'
import { Server } from '../../../../lexicon'
import { paginate, TimeCidKeyset } from '../../../../db/pagination'
import AppContext from '../../../../context'
import { notSoftDeletedClause } from '../../../../db/util'
import { authVerifier } from '../util'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.notification.listNotifications({
auth: authVerifier,
handler: async ({ params, auth }) => {
const { limit, cursor } = params
const requester = auth.credentials.did
const { seenAt } = params

const { ref } = ctx.db.db.dynamic
let notifBuilder = ctx.db.db
.selectFrom('notification as notif')
.innerJoin('record', 'record.uri', 'notif.recordUri')
.innerJoin('actor as author', 'author.did', 'notif.author')
.where(notSoftDeletedClause(ref('record')))
.where(notSoftDeletedClause(ref('author')))
.where('notif.did', '=', requester)
.select([
'notif.recordUri as uri',
'notif.recordCid as cid',
'author.did as authorDid',
'author.handle as authorHandle',
'author.indexedAt as authorIndexedAt',
'author.takedownId as authorTakedownId',
'notif.reason as reason',
'notif.reasonSubject as reasonSubject',
'notif.sortAt as indexedAt',
'record.json as recordJson',
])

const keyset = new NotifsKeyset(
ref('notif.sortAt'),
ref('notif.recordCid'),
)
notifBuilder = paginate(notifBuilder, {
cursor,
limit,
keyset,
})

const notifs = await notifBuilder.execute()

const actorService = ctx.services.actor(ctx.db)
const authors = await actorService.views.profile(
notifs.map((notif) => ({
did: notif.authorDid,
handle: notif.authorHandle,
indexedAt: notif.authorIndexedAt,
takedownId: notif.authorTakedownId,
})),
requester,
)

const notifications = notifs.map((notif, i) => ({
uri: notif.uri,
cid: notif.cid,
author: authors[i],
reason: notif.reason,
reasonSubject: notif.reasonSubject || undefined,
record: jsonStringToLex(notif.recordJson) as Record<string, unknown>,
isRead: seenAt ? notif.indexedAt <= seenAt : false,
indexedAt: notif.indexedAt,
}))

return {
encoding: 'application/json',
body: {
notifications,
cursor: keyset.packFromResult(notifs),
},
}
},
})
}

type NotifRow = { indexedAt: string; cid: string }
class NotifsKeyset extends TimeCidKeyset<NotifRow> {
labelResult(result: NotifRow) {
return { primary: result.indexedAt, secondary: result.cid }
}
}
4 changes: 4 additions & 0 deletions packages/bsky/src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import getFollows from './app/bsky/graph/getFollows'
import searchActors from './app/bsky/actor/searchActors'
import searchActorsTypeahead from './app/bsky/actor/searchActorsTypeahead'
import getSuggestions from './app/bsky/actor/getSuggestions'
import getUnreadCount from './app/bsky/notification/getUnreadCount'
import listNotifications from './app/bsky/notification/listNotifications'
import unspecced from './app/bsky/unspecced'

export * as health from './health'
Expand All @@ -31,6 +33,8 @@ export default function (server: Server, ctx: AppContext) {
searchActors(server, ctx)
searchActorsTypeahead(server, ctx)
getSuggestions(server, ctx)
getUnreadCount(server, ctx)
listNotifications(server, ctx)
unspecced(server, ctx)
return server
}
4 changes: 3 additions & 1 deletion packages/bsky/src/db/database-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import * as subscription from './tables/subscription'
import * as actor from './tables/actor'
import * as actorSync from './tables/actor-sync'
import * as record from './tables/record'
import * as notification from './tables/notification'

export type DatabaseSchemaType = duplicateRecord.PartialDB &
profile.PartialDB &
Expand All @@ -25,7 +26,8 @@ export type DatabaseSchemaType = duplicateRecord.PartialDB &
subscription.PartialDB &
actor.PartialDB &
actorSync.PartialDB &
record.PartialDB
record.PartialDB &
notification.PartialDB

export type DatabaseSchema = Kysely<DatabaseSchemaType>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
// Notifications
await db.schema
.createTable('notification')
.addColumn('id', 'bigserial', (col) => col.primaryKey())
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('recordUri', 'varchar', (col) => col.notNull())
.addColumn('recordCid', 'varchar', (col) => col.notNull())
.addColumn('author', 'varchar', (col) => col.notNull())
.addColumn('reason', 'varchar', (col) => col.notNull())
.addColumn('reasonSubject', 'varchar')
.addColumn('sortAt', 'varchar', (col) => col.notNull())
.execute()
await db.schema
.createIndex('notification_did_sortat_idx')
.on('notification')
.columns(['did', 'sortAt'])
.execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('notification').execute()
}
1 change: 1 addition & 0 deletions packages/bsky/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
// this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process.

export * as _20230309T045948368Z from './20230309T045948368Z-init'
export * as _20230408T152211201Z from './20230408T152211201Z-notification-init'
16 changes: 16 additions & 0 deletions packages/bsky/src/db/tables/notification.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Generated } from 'kysely'

export const tableName = 'notification'

export interface Notification {
id: Generated<number>
did: string
recordUri: string
recordCid: string
author: string
reason: string
reasonSubject: string | null
sortAt: string
}

export type PartialDB = { [tableName]: Notification }
13 changes: 13 additions & 0 deletions packages/bsky/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4557,6 +4557,15 @@ export const schemaDict = {
defs: {
main: {
type: 'query',
parameters: {
type: 'params',
properties: {
seenAt: {
type: 'string',
format: 'datetime',
},
},
},
output: {
encoding: 'application/json',
schema: {
Expand Down Expand Up @@ -4590,6 +4599,10 @@ export const schemaDict = {
cursor: {
type: 'string',
},
seenAt: {
type: 'string',
format: 'datetime',
},
},
},
output: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import { isObj, hasProp } from '../../../../util'
import { CID } from 'multiformats/cid'
import { HandlerAuth } from '@atproto/xrpc-server'

export interface QueryParams {}
export interface QueryParams {
seenAt?: string
}

export type InputSchema = undefined

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs'
export interface QueryParams {
limit: number
cursor?: string
seenAt?: string
}

export type InputSchema = undefined
Expand Down
15 changes: 6 additions & 9 deletions packages/bsky/src/services/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,18 @@ export class IndexingService {
this.db.assertTransaction()
const indexer = this.findIndexerForCollection(uri.collection)
if (!indexer) return
// @TODO(bsky) direct notifs
const notifs =
action === WriteOpAction.Create
? await indexer.insertRecord(uri, cid, obj, timestamp)
: await indexer.updateRecord(uri, cid, obj, timestamp)
return notifs
if (action === WriteOpAction.Create) {
await indexer.insertRecord(uri, cid, obj, timestamp)
} else {
await indexer.updateRecord(uri, cid, obj, timestamp)
}
}

async deleteRecord(uri: AtUri, cascading = false) {
this.db.assertTransaction()
const indexer = this.findIndexerForCollection(uri.collection)
if (!indexer) return
// @TODO(bsky) direct notifs
const notifs = await indexer.deleteRecord(uri, cascading)
return notifs
await indexer.deleteRecord(uri, cascading)
}

async indexHandle(did: string, timestamp: string, force = false) {
Expand Down
43 changes: 0 additions & 43 deletions packages/bsky/src/services/indexing/messages.ts

This file was deleted.

Loading

0 comments on commit c70cb2b

Please sign in to comment.