Skip to content

Commit

Permalink
[Java]: add duty cycle tracking to agent of driver, archive, consensu…
Browse files Browse the repository at this point in the history
…s module, and service container. Default tracker is stall tracker that tracks duty cycle max ns and count of exceeds of threshold as counters.
  • Loading branch information
tmontgomery committed Jul 8, 2022
1 parent 31b7e47 commit 5b9d847
Show file tree
Hide file tree
Showing 22 changed files with 1,080 additions and 34 deletions.
275 changes: 275 additions & 0 deletions aeron-archive/src/main/java/io/aeron/archive/Archive.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.aeron.archive.checksum.Checksums;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.driver.status.DutyCycleStallTracker;
import io.aeron.exceptions.ConcurrentConcludeException;
import io.aeron.exceptions.ConfigurationException;
import io.aeron.security.Authenticator;
Expand Down Expand Up @@ -388,6 +390,36 @@ public static final class Configuration
public static final long REPLAY_LINGER_TIMEOUT_DEFAULT_NS =
io.aeron.driver.Configuration.publicationLingerTimeoutNs();

/**
* Property name for threshold value for the conductor work cycle threshold to track for being exceeded.
*/
public static final String CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME = "aeron.archive.conductor.cycle.threshold";

/**
* Default threshold value for the conductor work cycle threshold to track for being exceeded.
*/
public static final long CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Property name for threshold value for the recorder work cycle threshold to track for being exceeded.
*/
public static final String RECORDER_CYCLE_THRESHOLD_PROP_NAME = "aeron.archive.recorder.cycle.threshold";

/**
* Default threshold value for the recorder work cycle threshold to track for being exceeded.
*/
public static final long RECORDER_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Property name for threshold value for the replayer work cycle threshold to track for being exceeded.
*/
public static final String REPLAYER_CYCLE_THRESHOLD_PROP_NAME = "aeron.archive.replayer.cycle.threshold";

/**
* Default threshold value for the replayer work cycle threshold to track for being exceeded.
*/
public static final long REPLAYER_CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Should the archive delete existing files on start. Default is false and should only be true for testing.
*/
Expand Down Expand Up @@ -439,6 +471,18 @@ public static final class Configuration
*/
public static final int ARCHIVE_CONTROL_SESSIONS_TYPE_ID = AeronCounters.ARCHIVE_CONTROL_SESSIONS_TYPE_ID;

/**
* The type id of the {@link Counter} used for keeping track of the max duty cycle time of an archive agent.
*/
public static final int ARCHIVE_MAX_CYCLE_TIME_TYPE_ID = AeronCounters.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID;

/**
* The type id of the {@link Counter} used for keeping track of the count of cycle time threshold exceeded of
* an archive agent.
*/
public static final int ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID =
AeronCounters.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID;

/**
* Size in bytes of the error buffer for the archive when not externally provided.
*/
Expand Down Expand Up @@ -663,6 +707,36 @@ public static long replayLingerTimeoutNs()
return getDurationInNanos(REPLAY_LINGER_TIMEOUT_PROP_NAME, REPLAY_LINGER_TIMEOUT_DEFAULT_NS);
}

/**
* Get threshold value for the conductor work cycle threshold to track for being exceeded.
*
* @return threshold value in nanoseconds.
*/
public static long conductorCycleThresholdNs()
{
return getDurationInNanos(CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME, CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS);
}

/**
* Get threshold value for the recorder work cycle threshold to track for being exceeded.
*
* @return threshold value in nanoseconds.
*/
public static long recorderCycleThresholdNs()
{
return getDurationInNanos(RECORDER_CYCLE_THRESHOLD_PROP_NAME, RECORDER_CYCLE_THRESHOLD_DEFAULT_NS);
}

/**
* Get threshold value for the replayer work cycle threshold to track for being exceeded.
*
* @return threshold value in nanoseconds.
*/
public static long replayerCycleThresholdNs()
{
return getDurationInNanos(REPLAYER_CYCLE_THRESHOLD_PROP_NAME, REPLAYER_CYCLE_THRESHOLD_DEFAULT_NS);
}

