Skip to content

Commit

Permalink
[Java] ClusterBackup should be using the correct stream id when exten…
Browse files Browse the repository at this point in the history
…ding the log.
  • Loading branch information
mjpt777 committed Dec 17, 2019
1 parent b7fe230 commit f90c802
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
31 changes: 28 additions & 3 deletions aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public static class Context
private String memberStatusChannel = Configuration.MEMBER_STATUS_CHANNEL_DEFAULT;
private int memberStatusStreamId = ConsensusModule.Configuration.memberStatusStreamId();
private int replayStreamId = ClusteredServiceContainer.Configuration.replayStreamId();
private int logStreamId = ConsensusModule.Configuration.logStreamId();
private String transferEndpoint = Configuration.TRANSFER_ENDPOINT_DEFAULT;

private long clusterBackupIntervalNs = Configuration.clusterBackupIntervalNs();
Expand Down Expand Up @@ -859,7 +860,7 @@ public int memberStatusStreamId()
}

/**
* Set the stream id for the cluster log and snapshot replay channel.
* Set the stream id for the cluster snapshot replay channel.
*
* @param streamId for the cluster log replay channel.
* @return this for a fluent API
Expand All @@ -872,16 +873,40 @@ public Context replayStreamId(final int streamId)
}

/**
* Get the stream id for the cluster log and snapshot replay channel.
* Get the stream id for the cluster snapshot replay channel.
*
* @return the stream id for the cluster log replay channel.
* @return the stream id for the cluster snapshot replay channel.
* @see io.aeron.cluster.service.ClusteredServiceContainer.Configuration#REPLAY_STREAM_ID_PROP_NAME
*/
public int replayStreamId()
{
return replayStreamId;
}

/**
* Set the stream id for the cluster log channel.
*
* @param streamId for the cluster log channel.
* @return this for a fluent API
* @see ConsensusModule.Configuration#LOG_STREAM_ID_PROP_NAME
*/
public Context logStreamId(final int streamId)
{
logStreamId = streamId;
return this;
}

/**
* Get the stream id for the cluster log channel.
*
* @return the stream id for the cluster log channel.
* @see ConsensusModule.Configuration#LOG_STREAM_ID_PROP_NAME
*/
public int logStreamId()
{
return logStreamId;
}

/**
* Set the transfer endpoint to use for snapshot and log retrieval.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,7 @@ private int liveLogReplay(final long nowMs)
final long replayId = ctx.aeron().nextCorrelationId();
final RecordingLog.Entry logEntry = recordingLog.findLastTerm();
final long startPosition = null == logEntry ?
leaderLogEntry.termBaseLogPosition :
backupArchive.getStopPosition(logEntry.recordingId);
leaderLogEntry.termBaseLogPosition : backupArchive.getStopPosition(logEntry.recordingId);

final String transferChannel = "aeron:udp?endpoint=" + ctx.transferEndpoint();

Expand All @@ -653,7 +652,7 @@ private int liveLogReplay(final long nowMs)
NULL_LENGTH,
leaderCommitPositionCounterId,
transferChannel,
ctx.replayStreamId(),
ctx.logStreamId(),
replayId,
clusterArchive.controlSessionId()))
{
Expand Down Expand Up @@ -691,12 +690,12 @@ else if (pollForResponse(clusterArchive, correlationId))
if (null == logEntry)
{
liveLogReplaySubscriptionId = backupArchive.startRecording(
replaySubscriptionChannel, ctx.replayStreamId(), SourceLocation.REMOTE);
replaySubscriptionChannel, ctx.logStreamId(), SourceLocation.REMOTE);
}
else
{
liveLogReplaySubscriptionId = backupArchive.extendRecording(
logEntry.recordingId, replaySubscriptionChannel, ctx.replayStreamId(), SourceLocation.REMOTE);
logEntry.recordingId, replaySubscriptionChannel, ctx.logStreamId(), SourceLocation.REMOTE);
}
}
}
Expand Down

0 comments on commit f90c802

Please sign in to comment.