Skip to content

Commit

Permalink
feat: create a system to track network propagation delay (farcasterxy…
Browse files Browse the repository at this point in the history
…z#970)

* Add new proto definitions

* Add ping and ack functions

* Add gossip event handler

* Add cli argument for network latency mode

* Add ping message scheduler

* Fix imports

* Add tests for message handler

* Add metrics store

* Fix cli arg name

* Add multinode setup and fix metric logging

* Add docker-compose file for multinode test setup

* Drop hard coded id paths

* preserve backward compatibility of process file path

* Enable gossip network diagnostics

* Add tests 1

* Clean up gossipNode tests

* Fix lint

* Lower frequency for latency pings

* Add tests for metrics

* Improve coverage computation algo

* Clean up

* Fixes from PR feedback

* Fixes from PR feedback

* Fixes from PR feedback

* Rename field for consistency

* Add jitter to ping cron

Add jitter to ping cron

* Log legible peerIds

* Fix import paths

* Update log field name

* Consolidate metrics recorder and ping job

* Fix docker compose

* Add time-to-merge metric

* Confine metrics logic to single class

* Clean up

* Fix div by zero

* Use peerId string as metrics key

* Drop unused getter

* Refactor metrics to be keyed by peerId

* Fix tests

* Clean up and fix tests

* Store metrics in DB

* Replace interfaces with types

* Fix testS

* WIP: Adding metrics serde

* Replace map with object to allow for metrics serde

* Add test for serde

* Fix security warnings

* Fix coverage logging

* Add test

* Move message handling to hub

* Fix timestamp overflow

* Lower ping freq and fix coverage logging

* Add peer network to logs

* Fix merge time metric

* Revert "Fix merge time metric"

This reverts commit 76beb5f.

* Expire metrics after logging

* Rename command line arguments

* Fixes + test updateS

* Fixes + test updateS

* Add test for expiry

* Add test for ping function

* Drop unused function

* Record merge times for all merges

* Don't accumulate message count metrics

* Add check for successful message merge

* Re-add generated code

* Fix types

* Re-add generated code

* Fix imports

* fix imports
  • Loading branch information
akshaan authored Jul 8, 2023
1 parent dbe6074 commit 2bfcaf4
Show file tree
Hide file tree
Showing 11 changed files with 1,036 additions and 85 deletions.
4 changes: 3 additions & 1 deletion apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ app
.option("--commit-lock-max-pending <number>", "Commit lock max pending jobs (default: 1000)", parseNumber)
.option("-i, --id <filepath>", "Path to the PeerId file")
.option("-n --network <network>", "Farcaster network ID", parseNetwork)
.option("--gossip-metrics-enabled", "Enable gossip network tracing and metrics. (default: disabled)")
.option("--process-file-prefix <prefix>", 'Prefix for file to which hub process number is written. (default: "")')
.action(async (cliOptions) => {
const teardown = async (hub: Hub) => {
Expand Down Expand Up @@ -325,7 +326,8 @@ app
commitLockMaxPending: cliOptions.commitLockMaxPending ?? hubConfig.commitLockMaxPending,
adminServerEnabled: cliOptions.adminServerEnabled ?? hubConfig.adminServerEnabled,
adminServerHost: cliOptions.adminServerHost ?? hubConfig.adminServerHost,
testUsers,
testUsers: testUsers,
gossipMetricsEnabled: cliOptions.gossipMetricsEnabled ?? false,
};

const hubResult = Result.fromThrowable(
Expand Down
51 changes: 47 additions & 4 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
getSSLHubRpcClient,
getInsecureHubRpcClient,
UserNameProof,
AckMessageBody,
NetworkLatencyMessage,
} from "@farcaster/hub-nodejs";
import { PeerId } from "@libp2p/interface-peer-id";
import { peerIdFromBytes } from "@libp2p/peer-id";
Expand Down Expand Up @@ -62,6 +64,7 @@ import { GossipContactInfoJobScheduler } from "./storage/jobs/gossipContactInfoJ
import { MAINNET_ALLOWED_PEERS } from "./allowedPeers.mainnet.js";
import StoreEventHandler from "./storage/stores/storeEventHandler.js";
import { FNameRegistryClient, FNameRegistryEventsProvider } from "./eth/fnameRegistryEventsProvider.js";
import { GOSSIP_PROTOCOL_VERSION } from "./network/p2p/protocol.js";

export type HubSubmitSource = "gossip" | "rpc" | "eth-provider" | "sync" | "fname-registry";

Expand Down Expand Up @@ -184,6 +187,9 @@ export interface HubOptions {

/** Cron schedule for prune events job */
pruneEventsJobCron?: string;

/** Periodically send network latency ping messages to the gossip network and log metrics */
gossipMetricsEnabled?: boolean;
}

/** @returns A randomized string of the format `rocksdb.tmp.*` used for the DB Name */
Expand Down Expand Up @@ -222,7 +228,7 @@ export class Hub implements HubInterface {
constructor(options: HubOptions) {
this.options = options;
this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName());
this.gossipNode = new GossipNode(this.options.network);
this.gossipNode = new GossipNode(this.rocksDB, this.options.network, this.options.gossipMetricsEnabled);

// Create the ETH registry provider, which will fetch ETH events and push them into the engine.
// Defaults to Goerli testnet, which is currently used for Production Farcaster Hubs.
Expand Down Expand Up @@ -550,6 +556,8 @@ export class Hub implements HubInterface {
return Promise.resolve(err(peerIdResult.error));
}

this.gossipNode.recordMessageReceipt(gossipMessage);

if (gossipMessage.message) {
const message = gossipMessage.message;

Expand All @@ -564,12 +572,18 @@ export class Hub implements HubInterface {
}

// Merge the message
const submitStartTimestamp = Date.now();
const result = await this.submitMessage(message, "gossip");
if (result.isOk()) {
const submitEndTimestamp = Date.now();
this.gossipNode.recordMessageMerge(submitEndTimestamp - submitStartTimestamp);
}
return result.map(() => undefined);
} else if (gossipMessage.contactInfoContent) {
if (peerIdResult.isOk()) {
await this.handleContactInfo(peerIdResult.value, gossipMessage.contactInfoContent);
}
await this.handleContactInfo(peerIdResult.value, gossipMessage.contactInfoContent);
return ok(undefined);
} else if (gossipMessage.networkLatencyMessage) {
await this.handleNetworkLatencyMessage(gossipMessage.networkLatencyMessage);
return ok(undefined);
} else {
return err(new HubError("bad_request.invalid_param", "invalid message type"));
Expand Down Expand Up @@ -640,6 +654,35 @@ export class Hub implements HubInterface {
}
}

private async handleNetworkLatencyMessage(message: NetworkLatencyMessage) {
if (!this.gossipNode.peerId) {
log.error("gossipNode has no peerId");
return;
}
// Respond to ping message with an ack message
if (message.ackMessage) {
this.gossipNode.recordLatencyAckMessageReceipt(message.ackMessage);
} else if (message.pingMessage) {
const pingMessage = message.pingMessage;
const ackMessage = AckMessageBody.create({
pingOriginPeerId: pingMessage.pingOriginPeerId,
ackOriginPeerId: this.gossipNode.peerId.toBytes(),
pingTimestamp: pingMessage.pingTimestamp,
ackTimestamp: Date.now(),
});
const networkLatencyMessage = NetworkLatencyMessage.create({
ackMessage,
});
const ackGossipMessage = GossipMessage.create({
networkLatencyMessage,
topics: [this.gossipNode.primaryTopic()],
peerId: this.gossipNode.peerId.toBytes(),
version: GOSSIP_PROTOCOL_VERSION,
});
await this.gossipNode.publish(ackGossipMessage);
}
}

/** Since we don't know if the peer is using SSL or not, we'll attempt to get the SSL version,
* and fall back to the non-SSL version
*/
Expand Down
157 changes: 157 additions & 0 deletions apps/hubble/src/network/p2p/gossipMetricsRecorder.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { createEd25519PeerId } from "@libp2p/peer-id-factory";
import { GossipMetricsRecorder, GossipMetrics, METRICS_TTL_MILLISECONDS } from "./gossipMetricsRecorder.js";
import { AckMessageBody, NetworkLatencyMessage, GossipMessage } from "@farcaster/hub-nodejs";
import { GossipNode } from "./gossipNode.js";
import { GOSSIP_PROTOCOL_VERSION } from "./protocol.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import { RootPrefix } from "../../storage/db/types.js";
import { jest } from "@jest/globals";

const db = jestRocksDB("network.p2p.gossipMetricsRecorder.test");

describe("Gossip metrics recorder accumulates metrics from messages", () => {
afterEach(async () => {
jest.resetAllMocks();
await db.del(Buffer.from([RootPrefix.GossipMetrics]));
});

test("", async () => {
const node = new GossipNode(db, undefined, true);
await node.start([]);
const nodePeerId = node.peerId ?? (await createEd25519PeerId());
const otherPeerId = await createEd25519PeerId();
let ackPeerId = await createEd25519PeerId();
const pingTimestamp = Date.now();
const recorder = node.metricsRecorder ?? new GossipMetricsRecorder(node, db);

const timeTaken1 = 3600 * 1000;
let ackMessage = AckMessageBody.create({
pingOriginPeerId: otherPeerId.toBytes(),
ackOriginPeerId: ackPeerId.toBytes(),
pingTimestamp: pingTimestamp,
ackTimestamp: pingTimestamp + timeTaken1,
});
const networkLatencyMessage = NetworkLatencyMessage.create({
ackMessage,
});
const gossipMessage = GossipMessage.create({
networkLatencyMessage,
topics: [node.primaryTopic()],
peerId: node.peerId?.toBytes() ?? new Uint8Array(),
version: GOSSIP_PROTOCOL_VERSION,
});
recorder?.recordMessageReceipt(gossipMessage);

// Recent peers set should now have ack sender peerId
expect(Object.keys(recorder.recentPeerIds)).toHaveLength(0);

// Metrics map should have ack message with coverage
expect(Object.keys(recorder.peerLatencyMetrics)).toHaveLength(0);

// Message count should be incremented
expect(Object.keys(recorder.peerMessageMetrics)).toHaveLength(1);
expect(recorder.peerMessageMetrics[nodePeerId.toString()]?.messageCount).toEqual(1);

ackPeerId = await createEd25519PeerId();
const timeTaken2 = 7200 * 1000;
ackMessage = AckMessageBody.create({
pingOriginPeerId: nodePeerId.toBytes(),
ackOriginPeerId: ackPeerId.toBytes(),
pingTimestamp: pingTimestamp,
ackTimestamp: pingTimestamp + timeTaken2,
});
recorder.recordLatencyAckMessageReceipt(ackMessage);

// Recent peers set should have peerId from second ack
expect(Object.keys(recorder.recentPeerIds)).toHaveLength(1);
expect(recorder.recentPeerIds[ackPeerId.toString()]).toBeTruthy();

// Metrics map should have ack with updates coverage
const peerMetricsKey = `${ackPeerId.toString()}_${pingTimestamp}`;
const updatedPeerLatencyMetrics = recorder.peerLatencyMetrics[peerMetricsKey.toString()];
const updatedGlobalMetrics = recorder.globalMetrics;
expect(Object.keys(recorder.peerLatencyMetrics)).toHaveLength(1);

expect(updatedPeerLatencyMetrics?.numAcks).toEqual(1);
expect(updatedPeerLatencyMetrics?.lastAckTimestamp).toEqual(pingTimestamp + timeTaken2);
expect(Object.keys(recorder.peerMessageMetrics)).toHaveLength(1);
expect(Object.keys(updatedGlobalMetrics.networkCoverage)).toHaveLength(1);
expect(updatedGlobalMetrics.networkCoverage[pingTimestamp.toString()]).toEqual({
seenPeerIds: {
[ackPeerId.toString()]: timeTaken2,
},
coverageMap: {
"0.5": timeTaken2,
"0.75": timeTaken2,
"0.9": timeTaken2,
"0.99": timeTaken2,
},
});
});

test("GossipMetrics ser/de works correctly", async () => {
const recentPeerIds = { testPeerId: 1 };
const peerLatencyMetrics = { testPeerId_123: { numAcks: 1, lastAckTimestamp: 12345 } };
const peerMessageMetrics = { testPeerId: { messageCount: 11 } };
const messageMergeTime = { sum: 112, numElements: 1 };
const globalMetrics = { networkCoverage: {}, messageMergeTime: messageMergeTime };

const metrics = new GossipMetrics(recentPeerIds, peerLatencyMetrics, peerMessageMetrics, globalMetrics);
const buffer = metrics.toBuffer();
const deserializedMetrics = GossipMetrics.fromBuffer(buffer);
expect(deserializedMetrics).toEqual(metrics);
});

test("Message merge times are updated correctly", async () => {
const node = new GossipNode(db, undefined, true);
node.start([]);
const recorder = node.metricsRecorder ?? new GossipMetricsRecorder(node, db);
await recorder.start();
expect(recorder.globalMetrics.messageMergeTime).toEqual({ numElements: 0, sum: 0 });
recorder.recordMessageMerge(10);
expect(recorder.globalMetrics.messageMergeTime).toEqual({ numElements: 1, sum: 10 });
recorder.recordMessageMerge(100);
expect(recorder.globalMetrics.messageMergeTime).toEqual({ numElements: 2, sum: 110 });
});

test("Metrics are expired correctly", async () => {
const metricTime = Date.now() - METRICS_TTL_MILLISECONDS;
const recentPeerIds = { testPeerId: metricTime };
const peerLatencyMetrics = { testPeerId_123: { numAcks: 1, lastAckTimestamp: metricTime } };
const peerMessageMetrics = { testPeerId: { messageCount: 11 } };
const messageMergeTime = { sum: 112, numElements: 1 };
const globalMetrics = {
networkCoverage: { metricTime: { coverageMap: {}, seenPeerIds: {} } },
messageMergeTime: messageMergeTime,
};
const metrics = new GossipMetrics(recentPeerIds, peerLatencyMetrics, peerMessageMetrics, globalMetrics);
await db.put(Buffer.from([RootPrefix.GossipMetrics]), metrics.toBuffer());

const node = new GossipNode(db, undefined, true);
node.start([]);
const recorder = new GossipMetricsRecorder(node, db);
await recorder.start();

expect(recorder.globalMetrics).toEqual({ networkCoverage: {}, messageMergeTime: { sum: 0, numElements: 0 } });
expect(recorder.peerLatencyMetrics).toEqual({});
expect(recorder.peerMessageMetrics).toEqual({});
expect(recorder.recentPeerIds).toEqual({});
});

test("Metrics are logged and expired after ping is sent", async () => {
const node = new GossipNode(db, undefined, true);
await node.start([]);
const recorder = new GossipMetricsRecorder(node, db);
await recorder.start();

jest.spyOn(recorder, "expireMetrics");
jest.spyOn(recorder, "logMetrics");
jest.spyOn(node, "publish");
await recorder.sendPingAndLogMetrics(0);

expect(recorder.expireMetrics).toHaveBeenCalledTimes(1);
expect(recorder.logMetrics).toHaveBeenCalledTimes(1);
expect(node.publish).toHaveBeenCalledTimes(1);
expect(recorder.peerMessageMetrics).toEqual({});
});
});
Loading

0 comments on commit 2bfcaf4

Please sign in to comment.