/**
* Whether to delete directory on start or not.
*
Expand Down Expand Up @@ -816,6 +890,9 @@ public static final class Context implements Cloneable

private long connectTimeoutNs = Configuration.connectTimeoutNs();
private long replayLingerTimeoutNs = Configuration.replayLingerTimeoutNs();
private long conductorCycleThresholdNs = Configuration.conductorCycleThresholdNs();
private long recorderCycleThresholdNs = Configuration.recorderCycleThresholdNs();
private long replayerCycleThresholdNs = Configuration.replayerCycleThresholdNs();
private long maxCatalogEntries = Configuration.maxCatalogEntries();
private long catalogCapacity = Configuration.catalogCapacity();
private long lowStorageSpaceThreshold = Configuration.lowStorageSpaceThreshold();
Expand Down Expand Up @@ -852,6 +929,9 @@ public static final class Context implements Cloneable
private UnsafeBuffer dataBuffer;
private UnsafeBuffer replayBuffer;
private UnsafeBuffer recordChecksumBuffer;
private DutyCycleTracker conductorDutyCycleTracker;
private DutyCycleTracker recorderDutyCycleTracker;
private DutyCycleTracker replayerDutyCycleTracker;

/**
* Perform a shallow copy of the object.
Expand Down Expand Up @@ -1017,6 +1097,19 @@ public void conclude()
idleStrategySupplier = Configuration.idleStrategySupplier(null);
}

if (null == conductorDutyCycleTracker)
{
conductorDutyCycleTracker = new DutyCycleStallTracker(
new CachedNanoClock(),
aeron.addCounter(
Archive.Configuration.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID, "archive-conductor max cycle time (ns)"),
aeron.addCounter(
Archive.Configuration.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID,
"archive-conductor work cycle time exceeded count: threshold=" +
conductorCycleThresholdNs + "ns"),
conductorCycleThresholdNs);
}

if (DEDICATED == threadingMode)
{
if (null == recorderIdleStrategySupplier)
Expand All @@ -1036,6 +1129,34 @@ public void conclude()
replayerIdleStrategySupplier = idleStrategySupplier;
}
}

if (null == recorderDutyCycleTracker)
{
recorderDutyCycleTracker = new DutyCycleStallTracker(
new CachedNanoClock(),
aeron.addCounter(
Archive.Configuration.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID,
"archive-recorder max cycle time (ns)"),
aeron.addCounter(
Archive.Configuration.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID,
"archive-recorder work cycle time exceeded count: threshold=" +
recorderCycleThresholdNs + "ns"),
recorderCycleThresholdNs);
}

if (null == replayerDutyCycleTracker)
{
replayerDutyCycleTracker = new DutyCycleStallTracker(
new CachedNanoClock(),
aeron.addCounter(
Archive.Configuration.ARCHIVE_MAX_CYCLE_TIME_TYPE_ID,
"archive-replayer max cycle time (ns)"),
aeron.addCounter(
Archive.Configuration.ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID,
"archive-replayer work cycle time exceeded count: threshold=" +
replayerCycleThresholdNs + "ns"),
replayerCycleThresholdNs);
}
}

if (!isPowerOfTwo(segmentFileLength))
Expand Down Expand Up @@ -1536,6 +1657,154 @@ public long replayLingerTimeoutNs()
return replayLingerTimeoutNs;
}

/**
* Set a threshold for the conductor work cycle time which when exceed it will increment the
* conductor cycle time exceeded count.
*
* @param thresholdNs value in nanoseconds
* @return this for fluent API.
* @see io.aeron.archive.Archive.Configuration#CONDUCTOR_CYCLE_THRESHOLD_PROP_NAME
* @see io.aeron.archive.Archive.Configuration#CONDUCTOR_CYCLE_THRESHOLD_DEFAULT_NS
*/
public Context conductorCycleThresholdNs(final long thresholdNs)
{
this.conductorCycleThresholdNs = thresholdNs;
return this;
}

/**
* Threshold for the conductor work cycle time which when exceed it will increment the
* conductor cycle time exceeded count.
*
* @return threshold to track for the conductor work cycle time.
*/
public long conductorCycleThresholdNs()
{
return conductorCycleThresholdNs;
}

