Skip to content

Commit

Permalink
Fixed PartitionManager Cmap indexing issue during member leave.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdogan committed Apr 16, 2012
1 parent 656ec95 commit c10f8b0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
24 changes: 22 additions & 2 deletions hazelcast/src/main/java/com/hazelcast/impl/PartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ && initialized && shouldCheckRepartitioning()) {

// for testing purposes only
public boolean activateMigration() {
return migrationActive.getAndSet(true);
return migrationActive.compareAndSet(false, true);
}

// for testing purposes only
public boolean inactivateMigration() {
migrationActive.getAndSet(false);
migrationActive.compareAndSet(true, false);
while (migratingPartition != null) {
try {
Thread.sleep(250);
Expand All @@ -145,6 +145,10 @@ private void sendClusterRuntimeState() {
|| !concurrentMapManager.node.joined()) {
return;
}
if (!migrationActive.get()) {
// migration is disabled because of a member leave, wait till enabled!
return;
}
// do not send partition state until initialized!
// sending partition state makes nodes believe initialization completed.
if (!initialized) return;
Expand Down Expand Up @@ -376,6 +380,11 @@ public void syncForDead(MemberImpl deadMember) {
if (!hasStorageMember()) {
reset();
}
// inactivate migration and sending of ClusterRuntimeState (@see #sendClusterRuntimeState)
// let all members notice the dead and fix their own records and indexes.
// otherwise new master may take action fast and send new partition state
// before other members realize the dead one and fix their records.
final boolean migrationStatus = migrationActive.getAndSet(false);
concurrentMapManager.partitionServiceImpl.reset();
checkMigratingPartitionForDead(deadAddress);
// list of partitions those have dead member in their replicas
Expand All @@ -398,6 +407,17 @@ public void syncForDead(MemberImpl deadMember) {
}
fixCMapsForDead(deadAddress, indexesOfDead);
fixReplicasAndPartitionsForDead(deadMember, indexesOfDead);

final Node node = concurrentMapManager.node;
// activate migration back after connectionDropTime x 10 milliseconds,
// thinking optimistically that all nodes notice the dead one in this period.
final long waitBeforeMigrationActivate = node.groupProperties.CONNECTION_MONITOR_INTERVAL.getLong()
* node.groupProperties.CONNECTION_MONITOR_MAX_FAULTS.getInteger() * 10;
node.executorManager.getScheduledExecutorService().schedule(new Runnable() {
public void run() {
migrationActive.compareAndSet(false, migrationStatus);
}
}, waitBeforeMigrationActivate, TimeUnit.MILLISECONDS);
}

private void fixReplicasAndPartitionsForDead(final MemberImpl deadMember, final int[] indexesOfDead) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void testPartitionAndCMapRecordCounts() throws InterruptedException {
}
Collection<HazelcastInstance> set = Hazelcast.getAllHazelcastInstances();
final int replicaMax = set.size();
int wait = replicaMax * 3;
int wait = replicaMax * 5;
System.out.println("Waiting " + wait + " seconds for partition arrangement...");
Thread.sleep(1000 * wait);
int[] partitionCounts = new int[PartitionInfo.MAX_REPLICA_COUNT];
Expand Down

0 comments on commit c10f8b0

Please sign in to comment.