Skip to content

Commit

Permalink
Ozone cdn invalidation (bluesky-social#2087)
Browse files Browse the repository at this point in the history
* hooking up invalidator to ozone

* test + fix

* wire up service entry

* add cid to invalidation url

* add aws to ozone service package.json

* build branch

* fix build

---------

Co-authored-by: Devin Ivy <[email protected]>
  • Loading branch information
dholms and devinivy authored Feb 29, 2024
1 parent 6fe00a4 commit 81370d7
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 8 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-push-ozone-aws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- ozone-cdn-invalidation
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}
Expand Down
4 changes: 3 additions & 1 deletion packages/dev-env/src/ozone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ export class TestOzone {
const secrets = ozone.envToSecrets(env)

// api server
const server = await ozone.OzoneService.create(cfg, secrets)
const server = await ozone.OzoneService.create(cfg, secrets, {
imgInvalidator: config.imgInvalidator,
})
await server.start()

const daemon = await ozone.OzoneDaemon.create(cfg, secrets)
Expand Down
1 change: 1 addition & 0 deletions packages/dev-env/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export type OzoneConfig = Partial<ozone.OzoneEnvironment> & {
dbPostgresUrl: string
migration?: string
signingKey?: ExportableKeypair
imgInvalidator?: ozone.ImageInvalidator
}

export type TestServerParams = {
Expand Down
10 changes: 10 additions & 0 deletions packages/ozone/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
did: env.pdsDid,
}

const cdnCfg: OzoneConfig['cdn'] = {
paths: env.cdnPaths,
}

assert(env.didPlcUrl)
const identityCfg: OzoneConfig['identity'] = {
plcUrl: env.didPlcUrl,
Expand All @@ -48,6 +52,7 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
db: dbCfg,
appview: appviewCfg,
pds: pdsCfg,
cdn: cdnCfg,
identity: identityCfg,
}
}
Expand All @@ -57,6 +62,7 @@ export type OzoneConfig = {
db: DatabaseConfig
appview: AppviewConfig
pds: PdsConfig | null
cdn: CdnConfig
identity: IdentityConfig
}

