From 5b9d847791fe0da4fe7877f4f347b29d46771e7c Mon Sep 17 00:00:00 2001 From: "Todd L. Montgomery" Date: Fri, 8 Jul 2022 09:54:20 +0100 Subject: [PATCH] [Java]: add duty cycle tracking to agent of driver, archive, consensus module, and service container. Default tracker is stall tracker that tracks duty cycle max ns and count of exceeds of threshold as counters. --- .../main/java/io/aeron/archive/Archive.java | 275 ++++++++++++++++++ .../io/aeron/archive/ArchiveConductor.java | 10 + .../DedicatedModeArchiveConductor.java | 56 +++- .../src/main/java/io/aeron/AeronCounters.java | 33 +++ .../io/aeron/cluster/ConsensusModule.java | 97 ++++++ .../aeron/cluster/ConsensusModuleAgent.java | 7 + .../service/ClusteredServiceAgent.java | 8 + .../service/ClusteredServiceContainer.java | 132 ++++++++- .../cluster/ConsensusModuleAgentTest.java | 5 +- .../java/io/aeron/driver/Configuration.java | 40 +++ .../java/io/aeron/driver/DriverConductor.java | 19 +- .../io/aeron/driver/DriverConductorProxy.java | 9 + .../io/aeron/driver/DutyCycleTracker.java | 69 +++++ .../java/io/aeron/driver/MediaDriver.java | 157 ++++++++++ .../main/java/io/aeron/driver/Receiver.java | 23 +- .../src/main/java/io/aeron/driver/Sender.java | 22 +- .../driver/status/DutyCycleStallTracker.java | 94 ++++++ .../status/SystemCounterDescriptor.java | 22 +- .../io/aeron/driver/DriverConductorTest.java | 23 +- .../io/aeron/driver/IpcPublicationTest.java | 4 +- .../java/io/aeron/driver/ReceiverTest.java | 6 +- .../test/java/io/aeron/driver/SenderTest.java | 3 +- 22 files changed, 1080 insertions(+), 34 deletions(-) create mode 100644 aeron-driver/src/main/java/io/aeron/driver/DutyCycleTracker.java create mode 100644 aeron-driver/src/main/java/io/aeron/driver/status/DutyCycleStallTracker.java diff --git a/aeron-archive/src/main/java/io/aeron/archive/Archive.java b/aeron-archive/src/main/java/io/aeron/archive/Archive.java index 46b3b03ef8..e99abf450c 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/Archive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/Archive.java @@ -20,6 +20,8 @@ import io.aeron.archive.checksum.Checksums; import io.aeron.archive.client.AeronArchive; import io.aeron.archive.client.ArchiveException; +import io.aeron.driver.DutyCycleTracker; +import io.aeron.driver.status.DutyCycleStallTracker; import io.aeron.exceptions.ConcurrentConcludeException; import io.aeron.exceptions.ConfigurationException; import io.aeron.security.Authenticator; @@ -388,6 +390,36 @@ public static final class Configuration public static final long REPLAY_LINGER_TIMEOUT_DEFAULT_NS = io.aeron.driver.Configuration.publicationLingerTimeoutNs(); + /** + * Property name for threshold value for the conductor work cycle threshold to track for being exceeded. + */ + public static final String CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME = "aeron.archive.conductor.cycle.threshold"; + + /** + * Default threshold value for the conductor work cycle threshold to track for being exceeded. + */ + public static final long CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + + /** + * Property name for threshold value for the recorder work cycle threshold to track for being exceeded. + */ + public static final String RECORDER_CYCLE_THRESHOLD_PROP_NAME = "aeron.archive.recorder.cycle.threshold"; + + /** + * Default threshold value for the recorder work cycle threshold to track for being exceeded. + */ + public static final long RECORDER_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + + /** + * Property name for threshold value for the replayer work cycle threshold to track for being exceeded. + */ + public static final String REPLAYER_CYCLE_THRESHOLD_PROP_NAME = "aeron.archive.replayer.cycle.threshold"; + + /** + * Default threshold value for the replayer work cycle threshold to track for being exceeded. + */ + public static final long REPLAYER_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + /** * Should the archive delete existing files on start. Default is false and should only be true for testing. */ @@ -439,6 +471,18 @@ public static final class Configuration */ public static final int ARCHIVE_CONTROL_SESSIONS_TYPE_ID = AeronCounters.ARCHIVE_CONTROL_SESSIONS_TYPE_ID; + /** + * The type id of the {@link Counter} used for keeping track of the max duty cycle time of an archive agent. + */ + public static final int ARCHIVE_MAX_CYCLE_TIME_TYPE_ID = AeronCounters.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID; + + /** + * The type id of the {@link Counter} used for keeping track of the count of cycle time threshold exceeded of + * an archive agent. + */ + public static final int ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = + AeronCounters.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID; + /** * Size in bytes of the error buffer for the archive when not externally provided. */ @@ -663,6 +707,36 @@ public static long replayLingerTimeoutNs() return getDurationInNanos(REPLAY_LINGER_TIMEOUT_PROP_NAME, REPLAY_LINGER_TIMEOUT_DEFAULT_NS); } + /** + * Get threshold value for the conductor work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long conductorCycleThresholdNs() + { + return getDurationInNanos(CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME, CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS); + } + + /** + * Get threshold value for the recorder work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long recorderCycleThresholdNs() + { + return getDurationInNanos(RECORDER_CYCLE_THRESHOLD_PROP_NAME, RECORDER_CYCLE_THRESHOLD_DEFAULT_NS); + } + + /** + * Get threshold value for the replayer work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long replayerCycleThresholdNs() + { + return getDurationInNanos(REPLAYER_CYCLE_THRESHOLD_PROP_NAME, REPLAYER_CYCLE_THRESHOLD_DEFAULT_NS); + } + /** * Whether to delete directory on start or not. * @@ -816,6 +890,9 @@ public static final class Context implements Cloneable private long connectTimeoutNs = Configuration.connectTimeoutNs(); private long replayLingerTimeoutNs = Configuration.replayLingerTimeoutNs(); + private long conductorCycleThresholdNs = Configuration.conductorCycleThresholdNs(); + private long recorderCycleThresholdNs = Configuration.recorderCycleThresholdNs(); + private long replayerCycleThresholdNs = Configuration.replayerCycleThresholdNs(); private long maxCatalogEntries = Configuration.maxCatalogEntries(); private long catalogCapacity = Configuration.catalogCapacity(); private long lowStorageSpaceThreshold = Configuration.lowStorageSpaceThreshold(); @@ -852,6 +929,9 @@ public static final class Context implements Cloneable private UnsafeBuffer dataBuffer; private UnsafeBuffer replayBuffer; private UnsafeBuffer recordChecksumBuffer; + private DutyCycleTracker conductorDutyCycleTracker; + private DutyCycleTracker recorderDutyCycleTracker; + private DutyCycleTracker replayerDutyCycleTracker; /** * Perform a shallow copy of the object. @@ -1017,6 +1097,19 @@ public void conclude() idleStrategySupplier = Configuration.idleStrategySupplier(null); } + if (null == conductorDutyCycleTracker) + { + conductorDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + aeron.addCounter( + Archive.Configuration.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID, "archive-conductor max cycle time (ns)"), + aeron.addCounter( + Archive.Configuration.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID, + "archive-conductor work cycle time exceeded count: threshold=" + + conductorCycleThresholdNs + "ns"), + conductorCycleThresholdNs); + } + if (DEDICATED == threadingMode) { if (null == recorderIdleStrategySupplier) @@ -1036,6 +1129,34 @@ public void conclude() replayerIdleStrategySupplier = idleStrategySupplier; } } + + if (null == recorderDutyCycleTracker) + { + recorderDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + aeron.addCounter( + Archive.Configuration.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID, + "archive-recorder max cycle time (ns)"), + aeron.addCounter( + Archive.Configuration.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID, + "archive-recorder work cycle time exceeded count: threshold=" + + recorderCycleThresholdNs + "ns"), + recorderCycleThresholdNs); + } + + if (null == replayerDutyCycleTracker) + { + replayerDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + aeron.addCounter( + Archive.Configuration.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID, + "archive-replayer max cycle time (ns)"), + aeron.addCounter( + Archive.Configuration.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID, + "archive-replayer work cycle time exceeded count: threshold=" + + replayerCycleThresholdNs + "ns"), + replayerCycleThresholdNs); + } } if (!isPowerOfTwo(segmentFileLength)) @@ -1536,6 +1657,154 @@ public long replayLingerTimeoutNs() return replayLingerTimeoutNs; } + /** + * Set a threshold for the conductor work cycle time which when exceed it will increment the + * conductor cycle time exceeded count. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see io.aeron.archive.Archive.Configuration#CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME + * @see io.aeron.archive.Archive.Configuration#CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context conductorCycleThresholdNs(final long thresholdNs) + { + this.conductorCycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the conductor work cycle time which when exceed it will increment the + * conductor cycle time exceeded count. + * + * @return threshold to track for the conductor work cycle time. + */ + public long conductorCycleThresholdNs() + { + return conductorCycleThresholdNs; + } + + /** + * Set a threshold for the recorder work cycle time which when exceed it will increment the + * recorder cycle time exceeded count. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see io.aeron.archive.Archive.Configuration#RECORDER_CYCLE_THRESHOLD_PROP_NAME + * @see io.aeron.archive.Archive.Configuration#RECORDER_CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context recorderCycleThresholdNs(final long thresholdNs) + { + this.recorderCycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the recorder work cycle time which when exceed it will increment the + * recorder cycle time exceeded count. + * + * @return threshold to track for the recorder work cycle time. + */ + public long recorderCycleThresholdNs() + { + return recorderCycleThresholdNs; + } + + /** + * Set a threshold for the replayer work cycle time which when exceed it will increment the + * replayer cycle time exceeded count. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see io.aeron.archive.Archive.Configuration#REPLAYER_CYCLE_THRESHOLD_PROP_NAME + * @see io.aeron.archive.Archive.Configuration#REPLAYER_CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context replayerCycleThresholdNs(final long thresholdNs) + { + this.replayerCycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the replayer work cycle time which when exceed it will increment the + * replayer cycle time exceeded count. + * + * @return threshold to track for the replayer work cycle time. + */ + public long replayerCycleThresholdNs() + { + return replayerCycleThresholdNs; + } + + /** + * Set the duty cycle tracker for the conductor. + * + * @param dutyCycleTracker for the conductor. + * @return this for a fluent API. + */ + public Context conductorDutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.conductorDutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * The duty cycle tracker for the conductor. + * + * @return duty cycle tracker for the conductor. + */ + public DutyCycleTracker conductorDutyCycleTracker() + { + return conductorDutyCycleTracker; + } + + /** + * Set the duty cycle tracker for the recorder. + * NOTE: Only used in DEDICATED threading mode. + * + * @param dutyCycleTracker for the recorder. + * @return this for a fluent API. + */ + public Context recorderDutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.recorderDutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * The duty cycle tracker for the recorder. + * NOTE: Only used in DEDICATED threading mode. + * + * @return duty cycle tracker for the recorder. + */ + public DutyCycleTracker recorderDutyCycleTracker() + { + return recorderDutyCycleTracker; + } + + /** + * Set the duty cycle tracker for the replayer. + * NOTE: Only used in DEDICATED threading mode. + * + * @param dutyCycleTracker for the replayer. + * @return this for a fluent API. + */ + public Context replayerDutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.replayerDutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * The duty cycle tracker for the replayer. + * NOTE: Only used in DEDICATED threading mode. + * + * @return duty cycle tracker for the replayer. + */ + public DutyCycleTracker replayerDutyCycleTracker() + { + return replayerDutyCycleTracker; + } + /** * Provides an explicit {@link Checksum} for checksum computation during recording. * @@ -2537,6 +2806,12 @@ public String toString() "\n dataBuffer=" + dataBuffer + "\n replayBuffer=" + replayBuffer + "\n recordChecksumBuffer=" + recordChecksumBuffer + + "\n conductorCycleThresholdNs=" + conductorCycleThresholdNs + + "\n recorderCycleThresholdNs=" + recorderCycleThresholdNs + + "\n replayerCycleThresholdNs=" + replayerCycleThresholdNs + + "\n conductorDutyCycleTracker=" + conductorDutyCycleTracker + + "\n recorderDutyCycleTracker=" + recorderDutyCycleTracker + + "\n replayerDutyCycleTracker=" + replayerDutyCycleTracker + "\n}"; } } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index febf76014a..25375aa95f 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -23,6 +23,7 @@ import io.aeron.archive.codecs.RecordingSignal; import io.aeron.archive.codecs.SourceLocation; import io.aeron.archive.status.RecordingPos; +import io.aeron.driver.DutyCycleTracker; import io.aeron.exceptions.AeronException; import io.aeron.exceptions.TimeoutException; import io.aeron.logbuffer.LogBufferDescriptor; @@ -96,6 +97,7 @@ abstract class ArchiveConductor private final AgentInvoker aeronAgentInvoker; private final AgentInvoker driverAgentInvoker; private final EpochClock epochClock; + private final NanoClock nanoClock; private final CachedEpochClock cachedEpochClock = new CachedEpochClock(); private final File archiveDir; private final Subscription controlSubscription; @@ -107,6 +109,7 @@ abstract class ArchiveConductor private final AuthorisationService authorisationService; private final ControlResponseProxy controlResponseProxy = new ControlResponseProxy(); private final ControlSessionProxy controlSessionProxy = new ControlSessionProxy(controlResponseProxy); + private final DutyCycleTracker dutyCycleTracker; final Archive.Context ctx; SessionWorker recorder; SessionWorker replayer; @@ -121,10 +124,12 @@ abstract class ArchiveConductor aeronAgentInvoker = aeron.conductorAgentInvoker(); driverAgentInvoker = ctx.mediaDriverAgentInvoker(); epochClock = ctx.epochClock(); + nanoClock = ctx.nanoClock(); archiveDir = ctx.archiveDir(); connectTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.connectTimeoutNs()); catalog = ctx.catalog(); markFile = ctx.archiveMarkFile(); + dutyCycleTracker = ctx.conductorDutyCycleTracker(); cachedEpochClock.update(epochClock.time()); authenticator = ctx.authenticatorSupplier().get(); @@ -156,6 +161,8 @@ public void onStart() { recorder = newRecorder(); replayer = newReplayer(); + + dutyCycleTracker.update(nanoClock.nanoTime()); } public void onAvailableImage(final Image image) @@ -265,6 +272,7 @@ protected void abort() */ public int doWork() { + final long nowNs = nanoClock.nanoTime(); int workCount = 0; if (isAbort) @@ -272,6 +280,8 @@ public int doWork() throw new AgentTerminationException("unexpected Aeron close"); } + dutyCycleTracker.measureAndUpdateClock(nowNs); + final long nowMs = epochClock.time(); if (cachedEpochClock.time() != nowMs) { diff --git a/aeron-archive/src/main/java/io/aeron/archive/DedicatedModeArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/DedicatedModeArchiveConductor.java index 5edf852039..2018c313d1 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/DedicatedModeArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/DedicatedModeArchiveConductor.java @@ -15,6 +15,7 @@ */ package io.aeron.archive; +import io.aeron.driver.DutyCycleTracker; import org.agrona.CloseHelper; import org.agrona.concurrent.*; import org.agrona.concurrent.status.AtomicCounter; @@ -77,12 +78,24 @@ protected void closeSessionWorkers() SessionWorker newRecorder() { - return new DedicatedModeRecorder(errorHandler, ctx.errorCounter(), closeQueue, ctx.abortLatch()); + return new DedicatedModeRecorder( + errorHandler, + ctx.errorCounter(), + closeQueue, + ctx.abortLatch(), + ctx.recorderDutyCycleTracker(), + ctx.nanoClock()); } SessionWorker newReplayer() { - return new DedicatedModeReplayer(errorHandler, ctx.errorCounter(), closeQueue, ctx.abortLatch()); + return new DedicatedModeReplayer( + errorHandler, + ctx.errorCounter(), + closeQueue, + ctx.abortLatch(), + ctx.replayerDutyCycleTracker(), + ctx.nanoClock()); } private int processCloseQueue() @@ -114,13 +127,17 @@ static class DedicatedModeRecorder extends SessionWorker private final ManyToOneConcurrentLinkedQueue closeQueue; private final AtomicCounter errorCounter; private final CountDownLatch abortLatch; + private final DutyCycleTracker dutyCycleTracker; + private final NanoClock nanoClock; private volatile boolean isAbort; DedicatedModeRecorder( final CountedErrorHandler errorHandler, final AtomicCounter errorCounter, final ManyToOneConcurrentLinkedQueue closeQueue, - final CountDownLatch abortLatch) + final CountDownLatch abortLatch, + final DutyCycleTracker dutyCycleTracker, + final NanoClock nanoClock) { super("archive-recorder", errorHandler); @@ -128,6 +145,8 @@ static class DedicatedModeRecorder extends SessionWorker this.errorCounter = errorCounter; this.sessionsQueue = new ManyToOneConcurrentLinkedQueue<>(); this.abortLatch = abortLatch; + this.dutyCycleTracker = dutyCycleTracker; + this.nanoClock = nanoClock; } /** @@ -138,6 +157,16 @@ protected void abort() isAbort = true; } + /** + * {@inheritDoc} + */ + public void onStart() + { + super.onStart(); + + dutyCycleTracker.update(nanoClock.nanoTime()); + } + /** * {@inheritDoc} */ @@ -148,6 +177,8 @@ public int doWork() throw new AgentTerminationException(); } + dutyCycleTracker.measureAndUpdateClock(nanoClock.nanoTime()); + return drainSessionsQueue() + super.doWork(); } @@ -236,13 +267,17 @@ static class DedicatedModeReplayer extends SessionWorker private final ManyToOneConcurrentLinkedQueue closeQueue; private final AtomicCounter errorCounter; private final CountDownLatch abortLatch; + private final DutyCycleTracker dutyCycleTracker; + private final NanoClock nanoClock; private volatile boolean isAbort; DedicatedModeReplayer( final CountedErrorHandler errorHandler, final AtomicCounter errorCounter, final ManyToOneConcurrentLinkedQueue closeQueue, - final CountDownLatch abortLatch) + final CountDownLatch abortLatch, + final DutyCycleTracker dutyCycleTracker, + final NanoClock nanoClock) { super("archive-replayer", errorHandler); @@ -250,6 +285,8 @@ static class DedicatedModeReplayer extends SessionWorker this.errorCounter = errorCounter; this.sessionsQueue = new ManyToOneConcurrentLinkedQueue<>(); this.abortLatch = abortLatch; + this.dutyCycleTracker = dutyCycleTracker; + this.nanoClock = nanoClock; } /** @@ -268,6 +305,15 @@ protected void addSession(final ReplaySession session) send(session); } + /** + * {@inheritDoc} + */ + public void onStart() + { + super.onStart(); + dutyCycleTracker.update(nanoClock.nanoTime()); + } + /** * {@inheritDoc} */ @@ -278,6 +324,8 @@ public int doWork() throw new AgentTerminationException(); } + dutyCycleTracker.measureAndUpdateClock(nanoClock.nanoTime()); + return drainSessionQueue() + super.doWork(); } diff --git a/aeron-client/src/main/java/io/aeron/AeronCounters.java b/aeron-client/src/main/java/io/aeron/AeronCounters.java index 881862f3f7..ffa3d2b352 100644 --- a/aeron-client/src/main/java/io/aeron/AeronCounters.java +++ b/aeron-client/src/main/java/io/aeron/AeronCounters.java @@ -147,6 +147,17 @@ public final class AeronCounters */ public static final int ARCHIVE_CONTROL_SESSIONS_TYPE_ID = 102; + /** + * The type id of the {@link Counter} used for keeping track of the max duty cycle time of an archive agent. + */ + public static final int ARCHIVE_MAX_CYCLE_TIME_TYPE_ID = 103; + + /** + * The type id of the {@link Counter} used for keeping track of the count of cycle time threshold exceeded of + * an archive agent. + */ + public static final int ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 104; + // Cluster counters /** @@ -224,6 +235,28 @@ public final class AeronCounters */ public static final int CLUSTER_CLUSTERED_SERVICE_ERROR_COUNT_TYPE_ID = 215; + /** + * The type id of the {@link Counter} used for keeping track of the max duty cycle time of the consensus module. + */ + public static final int CLUSTER_MAX_CYCLE_TIME_TYPE_ID = 216; + + /** + * The type id of the {@link Counter} used for keeping track of the count of cycle time threshold exceeded of + * the consensus module. + */ + public static final int CLUSTER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 217; + + /** + * The type id of the {@link Counter} used for keeping track of the max duty cycle time of the service container. + */ + public static final int CLUSTER_CLUSTERED_SERVICE_MAX_CYCLE_TIME_TYPE_ID = 218; + + /** + * The type id of the {@link Counter} used for keeping track of the count of cycle time threshold exceeded of + * the service container. + */ + public static final int CLUSTER_CLUSTERED_SERVICE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 219; + private AeronCounters() { } diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java index 8fc09a21b7..bfb05a77c7 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java @@ -22,6 +22,8 @@ import io.aeron.cluster.client.ClusterException; import io.aeron.cluster.codecs.mark.ClusterComponentType; import io.aeron.cluster.service.*; +import io.aeron.driver.DutyCycleTracker; +import io.aeron.driver.status.DutyCycleStallTracker; import io.aeron.exceptions.ConcurrentConcludeException; import io.aeron.exceptions.ConfigurationException; import io.aeron.security.Authenticator; @@ -529,6 +531,18 @@ public static final class Configuration public static final int CLUSTER_INVALID_REQUEST_COUNT_TYPE_ID = AeronCounters.CLUSTER_INVALID_REQUEST_COUNT_TYPE_ID; + /** + * Counter type id used for keeping track of the max duty cycle time of the agent. + */ + public static final int CLUSTER_MAX_CYCLE_TIME_TYPE_ID = + AeronCounters.CLUSTER_MAX_CYCLE_TIME_TYPE_ID; + + /** + * Counter type id used for keeping track of the count of cycle time threshold exceeded of an agent. + */ + public static final int CLUSTER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = + AeronCounters.CLUSTER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID; + /** * The number of services in this cluster instance. * @@ -664,6 +678,17 @@ public static final class Configuration */ public static final String TERMINATION_TIMEOUT_PROP_NAME = "aeron.cluster.termination.timeout"; + /** + * Property name for threshold value for the consensus module agent work cycle threshold to track + * for being exceeded. + */ + public static final String CYCLE_THRESHOLD_PROP_NAME = "aeron.cluster.cycle.threshold"; + + /** + * Default threshold value for the consensus module agent work cycle threshold to track for being exceeded. + */ + public static final long CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + /** * Default timeout a leader will wait on getting termination ACKs from followers. */ @@ -989,6 +1014,16 @@ public static long terminationTimeoutNs() return getDurationInNanos(TERMINATION_TIMEOUT_PROP_NAME, TERMINATION_TIMEOUT_DEFAULT_NS); } + /** + * Get threshold value for the consensus module agent work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long cycleThresholdNs() + { + return getDurationInNanos(CYCLE_THRESHOLD_PROP_NAME, CYCLE_THRESHOLD_DEFAULT_NS); + } + /** * Size in bytes of the error buffer in the mark file. * @@ -1240,6 +1275,7 @@ public static final class Context implements Cloneable private long electionStatusIntervalNs = Configuration.electionStatusIntervalNs(); private long dynamicJoinIntervalNs = Configuration.dynamicJoinIntervalNs(); private long terminationTimeoutNs = Configuration.terminationTimeoutNs(); + private long cycleThresholdNs = Configuration.cycleThresholdNs(); private String agentRoleName = Configuration.agentRoleName(); private ThreadFactory threadFactory; @@ -1270,6 +1306,7 @@ public static final class Context implements Cloneable private AuthorisationServiceSupplier authorisationServiceSupplier; private LogPublisher logPublisher; private EgressPublisher egressPublisher; + private DutyCycleTracker dutyCycleTracker; private boolean isLogMdc; /** @@ -1454,6 +1491,18 @@ public void conclude() aeron, buffer, "Cluster timed out client count", CLUSTER_CLIENT_TIMEOUT_COUNT_TYPE_ID, clusterId); } + if (null == dutyCycleTracker) + { + dutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + ClusterCounters.allocate( + aeron, buffer, "Cluster max cycle time (ns)", CLUSTER_MAX_CYCLE_TIME_TYPE_ID, clusterId), + ClusterCounters.allocate( + aeron, buffer, "Cluster work cycle time exceeded count: threshold=" + cycleThresholdNs + "ns", + CLUSTER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID, clusterId), + cycleThresholdNs); + } + if (null == threadFactory) { threadFactory = Thread::new; @@ -2672,6 +2721,54 @@ public long terminationTimeoutNs() return terminationTimeoutNs; } + /** + * Set a threshold for the consensus module agent work cycle time which when exceed it will increment the + * counter. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see Configuration#CYCLE_THRESHOLD_PROP_NAME + * @see Configuration#CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context cycleThresholdNs(final long thresholdNs) + { + this.cycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the consensus module agent work cycle time which when exceed it will increment the + * counter. + * + * @return threshold to track for the consensus module agent work cycle time. + */ + public long cycleThresholdNs() + { + return cycleThresholdNs; + } + + /** + * Set a duty cycle tracker to be used for tracking the duty cycle time of the consensus module agent. + * + * @param dutyCycleTracker to use for tracking. + * @return this for fluent API. + */ + public Context dutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.dutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * The duty cycle tracker used to track the consensus module agent duty cycle. + * + * @return the duty cycle tracker. + */ + public DutyCycleTracker dutyCycleTracker() + { + return dutyCycleTracker; + } + /** * Get the {@link Agent#roleName()} to be used for the consensus module agent. If {@code null} then one will * be generated. diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java index 6994e27900..4434bb91c8 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java @@ -28,6 +28,7 @@ import io.aeron.cluster.codecs.*; import io.aeron.cluster.service.*; import io.aeron.driver.DefaultNameResolver; +import io.aeron.driver.DutyCycleTracker; import io.aeron.driver.media.UdpChannel; import io.aeron.exceptions.AeronException; import io.aeron.logbuffer.ControlledFragmentHandler; @@ -141,6 +142,7 @@ final class ConsensusModuleAgent implements Agent, TimerService.TimerHandler private final IdleStrategy idleStrategy; private final RecordingLog recordingLog; private final ArrayList dynamicJoinSnapshots = new ArrayList<>(); + private final DutyCycleTracker dutyCycleTracker; private RecordingLog.RecoveryPlan recoveryPlan; private AeronArchive archive; private RecordingSignalPoller recordingSignalPoller; @@ -182,6 +184,7 @@ final class ConsensusModuleAgent implements Agent, TimerService.TimerHandler Arrays.fill(serviceClientIds, NULL_VALUE); this.serviceAckQueues = ServiceAck.newArrayOfQueues(ctx.serviceCount()); this.highMemberId = ClusterMember.highMemberId(activeMembers); + this.dutyCycleTracker = ctx.dutyCycleTracker(); aeronClientInvoker = aeron.conductorAgentInvoker(); aeronClientInvoker.invoke(); @@ -316,6 +319,8 @@ public void onStart() } unavailableCounterHandlerRegistrationId = aeron.addUnavailableCounterHandler(this::onUnavailableCounter); + + dutyCycleTracker.update(clusterClock.timeNanos()); } /** @@ -327,6 +332,8 @@ public int doWork() final long nowNs = clusterTimeUnit.toNanos(timestamp); int workCount = 0; + dutyCycleTracker.measureAndUpdateClock(nowNs); + try { if (nowNs >= slowTickDeadlineNs) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java index d654b4c43f..85c5a5f4ba 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java @@ -23,6 +23,7 @@ import io.aeron.cluster.client.ClusterException; import io.aeron.cluster.codecs.*; import io.aeron.driver.Configuration; +import io.aeron.driver.DutyCycleTracker; import io.aeron.exceptions.AeronException; import io.aeron.exceptions.TimeoutException; import io.aeron.logbuffer.BufferClaim; @@ -70,6 +71,7 @@ final class ClusteredServiceAgent implements Agent, Cluster, IdleStrategy private final ConsensusModuleProxy consensusModuleProxy; private final ServiceAdapter serviceAdapter; private final EpochClock epochClock; + private final NanoClock nanoClock; private final UnsafeBuffer messageBuffer = new UnsafeBuffer( new byte[Configuration.MAX_UDP_PAYLOAD_LENGTH]); private final UnsafeBuffer headerBuffer = new UnsafeBuffer( @@ -82,6 +84,7 @@ final class ClusteredServiceAgent implements Agent, Cluster, IdleStrategy private final Collection unmodifiableClientSessions = new UnmodifiableClientSessionCollection(sessionByIdMap.values()); private final BoundedLogAdapter logAdapter; + private final DutyCycleTracker dutyCycleTracker; private String activeLifecycleCallbackName; private ReadableCounter commitPosition; private ActiveLogEvent activeLogEvent; @@ -100,6 +103,8 @@ final class ClusteredServiceAgent implements Agent, Cluster, IdleStrategy idleStrategy = ctx.idleStrategy(); serviceId = ctx.serviceId(); epochClock = ctx.epochClock(); + nanoClock = ctx.nanoClock(); + dutyCycleTracker = ctx.dutyCycleTracker(); final String channel = ctx.controlChannel(); consensusModuleProxy = new ConsensusModuleProxy(aeron.addPublication(channel, ctx.consensusModuleStreamId())); @@ -114,6 +119,7 @@ public void onStart() commitPosition = awaitCommitPositionCounter(counters, ctx.clusterId()); recoverState(counters); + dutyCycleTracker.update(nanoClock.nanoTime()); } public void onClose() @@ -161,6 +167,8 @@ public int doWork() { int workCount = 0; + dutyCycleTracker.measureAndUpdateClock(nanoClock.nanoTime()); + try { if (checkForClockTick()) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java index ce914ceba7..687731336b 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java @@ -20,6 +20,8 @@ import io.aeron.cluster.client.ClusterException; import io.aeron.cluster.codecs.mark.ClusterComponentType; import io.aeron.cluster.codecs.mark.MarkFileHeaderEncoder; +import io.aeron.driver.DutyCycleTracker; +import io.aeron.driver.status.DutyCycleStallTracker; import io.aeron.exceptions.ConcurrentConcludeException; import io.aeron.exceptions.ConfigurationException; import org.agrona.*; @@ -35,11 +37,10 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; -import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.CLUSTERED_SERVICE_ERROR_COUNT_TYPE_ID; +import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.*; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; -import static org.agrona.SystemUtil.getSizeAsInt; -import static org.agrona.SystemUtil.loadPropertiesFiles; +import static org.agrona.SystemUtil.*; /** * Container for a service in the cluster managed by the Consensus Module. This is where business logic resides and @@ -301,6 +302,17 @@ public static final class Configuration public static final String DELEGATING_ERROR_HANDLER_PROP_NAME = "aeron.cluster.service.delegating.error.handler"; + /** + * Property name for threshold value for the container work cycle threshold to track + * for being exceeded. + */ + public static final String CYCLE_THRESHOLD_PROP_NAME = "aeron.cluster.service.cycle.threshold"; + + /** + * Default threshold value for the container work cycle threshold to track for being exceeded. + */ + public static final long CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + /** * Counter type id for the cluster node role. */ @@ -317,6 +329,19 @@ public static final class Configuration public static final int CLUSTERED_SERVICE_ERROR_COUNT_TYPE_ID = AeronCounters.CLUSTER_CLUSTERED_SERVICE_ERROR_COUNT_TYPE_ID; + /** + * The type id of the {@link Counter} used for keeping track of the max duty cycle time of the service container. + */ + public static final int CLUSTER_CLUSTERED_SERVICE_MAX_CYCLE_TIME_TYPE_ID = + AeronCounters.CLUSTER_CLUSTERED_SERVICE_MAX_CYCLE_TIME_TYPE_ID; + + /** + * The type id of the {@link Counter} used for keeping track of the count of cycle time threshold exceeded of + * the service container. + */ + public static final int CLUSTER_CLUSTERED_SERVICE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = + AeronCounters.CLUSTER_CLUSTERED_SERVICE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID; + /** * The value {@link #CLUSTER_ID_DEFAULT} or system property {@link #CLUSTER_ID_PROP_NAME} if set. * @@ -500,6 +525,16 @@ public static int logFragmentLimit() return Integer.getInteger(LOG_FRAGMENT_LIMIT_PROP_NAME, LOG_FRAGMENT_LIMIT_DEFAULT); } + /** + * Get threshold value for the container work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long cycleThresholdNs() + { + return getDurationInNanos(CYCLE_THRESHOLD_PROP_NAME, CYCLE_THRESHOLD_DEFAULT_NS); + } + /** * Create a new {@link ClusteredService} based on the configured {@link #SERVICE_CLASS_NAME_PROP_NAME}. * @@ -576,11 +611,13 @@ public static final class Context implements Cloneable private int errorBufferLength = Configuration.errorBufferLength(); private boolean isRespondingService = Configuration.isRespondingService(); private int logFragmentLimit = Configuration.logFragmentLimit(); + private long cycleThresholdNs = Configuration.cycleThresholdNs(); private CountDownLatch abortLatch; private ThreadFactory threadFactory; private Supplier idleStrategySupplier; private EpochClock epochClock; + private NanoClock nanoClock; private DistinctErrorLog errorLog; private ErrorHandler errorHandler; private DelegatingErrorHandler delegatingErrorHandler; @@ -591,6 +628,7 @@ public static final class Context implements Cloneable private File clusterDir; private String aeronDirectoryName = CommonContext.getAeronDirectoryName(); private Aeron aeron; + private DutyCycleTracker dutyCycleTracker; private boolean ownsAeronClient; private ClusteredService clusteredService; @@ -646,6 +684,11 @@ public void conclude() epochClock = SystemEpochClock.INSTANCE; } + if (null == nanoClock) + { + nanoClock = SystemNanoClock.INSTANCE; + } + if (null == clusterDir) { clusterDir = new File(clusterDirectoryName); @@ -719,6 +762,19 @@ public void conclude() } } + if (null == dutyCycleTracker) + { + dutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + aeron.addCounter(CLUSTER_CLUSTERED_SERVICE_MAX_CYCLE_TIME_TYPE_ID, + "Cluster container max cycle time (ns) - clusterId=" + clusterId + + " serviceId=" + serviceId), + aeron.addCounter(CLUSTER_CLUSTERED_SERVICE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID, + "Cluster work cycle time exceeded count: threshold=" + cycleThresholdNs + + "ns - clusterId=" + clusterId + " serviceId=" + serviceId), + cycleThresholdNs); + } + if (null == archiveContext) { archiveContext = new AeronArchive.Context() @@ -1508,6 +1564,76 @@ public DistinctErrorLog errorLog() return errorLog; } + /** + * The {@link NanoClock} as a source of time in nanoseconds for measuring duration. + * + * @return the {@link NanoClock} as a source of time in nanoseconds for measuring duration. + */ + public NanoClock nanoClock() + { + return nanoClock; + } + + /** + * The {@link NanoClock} as a source of time in nanoseconds for measuring duration. + * + * @param clock to be used. + * @return this for a fluent API. + */ + public Context nanoClock(final NanoClock clock) + { + nanoClock = clock; + return this; + } + + /** + * Set a threshold for the container work cycle time which when exceed it will increment the + * counter. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see Configuration#CYCLE_THRESHOLD_PROP_NAME + * @see Configuration#CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context cycleThresholdNs(final long thresholdNs) + { + this.cycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the container work cycle time which when exceed it will increment the + * counter. + * + * @return threshold to track for the container work cycle time. + */ + public long cycleThresholdNs() + { + return cycleThresholdNs; + } + + /** + * Set a duty cycle tracker to be used for tracking the duty cycle time of the container. + * + * @param dutyCycleTracker to use for tracking. + * @return this for fluent API. + */ + public Context dutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.dutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * The duty cycle tracker used to track the container duty cycle. + * + * @return the duty cycle tracker. + */ + public DutyCycleTracker dutyCycleTracker() + { + return dutyCycleTracker; + } + /** * Delete the cluster container directory. */ diff --git a/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java b/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java index 981477e19b..2bf88354f1 100644 --- a/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java +++ b/aeron-cluster/src/test/java/io/aeron/cluster/ConsensusModuleAgentTest.java @@ -23,6 +23,7 @@ import io.aeron.cluster.service.Cluster; import io.aeron.cluster.service.ClusterMarkFile; import io.aeron.cluster.service.ClusterTerminationException; +import io.aeron.driver.DutyCycleTracker; import io.aeron.security.AuthorisationService; import io.aeron.security.DefaultAuthenticatorSupplier; import io.aeron.status.ReadableCounter; @@ -30,6 +31,7 @@ import io.aeron.test.cluster.TestClusterClock; import org.agrona.collections.MutableLong; import org.agrona.concurrent.AgentInvoker; +import org.agrona.concurrent.CachedNanoClock; import org.agrona.concurrent.CountedErrorHandler; import org.agrona.concurrent.NoOpIdleStrategy; import org.agrona.concurrent.status.AtomicCounter; @@ -85,7 +87,8 @@ public class ConsensusModuleAgentTest .clusterMarkFile(mock(ClusterMarkFile.class)) .archiveContext(new AeronArchive.Context()) .logPublisher(mockLogPublisher) - .egressPublisher(mockEgressPublisher); + .egressPublisher(mockEgressPublisher) + .dutyCycleTracker(new DutyCycleTracker(new CachedNanoClock())); @BeforeEach public void before() diff --git a/aeron-driver/src/main/java/io/aeron/driver/Configuration.java b/aeron-driver/src/main/java/io/aeron/driver/Configuration.java index 91076100df..f07d0dd263 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/Configuration.java +++ b/aeron-driver/src/main/java/io/aeron/driver/Configuration.java @@ -713,6 +713,26 @@ public final class Configuration */ public static final long CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + /** + * Property name for threshold value for the sender work cycle threshold to track for being exceeded. + */ + public static final String SENDER_CYCLE_THRESHOLD_PROP_NAME = "aeron.driver.sender.cycle.threshold"; + + /** + * Default threshold value for the sender work cycle threshold to track for being exceeded. + */ + public static final long SENDER_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + + /** + * Property name for threshold value for the receiver work cycle threshold to track for being exceeded. + */ + public static final String RECEIVER_CYCLE_THRESHOLD_PROP_NAME = "aeron.driver.receiver.cycle.threshold"; + + /** + * Default threshold value for the receiver work cycle threshold to track for being exceeded. + */ + public static final long RECEIVER_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000); + /** * Should the driver configuration be printed on start. * @@ -1404,6 +1424,26 @@ public static long conductorCycleThresholdNs() return getDurationInNanos(CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME, CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS); } + /** + * Get threshold value for the sender work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long senderCycleThresholdNs() + { + return getDurationInNanos(SENDER_CYCLE_THRESHOLD_PROP_NAME, SENDER_CYCLE_THRESHOLD_DEFAULT_NS); + } + + /** + * Get threshold value for the receiver work cycle threshold to track for being exceeded. + * + * @return threshold value in nanoseconds. + */ + public static long receiverCycleThresholdNs() + { + return getDurationInNanos(RECEIVER_CYCLE_THRESHOLD_PROP_NAME, RECEIVER_CYCLE_THRESHOLD_DEFAULT_NS); + } + /** * Get the {@link IdleStrategy} that should be applied to {@link org.agrona.concurrent.Agent}s. * diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 075c9bc241..8643bcbb1a 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -114,8 +114,7 @@ public final class DriverConductor implements Agent private NameResolver nameResolver; private DriverNameResolver driverNameResolver; private final AtomicCounter errorCounter; - private final AtomicCounter maxCycleTime; - private final AtomicCounter cycleTimeThresholdExceededCount; + private final DutyCycleTracker dutyCycleTracker; DriverConductor(final MediaDriver.Context ctx) { @@ -134,6 +133,7 @@ public final class DriverConductor implements Agent clientProxy = ctx.clientProxy(); tempBuffer = ctx.tempBuffer(); errorCounter = ctx.systemCounters().get(ERRORS); + dutyCycleTracker = ctx.conductorDutyCycleTracker(); countersManager = ctx.countersManager(); @@ -145,8 +145,6 @@ public final class DriverConductor implements Agent this); lastConsumerCommandPosition = toDriverCommands.consumerPosition(); - maxCycleTime = ctx.systemCounters().get(CONDUCTOR_MAX_CYCLE_TIME); - cycleTimeThresholdExceededCount = ctx.systemCounters().get(CONDUCTOR_CYCLE_TIME_THRESHOLD_EXCEEDED); } /** @@ -169,14 +167,16 @@ public void onStart() ": driverName=" + ctx.resolverName() + " hostname=" + DriverNameResolver.getCanonicalName("")); + ctx.systemCounters().get(CONDUCTOR_MAX_CYCLE_TIME).appendToLabel(": " + ctx.threadingMode().name()); ctx.systemCounters().get(CONDUCTOR_CYCLE_TIME_THRESHOLD_EXCEEDED).appendToLabel( - ": threshold=" + ctx.conductorCycleThresholdNs() + "ns"); + ": threshold=" + ctx.conductorCycleThresholdNs() + "ns " + ctx.threadingMode().name()); nameResolver.init(ctx); final long nowNs = nanoClock.nanoTime(); cachedNanoClock.update(nowNs); cachedEpochClock.update(epochClock.time()); + dutyCycleTracker.update(nowNs); timerCheckDeadlineNs = nowNs + timerIntervalNs; clockUpdateDeadlineNs = nowNs + CLOCK_UPDATE_INTERNAL_NS; timeOfLastToDriverPositionChangeNs = nowNs; @@ -1992,21 +1992,14 @@ private void linkSpies(final ArrayList links, final NetworkPub private void trackTime(final long nowNs) { - final long cycleTimeNs = nowNs - cachedNanoClock.nanoTime(); - cachedNanoClock.update(nowNs); - maxCycleTime.proposeMaxOrdered(cycleTimeNs); + dutyCycleTracker.measureAndUpdateClock(nowNs); if (clockUpdateDeadlineNs - nowNs < 0) { clockUpdateDeadlineNs = nowNs + CLOCK_UPDATE_INTERNAL_NS; cachedEpochClock.update(epochClock.time()); } - - if (cycleTimeNs > ctx.conductorCycleThresholdNs()) - { - cycleTimeThresholdExceededCount.incrementOrdered(); - } } private int processTimers(final long nowNs) diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductorProxy.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductorProxy.java index 15154689e2..84b49e075a 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductorProxy.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductorProxy.java @@ -118,6 +118,15 @@ public boolean notConcurrent() return threadingMode == SHARED || threadingMode == INVOKER; } + /** + * Get the threading mode of the driver. + * @return ThreadingMode of the driver. + */ + public ThreadingMode threadingMode() + { + return threadingMode; + } + void driverConductor(final DriverConductor driverConductor) { this.driverConductor = driverConductor; diff --git a/aeron-driver/src/main/java/io/aeron/driver/DutyCycleTracker.java b/aeron-driver/src/main/java/io/aeron/driver/DutyCycleTracker.java new file mode 100644 index 0000000000..917edbaff6 --- /dev/null +++ b/aeron-driver/src/main/java/io/aeron/driver/DutyCycleTracker.java @@ -0,0 +1,69 @@ +/* + * Copyright 2014-2022 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.driver; + +import org.agrona.concurrent.CachedNanoClock; + +/** + * Tracker to handle tracking the duration of a duty cycle. + */ +public class DutyCycleTracker +{ + private final CachedNanoClock cachedNanoClock; + + /** + * Create a tracker using a given cached clock. + * + * @param cachedNanoClock to use. + */ + public DutyCycleTracker(final CachedNanoClock cachedNanoClock) + { + this.cachedNanoClock = cachedNanoClock; + } + + /** + * Update the cached clock time. + * + * @param nowNs to update with. + * @see CachedNanoClock#update(long) + */ + public void update(final long nowNs) + { + cachedNanoClock.update(nowNs); + } + + /** + * Pass measurement to tracker and report updating cached nano clock with time. + * + * @param nowNs of the measurement. + */ + public void measureAndUpdateClock(final long nowNs) + { + final long cycleTimeNs = nowNs - cachedNanoClock.nanoTime(); + + reportMeasurement(cycleTimeNs); + cachedNanoClock.update(nowNs); + } + + /** + * Callback called to report duration of cycle. + * + * @param durationNs of the duty cycle. + */ + public void reportMeasurement(final long durationNs) + { + } +} diff --git a/aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java b/aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java index 48694f198e..e25b032fd4 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java +++ b/aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java @@ -23,6 +23,7 @@ import io.aeron.driver.exceptions.ActiveDriverException; import io.aeron.driver.media.*; import io.aeron.driver.reports.LossReport; +import io.aeron.driver.status.DutyCycleStallTracker; import io.aeron.driver.status.SystemCounters; import io.aeron.exceptions.AeronException; import io.aeron.exceptions.ConcurrentConcludeException; @@ -450,6 +451,8 @@ public static final class Context extends CommonContext private long flowControlReceiverTimeoutNs = Configuration.flowControlReceiverTimeoutNs(); private long reResolutionCheckIntervalNs = Configuration.reResolutionCheckIntervalNs(); private long conductorCycleThresholdNs = Configuration.conductorCycleThresholdNs(); + private long senderCycleThresholdNs = Configuration.senderCycleThresholdNs(); + private long receiverCycleThresholdNs = Configuration.receiverCycleThresholdNs(); private int conductorBufferLength = Configuration.conductorBufferLength(); private int toClientsBufferLength = Configuration.toClientsBufferLength(); @@ -543,6 +546,10 @@ public static final class Context extends CommonContext private EpochNanoClock channelReceiveTimestampClock; private EpochNanoClock channelSendTimestampClock; + private DutyCycleTracker conductorDutyCycleTracker; + private DutyCycleTracker senderDutyCycleTracker; + private DutyCycleTracker receiverDutyCycleTracker; + /** * Perform a shallow copy of the object. * @@ -3082,6 +3089,58 @@ public Context channelReceiveTimestampClock(final EpochNanoClock clock) return this; } + /** + * Set a threshold for the sender work cycle time which when exceed it will increment the + * {@link io.aeron.driver.status.SystemCounterDescriptor#SENDER_CYCLE_TIME_THRESHOLD_EXCEEDED} counter. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see Configuration#SENDER_CYCLE_THRESHOLD_PROP_NAME + * @see Configuration#SENDER_CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context senderCycleThresholdNs(final long thresholdNs) + { + this.senderCycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the sender work cycle time which when exceed it will increment the + * {@link io.aeron.driver.status.SystemCounterDescriptor#SENDER_CYCLE_TIME_THRESHOLD_EXCEEDED} counter. + * + * @return threshold to track for the sender work cycle time. + */ + public long senderCycleThresholdNs() + { + return senderCycleThresholdNs; + } + + /** + * Set a threshold for the receiver work cycle time which when exceed it will increment the + * {@link io.aeron.driver.status.SystemCounterDescriptor#RECEIVER_CYCLE_TIME_THRESHOLD_EXCEEDED} counter. + * + * @param thresholdNs value in nanoseconds + * @return this for fluent API. + * @see Configuration#RECEIVER_CYCLE_THRESHOLD_PROP_NAME + * @see Configuration#RECEIVER_CYCLE_THRESHOLD_DEFAULT_NS + */ + public Context receiverCycleThresholdNs(final long thresholdNs) + { + this.receiverCycleThresholdNs = thresholdNs; + return this; + } + + /** + * Threshold for the receiver work cycle time which when exceed it will increment the + * {@link io.aeron.driver.status.SystemCounterDescriptor#RECEIVER_CYCLE_TIME_THRESHOLD_EXCEEDED} counter. + * + * @return threshold to track for the receiver work cycle time. + */ + public long receiverCycleThresholdNs() + { + return receiverCycleThresholdNs; + } + /** * Clock used record channel receive timestamps. * @@ -3104,6 +3163,72 @@ public Context channelSendTimestampClock(final EpochNanoClock clock) return this; } + /** + * Duty cycle tracker used for the conductor. + * + * @return conductor duty cycle tracker. + */ + public DutyCycleTracker conductorDutyCycleTracker() + { + return conductorDutyCycleTracker; + } + + /** + * Set the duty cycle tracker used for the conductor. + * + * @param dutyCycleTracker for the ocnductor. + * @return this for a fluent API. + */ + public Context conductorDutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.conductorDutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * Duty cycle tracker used for the sender. + * + * @return sender duty cycle tracker. + */ + public DutyCycleTracker senderDutyCycleTracker() + { + return senderDutyCycleTracker; + } + + /** + * Set the duty cycle tracker used for the sender. + * + * @param dutyCycleTracker for the sender. + * @return this for a fluent API. + */ + public Context senderDutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.senderDutyCycleTracker = dutyCycleTracker; + return this; + } + + /** + * Duty cycle tracker used for the receiver. + * + * @return receiver duty cycle tracker. + */ + public DutyCycleTracker receiverDutyCycleTracker() + { + return receiverDutyCycleTracker; + } + + /** + * Set the duty cycle tracker used for the receiver. + * + * @param dutyCycleTracker for the receiver. + * @return this for a fluent API. + */ + public Context receiverDutyCycleTracker(final DutyCycleTracker dutyCycleTracker) + { + this.receiverDutyCycleTracker = dutyCycleTracker; + return this; + } + /** * Clock used record channel send timestamps. * @@ -3478,6 +3603,33 @@ private void concludeDependantProperties() lossReportBuffer = mapLossReport(aeronDirectoryName(), align(lossReportBufferLength, filePageSize)); lossReport = new LossReport(new UnsafeBuffer(lossReportBuffer)); } + + if (null == conductorDutyCycleTracker) + { + conductorDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + systemCounters.get(CONDUCTOR_MAX_CYCLE_TIME), + systemCounters.get(CONDUCTOR_CYCLE_TIME_THRESHOLD_EXCEEDED), + conductorCycleThresholdNs); + } + + if (null == senderDutyCycleTracker) + { + senderDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + systemCounters.get(SENDER_MAX_CYCLE_TIME), + systemCounters.get(SENDER_CYCLE_TIME_THRESHOLD_EXCEEDED), + senderCycleThresholdNs); + } + + if (null == receiverDutyCycleTracker) + { + receiverDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + systemCounters.get(RECEIVER_MAX_CYCLE_TIME), + systemCounters.get(RECEIVER_CYCLE_TIME_THRESHOLD_EXCEEDED), + receiverCycleThresholdNs); + } } private void concludeCounters() @@ -3629,6 +3781,8 @@ public String toString() "\n statusMessageTimeoutNs=" + statusMessageTimeoutNs + "\n counterFreeToReuseTimeoutNs=" + counterFreeToReuseTimeoutNs + "\n conductorCycleThresholdNs=" + conductorCycleThresholdNs + + "\n senderCycleThresholdNs=" + senderCycleThresholdNs + + "\n receiverCycleThresholdNs=" + receiverCycleThresholdNs + "\n publicationTermBufferLength=" + publicationTermBufferLength + "\n ipcTermBufferLength=" + ipcTermBufferLength + "\n publicationTermWindowLength=" + publicationTermWindowLength + @@ -3705,6 +3859,9 @@ public String toString() "\n cncMetaDataBuffer=" + cncMetaDataBuffer + "\n channelSendTimestampClock=" + channelSendTimestampClock + "\n channelReceiveTimestampClock=" + channelReceiveTimestampClock + + "\n conductorDutyCycleTracker=" + conductorDutyCycleTracker + + "\n senderDutyCycleTracker=" + senderDutyCycleTracker + + "\n receiverDutyCycleTracker=" + receiverDutyCycleTracker + "\n}"; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/Receiver.java b/aeron-driver/src/main/java/io/aeron/driver/Receiver.java index bba8fc7778..0915f8c39a 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/Receiver.java +++ b/aeron-driver/src/main/java/io/aeron/driver/Receiver.java @@ -19,10 +19,14 @@ import io.aeron.driver.media.ReceiveChannelEndpoint; import io.aeron.driver.media.ReceiveDestinationTransport; import io.aeron.driver.media.UdpChannel; +import io.aeron.driver.status.DutyCycleStallTracker; import org.agrona.CloseHelper; import org.agrona.collections.ArrayListUtil; import org.agrona.collections.ArrayUtil; -import org.agrona.concurrent.*; +import org.agrona.concurrent.Agent; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.NanoClock; +import org.agrona.concurrent.OneToOneConcurrentArrayQueue; import org.agrona.concurrent.status.AtomicCounter; import java.net.InetSocketAddress; @@ -30,8 +34,7 @@ import java.util.ArrayList; import static io.aeron.driver.Configuration.PENDING_SETUPS_TIMEOUT_NS; -import static io.aeron.driver.status.SystemCounterDescriptor.BYTES_RECEIVED; -import static io.aeron.driver.status.SystemCounterDescriptor.RESOLUTION_CHANGES; +import static io.aeron.driver.status.SystemCounterDescriptor.*; /** * Agent that receives messages streams and rebuilds {@link PublicationImage}s, plus iterates over them sending status @@ -52,6 +55,7 @@ public final class Receiver implements Agent private PublicationImage[] publicationImages = EMPTY_IMAGES; private final ArrayList pendingSetupMessages = new ArrayList<>(); private final DriverConductorProxy conductorProxy; + private final DutyCycleTracker dutyCycleTracker; Receiver(final MediaDriver.Context ctx) { @@ -63,6 +67,7 @@ public final class Receiver implements Agent cachedNanoClock = ctx.receiverCachedNanoClock(); conductorProxy = ctx.driverConductorProxy(); reResolutionCheckIntervalNs = ctx.reResolutionCheckIntervalNs(); + dutyCycleTracker = ctx.receiverDutyCycleTracker(); } /** @@ -72,7 +77,18 @@ public void onStart() { final long nowNs = nanoClock.nanoTime(); cachedNanoClock.update(nowNs); + dutyCycleTracker.update(nowNs); reResolutionDeadlineNs = nowNs + reResolutionCheckIntervalNs; + + if (dutyCycleTracker instanceof DutyCycleStallTracker) + { + final DutyCycleStallTracker dutyCycleStallTracker = (DutyCycleStallTracker)dutyCycleTracker; + + dutyCycleStallTracker.maxCycleTime().appendToLabel(": " + conductorProxy.threadingMode().name()); + dutyCycleStallTracker.cycleTimeThresholdExceededCount().appendToLabel( + ": threshold=" + dutyCycleStallTracker.cycleTimeThresholdNs() + "ns " + + conductorProxy.threadingMode().name()); + } } /** @@ -98,6 +114,7 @@ public int doWork() { final long nowNs = nanoClock.nanoTime(); cachedNanoClock.update(nowNs); + dutyCycleTracker.measureAndUpdateClock(nowNs); int workCount = commandQueue.drain(Runnable::run, Configuration.COMMAND_DRAIN_LIMIT); diff --git a/aeron-driver/src/main/java/io/aeron/driver/Sender.java b/aeron-driver/src/main/java/io/aeron/driver/Sender.java index 6c1df2afd3..fd355b77de 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/Sender.java +++ b/aeron-driver/src/main/java/io/aeron/driver/Sender.java @@ -18,8 +18,12 @@ import io.aeron.ChannelUri; import io.aeron.driver.media.ControlTransportPoller; import io.aeron.driver.media.SendChannelEndpoint; +import io.aeron.driver.status.DutyCycleStallTracker; import org.agrona.collections.ArrayUtil; -import org.agrona.concurrent.*; +import org.agrona.concurrent.Agent; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.NanoClock; +import org.agrona.concurrent.OneToOneConcurrentArrayQueue; import org.agrona.concurrent.status.AtomicCounter; import java.net.InetSocketAddress; @@ -69,6 +73,7 @@ public final class Sender extends SenderRhsPadding implements Agent private final NanoClock nanoClock; private final CachedNanoClock cachedNanoClock; private final DriverConductorProxy conductorProxy; + private final DutyCycleTracker dutyCycleTracker; Sender(final MediaDriver.Context ctx) { @@ -82,6 +87,8 @@ public final class Sender extends SenderRhsPadding implements Agent this.reResolutionCheckIntervalNs = ctx.reResolutionCheckIntervalNs(); this.dutyCycleRatio = ctx.sendToStatusMessagePollRatio(); this.conductorProxy = ctx.driverConductorProxy(); + + this.dutyCycleTracker = ctx.senderDutyCycleTracker(); } /** @@ -91,7 +98,19 @@ public void onStart() { final long nowNs = nanoClock.nanoTime(); cachedNanoClock.update(nowNs); + dutyCycleTracker.update(nowNs); reResolutionDeadlineNs = nowNs + reResolutionCheckIntervalNs; + + if (dutyCycleTracker instanceof DutyCycleStallTracker) + { + final DutyCycleStallTracker dutyCycleStallTracker = (DutyCycleStallTracker)dutyCycleTracker; + + dutyCycleStallTracker.maxCycleTime().appendToLabel( + ": " + conductorProxy.threadingMode().name()); + dutyCycleStallTracker.cycleTimeThresholdExceededCount().appendToLabel( + ": threshold=" + dutyCycleStallTracker.cycleTimeThresholdNs() + "ns " + + conductorProxy.threadingMode().name()); + } } /** @@ -109,6 +128,7 @@ public int doWork() { final long nowNs = nanoClock.nanoTime(); cachedNanoClock.update(nowNs); + dutyCycleTracker.measureAndUpdateClock(nowNs); final int workCount = commandQueue.drain(Runnable::run, Configuration.COMMAND_DRAIN_LIMIT); final int bytesSent = doSend(nowNs); diff --git a/aeron-driver/src/main/java/io/aeron/driver/status/DutyCycleStallTracker.java b/aeron-driver/src/main/java/io/aeron/driver/status/DutyCycleStallTracker.java new file mode 100644 index 0000000000..960a3da894 --- /dev/null +++ b/aeron-driver/src/main/java/io/aeron/driver/status/DutyCycleStallTracker.java @@ -0,0 +1,94 @@ +/* + * Copyright 2014-2022 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.driver.status; + +import io.aeron.driver.DutyCycleTracker; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.status.AtomicCounter; + +/** + * Duty cycle tracker that detects when a cycle exceeds a threshold and tracks max cycle time reporting both through + * counters. + */ +public class DutyCycleStallTracker extends DutyCycleTracker +{ + private final AtomicCounter maxCycleTime; + private final AtomicCounter cycleTimeThresholdExceededCount; + private final long cycleTimeThresholdNs; + + /** + * Create a tracker to track max cycle time and excesses of a threshold. + * + * @param cachedNanoClock to use for tracking. + * @param maxCycleTime counter for tracking. + * @param cycleTimeThresholdExceededCount counter for tracking. + * @param cycleTimeThresholdNs to use for tracking excesses. + */ + public DutyCycleStallTracker( + final CachedNanoClock cachedNanoClock, + final AtomicCounter maxCycleTime, + final AtomicCounter cycleTimeThresholdExceededCount, + final long cycleTimeThresholdNs) + { + super(cachedNanoClock); + this.maxCycleTime = maxCycleTime; + this.cycleTimeThresholdExceededCount = cycleTimeThresholdExceededCount; + this.cycleTimeThresholdNs = cycleTimeThresholdNs; + } + + /** + * Get max cycle time counter. + * + * @return max cycle time counter. + */ + public AtomicCounter maxCycleTime() + { + return maxCycleTime; + } + + /** + * Get threshold exceeded counter. + * + * @return threshold exceeded counter. + */ + public AtomicCounter cycleTimeThresholdExceededCount() + { + return cycleTimeThresholdExceededCount; + } + + /** + * Get threshold value. + * + * @return threshold value. + */ + public long cycleTimeThresholdNs() + { + return cycleTimeThresholdNs; + } + + /** + * {@inheritDoc} + */ + public void reportMeasurement(final long durationNs) + { + maxCycleTime.proposeMaxOrdered(durationNs); + + if (durationNs > cycleTimeThresholdNs) + { + cycleTimeThresholdExceededCount.incrementOrdered(); + } + } +} diff --git a/aeron-driver/src/main/java/io/aeron/driver/status/SystemCounterDescriptor.java b/aeron-driver/src/main/java/io/aeron/driver/status/SystemCounterDescriptor.java index 3100c11518..7289bf51c9 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/status/SystemCounterDescriptor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/status/SystemCounterDescriptor.java @@ -162,7 +162,27 @@ public enum SystemCounterDescriptor /** * Count of the number of times the cycle time threshold has been exceeded by the conductor in its work cycle. */ - CONDUCTOR_CYCLE_TIME_THRESHOLD_EXCEEDED(27, "Conductor work cycle exceeded threshold count"); + CONDUCTOR_CYCLE_TIME_THRESHOLD_EXCEEDED(27, "Conductor work cycle exceeded threshold count"), + + /** + * The maximum time spent by the sender between work cycles. + */ + SENDER_MAX_CYCLE_TIME(28, "Sender max cycle time doing its work (ns)"), + + /** + * Count of the number of times the cycle time threshold has been exceeded by the sender in its work cycle. + */ + SENDER_CYCLE_TIME_THRESHOLD_EXCEEDED(29, "Sender work cycle exceeded threshold count"), + + /** + * The maximum time spent by the receiver between work cycles. + */ + RECEIVER_MAX_CYCLE_TIME(30, "Receiver max cycle time doing its work (ns)"), + + /** + * Count of the number of times the cycle time threshold has been exceeded by the receiver in its work cycle. + */ + RECEIVER_CYCLE_TIME_THRESHOLD_EXCEEDED(31, "Receiver work cycle exceeded threshold count"); /** * All system counters have the same type id, i.e. system counters are the same type. Other types can exist. diff --git a/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java b/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java index 9cddd84497..a99c10e1e2 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java @@ -23,17 +23,24 @@ import io.aeron.driver.exceptions.InvalidChannelException; import io.aeron.driver.media.ReceiveChannelEndpoint; import io.aeron.driver.media.ReceiveChannelEndpointThreadLocals; +import io.aeron.driver.status.DutyCycleStallTracker; import io.aeron.driver.status.SystemCounterDescriptor; import io.aeron.driver.status.SystemCounters; import io.aeron.logbuffer.HeaderWriter; import io.aeron.logbuffer.LogBufferDescriptor; import io.aeron.logbuffer.TermAppender; import io.aeron.protocol.StatusMessageFlyweight; -import org.agrona.*; -import org.agrona.concurrent.*; +import org.agrona.CloseHelper; +import org.agrona.ErrorHandler; +import org.agrona.concurrent.CachedEpochClock; +import org.agrona.concurrent.CachedNanoClock; +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue; +import org.agrona.concurrent.UnsafeBuffer; import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer; import org.agrona.concurrent.ringbuffer.RingBuffer; -import org.agrona.concurrent.status.*; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; +import org.agrona.concurrent.status.CountersReader; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -143,6 +150,12 @@ public void before() when(spySystemCounters.get(SystemCounterDescriptor.ERRORS)).thenReturn(mockErrorCounter); when(mockErrorCounter.appendToLabel(any())).thenReturn(mockErrorCounter); + final DutyCycleStallTracker conductorDutyCycleTracker = new DutyCycleStallTracker( + new CachedNanoClock(), + spySystemCounters.get(CONDUCTOR_MAX_CYCLE_TIME), + spySystemCounters.get(CONDUCTOR_CYCLE_TIME_THRESHOLD_EXCEEDED), + 600_000_000); + final MediaDriver.Context ctx = new MediaDriver.Context() .tempBuffer(new UnsafeBuffer(new byte[METADATA_LENGTH])) .timerIntervalNs(DEFAULT_TIMER_INTERVAL_NS) @@ -172,7 +185,9 @@ public void before() .driverConductorProxy(driverConductorProxy) .receiveChannelEndpointThreadLocals(new ReceiveChannelEndpointThreadLocals()) .conductorCycleThresholdNs(600_000_000) - .nameResolver(DefaultNameResolver.INSTANCE); + .nameResolver(DefaultNameResolver.INSTANCE) + .threadingMode(ThreadingMode.DEDICATED) + .conductorDutyCycleTracker(conductorDutyCycleTracker); driverProxy = new DriverProxy(toDriverCommands, toDriverCommands.nextCorrelationId()); driverConductor = new DriverConductor(ctx); diff --git a/aeron-driver/src/test/java/io/aeron/driver/IpcPublicationTest.java b/aeron-driver/src/test/java/io/aeron/driver/IpcPublicationTest.java index 3fc45a6e0d..3b2d8b9b31 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/IpcPublicationTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/IpcPublicationTest.java @@ -78,7 +78,9 @@ public void setUp() .countersManager(countersManager) .systemCounters(systemCounters) .nameResolver(DefaultNameResolver.INSTANCE) - .nanoClock(new CachedNanoClock()); + .nanoClock(new CachedNanoClock()) + .threadingMode(ThreadingMode.DEDICATED) + .conductorDutyCycleTracker(new DutyCycleTracker(new CachedNanoClock())); ctx.countersValuesBuffer(counterBuffer); diff --git a/aeron-driver/src/test/java/io/aeron/driver/ReceiverTest.java b/aeron-driver/src/test/java/io/aeron/driver/ReceiverTest.java index f0589ac910..8615648ab0 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/ReceiverTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/ReceiverTest.java @@ -119,7 +119,8 @@ public class ReceiverTest .cachedNanoClock(nanoClock) .senderCachedNanoClock(nanoClock) .receiverCachedNanoClock(nanoClock) - .lossReport(mockLossReport); + .lossReport(mockLossReport) + .receiverDutyCycleTracker(new DutyCycleTracker(new CachedNanoClock())); private ReceiveChannelEndpoint receiveChannelEndpoint; @@ -154,7 +155,8 @@ public void setUp() throws Exception .senderCachedNanoClock(nanoClock) .receiverCachedNanoClock(nanoClock) .receiveChannelEndpointThreadLocals(new ReceiveChannelEndpointThreadLocals()) - .driverConductorProxy(driverConductorProxy); + .driverConductorProxy(driverConductorProxy) + .receiverDutyCycleTracker(new DutyCycleTracker(new CachedNanoClock())); receiverProxy = new ReceiverProxy( ThreadingMode.DEDICATED, ctx.receiverCommandQueue(), mock(AtomicCounter.class)); diff --git a/aeron-driver/src/test/java/io/aeron/driver/SenderTest.java b/aeron-driver/src/test/java/io/aeron/driver/SenderTest.java index 3108bd808c..118e2efd51 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/SenderTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/SenderTest.java @@ -124,7 +124,8 @@ public void setUp() .systemCounters(mockSystemCounters) .senderCommandQueue(senderCommandQueue) .nanoClock(nanoClock) - .errorHandler(errorHandler); + .errorHandler(errorHandler) + .senderDutyCycleTracker(new DutyCycleTracker(new CachedNanoClock())); sender = new Sender(ctx); LogBufferDescriptor.initialiseTailWithTermId(rawLog.metaData(), 0, INITIAL_TERM_ID);