Skip to content

Commit

Permalink
[Java]: basic mechanics of adding passive member to cluster. If follo…
Browse files Browse the repository at this point in the history
…wer, redirect add to leader. Add check for add/remove response to duty cycle. Message contents refinement.
  • Loading branch information
tmontgomery committed Sep 1, 2018
1 parent c4f41cd commit 01ddf59
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 45 deletions.
128 changes: 127 additions & 1 deletion aeron-cluster/src/main/java/io/aeron/cluster/ClusterMember.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.aeron.Publication;
import io.aeron.cluster.client.ClusterException;
import org.agrona.CloseHelper;
import org.agrona.collections.ArrayUtil;

import static io.aeron.CommonContext.ENDPOINT_PARAM_NAME;
import static io.aeron.CommonContext.UDP_MEDIA;
Expand All @@ -37,6 +38,7 @@ public final class ClusterMember
private long logPosition = NULL_POSITION;
private long candidateTermId = Aeron.NULL_VALUE;
private long catchupReplaySessionId = Aeron.NULL_VALUE;
private long changeCorrelationId = Aeron.NULL_VALUE;
private final String clientFacingEndpoint;
private final String memberFacingEndpoint;
private final String logEndpoint;
Expand Down Expand Up @@ -243,6 +245,17 @@ public long catchupReplaySessionId()
return catchupReplaySessionId;
}

public ClusterMember changeCorrelationId(final long correlationId)
{
this.changeCorrelationId = correlationId;
return this;
}

public long changeCorrelationId()
{
return changeCorrelationId;
}

/**
* The address:port endpoint for this cluster member that clients will connect to.
*
Expand Down Expand Up @@ -349,19 +362,73 @@ public static ClusterMember[] parse(final String value)
throw new ClusterException("invalid member value: " + endpointsDetail + " within: " + value);
}

final String justEndpoints = String.join(
",",
memberAttributes[1],
memberAttributes[2],
memberAttributes[3],
memberAttributes[4],
memberAttributes[5]);

members[i] = new ClusterMember(
Integer.parseInt(memberAttributes[0]),
memberAttributes[1],
memberAttributes[2],
memberAttributes[3],
memberAttributes[4],
memberAttributes[5],
endpointsDetail);
justEndpoints);
}

return members;
}

public static ClusterMember parseEndpoints(final int id, final String endpointsDetail)
{
final String[] memberAttributes = endpointsDetail.split(",");
if (memberAttributes.length != 5)
{
throw new ClusterException("invalid member value: " + endpointsDetail);
}

return new ClusterMember(
id,
memberAttributes[1],
memberAttributes[2],
memberAttributes[3],
memberAttributes[4],
memberAttributes[5],
endpointsDetail);
}

/**
* Fill a string with member details from a cluster members array.
*
* @param clusterMembers to fill the details from
* @return String representation suitable for use with {@link ClusterMember#parse}
*/
public static String membersString(final ClusterMember[] clusterMembers)
{
final StringBuilder builder = new StringBuilder();

for (int i = 0, length = clusterMembers.length; i < length; i++)
{
final ClusterMember member = clusterMembers[i];

builder
.append(member.id())
.append(',')
.append(member.endpointsDetail());

if ((length - 1) != i)
{
builder.append('|');
}
}

return builder.toString();
}

