Skip to content

Commit

Permalink
[Java]: rework DynamicJoin to query snapshot lengths asynchronously b…
Browse files Browse the repository at this point in the history
…efore retrieveing them.
  • Loading branch information
tmontgomery committed Nov 25, 2019
1 parent 106fba2 commit 6bcee21
Showing 1 changed file with 99 additions and 39 deletions.
138 changes: 99 additions & 39 deletions aeron-cluster/src/main/java/io/aeron/cluster/DynamicJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.aeron.logbuffer.Header;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.concurrent.status.CountersReader;

import java.util.ArrayList;
Expand All @@ -45,6 +46,7 @@ enum State
{
INIT,
PASSIVE_FOLLOWER,
SNAPSHOT_LENGTH_RETRIEVE,
SNAPSHOT_RETRIEVE,
SNAPSHOT_LOAD,
JOIN_CLUSTER,
Expand All @@ -61,6 +63,7 @@ enum State
private final String memberStatusEndpoint;
private final String transferEndpoint;
private final ArrayList<RecordingLog.Snapshot> leaderSnapshots = new ArrayList<>(4);
private final Long2LongHashMap leaderSnapshotLengthMap = new Long2LongHashMap(NULL_LENGTH);
private final long intervalNs;

private ExclusivePublication memberStatusPublication;
Expand Down Expand Up @@ -141,6 +144,10 @@ int doWork(final long nowNs)
workCount += passiveFollower(nowNs);
break;

case SNAPSHOT_LENGTH_RETRIEVE:
workCount += snapshotLengthRetrieve();
break;

case SNAPSHOT_RETRIEVE:
workCount += snapshotRetrieve();
break;
Expand Down Expand Up @@ -240,7 +247,7 @@ void onSnapshotRecordings(
.controlResponseStreamId(ctx.archiveContext().controlResponseStreamId());

leaderArchiveAsyncConnect = AeronArchive.asyncConnect(leaderArchiveCtx);
state(State.SNAPSHOT_RETRIEVE);
state(State.SNAPSHOT_LENGTH_RETRIEVE);
}
}
}
Expand Down Expand Up @@ -297,6 +304,54 @@ private int passiveFollower(final long nowNs)
return 0;
}

private int snapshotLengthRetrieve()
{
int workCount = 0;

if (null == leaderArchive)
{
leaderArchive = leaderArchiveAsyncConnect.poll();
return null == leaderArchive ? 0 : 1;
}

if (NULL_VALUE == correlationId)
{
final long stopPositionCorrelationId = ctx.aeron().nextCorrelationId();
final RecordingLog.Snapshot snapshot = leaderSnapshots.get(snapshotCursor);

if (leaderArchive.archiveProxy().getStopPosition(
snapshot.recordingId,
stopPositionCorrelationId,
leaderArchive.controlSessionId()))
{
correlationId = stopPositionCorrelationId;
workCount++;
}
}
else if (pollForResponse(leaderArchive, correlationId))
{
final long snapshotStopPosition = (int)leaderArchive.controlResponsePoller().relevantId();

correlationId = NULL_VALUE;

if (NULL_POSITION == snapshotStopPosition)
{
throw new ClusterException("snapshot stopPosition is NULL_POSITION");
}

leaderSnapshotLengthMap.put(snapshotCursor, snapshotStopPosition);
if (++snapshotCursor >= leaderSnapshots.size())
{
snapshotCursor = 0;
state(State.SNAPSHOT_RETRIEVE);
}

workCount++;
}

return workCount;
}

private int snapshotRetrieve()
{
int workCount = 0;
Expand Down Expand Up @@ -345,7 +400,8 @@ else if (null == snapshotRetrieveImage && null != snapshotRetrieveSubscription)
snapshotRetrieveImage = snapshotRetrieveSubscription.imageBySessionId(snapshotReplaySessionId);
if (null != snapshotRetrieveImage)
{
snapshotReader = new SnapshotReader(snapshotRetrieveImage, ctx.aeron().countersReader());
snapshotReader = new SnapshotReader(
snapshotRetrieveImage, ctx.aeron().countersReader(), leaderSnapshotLengthMap.get(snapshotCursor));
workCount++;
}
}
Expand Down Expand Up @@ -452,27 +508,28 @@ static class SnapshotReader implements ControlledFragmentHandler
{
private static final int FRAGMENT_LIMIT = 10;

private boolean inSnapshot = false;
private boolean isDone = false;
private long endPosition = 0;
private final long endPosition;
private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
private final SnapshotMarkerDecoder snapshotMarkerDecoder = new SnapshotMarkerDecoder();
private final CountersReader countersReader;
private final Image image;
private long recordingId = RecordingPos.NULL_RECORDING_ID;
private long recordingPosition = NULL_POSITION;
private int counterId;
private boolean inSnapshot = false;
private boolean inHeader = false;

SnapshotReader(final Image image, final CountersReader countersReader)
SnapshotReader(final Image image, final CountersReader countersReader, final long endPosition)
{
this.countersReader = countersReader;
this.image = image;
counterId = RecordingPos.findCounterIdBySession(countersReader, image.sessionId());
this.counterId = RecordingPos.findCounterIdBySession(countersReader, image.sessionId());
this.endPosition = endPosition;
}

boolean isDone()
{
return isDone && endPosition <= recordingPosition && image.isEndOfStream();
return endPosition <= recordingPosition;
}

long recordingId()
Expand Down Expand Up @@ -505,41 +562,44 @@ int poll()

public Action onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
messageHeaderDecoder.wrap(buffer, offset);

if (messageHeaderDecoder.templateId() == SnapshotMarkerDecoder.TEMPLATE_ID)
if (inHeader)
{
snapshotMarkerDecoder.wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

final long typeId = snapshotMarkerDecoder.typeId();
if (typeId != ConsensusModule.Configuration.SNAPSHOT_TYPE_ID &&
typeId != ClusteredServiceContainer.SNAPSHOT_TYPE_ID)
{
throw new ClusterException("unexpected snapshot type: " + typeId);
}
messageHeaderDecoder.wrap(buffer, offset);

switch (snapshotMarkerDecoder.mark())
if (messageHeaderDecoder.templateId() == SnapshotMarkerDecoder.TEMPLATE_ID)
{
case BEGIN:
if (inSnapshot)
{
throw new ClusterException("already in snapshot");
}
inSnapshot = true;
return Action.CONTINUE;
snapshotMarkerDecoder.wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

final long typeId = snapshotMarkerDecoder.typeId();
if (typeId != ConsensusModule.Configuration.SNAPSHOT_TYPE_ID &&
typeId != ClusteredServiceContainer.SNAPSHOT_TYPE_ID)
{
throw new ClusterException("unexpected snapshot type: " + typeId);
}

case END:
if (!inSnapshot)
{
throw new ClusterException("missing begin snapshot");
}
isDone = true;
endPosition = header.position();
return Action.BREAK;
switch (snapshotMarkerDecoder.mark())
{
case BEGIN:
if (inSnapshot)
{
throw new ClusterException("already in snapshot");
}
inSnapshot = true;
inHeader = true;
return Action.CONTINUE;

case END:
if (!inSnapshot)
{
throw new ClusterException("missing begin snapshot");
}
inHeader = false;
return Action.CONTINUE;
}
}
}

Expand Down

0 comments on commit 6bcee21

Please sign in to comment.