/**
* Set a threshold for the recorder work cycle time which when exceed it will increment the
* recorder cycle time exceeded count.
*
* @param thresholdNs value in nanoseconds
* @return this for fluent API.
* @see io.aeron.archive.Archive.Configuration#RECORDER_CYCLE_THRESHOLD_PROP_NAME
* @see io.aeron.archive.Archive.Configuration#RECORDER_CYCLE_THRESHOLD_DEFAULT_NS
*/
public Context recorderCycleThresholdNs(final long thresholdNs)
{
this.recorderCycleThresholdNs = thresholdNs;
return this;
}

/**
* Threshold for the recorder work cycle time which when exceed it will increment the
* recorder cycle time exceeded count.
*
* @return threshold to track for the recorder work cycle time.
*/
public long recorderCycleThresholdNs()
{
return recorderCycleThresholdNs;
}

/**
* Set a threshold for the replayer work cycle time which when exceed it will increment the
* replayer cycle time exceeded count.
*
* @param thresholdNs value in nanoseconds
* @return this for fluent API.
* @see io.aeron.archive.Archive.Configuration#REPLAYER_CYCLE_THRESHOLD_PROP_NAME
* @see io.aeron.archive.Archive.Configuration#REPLAYER_CYCLE_THRESHOLD_DEFAULT_NS
*/
public Context replayerCycleThresholdNs(final long thresholdNs)
{
this.replayerCycleThresholdNs = thresholdNs;
return this;
}

/**
* Threshold for the replayer work cycle time which when exceed it will increment the
* replayer cycle time exceeded count.
*
* @return threshold to track for the replayer work cycle time.
*/
public long replayerCycleThresholdNs()
{
return replayerCycleThresholdNs;
}

/**
* Set the duty cycle tracker for the conductor.
*
* @param dutyCycleTracker for the conductor.
* @return this for a fluent API.
*/
public Context conductorDutyCycleTracker(final DutyCycleTracker dutyCycleTracker)
{
this.conductorDutyCycleTracker = dutyCycleTracker;
return this;
}

/**
* The duty cycle tracker for the conductor.
*
* @return duty cycle tracker for the conductor.
*/
public DutyCycleTracker conductorDutyCycleTracker()
{
return conductorDutyCycleTracker;
}

/**
* Set the duty cycle tracker for the recorder.
* NOTE: Only used in DEDICATED threading mode.
*
* @param dutyCycleTracker for the recorder.
* @return this for a fluent API.
*/
public Context recorderDutyCycleTracker(final DutyCycleTracker dutyCycleTracker)
{
this.recorderDutyCycleTracker = dutyCycleTracker;
return this;
}

/**
* The duty cycle tracker for the recorder.
* NOTE: Only used in DEDICATED threading mode.
*
* @return duty cycle tracker for the recorder.
*/
public DutyCycleTracker recorderDutyCycleTracker()
{
return recorderDutyCycleTracker;
}

/**
* Set the duty cycle tracker for the replayer.
* NOTE: Only used in DEDICATED threading mode.
*
* @param dutyCycleTracker for the replayer.
* @return this for a fluent API.
*/
public Context replayerDutyCycleTracker(final DutyCycleTracker dutyCycleTracker)
{
this.replayerDutyCycleTracker = dutyCycleTracker;
return this;
}

/**
* The duty cycle tracker for the replayer.
* NOTE: Only used in DEDICATED threading mode.
*
* @return duty cycle tracker for the replayer.
*/
public DutyCycleTracker replayerDutyCycleTracker()
{
return replayerDutyCycleTracker;
}

/**
* Provides an explicit {@link Checksum} for checksum computation during recording.
*
Expand Down Expand Up @@ -2537,6 +2806,12 @@ public String toString()
"\n dataBuffer=" + dataBuffer +
"\n replayBuffer=" + replayBuffer +
"\n recordChecksumBuffer=" + recordChecksumBuffer +
"\n conductorCycleThresholdNs=" + conductorCycleThresholdNs +
"\n recorderCycleThresholdNs=" + recorderCycleThresholdNs +
"\n replayerCycleThresholdNs=" + replayerCycleThresholdNs +
"\n conductorDutyCycleTracker=" + conductorDutyCycleTracker +
"\n recorderDutyCycleTracker=" + recorderDutyCycleTracker +
"\n replayerDutyCycleTracker=" + replayerDutyCycleTracker +
"\n}";
}
}
Expand Down
Loading

0 comments on commit 5b9d847

Please sign in to comment.