Skip to content

Commit

Permalink
GEODE-5631: failedBatchRemovalMessageKeys not used after GII (apache#…
Browse files Browse the repository at this point in the history
…2375)

        * After GII a flag is set to indicate that failedBatchRemovalMessageKeys has been processed
	* If this flag is set, no more entries will be put into failedBatchRemovalMessageKeys.
  • Loading branch information
nabarunnag authored Aug 27, 2018
1 parent a383053 commit 85953f0
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,18 @@ public void addToFailedBatchRemovalMessageKeys(Object key) {
failedBatchRemovalMessageKeys.add(key);
}

public boolean isFailedBatchRemovalMessageKeysClearedFlag() {
return failedBatchRemovalMessageKeysClearedFlag;
}

public void setFailedBatchRemovalMessageKeysClearedFlag(
boolean failedBatchRemovalMessageKeysClearedFlag) {
this.failedBatchRemovalMessageKeysClearedFlag = failedBatchRemovalMessageKeysClearedFlag;
}

private boolean failedBatchRemovalMessageKeysClearedFlag = false;


public ConcurrentHashSet<Object> getFailedBatchRemovalMessageKeys() {
return this.failedBatchRemovalMessageKeys;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private void destroyFailedBatchRemovalMessageKeys() {
}
}
}
setFailedBatchRemovalMessageKeysClearedFlag(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void afterAckForSecondary_EventInBucket(AbstractGatewaySender abstractSe
}
}

private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key,
void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key,
PartitionedRegion prQ) {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
Expand All @@ -207,8 +207,12 @@ private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key
}
// add the key to failedBatchRemovalMessageQueue.
// This is to handle the last scenario in #49196
brq.addToFailedBatchRemovalMessageKeys(key);

// But if GII is already completed and FailedBatchRemovalMessageKeys
// are already cleared then no keys should be added to it as they will
// never be cleared and increase the memory footprint.
if (!brq.isFailedBatchRemovalMessageKeysClearedFlag()) {
brq.addToFailedBatchRemovalMessageKeys(key);
}
} catch (ForceReattemptException fe) {
if (isDebugEnabled) {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
Expand All @@ -37,6 +41,7 @@

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Operation;
Expand All @@ -46,11 +51,13 @@
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.BucketRegionQueue;
import org.apache.geode.internal.cache.BucketRegionQueueHelper;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EvictionAttributesImpl;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.KeyInfo;
Expand Down Expand Up @@ -179,6 +186,23 @@ public String answer(final InvocationOnMock invocation) throws Throwable {
new BucketRegionQueueHelper(this.cache, this.queueRegion, this.bucketRegionQueue);
}

@Test
public void ifIsFailedBatchRemovalMessageKeysClearedFlagSetThenAddToFailedBatchRemovalMessageKeysNotCalled()
throws ForceReattemptException {
ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage();
Object object = new Object();
PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class);
doThrow(new EntryNotFoundException("ENTRY NOT FOUND")).when(brq).destroyKey(object);
when(brq.isFailedBatchRemovalMessageKeysClearedFlag()).thenReturn(true);
doNothing().when(brq).addToFailedBatchRemovalMessageKeys(object);
pqrm.destroyKeyFromBucketQueue(brq, object, partitionedRegion);
verify(brq, times(1)).destroyKey(object);
verify(brq, times(1)).isFailedBatchRemovalMessageKeysClearedFlag();
verify(brq, times(0)).addToFailedBatchRemovalMessageKeys(object);

}

@Test
public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue()
throws Exception {
Expand Down

0 comments on commit 85953f0

Please sign in to comment.