Skip to content

Commit

Permalink
Fix race condition in labeling (bluesky-social#806)
Browse files Browse the repository at this point in the history
* Fix race condition in labeling

* comment
  • Loading branch information
dholms authored Apr 13, 2023
1 parent d6ac47d commit f6aef2b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
12 changes: 3 additions & 9 deletions packages/pds/src/app-view/services/indexing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import * as Repost from './plugins/repost'
import * as Follow from './plugins/follow'
import * as Profile from './plugins/profile'
import { MessageQueue } from '../../../event-stream/types'
import { Labeler } from '../../../labeler'

export class IndexingService {
records: {
Expand All @@ -19,11 +18,7 @@ export class IndexingService {
profile: Profile.PluginType
}

constructor(
public db: Database,
public labeler: Labeler,
public messageDispatcher: MessageQueue,
) {
constructor(public db: Database, public messageDispatcher: MessageQueue) {
this.records = {
post: Post.makePlugin(this.db.db),
like: Like.makePlugin(this.db.db),
Expand All @@ -33,8 +28,8 @@ export class IndexingService {
}
}

static creator(labeler: Labeler, messageDispatcher: MessageQueue) {
return (db: Database) => new IndexingService(db, labeler, messageDispatcher)
static creator(messageDispatcher: MessageQueue) {
return (db: Database) => new IndexingService(db, messageDispatcher)
}

async indexRecord(
Expand All @@ -51,7 +46,6 @@ export class IndexingService {
} else {
await indexer.updateRecord(uri, cid, obj, timestamp)
}
this.labeler.processRecord(uri, obj)
}

async deleteRecord(uri: AtUri, cascading = false) {
Expand Down
9 changes: 7 additions & 2 deletions packages/pds/src/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ export function createServices(resources: {
account: AccountService.creator(),
auth: AuthService.creator(),
record: RecordService.creator(messageDispatcher),
repo: RepoService.creator(repoSigningKey, messageDispatcher, blobstore),
repo: RepoService.creator(
repoSigningKey,
messageDispatcher,
blobstore,
labeler,
),
moderation: ModerationService.creator(
messageDispatcher,
blobstore,
Expand All @@ -45,7 +50,7 @@ export function createServices(resources: {
appView: {
actor: ActorService.creator(imgUriBuilder),
feed: FeedService.creator(imgUriBuilder),
indexing: IndexingService.creator(labeler, messageDispatcher),
indexing: IndexingService.creator(messageDispatcher),
label: LabelService.creator(),
},
}
Expand Down
15 changes: 14 additions & 1 deletion packages/pds/src/services/repo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { RepoBlobs } from './blobs'
import { createWriteToOp, writeToOp } from '../../repo'
import { RecordService } from '../record'
import { sequenceCommit, sequenceRebase } from '../../sequencer'
import { Labeler } from '../../labeler'

export class RepoService {
blobs: RepoBlobs
Expand All @@ -30,6 +31,7 @@ export class RepoService {
public repoSigningKey: crypto.Keypair,
public messageDispatcher: MessageQueue,
public blobstore: BlobStore,
public labeler: Labeler,
) {
this.blobs = new RepoBlobs(db, blobstore)
}
Expand All @@ -38,9 +40,10 @@ export class RepoService {
keypair: crypto.Keypair,
messageDispatcher: MessageQueue,
blobstore: BlobStore,
labeler: Labeler,
) {
return (db: Database) =>
new RepoService(db, keypair, messageDispatcher, blobstore)
new RepoService(db, keypair, messageDispatcher, blobstore, labeler)
}

services = {
Expand Down Expand Up @@ -156,6 +159,16 @@ export class RepoService {
this.blobs.processWriteBlobs(did, commitData.commit, writes),
sequenceCommit(this.db, did, commitData, writes),
])

// @TODO move to appview
writes.map((write) => {
if (
write.action === WriteOpAction.Create ||
write.action === WriteOpAction.Update
) {
this.labeler.processRecord(write.uri, write.record)
}
})
}

async rebaseRepo(did: string, now: string, swapCommit?: CID) {
Expand Down

0 comments on commit f6aef2b

Please sign in to comment.