Skip to content

Commit

Permalink
Geode 6374 - Fixing DistributedNoAckRegionCCEOffHeapDUnitTest.testNoL…
Browse files Browse the repository at this point in the history
…oaderWithInvalidEntry (apache#4409)

- Changes to wait and single sourcing code.
- Cleaning out AttributeFactory usages
- Cleanup of RegionFactory usages
- Deleting an unneeded test
  • Loading branch information
mhansonp authored Dec 17, 2019
1 parent 76f7f85 commit b1093e1
Show file tree
Hide file tree
Showing 25 changed files with 2,805 additions and 3,881 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public DiskDistributedNoAckAsyncRegionDUnitTest() {
}

@Override
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
protected <K, V> RegionAttributes<K, V> getRegionAttributes() {
AttributesFactory<K, V> factory = new AttributesFactory<>();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);

File[] diskDirs = new File[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.util.Map;
import java.util.Properties;
Expand All @@ -37,18 +37,15 @@
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;


public class DistributedNoAckRegionCCEDUnitTest extends DistributedNoAckRegionDUnitTest {

static volatile boolean ListenerBlocking;
private static volatile boolean ListenerBlocking;

@Override
public Properties getDistributedSystemProperties() {
Expand All @@ -65,21 +62,21 @@ public Properties getDistributedSystemProperties() {
* Returns region attributes for a <code>GLOBAL</code> region
*/
@Override
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
protected <K, V> RegionAttributes<K, V> getRegionAttributes() {
AttributesFactory<K, V> factory = new AttributesFactory<>();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(true);
return factory.create();
}

@Override
protected RegionAttributes getRegionAttributes(String type) {
RegionAttributes ra = getCache().getRegionAttributes(type);
protected <K, V> RegionAttributes<K, V> getRegionAttributes(String type) {
RegionAttributes<K, V> ra = getCache().getRegionAttributes(type);
if (ra == null) {
throw new IllegalStateException("The region shortcut " + type + " has been removed.");
}
AttributesFactory factory = new AttributesFactory(ra);
AttributesFactory<K, V> factory = new AttributesFactory<>(ra);
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
factory.setConcurrencyChecksEnabled(true);
return factory.create();
Expand All @@ -99,11 +96,10 @@ public void testEntryTtlLocalDestroy() {

@Test
public void testClearWithManyEventsInFlight() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
VM vm0 = VM.getVM(0);
VM vm1 = VM.getVM(1);
VM vm2 = VM.getVM(2);
VM vm3 = VM.getVM(3);

// create replicated regions in VM 0 and 1, then perform concurrent ops
// on the same key while creating the region in VM2. Afterward make
Expand All @@ -114,48 +110,51 @@ public void testClearWithManyEventsInFlight() {
createRegionWithAttribute(vm1, name, false);
createRegionWithAttribute(vm2, name, false);
createRegionWithAttribute(vm3, name, false);
vm0.invoke(() -> DistributedNoAckRegionCCEDUnitTest.addBlockingListener());
vm1.invoke(() -> DistributedNoAckRegionCCEDUnitTest.addBlockingListener());
vm2.invoke(() -> DistributedNoAckRegionCCEDUnitTest.addBlockingListener());
AsyncInvocation vm0Ops = vm0.invokeAsync(() -> DistributedNoAckRegionCCEDUnitTest.doManyOps());
AsyncInvocation vm1Ops = vm1.invokeAsync(() -> DistributedNoAckRegionCCEDUnitTest.doManyOps());
AsyncInvocation vm2Ops = vm2.invokeAsync(() -> DistributedNoAckRegionCCEDUnitTest.doManyOps());
vm0.invoke(DistributedNoAckRegionCCEDUnitTest::addBlockingListener);
vm1.invoke(DistributedNoAckRegionCCEDUnitTest::addBlockingListener);
vm2.invoke(DistributedNoAckRegionCCEDUnitTest::addBlockingListener);
AsyncInvocation vm0Ops = vm0.invokeAsync(DistributedNoAckRegionCCEDUnitTest::doManyOps);
AsyncInvocation vm1Ops = vm1.invokeAsync(DistributedNoAckRegionCCEDUnitTest::doManyOps);
AsyncInvocation vm2Ops = vm2.invokeAsync(DistributedNoAckRegionCCEDUnitTest::doManyOps);
// pause to let a bunch of operations build up
Wait.pause(5000);
AsyncInvocation a0 = vm3.invokeAsync(() -> DistributedNoAckRegionCCEDUnitTest.clearRegion());
vm0.invoke(() -> DistributedNoAckRegionCCEDUnitTest.unblockListener());
vm1.invoke(() -> DistributedNoAckRegionCCEDUnitTest.unblockListener());
vm2.invoke(() -> DistributedNoAckRegionCCEDUnitTest.unblockListener());
AsyncInvocation a0 = vm3.invokeAsync(DistributedNoAckRegionCCEDUnitTest::clearRegion);
vm0.invoke(DistributedNoAckRegionCCEDUnitTest::unblockListener);
vm1.invoke(DistributedNoAckRegionCCEDUnitTest::unblockListener);
vm2.invoke(DistributedNoAckRegionCCEDUnitTest::unblockListener);
waitForAsyncProcessing(a0, "");
waitForAsyncProcessing(vm0Ops, "");
waitForAsyncProcessing(vm1Ops, "");
waitForAsyncProcessing(vm2Ops, "");

Wait.pause(2000);// this test has with noack, thus we should wait before validating entries
// check consistency of the regions
Map r0Contents = vm0.invoke(() -> getCCRegionContents());
Map r1Contents = vm1.invoke(() -> getCCRegionContents());
Map r2Contents = vm2.invoke(() -> getCCRegionContents());
Map r3Contents = vm3.invoke(() -> getCCRegionContents());
Map r0Contents = vm0.invoke(MultiVMRegionTestCase::getCCRegionContents);
Map r1Contents = vm1.invoke(MultiVMRegionTestCase::getCCRegionContents);
Map r2Contents = vm2.invoke(MultiVMRegionTestCase::getCCRegionContents);
Map r3Contents = vm3.invoke(MultiVMRegionTestCase::getCCRegionContents);

for (int i = 0; i < 10; i++) {
String key = "cckey" + i;
assertEquals("region contents are not consistent", r0Contents.get(key), r1Contents.get(key));
assertEquals("region contents are not consistent", r1Contents.get(key), r2Contents.get(key));
assertEquals("region contents are not consistent", r2Contents.get(key), r3Contents.get(key));
assertThat(r0Contents.get(key)).withFailMessage("region contents are not consistent")
.isEqualTo(r1Contents.get(key));
assertThat(r1Contents.get(key)).withFailMessage("region contents are not consistent")
.isEqualTo(r2Contents.get(key));
assertThat(r2Contents.get(key)).withFailMessage("region contents are not consistent")
.isEqualTo(r3Contents.get(key));
for (int subi = 1; subi < 3; subi++) {
String subkey = key + "-" + subi;
assertEquals("region contents are not consistent", r0Contents.get(subkey),
r1Contents.get(subkey));
assertEquals("region contents are not consistent", r1Contents.get(subkey),
r2Contents.get(subkey));
assertEquals("region contents are not consistent", r2Contents.get(subkey),
r3Contents.get(subkey));
assertThat(r0Contents.get(subkey)).withFailMessage("region contents are not consistent")
.isEqualTo(r1Contents.get(subkey));
assertThat(r1Contents.get(subkey)).withFailMessage("region contents are not consistent")
.isEqualTo(r2Contents.get(subkey));
assertThat(r2Contents.get(subkey)).withFailMessage("region contents are not consistent")
.isEqualTo(r3Contents.get(subkey));
}
}
}

static void addBlockingListener() {
private static void addBlockingListener() {
ListenerBlocking = true;
CCRegion.getAttributesMutator().addCacheListener(new CacheListenerAdapter() {
@Override
Expand All @@ -168,28 +167,27 @@ private void onEvent(EntryEvent event) {
if (event.isOriginRemote()) {
synchronized (this) {
while (ListenerBlocking) {
LogWriterUtils.getLogWriter()
logger
.info("blocking cache operations for " + event.getDistributedMember());
blocked = true;
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LogWriterUtils.getLogWriter().info("blocking cache listener interrupted");
logger.info("blocking cache listener interrupted");
return;
}
}
}
if (blocked) {
LogWriterUtils.getLogWriter()
.info("allowing cache operations for " + event.getDistributedMember());
logger.info("allowing cache operations for " + event.getDistributedMember());
}
}
}

@Override
public void close() {
LogWriterUtils.getLogWriter().info("closing blocking listener");
logger.info("closing blocking listener");
ListenerBlocking = false;
synchronized (this) {
notifyAll();
Expand All @@ -213,20 +211,20 @@ public void afterDestroy(EntryEvent event) {
});
}

static void doManyOps() {
private static void doManyOps() {
// do not include putAll, which requires an Ack to detect failures
doOpsLoopNoFlush(5000, false, false);
doOpsLoopNoFlush(false, false);
}

static void unblockListener() {
private static void unblockListener() {
CacheListener listener = CCRegion.getCacheListener();
ListenerBlocking = false;
synchronized (listener) {
listener.notifyAll();
}
}

static void clearRegion() {
private static void clearRegion() {
CCRegion.clear();
}

Expand All @@ -241,12 +239,6 @@ public void testGIISendsTombstones() {
versionTestGIISendsTombstones();
}


protected void do_version_recovery_if_necessary(final VM vm0, final VM vm1, final VM vm2,
final Object[] params) {
// do nothing here
}

/**
* This tests the concurrency versioning system to ensure that event conflation happens correctly
* and that the statistic is being updated properly
Expand Down Expand Up @@ -282,52 +274,60 @@ public void testTombstones() {

@Test
public void testOneHopKnownIssues() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3); // this VM, but treat as a remote for uniformity
VM vm0 = VM.getVM(0);
VM vm1 = VM.getVM(1);
VM vm2 = VM.getVM(2);

assertThat(vm0).isNotNull();
assertThat(vm1).isNotNull();
assertThat(vm2).isNotNull();
// create an empty region in vm0 and replicated regions in VM 1 and 3,
// then perform concurrent ops
// on the same key while creating the region in VM2. Afterward make
// sure that all three regions are consistent

final String name = this.getUniqueName() + "-CC";
SerializableRunnable createRegion = new SerializableRunnable("Create Region") {
@Override
public void run() {
try {
final RegionFactory f;
int vmNumber = VM.getCurrentVMNum();
switch (vmNumber) {
case 0:
f = getCache().createRegionFactory(
getRegionAttributes(RegionShortcut.REPLICATE_PROXY.toString()));
break;
case 1:
f = getCache()
.createRegionFactory(getRegionAttributes(RegionShortcut.REPLICATE.toString()));
f.setDataPolicy(DataPolicy.NORMAL);
break;
default:
f = getCache().createRegionFactory(getRegionAttributes());
break;
}
CCRegion = (LocalRegion) f.create(name);
} catch (CacheException ex) {
Assert.fail("While creating region", ex);
}
}
};

vm0.invoke(createRegion); // empty
vm1.invoke(createRegion); // normal
vm2.invoke(createRegion); // replicate
assertThat(vm0.invoke("Create Region", () -> {
try {
final RegionFactory f = getCache().createRegionFactory(
getRegionAttributes(RegionShortcut.REPLICATE_PROXY.toString()));

CCRegion = (LocalRegion) f.create(name);
assertThat(CCRegion).isNotNull();
} catch (CacheException ex) {
fail("While creating region", ex);
}
return true;
})).isTrue(); // empty

assertThat(vm1.invoke("Create Region", () -> {
try {
final RegionFactory f = getCache()
.createRegionFactory(getRegionAttributes(RegionShortcut.REPLICATE.toString()));
f.setDataPolicy(DataPolicy.NORMAL);

CCRegion = (LocalRegion) f.create(name);
assertThat(CCRegion).isNotNull();
} catch (CacheException ex) {
fail("While creating region", ex);
}
return true;
})).isTrue(); // normal

assertThat(vm2.invoke("Create Region", () -> {
try {
final RegionFactory f = getCache().createRegionFactory(getRegionAttributes());
CCRegion = (LocalRegion) f.create(name);
assertThat(CCRegion).isNotNull();
} catch (CacheException ex) {
fail("While creating region", ex);
}
return true;
})).isTrue(); // replicate

// case 1: entry already invalid on vm2 (replicate) is invalidated by vm0 (empty)
final String invalidationKey = "invalidationKey";
final String destroyKey = "destroyKey";
SerializableRunnable test =
new SerializableRunnable() {
@Override
Expand All @@ -337,7 +337,8 @@ public void run() {
long invalidationCount = CCRegion.getCachePerfStats().getInvalidates();
CCRegion.invalidate(invalidationKey);
CCRegion.invalidate(invalidationKey);
assertEquals(invalidationCount + 1, CCRegion.getCachePerfStats().getInvalidates());
assertThat(invalidationCount + 1)
.isEqualTo(CCRegion.getCachePerfStats().getInvalidates());

// also test destroy() while we're at it. It should throw an exception
long destroyCount = CCRegion.getCachePerfStats().getDestroys();
Expand All @@ -348,7 +349,7 @@ public void run() {
} catch (EntryNotFoundException e) {
// expected
}
assertEquals(destroyCount + 1, CCRegion.getCachePerfStats().getDestroys());
assertThat(destroyCount + 1).isEqualTo(CCRegion.getCachePerfStats().getDestroys());
}
};
vm0.invoke("case 1: second invalidation not applied or distributed", test);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,18 @@ public Properties getDistributedSystemProperties() {
return props;
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
protected RegionAttributes getRegionAttributes() {
RegionAttributes attrs = super.getRegionAttributes();
AttributesFactory factory = new AttributesFactory(attrs);
protected <K, V> RegionAttributes<K, V> getRegionAttributes() {
RegionAttributes<K, V> attrs = super.getRegionAttributes();
AttributesFactory<K, V> factory = new AttributesFactory<>(attrs);
factory.setOffHeap(true);
return factory.create();
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
protected RegionAttributes getRegionAttributes(String type) {
RegionAttributes ra = super.getRegionAttributes(type);
AttributesFactory factory = new AttributesFactory(ra);
protected <K, V> RegionAttributes<K, V> getRegionAttributes(String type) {
RegionAttributes<K, V> ra = super.getRegionAttributes(type);
AttributesFactory<K, V> factory = new AttributesFactory<>(ra);
if (!ra.getDataPolicy().isEmpty()) {
factory.setOffHeap(true);
}
Expand Down
Loading

0 comments on commit b1093e1

Please sign in to comment.