From f6aef2b50f0176dc87617060c2c98e76c2968aa0 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Wed, 12 Apr 2023 19:39:12 -0500 Subject: [PATCH] Fix race condition in labeling (#806) * Fix race condition in labeling * comment --- .../pds/src/app-view/services/indexing/index.ts | 12 +++--------- packages/pds/src/services/index.ts | 9 +++++++-- packages/pds/src/services/repo/index.ts | 15 ++++++++++++++- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/packages/pds/src/app-view/services/indexing/index.ts b/packages/pds/src/app-view/services/indexing/index.ts index 95f0d719321..e71160fef2b 100644 --- a/packages/pds/src/app-view/services/indexing/index.ts +++ b/packages/pds/src/app-view/services/indexing/index.ts @@ -8,7 +8,6 @@ import * as Repost from './plugins/repost' import * as Follow from './plugins/follow' import * as Profile from './plugins/profile' import { MessageQueue } from '../../../event-stream/types' -import { Labeler } from '../../../labeler' export class IndexingService { records: { @@ -19,11 +18,7 @@ export class IndexingService { profile: Profile.PluginType } - constructor( - public db: Database, - public labeler: Labeler, - public messageDispatcher: MessageQueue, - ) { + constructor(public db: Database, public messageDispatcher: MessageQueue) { this.records = { post: Post.makePlugin(this.db.db), like: Like.makePlugin(this.db.db), @@ -33,8 +28,8 @@ export class IndexingService { } } - static creator(labeler: Labeler, messageDispatcher: MessageQueue) { - return (db: Database) => new IndexingService(db, labeler, messageDispatcher) + static creator(messageDispatcher: MessageQueue) { + return (db: Database) => new IndexingService(db, messageDispatcher) } async indexRecord( @@ -51,7 +46,6 @@ export class IndexingService { } else { await indexer.updateRecord(uri, cid, obj, timestamp) } - this.labeler.processRecord(uri, obj) } async deleteRecord(uri: AtUri, cascading = false) { diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index fe3ecc3bb80..ae2f5188816 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -35,7 +35,12 @@ export function createServices(resources: { account: AccountService.creator(), auth: AuthService.creator(), record: RecordService.creator(messageDispatcher), - repo: RepoService.creator(repoSigningKey, messageDispatcher, blobstore), + repo: RepoService.creator( + repoSigningKey, + messageDispatcher, + blobstore, + labeler, + ), moderation: ModerationService.creator( messageDispatcher, blobstore, @@ -45,7 +50,7 @@ export function createServices(resources: { appView: { actor: ActorService.creator(imgUriBuilder), feed: FeedService.creator(imgUriBuilder), - indexing: IndexingService.creator(labeler, messageDispatcher), + indexing: IndexingService.creator(messageDispatcher), label: LabelService.creator(), }, } diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/services/repo/index.ts index a8ad1b94160..a2a985eb03c 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/services/repo/index.ts @@ -21,6 +21,7 @@ import { RepoBlobs } from './blobs' import { createWriteToOp, writeToOp } from '../../repo' import { RecordService } from '../record' import { sequenceCommit, sequenceRebase } from '../../sequencer' +import { Labeler } from '../../labeler' export class RepoService { blobs: RepoBlobs @@ -30,6 +31,7 @@ export class RepoService { public repoSigningKey: crypto.Keypair, public messageDispatcher: MessageQueue, public blobstore: BlobStore, + public labeler: Labeler, ) { this.blobs = new RepoBlobs(db, blobstore) } @@ -38,9 +40,10 @@ export class RepoService { keypair: crypto.Keypair, messageDispatcher: MessageQueue, blobstore: BlobStore, + labeler: Labeler, ) { return (db: Database) => - new RepoService(db, keypair, messageDispatcher, blobstore) + new RepoService(db, keypair, messageDispatcher, blobstore, labeler) } services = { @@ -156,6 +159,16 @@ export class RepoService { this.blobs.processWriteBlobs(did, commitData.commit, writes), sequenceCommit(this.db, did, commitData, writes), ]) + + // @TODO move to appview + writes.map((write) => { + if ( + write.action === WriteOpAction.Create || + write.action === WriteOpAction.Update + ) { + this.labeler.processRecord(write.uri, write.record) + } + }) } async rebaseRepo(did: string, now: string, swapCommit?: CID) {