From 11e4c3a4ca4bf7ef2203e0fdd111e536e5721e50 Mon Sep 17 00:00:00 2001 From: Xiaojian Zhou Date: Fri, 9 Jul 2021 10:54:17 -0700 Subject: [PATCH] =?UTF-8?q?GEODE-9346:=20When=20client=20received=20incorr?= =?UTF-8?q?ect=20byte=20array=20of=20PdxType=20due=20=E2=80=A6=20(#6561)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cache/query/dunit/PDXQueryTestBase.java | 19 +- .../query/dunit/PdxLocalQueryDUnitTest.java | 6 +- .../dunit/PdxMultiThreadQueryDUnitTest.java | 368 ++++++++++++++++++ .../cache/query/dunit/PdxQueryDUnitTest.java | 67 ++-- .../geode/cache/client/internal/QueryOp.java | 28 +- 5 files changed, 442 insertions(+), 46 deletions(-) create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java index 228362aa6692..3f59d97e224e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.geode.LogWriter; import org.apache.geode.cache.AttributesFactory; @@ -82,7 +83,7 @@ public final void preTearDownCacheTestCase() throws Exception { preTearDownPDXQueryTestBase(); disconnectAllFromDS(); // tests all expect to create a new ds // Reset the testObject numinstance for the next test. - TestObject.numInstance = 0; + TestObject.numInstance.set(0); // In all VM. resetTestObjectInstanceCount(); } @@ -96,11 +97,11 @@ private void resetTestObjectInstanceCount() { vm.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - TestObject.numInstance = 0; + TestObject.numInstance.set(0); PortfolioPdx.numInstance = 0; PositionPdx.numInstance = 0; PositionPdx.cnt = 0; - TestObject2.numInstance = 0; + TestObject2.numInstance.set(0); } }); } @@ -306,15 +307,15 @@ protected void startCacheServer(int port, boolean notifyBySubscription) throws I public static class TestObject2 implements PdxSerializable { public int _id; - public static int numInstance = 0; + public static AtomicInteger numInstance = new AtomicInteger(); public TestObject2() { - numInstance++; + numInstance.incrementAndGet(); } public TestObject2(int id) { this._id = id; - numInstance++; + numInstance.incrementAndGet(); } public int getId() { @@ -359,7 +360,7 @@ public static class TestObject implements PdxSerializable { public int important; public int selection; public int select; - public static int numInstance = 0; + public static final AtomicInteger numInstance = new AtomicInteger(); public Map idTickers = new HashMap(); public HashMap positions = new HashMap(); public TestObject2 test; @@ -368,7 +369,7 @@ public TestObject() { if (log != null) { log.info("TestObject ctor stack trace", new Exception()); } - numInstance++; + numInstance.incrementAndGet(); } public TestObject(int id, String ticker) { @@ -381,7 +382,7 @@ public TestObject(int id, String ticker) { this.important = id; this.selection = id; this.select = id; - numInstance++; + numInstance.incrementAndGet(); idTickers.put(id + "", ticker); this.test = new TestObject2(id); } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java index 1c4ce101e873..60484c126e2a 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java @@ -899,7 +899,7 @@ public void run2() throws CacheException { protected final void preTearDownPDXQueryTestBase() throws Exception { disconnectAllFromDS(); // tests all expect to create a new ds // Reset the testObject numinstance for the next test. - TestObject.numInstance = 0; + TestObject.numInstance.set(0); PortfolioPdx.DEBUG = false; // In all VM. resetTestObjectInstanceCount(); @@ -917,11 +917,11 @@ private void resetTestObjectInstanceCount() { vm.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - TestObject.numInstance = 0; + TestObject.numInstance.set(0); PortfolioPdx.numInstance = 0; PositionPdx.numInstance = 0; PositionPdx.cnt = 0; - TestObject2.numInstance = 0; + TestObject2.numInstance.set(0); } }); } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java new file mode 100644 index 000000000000..6e1bc7eb62e6 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxMultiThreadQueryDUnitTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.dunit; + +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.SerializationException; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.client.ServerConnectivityException; +import org.apache.geode.cache.client.ServerOperationException; +import org.apache.geode.cache.query.FunctionDomainException; +import org.apache.geode.cache.query.NameResolutionException; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryInvocationTargetException; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.TypeMismatchException; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.pdx.PdxReader; +import org.apache.geode.pdx.PdxSerializable; +import org.apache.geode.pdx.PdxSerializationException; +import org.apache.geode.pdx.PdxWriter; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.OQLQueryTest; +import org.apache.geode.test.version.VersionManager; +import org.apache.geode.util.internal.GeodeGlossary; + +@Category({OQLQueryTest.class}) +public class PdxMultiThreadQueryDUnitTest extends PDXQueryTestBase { + public static final Logger logger = LogService.getLogger(); + static final int numberOfEntries = 100; + final String poolName = "testClientServerQueryPool"; + final String hostName = NetworkUtils.getServerHostName(); + private VM server0; + private VM server1; + private VM server2; + private VM client; + private int port0; + private int port1; + private int port2; + + public PdxMultiThreadQueryDUnitTest() { + super(); + } + + @Before + public void startUpServersAndClient() { + final Host host = Host.getHost(0); + + server0 = host.getVM(VersionManager.CURRENT_VERSION, 0); + server1 = host.getVM(VersionManager.CURRENT_VERSION, 1); + server2 = host.getVM(VersionManager.CURRENT_VERSION, 2); + client = host.getVM(VersionManager.CURRENT_VERSION, 3); + + // Start servers + for (VM vm : Arrays.asList(server0, server1, server2)) { + vm.invoke((SerializableRunnableIF) this::configAndStartBridgeServer); + } + + port0 = server0.invoke(PdxQueryDUnitTest::getCacheServerPort); + port1 = server1.invoke(PdxQueryDUnitTest::getCacheServerPort); + port2 = server2.invoke(PdxQueryDUnitTest::getCacheServerPort); + } + + @After + public void closeServersAndClient() { + closeClient(client); + closeClient(server2); + closeClient(server1); + closeClient(server0); + } + + @Test + public void testClientServerQuery() throws CacheException { + // create pdx instance at servers + server0.invoke(() -> { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < numberOfEntries; i++) { + region.put("key-" + i, new TestObject(i, "vmware")); + } + }); + + // Create client region + client.invoke(() -> { + ClientCacheFactory cf = new ClientCacheFactory(); + cf.addPoolServer(hostName, port1); + cf.addPoolServer(hostName, port2); + cf.addPoolServer(hostName, port0); + cf.setPdxReadSerialized(false); + ClientCache clientCache = getClientCache(cf); + Region region = + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + + logger.info("### Executing Query on server: " + queryString[1] + ": from client region: " + + region.getFullPath()); + final int size = 100; + IntStream.range(0, size).parallel().forEach(a -> { + try { + SelectResults selectResults = + region.query(queryString[1]); + assertThat(selectResults.size()).isEqualTo(numberOfEntries); + } catch (FunctionDomainException | TypeMismatchException | NameResolutionException + | QueryInvocationTargetException e) { + fail("Unexpected query exception:" + e.getMessage()); + } + }); + await().until(() -> TestObject.numInstance.get() == size * numberOfEntries); + }); + } + + @Test + public void testClientServerQueryUsingRemoteQueryService() + throws CacheException, InterruptedException { + // create pdx instance at servers + server0.invoke(() -> { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < numberOfEntries; i++) { + region.put("key-" + i, new TestObject(i, "vmware")); + } + }); + + // Create client pool. + createPool(client, poolName, new String[] {hostName, hostName, hostName}, + new int[] {port0, port1, port2}, true); + + final int size = 100; + AsyncInvocation[] asyncInvocationArray = new AsyncInvocation[size]; + for (int i = 0; i < size; i++) { + asyncInvocationArray[i] = + client.invokeAsync(() -> { + QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService(); + logger.info("### Executing Query on server: " + queryString[1]); + Query query = remoteQueryService.newQuery(queryString[1]); + SelectResults selectResults = + (SelectResults) query.execute(); + assertThat(selectResults.size()).isEqualTo(numberOfEntries); + }); + } + + for (int i = 0; i < size; i++) { + asyncInvocationArray[i].await(); + } + client + .invoke(() -> await().until(() -> TestObject.numInstance.get() == size * numberOfEntries)); + } + + @Test + public void testRetrySucceedWithPdxSerializationException() throws CacheException { + // create pdx instance at servers + server0.invoke(() -> { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < numberOfEntries; i++) { + region.put("key-" + i, new TestObjectThrowsPdxSerializationException()); + } + }); + + // Create client pool with 3 servers + createPool(client, poolName, new String[] {hostName, hostName, hostName}, + new int[] {port0, port1, port2}, true); + + client.invoke(() -> { + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true"); + try { + TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true; + QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService(); + logger.info("### Executing Query on server: " + queryString[1]); + Query query = remoteQueryService.newQuery(queryString[1]); + SelectResults selectResults = + (SelectResults) query.execute(); + assertThat(selectResults.size()).isEqualTo(numberOfEntries); + // the 2 failed try incremented numInstance + assertThat(numberOfEntries + 2) + .isEqualTo(TestObjectThrowsPdxSerializationException.numInstance.get()); + } finally { + assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization) + .isFalse(); + TestObjectThrowsPdxSerializationException.numInstance.set(0); + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false"); + } + }); + } + + @Test + public void testRetryNotEnabledBySystemProperty() throws CacheException { + // create pdx instance at servers + server0.invoke(() -> { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < numberOfEntries; i++) { + region.put("key-" + i, new TestObjectThrowsPdxSerializationException()); + } + }); + + // Create client pool with only 2 servers to test that retry will not run forever + createPool(client, poolName, new String[] {hostName, hostName}, + new int[] {port0, port1}, true); + + client.invoke(() -> { + try { + // If the client did not explicitly specify GeodeGlossary.GEMFIRE_PREFIX + + // "enableQueryRetryOnPdxSerializationException" to true, retry will not be enabled + TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true; + QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService(); + logger.info("### Executing Query on server: " + queryString[1]); + Query query = remoteQueryService.newQuery(queryString[1]); + assertThatThrownBy(query::execute).isInstanceOf(ServerOperationException.class) + .hasCauseInstanceOf(SerializationException.class); + assertThat(TestObjectThrowsPdxSerializationException.numInstance.get()).isEqualTo(1); + assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization) + .isTrue(); + } finally { + TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = false; + TestObjectThrowsPdxSerializationException.numInstance.set(0); + } + }); + } + + @Test + public void testNotToRetryOnRegularSerializationException() throws CacheException { + // create pdx instance at servers + server0.invoke(() -> { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < numberOfEntries; i++) { + region.put("key-" + i, new TestObjectThrowsSerializationException()); + } + }); + + // Create client pool with only 2 servers to test that retry will not run forever + createPool(client, poolName, new String[] {hostName, hostName}, + new int[] {port0, port1}, true); + + client.invoke(() -> { + try { + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true"); + TestObjectThrowsSerializationException.throwExceptionOnDeserialization = true; + QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService(); + logger.info("### Executing Query on server: " + queryString[1]); + Query query = remoteQueryService.newQuery(queryString[1]); + assertThatThrownBy(query::execute).isInstanceOf(ServerOperationException.class) + .hasCauseInstanceOf(SerializationException.class); + assertThat(TestObjectThrowsSerializationException.numInstance.get()).isEqualTo(1); + assertThat(TestObjectThrowsSerializationException.throwExceptionOnDeserialization) + .isTrue(); + } finally { + TestObjectThrowsSerializationException.throwExceptionOnDeserialization = false; + TestObjectThrowsSerializationException.numInstance.set(0); + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false"); + } + }); + } + + @Test + public void testRetryFailedWithServerConnectivityException() throws CacheException { + // create pdx instance at servers + server0.invoke(() -> { + Region region = getRootRegion().getSubregion(regionName); + for (int i = 0; i < numberOfEntries; i++) { + region.put("key-" + i, new TestObjectThrowsPdxSerializationException()); + } + }); + + // Create client pool with only 2 servers to test that retry will not run forever + createPool(client, poolName, new String[] {hostName, hostName}, + new int[] {port0, port1}, true); + + client.invoke(() -> { + try { + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "true"); + TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization = true; + QueryService remoteQueryService = (PoolManager.find(poolName)).getQueryService(); + logger.info("### Executing Query on server: " + queryString[1]); + Query query = remoteQueryService.newQuery(queryString[1]); + assertThatThrownBy(query::execute).isInstanceOf(ServerConnectivityException.class); + assertThat(TestObjectThrowsPdxSerializationException.numInstance.get()).isEqualTo(2); + assertThat(TestObjectThrowsPdxSerializationException.throwExceptionOnDeserialization) + .isFalse(); + } finally { + TestObjectThrowsPdxSerializationException.numInstance.set(0); + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException", "false"); + } + }); + } + + public static class TestObjectThrowsPdxSerializationException implements PdxSerializable { + private static boolean throwExceptionOnDeserialization = false; + public static AtomicInteger numInstance = new AtomicInteger(); + + public TestObjectThrowsPdxSerializationException() { + numInstance.incrementAndGet(); + } + + @Override + public void toData(PdxWriter writer) {} + + @Override + public void fromData(PdxReader reader) { + if (throwExceptionOnDeserialization) { + if (numInstance.get() >= 2) { + // after retried 2 servers, let the retry to 3rd server succeed + throwExceptionOnDeserialization = false; + } + throw new PdxSerializationException("Deserialization is expected to fail in this VM"); + } + } + } + + public static class TestObjectThrowsSerializationException implements PdxSerializable { + private static boolean throwExceptionOnDeserialization = false; + public static AtomicInteger numInstance = new AtomicInteger(); + + public TestObjectThrowsSerializationException() { + numInstance.incrementAndGet(); + } + + @Override + public void toData(PdxWriter writer) {} + + @Override + public void fromData(PdxReader reader) { + if (throwExceptionOnDeserialization) { + if (numInstance.get() >= 2) { + // after retried 2 servers, let the retry to 3rd server succeed + throwExceptionOnDeserialization = false; + } + throw new SerializationException("Deserialization is expected to fail in this VM"); + } + } + } +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java index 0fa0067badf7..9ff3777da231 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/PdxQueryDUnitTest.java @@ -113,7 +113,7 @@ public void run2() throws CacheException { public void run2() throws CacheException { configAndStartBridgeServer(); Region region = getRootRegion().getSubregion(regionName); - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); // Execute query with different type of Results. QueryService qs = getCache().getQueryService(); @@ -140,7 +140,7 @@ public void run2() throws CacheException { } // Pdx objects for local queries now get deserialized when results are iterated. // So the deserialized objects are no longer cached in VMCachedDeserializable. - assertEquals(numberOfEntries * 2, TestObject.numInstance); + assertEquals(numberOfEntries * 2, TestObject.numInstance.get()); } }); @@ -188,7 +188,7 @@ public void run2() throws CacheException { configAndStartBridgeServer(); Region region = getRootRegion().getSubregion(regionName); System.out.println("##### Region size is: " + region.size()); - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -232,7 +232,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -280,7 +280,7 @@ public void run2() throws CacheException { configAndStartBridgeServer(); Region region = getRootRegion().getSubregion(regionName); System.out.println("##### Region size is: " + region.size()); - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -328,7 +328,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -380,7 +380,7 @@ public void run2() throws CacheException { configAndStartBridgeServer(); Region region = getRootRegion().getSubregion(regionName); System.out.println("##### Region size is: " + region.size()); - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -426,7 +426,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -550,7 +550,7 @@ public void run2() throws CacheException { } } - assertEquals(2 * numberOfEntries, TestObject.numInstance); + assertEquals(2 * numberOfEntries, TestObject.numInstance.get()); } }; @@ -561,7 +561,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -856,7 +856,7 @@ public void run2() throws CacheException { Assert.fail("Failed executing " + queryStr, e); } - assertEquals(numberOfEntries, TestObject.numInstance); + assertEquals(numberOfEntries, TestObject.numInstance.get()); } }; @@ -867,7 +867,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1149,7 +1149,8 @@ public void run2() throws CacheException { } } - assertEquals(2 * (numberOfEntries + 5), (TestObject.numInstance + TestObject2.numInstance)); + assertEquals(2 * (numberOfEntries + 5), + (TestObject.numInstance.get() + TestObject2.numInstance.get())); } }; @@ -1160,7 +1161,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1246,7 +1247,7 @@ public void run2() throws CacheException { Assert.fail("Failed executing " + queryString[i], e); } } - assertEquals(numberOfEntries, TestObject.numInstance); + assertEquals(numberOfEntries, TestObject.numInstance.get()); } }); @@ -1254,7 +1255,7 @@ public void run2() throws CacheException { vm0.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(numberOfEntries, TestObject.numInstance); + assertEquals(numberOfEntries, TestObject.numInstance.get()); } }); @@ -1263,7 +1264,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1272,7 +1273,7 @@ public void run2() throws CacheException { vm2.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1302,7 +1303,7 @@ public void run2() throws CacheException { } } } - if (TestObject.numInstance <= 0) { + if (TestObject.numInstance.get() <= 0) { fail("Expected TestObject instance to be >= 0."); } } @@ -1313,7 +1314,7 @@ public void run2() throws CacheException { vm2.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1623,7 +1624,7 @@ public void run2() throws CacheException { } catch (Exception ex) { fail("Unable to create index. " + ex.getMessage()); } - assertEquals(numberOfEntries, TestObject.numInstance); + assertEquals(numberOfEntries, TestObject.numInstance.get()); } }); @@ -1643,7 +1644,7 @@ public void run2() throws CacheException { } catch (Exception ex) { fail("Unable to create index. " + ex.getMessage()); } - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1683,14 +1684,14 @@ public void run2() throws CacheException { vm0.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(numberOfEntries, TestObject.numInstance); + assertEquals(numberOfEntries, TestObject.numInstance.get()); } }); vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1731,7 +1732,7 @@ public void run2() throws CacheException { } } - assertEquals(4 * numberOfEntries, TestObject.numInstance); + assertEquals(4 * numberOfEntries, TestObject.numInstance.get()); for (int i = 3; i < queryString.length; i++) { try { @@ -1763,7 +1764,7 @@ public void run2() throws CacheException { vm0.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(numberOfEntries, TestObject.numInstance); + assertEquals(numberOfEntries, TestObject.numInstance.get()); } }); @@ -1772,7 +1773,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1892,7 +1893,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -1920,7 +1921,7 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("Create cache server") { @Override public void run2() throws CacheException { - assertEquals(0, TestObject.numInstance); + assertEquals(0, TestObject.numInstance.get()); } }); @@ -2560,14 +2561,14 @@ public void run2() throws CacheException { vm1.invoke(new CacheSerializableRunnable("validate") { @Override public void run2() throws CacheException { - assertEquals(testObjectCnt, TestObject.numInstance); + assertEquals(testObjectCnt, TestObject.numInstance.get()); assertEquals(positionObjectCnt, PositionPdx.numInstance); - assertEquals(testObjCnt, TestObject2.numInstance); + assertEquals(testObjCnt, TestObject2.numInstance.get()); // Reset the instances - TestObject.numInstance = 0; + TestObject.numInstance.set(0); PositionPdx.numInstance = 0; - TestObject2.numInstance = 0; + TestObject2.numInstance.set(0); } }); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java index 0a37be084ff8..1675cae93aa8 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueryOp.java @@ -14,6 +14,7 @@ */ package org.apache.geode.cache.client.internal; +import java.io.IOException; import java.util.Arrays; import org.apache.geode.SerializationException; @@ -31,6 +32,9 @@ import org.apache.geode.internal.cache.tier.sockets.ObjectPartList; import org.apache.geode.internal.cache.tier.sockets.Part; import org.apache.geode.internal.serialization.KnownVersion; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.pdx.PdxSerializationException; +import org.apache.geode.util.internal.GeodeGlossary; /** * Does a region query on a server @@ -120,7 +124,29 @@ protected Object processResponse(Message msg) throws Exception { queryResult = resultPart.getObject(); } catch (Exception e) { String s = "While deserializing " + getOpName() + " result"; - exceptionRef[0] = new SerializationException(s, e); + + // Enable the workaround to convert PdxSerializationException into IOException to retry. + // It only worked when the client is configured to connect to more than one cache server + // AND the pool's "retry-attempts" is -1 (the default which means try each server) or > 0. + // It is possible that if application closed the current connection and got a new + // connection to the same server and retried the query to it, that it would also + // workaround this issue and it would not have the limitations of needing multiple servers + // and would not depend on the retry-attempts configuration. + boolean enableQueryRetryOnPdxSerializationException = Boolean.getBoolean( + GeodeGlossary.GEMFIRE_PREFIX + "enableQueryRetryOnPdxSerializationException"); + if (e instanceof PdxSerializationException + && enableQueryRetryOnPdxSerializationException) { + // IOException will allow the client to retry next server in the connection pool until + // exhausted all the servers (so it will not retry forever). Why retry: + // The byte array of the pdxInstance is always the same at the server. Other clients can + // get a correct one from query response message. Even this client can get it correctly + // before and after the PdxSerializationException. + exceptionRef[0] = new IOException(s, e); + LogService.getLogger().warn( + "Encountered unexpected PdxSerializationException, retrying on another server"); + } else { + exceptionRef[0] = new SerializationException(s, e); + } return; } if (queryResult instanceof Throwable) {