Skip to content

Commit

Permalink
feat(shuttle): Support running before/after hooks when processing eve…
Browse files Browse the repository at this point in the history
…nts (#2176)

## Why is this change needed?

We want the ability to provide custom pre/post-processing logic when
processing events. Expose optional hooks that can be specified. This has
the added benefit that the `beforeProcess` hook can be used to skip
processing of an event.

## Merge Checklist

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [ ] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [x] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.


<!-- start pr-codex -->

---

## PR-Codex overview
This PR enhances the `@farcaster/shuttle` package by adding support for
running before/after hooks when processing events.

### Detailed summary
- Added `beforeProcess` and `afterProcess` handlers to
`EventStreamHubSubscriber` and `HubEventStreamConsumer`
- Implemented handling of preprocess and postprocess actions for event
processing
- Introduced `EventStreamHubSubscriberOptions` and
`EventStreamConsumerOptions` interfaces
- Updated `HubEventStreamConsumer` to extend `TypedEmitter` for custom
event tracking

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sds authored Jul 13, 2024
1 parent fb0a083 commit 41f32cd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changeset/dry-insects-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

Add support for running before/after hooks when processing events
34 changes: 31 additions & 3 deletions packages/shuttle/src/shuttle/eventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { log } from "../log";
import { pino } from "pino";
import { ProcessResult } from "./index";
import { Result } from "neverthrow";
import { TypedEmitter } from "tiny-typed-emitter";

// Dummy name since we don't need unique names to get desired semantics
const DUMMY_CONSUMER_GROUP = "x";
Expand Down Expand Up @@ -203,15 +204,25 @@ const MESSAGE_PROCESSING_CONCURRENCY = 10;
const EVENT_PROCESSING_TIMEOUT = 10_000; // How long before retrying processing (millis)
const EVENT_DELETION_THRESHOLD = 1000 * 60 * 60 * 24; // 1 day

type PreProcessHandler = (event: HubEvent, eventBytes: Uint8Array) => Promise<ProcessResult>;
type PostProcessHandler = (event: HubEvent, eventBytes: Uint8Array) => Promise<void>;

export type EventStreamConsumerOptions = {
maxEventsPerFetch?: number;
messageProcessingConcurrency?: number;
groupName?: string;
eventProcessingTimeout?: number;
eventDeletionThreshold?: number;
beforeProcess?: PreProcessHandler;
afterProcess?: PostProcessHandler;
};

export class HubEventStreamConsumer {
// Provides a way for custom tracking to be implemented
interface HubEventStreamConsumerEventsEmitter {
onError: (hubEvent: HubEvent, error: Error) => void;
}

export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerEventsEmitter> {
public hub: HubClient;
private stream: EventStreamConnection;
public readonly streamKey: string;
Expand All @@ -223,6 +234,8 @@ export class HubEventStreamConsumer {
public stopped = true;
public readonly groupName: string;
private log: pino.Logger;
private beforeProcess?: PreProcessHandler;
private afterProcess?: PostProcessHandler;

constructor(
hub: HubClient,
Expand All @@ -231,6 +244,7 @@ export class HubEventStreamConsumer {
options: EventStreamConsumerOptions = {},
logger: pino.Logger = log,
) {
super();
this.hub = hub;
this.stream = eventStream;
this.streamKey = `hub:${this.hub.host}:evt:msg:${shardKey}`;
Expand All @@ -241,6 +255,8 @@ export class HubEventStreamConsumer {
this.eventDeletionThreshold = options.eventDeletionThreshold || EVENT_DELETION_THRESHOLD;
this.shardKey = shardKey;
this.log = logger;
this.beforeProcess = options.beforeProcess;
this.afterProcess = options.afterProcess;
}

async start(onEvent: (event: HubEvent) => Promise<Result<ProcessResult, Error>>) {
Expand Down Expand Up @@ -294,6 +310,13 @@ export class HubEventStreamConsumer {

const startTime = Date.now();
const hubEvent = HubEvent.decode(streamEvent.data);

const preprocessResult = await this.beforeProcess?.call(this, hubEvent, streamEvent.data);
if (preprocessResult?.skipped) {
eventIdsProcessed.push(streamEvent.id);
return; // Skip event
}

const result = await onEvent(hubEvent);
const processingTime = Date.now() - startTime;
statsd.timing("hub.event.stream.time", processingTime, {
Expand All @@ -302,15 +325,20 @@ export class HubEventStreamConsumer {
hubEventType: hubEvent.type.toString(),
});

if (result.isErr()) throw result.error;
if (result.isErr()) {
this.emit("onError", hubEvent, result.error);
throw result.error;
}

eventIdsProcessed.push(streamEvent.id);
if (result.value.skipped) {
statsd.increment("hub.event.stream.skipped", 1, {
hub: this.hub.host,
source: this.shardKey,
});
} else {
await this.afterProcess?.call(this, hubEvent, streamEvent.data);
}
eventIdsProcessed.push(streamEvent.id);

if (!result.value.skipped) {
const e2eTime = Date.now() - extractEventTimestamp(hubEvent.id);
Expand Down
25 changes: 24 additions & 1 deletion packages/shuttle/src/shuttle/hubSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import { EventStreamConnection } from "./eventStream";
import { sleep } from "../utils";
import { RedisClient } from "./redis";
import { HubClient } from "./hub";
import { ProcessResult } from "./index";

interface HubEventsEmitter {
event: (hubEvent: HubEvent) => void;
onError: (error: Error, stopped: boolean) => void;
}

Expand Down Expand Up @@ -170,6 +170,14 @@ export class BaseHubSubscriber extends HubSubscriber {
}
}

type PreProcessHandler = (event: HubEvent, eventBytes: Uint8Array) => Promise<ProcessResult>;
type PostProcessHandler = (event: HubEvent, eventBytes: Uint8Array) => Promise<void>;

type EventStreamHubSubscriberOptions = {
beforeProcess?: PreProcessHandler;
afterProcess?: PostProcessHandler;
};

export class EventStreamHubSubscriber extends BaseHubSubscriber {
private eventStream: EventStreamConnection;
private redis: RedisClient;
Expand All @@ -181,6 +189,8 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
public maxTimeBetweenBatchFlushes = 200; // Millis
public maxBatchBytesBeforeForceFlush = 2 ** 20; // 2 MiB
private eventBatchBytes = 0;
private beforeProcess?: PreProcessHandler;
private afterProcess?: PostProcessHandler;

constructor(
label: string,
Expand All @@ -192,13 +202,16 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
eventTypes?: HubEventType[],
totalShards?: number,
shardIndex?: number,
options?: EventStreamHubSubscriberOptions,
) {
super(label, hubClient.client, log, eventTypes, totalShards, shardIndex);
this.eventStream = eventStream;
this.redis = redis;
this.streamKey = `hub:${hubClient.host}:evt:msg:${shardKey}`;
this.redisKey = `${hubClient.host}:${shardKey}`;
this.eventsToAdd = [];
this.beforeProcess = options?.beforeProcess;
this.afterProcess = options?.afterProcess;
}

public override async getLastEventId(): Promise<number | undefined> {
Expand All @@ -213,6 +226,10 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {

public override async processHubEvent(event: HubEvent): Promise<boolean> {
const eventBytes = Buffer.from(HubEvent.encode(event).finish());

const preprocessResult = await this.beforeProcess?.call(this, event, eventBytes);
if (preprocessResult?.skipped) return false; // Skip event

this.eventBatchBytes += eventBytes.length;
this.eventsToAdd.push([event, eventBytes]);
if (
Expand All @@ -236,6 +253,12 @@ export class EventStreamHubSubscriber extends BaseHubSubscriber {
const [evt, eventBytes] = eventBatch[eventBatch.length - 1]!;
const lastEventId = evt.id;
await this.redis.setLastProcessedEvent(this.redisKey, lastEventId);

if (this.afterProcess) {
for (const [evt, evtBytes] of eventBatch) {
await this.afterProcess.call(this, evt, evtBytes);
}
}
}

return true;
Expand Down

0 comments on commit 41f32cd

Please sign in to comment.