Skip to content

Commit da42607

Browse files
GEODE-4285: Get a distributed lock if we can't find a PDX type
If we are unable to find a PDX type during a get, will we get a distributed lock and try again. This prevents races where the type may be in the middle of distribution. Adding a dunit test for the race condition that requires this fix.
1 parent 6267bb5 commit da42607

File tree

2 files changed

+206
-9
lines changed

2 files changed

+206
-9
lines changed

geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -446,13 +446,27 @@ private void checkDistributedTypeRegistryState() {
446446
}
447447

448448
public PdxType getType(int typeId) {
449+
return getById(typeId);
450+
}
451+
452+
private <T> T getById(Object typeId) {
449453
verifyConfiguration();
450454
TXStateProxy currentState = suspendTX();
451455
try {
452-
return (PdxType) getIdToType().get(typeId);
456+
T pdxType = (T) getIdToType().get(typeId);
457+
if (pdxType == null) {
458+
lock();
459+
try {
460+
pdxType = (T) getIdToType().get(typeId);
461+
} finally {
462+
unlock();
463+
}
464+
}
465+
return pdxType;
453466
} finally {
454467
resumeTX(currentState);
455468
}
469+
456470
}
457471

458472
public void addRemoteType(int typeId, PdxType type) {
@@ -466,7 +480,7 @@ public void addRemoteType(int typeId, PdxType type) {
466480
// the distributed lock.
467481
lock();
468482
try {
469-
r.put(typeId, type);
483+
r.putIfAbsent(typeId, type);
470484
} finally {
471485
unlock();
472486
}
@@ -703,14 +717,8 @@ public int defineEnum(EnumInfo newInfo) {
703717
}
704718

705719
public EnumInfo getEnumById(int id) {
706-
verifyConfiguration();
707720
EnumId enumId = new EnumId(id);
708-
TXStateProxy currentState = suspendTX();
709-
try {
710-
return (EnumInfo) getIdToType().get(enumId);
711-
} finally {
712-
resumeTX(currentState);
713-
}
721+
return getById(enumId);
714722
}
715723

716724
@Override

geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java

+189
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,31 @@
1616

1717
import static org.junit.Assert.*;
1818

19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutionException;
1921
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
2023

2124
import org.awaitility.Awaitility;
2225
import org.junit.Ignore;
2326
import org.junit.Test;
2427
import org.junit.experimental.categories.Category;
2528

29+
import org.apache.geode.cache.Region;
30+
import org.apache.geode.cache.partition.PartitionRegionHelper;
31+
import org.apache.geode.distributed.DistributedMember;
32+
import org.apache.geode.distributed.internal.ClusterDistributionManager;
33+
import org.apache.geode.distributed.internal.DistributionManager;
34+
import org.apache.geode.distributed.internal.DistributionMessage;
35+
import org.apache.geode.distributed.internal.DistributionMessageObserver;
36+
import org.apache.geode.internal.cache.UpdateOperation;
2637
import org.apache.geode.internal.cache.wan.WANTestBase;
38+
import org.apache.geode.pdx.PdxClientServerDUnitTest;
39+
import org.apache.geode.pdx.PdxReader;
40+
import org.apache.geode.pdx.PdxSerializable;
41+
import org.apache.geode.pdx.PdxWriter;
42+
import org.apache.geode.pdx.internal.PeerTypeRegistration;
43+
import org.apache.geode.test.dunit.AsyncInvocation;
2744
import org.apache.geode.test.dunit.IgnoredException;
2845
import org.apache.geode.test.dunit.Wait;
2946
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
@@ -35,6 +52,7 @@
3552
public class PDXNewWanDUnitTest extends WANTestBase {
3653

3754
private static final long serialVersionUID = 1L;
55+
public static final String KEY_0 = "Key_0";
3856

3957
public PDXNewWanDUnitTest() {
4058
super();
@@ -496,6 +514,115 @@ public void testWANPDX_PR_ParallelSender() {
496514
vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_PR", 1));
497515
}
498516

517+
@Test
518+
public void testWANPDX_PR_ParallelSender_WithDelayedTypeRegistry()
519+
throws InterruptedException, ExecutionException {
520+
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
521+
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
522+
523+
// Create the receiver side of the WAN gateway. Only vm2 will be a receiver, vm3 is
524+
// just a peer
525+
createCacheInVMs(nyPort, vm2, vm3);
526+
vm2.invoke(() -> WANTestBase.createReceiver());
527+
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 4,
528+
isOffHeap()));
529+
vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 4,
530+
isOffHeap()));
531+
532+
AsyncInvocation deserializationFuture;
533+
try {
534+
// Delay processing of sending type registry update from vm2
535+
vm2.invoke(() -> {
536+
DistributionMessageObserver.setInstance(new BlockingPdxTypeUpdateObserver());
537+
});
538+
539+
// Create the sender side of the WAN connection. 2 VMs, with paused senders
540+
vm4.invoke(() -> WANTestBase.createCache(lnPort));
541+
vm5.invoke(() -> WANTestBase.createCache(lnPort));
542+
543+
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false));
544+
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false));
545+
546+
// Create the partitioned region in vm4
547+
vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 0, 4,
548+
isOffHeap()));
549+
550+
vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 0, 4,
551+
isOffHeap()));
552+
553+
vm5.invoke(() -> {
554+
Region region = cache.getRegion(getTestMethodName() + "_PR");
555+
PartitionRegionHelper.assignBucketsToPartitions(region);
556+
});
557+
558+
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
559+
vm5.invoke(() -> WANTestBase.pauseSender("ln"));
560+
561+
// Do some puts to fill up our queues
562+
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 20));
563+
564+
vm4.invoke(() -> {
565+
final Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
566+
PdxValue result = (PdxValue) r.put(KEY_0, new PdxValue(0));
567+
});
568+
569+
// Force VM4 to be the primary
570+
vm4.invoke(() -> {
571+
final Region region = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
572+
DistributedMember primary = PartitionRegionHelper.getPrimaryMemberForKey(region, KEY_0);
573+
// If we are not the primary
574+
DistributedMember localMember = cache.getDistributedSystem().getDistributedMember();
575+
if (!primary.equals(localMember)) {
576+
PartitionRegionHelper.moveBucketByKey(region, primary, localMember, KEY_0);
577+
578+
}
579+
});
580+
581+
vm5.invoke(() -> WANTestBase.resumeSender("ln"));
582+
583+
boolean blocking = vm2.invoke(() -> {
584+
BlockingPdxTypeUpdateObserver observer =
585+
(BlockingPdxTypeUpdateObserver) DistributionMessageObserver.getInstance();
586+
return observer.startedBlocking.await(1, TimeUnit.MINUTES);
587+
});
588+
589+
assertTrue(blocking);
590+
591+
vm4.invoke(() -> WANTestBase.resumeSender("ln"));
592+
593+
vm2.invoke(() -> {
594+
final Region region = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
595+
Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> region.containsKey(KEY_0));
596+
597+
});
598+
599+
// Make sure vm3 can deserialize the value
600+
deserializationFuture = vm3.invokeAsync(() -> {
601+
final Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
602+
PdxValue result = (PdxValue) r.get(KEY_0);
603+
assertEquals(result, new PdxValue(0));
604+
});
605+
606+
try {
607+
deserializationFuture.await(10, TimeUnit.SECONDS);
608+
fail("Get should have been blocked waiting for PDX type to be distributed");
609+
} catch (TimeoutException e) {
610+
// This is what we hope will happen. The get will be blocked by some sort of lock, rather
611+
// than failing due to a missing type.
612+
}
613+
614+
} finally {
615+
616+
vm2.invoke(() -> {
617+
BlockingPdxTypeUpdateObserver observer =
618+
(BlockingPdxTypeUpdateObserver) DistributionMessageObserver.getInstance();
619+
observer.latch.countDown();
620+
});
621+
}
622+
623+
deserializationFuture.get();
624+
}
625+
499626
@Test
500627
public void testWANPDX_PR_ParallelSender_47826() {
501628
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
@@ -724,5 +851,67 @@ public static void verifyFilterInvocation(int invocation) {
724851
}
725852

726853

854+
private static class BlockingPdxTypeUpdateObserver extends DistributionMessageObserver {
855+
private CountDownLatch latch = new CountDownLatch(1);
856+
private CountDownLatch startedBlocking = new CountDownLatch(1);
857+
858+
@Override
859+
public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
860+
if (message instanceof UpdateOperation.UpdateMessage
861+
&& ((UpdateOperation.UpdateMessage) message).getRegionPath()
862+
.contains(PeerTypeRegistration.REGION_FULL_PATH)) {
863+
startedBlocking.countDown();
864+
try {
865+
latch.await();
866+
} catch (InterruptedException e) {
867+
throw new RuntimeException("Interrupted", e);
868+
}
869+
870+
}
871+
}
872+
}
873+
874+
public static class PdxValue implements PdxSerializable {
875+
public int value;
876+
877+
public PdxValue() {
878+
879+
}
880+
881+
public PdxValue(int value) {
882+
this.value = value;
883+
}
884+
885+
@Override
886+
public void toData(PdxWriter writer) {
887+
writer.writeInt("value", value);
888+
889+
}
890+
891+
@Override
892+
public void fromData(PdxReader reader) {
893+
value = reader.readInt("value");
894+
}
895+
896+
@Override
897+
public boolean equals(Object o) {
898+
if (this == o) {
899+
return true;
900+
}
901+
if (o == null || getClass() != o.getClass()) {
902+
return false;
903+
}
904+
905+
PdxValue pdxValue = (PdxValue) o;
906+
907+
return value == pdxValue.value;
908+
}
909+
910+
@Override
911+
public int hashCode() {
912+
return value;
913+
}
914+
}
915+
727916

728917
}

0 commit comments

Comments
 (0)