Skip to content

Commit

Permalink
[Java]: use setting of clusterTimeMs as indication to reset timer sta…
Browse files Browse the repository at this point in the history
…rtTime if clusterTimeMs not set yet.
  • Loading branch information
tmontgomery committed Jul 26, 2018
1 parent b604866 commit 8874aa9
Showing 1 changed file with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ConsensusModuleAgent implements Agent, MemberStatusListener
private long followerCommitPosition = 0;
private long timeOfLastLogUpdateMs = 0;
private long cachedTimeMs;
private long clusterTimeMs;
private long clusterTimeMs = NULL_VALUE;
private long lastRecordingId = RecordingPos.NULL_RECORDING_ID;
private int logPublicationInitialTermId = NULL_VALUE;
private int logPublicationTermBufferLength = NULL_VALUE;
Expand Down Expand Up @@ -212,10 +212,6 @@ public void onStart()
{
recoverFromSnapshot(recoveryPlan.snapshots.get(0), archive);
}
else
{
timerService.resetStartTime(epochClock.time());
}

awaitServiceAcks(expectedAckPosition);
}
Expand Down Expand Up @@ -251,7 +247,7 @@ public int doWork()
cachedTimeMs = nowMs;
if (Cluster.Role.LEADER == role)
{
clusterTimeMs = nowMs;
clusterTimeMs(nowMs);
}

isSlowTickCycle = true;
Expand Down Expand Up @@ -736,13 +732,13 @@ void onReplaySessionMessage(
final int length,
final Header header)
{
clusterTimeMs = timestamp;
clusterTimeMs(timestamp);
sessionByIdMap.get(clusterSessionId).lastActivity(timestamp, correlationId);
}

void onReplayTimerEvent(final long correlationId, final long timestamp)
{
clusterTimeMs = timestamp;
clusterTimeMs(timestamp);

if (!timerService.cancelTimer(correlationId))
{
Expand All @@ -758,7 +754,7 @@ void onReplaySessionOpen(
final int responseStreamId,
final String responseChannel)
{
clusterTimeMs = timestamp;
clusterTimeMs(timestamp);

final ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, responseChannel);
session.open(logPosition);
Expand Down Expand Up @@ -793,15 +789,15 @@ void onLoadSession(
void onReplaySessionClose(
final long correlationId, final long clusterSessionId, final long timestamp, final CloseReason closeReason)
{
clusterTimeMs = timestamp;
clusterTimeMs(timestamp);
sessionByIdMap.remove(clusterSessionId).close();
}

@SuppressWarnings("unused")
void onReplayClusterAction(
final long leadershipTermId, final long logPosition, final long timestamp, final ClusterAction action)
{
clusterTimeMs = timestamp;
clusterTimeMs(timestamp);

switch (action)
{
Expand Down Expand Up @@ -835,7 +831,7 @@ void onReplayNewLeadershipTermEvent(
final int leaderMemberId,
final int logSessionId)
{
clusterTimeMs = timestamp;
clusterTimeMs(timestamp);
this.leadershipTermId = leadershipTermId;

if (null != election && null != appendedPosition)
Expand Down Expand Up @@ -1457,7 +1453,7 @@ private void createAppendPosition(final int logSessionId)

private void recoverFromSnapshot(final RecordingLog.Snapshot snapshot, final AeronArchive archive)
{
clusterTimeMs = snapshot.timestamp;
clusterTimeMs(snapshot.timestamp);
expectedAckPosition = snapshot.logPosition;
leadershipTermId = snapshot.leadershipTermId;

Expand All @@ -1466,8 +1462,6 @@ private void recoverFromSnapshot(final RecordingLog.Snapshot snapshot, final Aer
final int sessionId = (int)archive.startReplay(snapshot.recordingId, 0, NULL_LENGTH, channel, streamId);
final String replaySubscriptionChannel = ChannelUri.addSessionId(channel, sessionId);

timerService.resetStartTime(clusterTimeMs);

try (Subscription subscription = aeron.addSubscription(replaySubscriptionChannel, streamId))
{
final Image image = awaitImage(sessionId, subscription);
Expand Down Expand Up @@ -1840,4 +1834,14 @@ private void onUnavailableIngressImage(final Image image)
{
ingressAdapter.freeSessionBuffer(image.sessionId());
}

private void clusterTimeMs(final long nowMs)
{
if (NULL_VALUE == clusterTimeMs)
{
timerService.resetStartTime(nowMs);
}

clusterTimeMs = nowMs;
}
}

0 comments on commit 8874aa9

Please sign in to comment.