Skip to content

Commit

Permalink
chore: add more tags to event stream metrics (#2307)
Browse files Browse the repository at this point in the history
In order to better understand how the stale event processing and the
event-specific codepaths impact the message read latency, add tags to
identify these attributes.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR adds event kind and stale/not stale tags to event processing
metrics in the `HubEventStreamConsumer` class.

### Detailed summary
- Added `whenReceived` tag to event processing metrics
- Updated metrics for different stages of event processing
- Introduced distinction between current and stale events

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini authored Sep 11, 2024
1 parent 4e897e9 commit b7a0f40
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/shiny-jobs-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

chore: add event kind and stale/not stale tags to event processing metrics
17 changes: 16 additions & 1 deletion packages/shuttle/src/shuttle/eventStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
eventChunk = eventChunk.filter((_evt, idx) => !preprocessResult[idx]?.skipped);
}

const whenReceived = "current";
await inBatchesOf(eventChunk, this.messageProcessingConcurrency, async (batchedEvents) => {
const eventIdsProcessed: string[] = [];
const eventIdsSkipped: string[] = [];
Expand All @@ -324,6 +325,7 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
statsd.timing("hub.event.stream.dequeue_delay", dequeueDelay, {
hub: this.hub.host,
source: this.shardKey,
whenReceived,
});

const startTime = Date.now();
Expand All @@ -333,6 +335,7 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
hub: this.hub.host,
source: this.shardKey,
hubEventType: hubEvent.type.toString(),
whenReceived,
});

if (result.isErr()) {
Expand All @@ -355,6 +358,7 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
hub: this.hub.host,
source: this.shardKey,
hubEventType: hubEvent.type.toString(),
whenReceived,
});
}
} catch (e: unknown) {
Expand All @@ -371,11 +375,13 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
statsd.timing("hub.event.stream.ack_time", Date.now() - startTime, {
hub: this.hub.host,
source: this.shardKey,
whenReceived,
});

statsd.increment("hub.event.stream.ack", eventIdsProcessed.length, {
hub: this.hub.host,
source: this.shardKey,
whenReceived,
});

if (this.afterProcess) {
Expand Down Expand Up @@ -421,6 +427,7 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE

await inBatchesOf(events, this.messageProcessingConcurrency, async (batchedEvents) => {
const eventIdsProcessed: string[] = [];
const whenReceived = "stale";
await Promise.allSettled(
batchedEvents.map((event) =>
(async (streamEvent) => {
Expand All @@ -430,19 +437,23 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
source: this.shardKey,
});

const hubEvent = HubEvent.decode(streamEvent.data);

const dequeueDelay = Date.now() - Number(streamEvent.id.split("-")[0]);
statsd.timing("hub.event.stream.dequeue_delay", dequeueDelay, {
hub: this.hub.host,
source: this.shardKey,
whenReceived,
});

const startTime = Date.now();
const hubEvent = HubEvent.decode(streamEvent.data);
const result = await onEvent(hubEvent);
const processingTime = Date.now() - startTime;
statsd.timing("hub.event.stream.time", processingTime, {
hub: this.hub.host,
source: this.shardKey,
hubEventType: hubEvent.type.toString(),
whenReceived,
});
if (result.isErr()) throw result.error;

Expand All @@ -451,6 +462,8 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
statsd.timing("hub.event.stream.e2e_time", Date.now() - extractEventTimestamp(hubEvent.id), {
hub: this.hub.host,
source: this.shardKey,
hubEventType: hubEvent.type.toString(),
whenReceived,
});
} catch (e: unknown) {
statsd.increment("hub.event.stream.errors", { hub: this.hub.host, source: this.shardKey });
Expand All @@ -468,11 +481,13 @@ export class HubEventStreamConsumer extends TypedEmitter<HubEventStreamConsumerE
statsd.timing("hub.event.stream.ack_time", Date.now() - startTime, {
hub: this.hub.host,
source: this.shardKey,
whenReceived,
});

statsd.increment("hub.event.stream.ack", eventIdsProcessed.length, {
hub: this.hub.host,
source: this.shardKey,
whenReceived,
});
statsd.increment("hub.event.stream.stale.processed", eventIdsProcessed.length, {
hub: this.hub.host,
Expand Down

0 comments on commit b7a0f40

Please sign in to comment.