Skip to content

Commit

Permalink
Always assign on-chain events to shard 0 for linear ordering (#2229)
Browse files Browse the repository at this point in the history
## Why is this change needed?

Sharding works when the ordering of events in different shards are
independent of each other.

In the case of onchain events and username proofs, however, these events
can affect multiple FIDs at a time
(e.g. transfers) which means depending on which shard they are assigned
to, they might get processed out of order.

Fix this by always using the same shard (shard 0) for onchain events +
username proofs. This isn't perfect since it means shard 0 is a SPOF,
but that's better than a potential race condition in the processing of
events, and the volume of onchain events is much lower than regular hub
events.

## Merge Checklist

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on ensuring onchain and username proofs are always
assigned to the same shard for linear ordering.

### Detailed summary
- Updated logic to assign onchain and username proofs to the same shard
for linear ordering
- Added a function to check if an event is ordered
- Adjusted tests to reflect the new shard assignment logic

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sds authored Jul 31, 2024
1 parent 5d5b327 commit 501ceff
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 18 deletions.
5 changes: 5 additions & 0 deletions .changeset/modern-cows-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Ensure onchain + username proofs are always assigned to the same shard for linear ordering
15 changes: 11 additions & 4 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1367,9 +1367,11 @@ export default class Server {
statsd().increment("rpc.subscribe.out_of_order");
}
lastEventId = event.id;
if (totalShards === 0) {
const isOrderedEvent =
event.type === HubEventType.MERGE_ON_CHAIN_EVENT || event.type === HubEventType.MERGE_USERNAME_PROOF;
if (totalShards === 0 || (isOrderedEvent && shardIndex === 0)) {
bufferedStreamWriter.writeToStream(event);
} else {
} else if (!isOrderedEvent) {
const fid = fidFromEvent(event);
if (jumpConsistentHash(fid, totalShards) === shardIndex) {
bufferedStreamWriter.writeToStream(event);
Expand Down Expand Up @@ -1420,9 +1422,14 @@ export default class Server {

await this.engine.getDb().forEachIteratorByOpts(eventsIteratorOpts.value, async (_key, value) => {
const event = HubEvent.decode(Uint8Array.from(value as Buffer));
const isOrderedEvent =
event.type === HubEventType.MERGE_ON_CHAIN_EVENT || event.type === HubEventType.MERGE_USERNAME_PROOF;
const isRequestedType = request.eventTypes.length === 0 || request.eventTypes.includes(event.type);
const isRequestedFid = totalShards === 0 || fidFromEvent(event) % totalShards === shardIndex;
const shouldWriteEvent = isRequestedType && isRequestedFid;
const shouldWriteEvent =
isRequestedType &&
(totalShards === 0 ||
(isOrderedEvent && shardIndex === 0) ||
(!isOrderedEvent && jumpConsistentHash(fidFromEvent(event), totalShards) === shardIndex));
if (shouldWriteEvent) {
const writeResult = bufferedStreamWriter.writeToStream(event);

Expand Down
32 changes: 20 additions & 12 deletions apps/hubble/src/rpc/test/eventService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,20 @@ describe("sharded event stream", () => {
await engine.mergeUserNameProof(usernameProof);
await sleep(100); // Wait for server to send events over stream

expect(shard0Events.length).toEqual(5);
expect(shard1Events.length).toEqual(5);
expect(shard0Events.length).toEqual(9); // Onchain/username proofs always assigned to shard 0 for linear ordering
expect(shard1Events.length).toEqual(1);

shard0Events.map(([, event]) => {
expect(event.fid || event.data.fid).toBe(202);
function orderedEvent(eventType: number) {
return eventType === HubEventType.MERGE_ON_CHAIN_EVENT || eventType === HubEventType.MERGE_USERNAME_PROOF;
}

shard0Events.map(([eventType, event]) => {
const fid = event.fid || event.data.fid;
expect(fid === 202 || orderedEvent(eventType)).toEqual(true);
});
shard1Events.map(([, event]) => {
expect(event.fid || event.data.fid).toBe(301);
shard1Events.map(([eventType, event]) => {
const fid = event.fid || event.data.fid;
expect(fid === 301 && !orderedEvent(eventType)).toEqual(true);
});

// Should also work when requesting events from the past
Expand All @@ -363,13 +369,15 @@ describe("sharded event stream", () => {
await setupSubscription(shard1HistoricalEvents, { totalShards: 2, shardIndex: 1, fromId: 0 });
await sleep(100);

expect(shard0HistoricalEvents).toHaveLength(5);
expect(shard1HistoricalEvents).toHaveLength(5);
shard0HistoricalEvents.map(([, event]) => {
expect(event.fid || event.data.fid).toBe(202);
expect(shard0HistoricalEvents).toHaveLength(9);
expect(shard1HistoricalEvents).toHaveLength(1);
shard0HistoricalEvents.map(([eventType, event]) => {
const fid = event.fid || event.data.fid;
expect(fid === 202 || orderedEvent(eventType)).toEqual(true);
});
shard1HistoricalEvents.map(([, event]) => {
expect(event.fid || event.data.fid).toBe(301);
shard1HistoricalEvents.map(([eventType, event]) => {
const fid = event.fid || event.data.fid;
expect(fid === 301 && !orderedEvent(eventType)).toEqual(true);
});
});
});
2 changes: 0 additions & 2 deletions packages/shuttle/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.9'

services:
postgres:
image: 'postgres:16-alpine'
Expand Down

0 comments on commit 501ceff

Please sign in to comment.