/**
* Add the publications for sending status messages to the other members of the cluster.
*
Expand Down Expand Up @@ -402,6 +469,17 @@ public static void closeMemberPublications(final ClusterMember[] clusterMembers)
}
}

public static void addMemberStatusPublication(
final ClusterMember member,
final ChannelUri channelUri,
final int streamId,
final Aeron aeron)
{
channelUri.put(ENDPOINT_PARAM_NAME, member.memberFacingEndpoint());
final String channel = channelUri.toString();
member.publication(aeron.addExclusivePublication(channel, streamId));
}

/**
* The threshold of clusters members required to achieve quorum given a count of cluster members.
*
Expand Down Expand Up @@ -712,6 +790,54 @@ public static int compareLog(final ClusterMember lhsMember, final ClusterMember
lhsMember.leadershipTermId, lhsMember.logPosition, rhsMember.leadershipTermId, rhsMember.logPosition);
}

public static boolean isNotDuplicateMember(final ClusterMember[] passiveMembers, final String memberEndpoints)
{
final int length = passiveMembers.length;

for (int i = 0; i < length; i++)
{
if (passiveMembers[i].endpointsDetail().equals(memberEndpoints))
{
return false;
}
}

return true;
}

public static ClusterMember[] addMember(final ClusterMember[] oldMembers, final ClusterMember newMember)
{
return ArrayUtil.add(oldMembers, newMember);
}

public static ClusterMember[] removeMember(final ClusterMember[] oldMembers, final int memberId)
{
final int length = oldMembers.length;
int index = ArrayUtil.UNKNOWN_INDEX;

for (int i = 0; i < length; i++)
{
if (oldMembers[i].id() == memberId)
{
index = i;
}
}

return ArrayUtil.remove(oldMembers, index);
}

public static int highMemberId(final ClusterMember[] clusterMembers)
{
int highId = Aeron.NULL_VALUE;

for (int i = 0, length = clusterMembers.length; i < length; i++)
{
highId = Math.max(highId, clusterMembers[i].id());
}

return highId;
}

public String toString()
{
return "ClusterMember{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ class ConsensusModuleAgent implements Agent, MemberStatusListener
private int logPublicationInitialTermId = NULL_VALUE;
private int logPublicationTermBufferLength = NULL_VALUE;
private int logPublicationMtuLength = NULL_VALUE;
private int highMemberId = NULL_VALUE;
private ReadableCounter appendedPosition;
private Counter commitPosition;
private ConsensusModule.State state = ConsensusModule.State.INIT;
private Cluster.Role role;
private ClusterMember[] clusterMembers;
private ClusterMember[] passiveMembers = new ClusterMember[0];
private ClusterMember leaderMember;
private final ClusterMember thisMember;
private long[] rankedPositions;
Expand Down Expand Up @@ -145,6 +147,7 @@ class ConsensusModuleAgent implements Agent, MemberStatusListener
this.tempBuffer = ctx.tempBuffer();
this.serviceHeartbeats = ctx.serviceHeartbeatCounters();
this.serviceAcks = ServiceAck.newArray(ctx.serviceCount());
this.highMemberId = ClusterMember.highMemberId(clusterMembers);

aeronClientInvoker = aeron.conductorAgentInvoker();
aeronClientInvoker.invoke();
Expand Down Expand Up @@ -566,40 +569,56 @@ public void onRecordingLog(final RecordingLogDecoder recordingLogDecoder)
{
}

public void onAddClusterMember(final String memberEndpoints)
public void onAddClusterMember(final long correlationId, final String memberEndpoints)
{
if (null == election && Cluster.Role.LEADER == role)
{
// TODO: add cluster member to list (no affect on group size)
// TODO: send ClusterMembersChange to new memberId
if (ClusterMember.isNotDuplicateMember(passiveMembers, memberEndpoints))
{
final ClusterMember newMember = ClusterMember.parseEndpoints(++highMemberId, memberEndpoints);

newMember.changeCorrelationId(correlationId);
passiveMembers = ClusterMember.addMember(passiveMembers, newMember);

final ChannelUri memberStatusUri = ChannelUri.parse(ctx.memberStatusChannel());

ClusterMember.addMemberStatusPublication(
newMember, memberStatusUri, ctx.memberStatusStreamId(), aeron);
}
}
else if (null == election && Cluster.Role.FOLLOWER == role)
{
// redirect add to leader. Leader will respond
memberStatusPublisher.addClusterMember(leaderMember.publication(), correlationId, memberEndpoints);
}
}

public void onRemoveClusterMember(final int memberId)
public void onRemoveClusterMember(final long correlationId, final int memberId)
{
if (null == election && Cluster.Role.LEADER == role)
{
// TODO: remove cluster member from list (no affect on group size)
// TODO: send ClusterMembersChange to all cluster members
// TODO: close publication for old cluster member
// TODO: remove cluster member from passive list
}
}

public void onClusterMembersChange(final String clusterMembers)
public void onClusterMembersChange(
final long correlationId, final int leaderMemberId, final String activeMembers, final String passiveMembers)
{
// TODO: send snapshotRecordingQuery if was added to passiveMembers
}

public void onSnapshotRecordingQuery(final int requestMemberId)
public void onSnapshotRecordingQuery(final long correlationId, final int requestMemberId)
{
}

public void onSnapshotRecordings(final SnapshotRecordingsDecoder snapshotRecordingsDecoder)
public void onSnapshotRecordings(
final long correlationId, final SnapshotRecordingsDecoder snapshotRecordingsDecoder)
{
}

public void onJoinCluster(final long leadershipTermId, final int memberId)
{
// TODO: add member officially from passive.
// TODO: add member officially from passive. Keep existing publication.
// TODO: send ClusterChange event to log
}

Expand Down Expand Up @@ -1278,6 +1297,7 @@ private int slowTickCycle(final long nowMs)
{
workCount += processPendingSessions(pendingSessions, nowMs);
workCount += checkSessions(sessionByIdMap, nowMs);
workCount += processPassiveMembers(passiveMembers);
}
}
else
Expand Down Expand Up @@ -1467,6 +1487,33 @@ private int processRedirectSessions(final ArrayList<ClusterSession> redirectSess
return workCount;
}

private int processPassiveMembers(final ClusterMember[] passiveMembers)
{
int workCount = 0;

for (int i = 0, length = passiveMembers.length; i < length; i++)
{
final ClusterMember member = passiveMembers[i];

if (member.changeCorrelationId() != Aeron.NULL_VALUE)
{
// TODO: differentiate leave and add as leave needs to remove close publication and member on success.
if (memberStatusPublisher.clusterMemberChange(
member.publication(),
member.changeCorrelationId(),
leaderMember.id(),
ClusterMember.membersString(clusterMembers),
ClusterMember.membersString(passiveMembers)))
{
member.changeCorrelationId(Aeron.NULL_VALUE);
workCount++;
}
}
}

return workCount;
}

private int checkSessions(final Long2ObjectHashMap<ClusterSession> sessionByIdMap, final long nowMs)
{
int workCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

memberStatusListener.onAddClusterMember(addClusterMemberDecoder.memberEndpoints());
memberStatusListener.onAddClusterMember(
addClusterMemberDecoder.correlationId(), addClusterMemberDecoder.memberEndpoints());
break;

case RemoveClusterMemberDecoder.TEMPLATE_ID:
Expand All @@ -254,7 +255,8 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

memberStatusListener.onRemoveClusterMember(removeClusterMemberDecoder.memberId());
memberStatusListener.onRemoveClusterMember(
removeClusterMemberDecoder.correlationId(), removeClusterMemberDecoder.memberId());
break;

case ClusterMembersChangeDecoder.TEMPLATE_ID:
Expand All @@ -264,7 +266,11 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

memberStatusListener.onClusterMembersChange(clusterMembersChangeDecoder.clusterMembers());
memberStatusListener.onClusterMembersChange(
clusterMembersChangeDecoder.correlationId(),
clusterMembersChangeDecoder.leaderMemberId(),
clusterMembersChangeDecoder.activeMembers(),
clusterMembersChangeDecoder.passiveMembers());
break;

case SnapshotRecordingQueryDecoder.TEMPLATE_ID:
Expand All @@ -274,7 +280,8 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

memberStatusListener.onSnapshotRecordingQuery(snapshotRecordingQueryDecoder.requestMemberId());
memberStatusListener.onSnapshotRecordingQuery(
snapshotRecordingQueryDecoder.correlationId(), snapshotRecordingQueryDecoder.requestMemberId());
break;

case SnapshotRecordingsDecoder.TEMPLATE_ID:
Expand All @@ -284,7 +291,8 @@ public void onFragment(final DirectBuffer buffer, final int offset, final int le
messageHeaderDecoder.blockLength(),
messageHeaderDecoder.version());

memberStatusListener.onSnapshotRecordings(snapshotRecordingsDecoder);
memberStatusListener.onSnapshotRecordings(
snapshotRecordingsDecoder.correlationId(), snapshotRecordingsDecoder);
break;

case JoinClusterDecoder.TEMPLATE_ID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ void onRecordingLogQuery(

void onRecordingLog(RecordingLogDecoder recordingLogDecoder);

void onAddClusterMember(String memberEndpoints);
void onAddClusterMember(long correlationId, String memberEndpoints);

void onRemoveClusterMember(int memberId);
void onRemoveClusterMember(long correlationId, int memberId);

void onClusterMembersChange(String clusterMembers);
void onClusterMembersChange(long correlationId, int leaderMemberId, String activeMembers, String passiveMembers);

void onSnapshotRecordingQuery(int requestMemberId);
void onSnapshotRecordingQuery(long correlationId, int requestMemberId);

void onSnapshotRecordings(SnapshotRecordingsDecoder snapshotRecordingsDecoder);
void onSnapshotRecordings(long correlationId, SnapshotRecordingsDecoder snapshotRecordingsDecoder);

void onJoinCluster(long leadershipTermId, int memberId);

Expand Down
Loading

0 comments on commit 01ddf59

Please sign in to comment.