Expand Down Expand Up @@ -88,3 +94,7 @@ export type PdsConfig = {
export type IdentityConfig = {
plcUrl: string
}

export type CdnConfig = {
paths?: string[]
}
4 changes: 3 additions & 1 deletion packages/ozone/src/config/env.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { envInt, envStr } from '@atproto/common'
import { envInt, envList, envStr } from '@atproto/common'

export const readEnv = (): OzoneEnvironment => {
return {
Expand All @@ -17,6 +17,7 @@ export const readEnv = (): OzoneEnvironment => {
dbPoolMaxUses: envInt('OZONE_DB_POOL_MAX_USES'),
dbPoolIdleTimeoutMs: envInt('OZONE_DB_POOL_IDLE_TIMEOUT_MS'),
didPlcUrl: envStr('OZONE_DID_PLC_URL'),
cdnPaths: envList('OZONE_CDN_PATHS'),
adminPassword: envStr('OZONE_ADMIN_PASSWORD'),
moderatorPassword: envStr('OZONE_MODERATOR_PASSWORD'),
triagePassword: envStr('OZONE_TRIAGE_PASSWORD'),
Expand All @@ -40,6 +41,7 @@ export type OzoneEnvironment = {
dbPoolMaxUses?: number
dbPoolIdleTimeoutMs?: number
didPlcUrl?: string
cdnPaths?: string[]
adminPassword?: string
moderatorPassword?: string
triagePassword?: string
Expand Down
4 changes: 4 additions & 0 deletions packages/ozone/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
CommunicationTemplateService,
CommunicationTemplateServiceCreator,
} from './communication-service/template'
import { ImageInvalidator } from './image-invalidator'

export type AppContextOptions = {
db: Database
Expand All @@ -25,6 +26,7 @@ export type AppContextOptions = {
pdsAgent: AtpAgent | undefined
signingKey: Keypair
idResolver: IdResolver
imgInvalidator?: ImageInvalidator
backgroundQueue: BackgroundQueue
sequencer: Sequencer
}
Expand Down Expand Up @@ -71,6 +73,8 @@ export class AppContext {
appviewAgent,
appviewAuth,
cfg.service.did,
overrides?.imgInvalidator,
cfg.cdn.paths,
)

const communicationTemplateService = CommunicationTemplateService.creator()
Expand Down
7 changes: 7 additions & 0 deletions packages/ozone/src/image-invalidator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Invalidation is a general interface for propagating an image blob
// takedown through any caches where a representation of it may be stored.
// @NOTE this does not remove the blob from storage: just invalidates it from caches.
// @NOTE keep in sync with same interface in aws/src/cloudfront.ts
export interface ImageInvalidator {
invalidate(subject: string, paths: string[]): Promise<void>
}
1 change: 1 addition & 0 deletions packages/ozone/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { createServer } from './lexicon'
import AppContext, { AppContextOptions } from './context'

export * from './config'
export { type ImageInvalidator } from './image-invalidator'
export { Database } from './db'
export { OzoneDaemon, EventPusher, EventReverser } from './daemon'
export { AppContext } from './context'
Expand Down
42 changes: 37 additions & 5 deletions packages/ozone/src/mod-service/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ import {
RepoSubject,
subjectFromStatusRow,
} from './subject'
import { jsonb } from '../db/types'
import { LabelChannel } from '../db/schema/label'
import { BlobPushEvent } from '../db/schema/blob_push_event'
import { BackgroundQueue } from '../background'
import { EventPusher } from '../daemon'
import { jsonb } from '../db/types'
import { LabelChannel } from '../db/schema/label'
import { ImageInvalidator } from '../image-invalidator'
import { httpLogger as log } from '../logger'

export type ModerationServiceCreator = (db: Database) => ModerationService

Expand All @@ -55,6 +57,8 @@ export class ModerationService {
public appviewAgent: AtpAgent,
private appviewAuth: AppviewAuth,
public serverDid: string,
public imgInvalidator?: ImageInvalidator,
public cdnPaths?: string[],
) {}

static creator(
Expand All @@ -63,6 +67,8 @@ export class ModerationService {
appviewAgent: AtpAgent,
appviewAuth: AppviewAuth,
serverDid: string,
imgInvalidator?: ImageInvalidator,
cdnPaths?: string[],
) {
return (db: Database) =>
new ModerationService(
Expand All @@ -72,6 +78,8 @@ export class ModerationService {
appviewAgent,
appviewAuth,
serverDid,
imgInvalidator,
cdnPaths,
)
}

Expand Down Expand Up @@ -556,14 +564,38 @@ export class ModerationService {
lastAttempted: null,
}),
)
.returning('id')
.returning(['id', 'subjectDid', 'subjectBlobCid', 'eventType'])
.execute()

this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
blobEvts.map((evt) => this.eventPusher.attemptBlobEvent(evt.id)),
await Promise.allSettled(
blobEvts.map((evt) =>
this.eventPusher
.attemptBlobEvent(evt.id)
.catch((err) =>
log.error({ err, ...evt }, 'failed to push blob event'),
),
),
)

if (this.imgInvalidator) {
await Promise.allSettled(
(subject.blobCids ?? []).map((cid) => {
const paths = (this.cdnPaths ?? []).map((path) =>
path.replace('%s', subject.did).replace('%s', cid),
)
return this.imgInvalidator
?.invalidate(cid, paths)
.catch((err) =>
log.error(
{ err, paths, cid },
'failed to invalidate blob on cdn',
),
)
}),
)
}
})
})
}
Expand Down
27 changes: 27 additions & 0 deletions packages/ozone/tests/moderation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
} from '../src/lexicon/types/com/atproto/admin/defs'
import { EventReverser } from '../src'
import { TestOzone } from '@atproto/dev-env/src/ozone'
import { ImageInvalidator } from '../src/image-invalidator'
import {
UNSPECCED_TAKEDOWN_BLOBS_LABEL,
UNSPECCED_TAKEDOWN_LABEL,
Expand All @@ -43,6 +44,7 @@ type TakedownParams = BaseCreateReportParams &
describe('moderation', () => {
let network: TestNetwork
let ozone: TestOzone
let mockInvalidator: MockInvalidator
let agent: AtpAgent
let bskyAgent: AtpAgent
let pdsAgent: AtpAgent
Expand Down Expand Up @@ -155,8 +157,13 @@ describe('moderation', () => {
}

beforeAll(async () => {
mockInvalidator = new MockInvalidator()
network = await TestNetwork.create({
dbPostgresSchema: 'ozone_moderation',
ozone: {
imgInvalidator: mockInvalidator,
cdnPaths: ['/path1/%s/%s', '/path2/%s/%s'],
},
})
ozone = network.ozone
agent = network.ozone.getClient()
Expand Down Expand Up @@ -981,6 +988,18 @@ describe('moderation', () => {
expect(await fetchImage.json()).toEqual({ message: 'Image not found' })
})

it('invalidates the image in the cdn', async () => {
const blobCid = blob.image.ref.toString()
expect(mockInvalidator.invalidated.length).toBe(1)
expect(mockInvalidator.invalidated.at(0)?.subject).toBe(blobCid)
expect(mockInvalidator.invalidated.at(0)?.paths.at(0)).toEqual(
`/path1/${sc.dids.carol}/${blobCid}`,
)
expect(mockInvalidator.invalidated.at(0)?.paths.at(1)).toEqual(
`/path2/${sc.dids.carol}/${blobCid}`,
)
})

it('fans takedown out to pds', async () => {
const res = await pdsAgent.api.com.atproto.admin.getSubjectStatus(
{
Expand Down Expand Up @@ -1059,3 +1078,11 @@ describe('moderation', () => {
})
})
})

class MockInvalidator implements ImageInvalidator {
invalidated: { subject: string; paths: string[] }[] = []

async invalidate(subject: string, paths: string[]) {
this.invalidated.push({ subject, paths })
}
}
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 37 additions & 1 deletion services/ozone/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ require('dd-trace') // Only works with commonjs

// Tracer code above must come before anything else
const path = require('path')
const {
BunnyInvalidator,
CloudfrontInvalidator,
MultiImageInvalidator,
} = require('@atproto/aws')
const {
OzoneService,
envToCfg,
Expand All @@ -24,7 +29,38 @@ const main = async () => {
const env = readEnv()
const cfg = envToCfg(env)
const secrets = envToSecrets(env)
const ozone = await OzoneService.create(cfg, secrets)

// configure zero, one, or more image invalidators
const imgUriEndpoint = process.env.OZONE_IMG_URI_ENDPOINT
const bunnyAccessKey = process.env.OZONE_BUNNY_ACCESS_KEY
const cfDistributionId = process.env.OZONE_CF_DISTRIBUTION_ID

const imgInvalidators = []

if (bunnyAccessKey) {
imgInvalidators.push(
new BunnyInvalidator({
accessKey: bunnyAccessKey,
urlPrefix: imgUriEndpoint,
}),
)
}

if (cfDistributionId) {
imgInvalidators.push(
new CloudfrontInvalidator({
distributionId: cfDistributionId,
pathPrefix: imgUriEndpoint && new URL(imgUriEndpoint).pathname,
}),
)
}

const imgInvalidator =
imgInvalidators.length > 1
? new MultiImageInvalidator(imgInvalidators)
: imgInvalidators[0]

const ozone = await OzoneService.create(cfg, secrets, { imgInvalidator })

await ozone.start()

Expand Down
1 change: 1 addition & 0 deletions services/ozone/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "ozone-service",
"private": true,
"dependencies": {
"@atproto/aws": "workspace:^",
"@atproto/ozone": "workspace:^",
"dd-trace": "3.13.2"
}
Expand Down

0 comments on commit 81370d7

Please sign in to comment.