Skip to content

Commit

Permalink
GEODE-8687: Fix for handling of PdxSerializationException on client (a…
Browse files Browse the repository at this point in the history
…pache#5730)

* GEODE-8687: Improve handling of seralization error

* Improves handling of PdxSerializationException on client at the reception
of events from subscription queue

* Faulty behavior: At the reception of event for which
PdxSerializationException is thrown the client would always shutdown
CacheClientUpdater, destroy subscription queue connection
and try to perform failover to other server in cluster

* Behaviour with this fix: At the reception of event that provoke
PdxSerializationException client will only log the exception

* DurableClientCQAutoSerializer test updated

* Empty commit to trigger test

* Updates after review
  • Loading branch information
jvarenina authored Dec 3, 2020
1 parent a39c200 commit 2e7e456
Show file tree
Hide file tree
Showing 5 changed files with 567 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.pdx.PdxSerializationException;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
Expand Down Expand Up @@ -1793,7 +1794,7 @@ private Object deserialize(byte[] serializedBytes) {
// This is a debugging method so ignore all exceptions like ClassNotFoundException
try (ByteArrayDataInput dis = new ByteArrayDataInput(serializedBytes)) {
deserializedObject = DataSerializer.readObject(dis);
} catch (ClassNotFoundException | IOException ignore) {
} catch (ClassNotFoundException | IOException | PdxSerializationException e) {
}
return deserializedObject;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.Serializable;
import java.util.Map;
import java.util.Objects;

import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
import org.apache.geode.pdx.internal.AutoSerializableManager;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.test.junit.categories.SerializationTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;

@Category({ClientSubscriptionTest.class, SerializationTest.class})
public class DurableClientCQAutoSerializerDUnitTest implements Serializable {
private static final String REPLICATE_REGION_NAME = "ReplicateRegion";
private static final String PARTITION_REGION_NAME = "PartitionRegion";

private MemberVM server;
private MemberVM server2;
private MemberVM locator;
private ClientVM client;
private ClientVM client2;

private static TestAutoSerializerCqListener cqListener = null;

private static final String TEST_OBJECT1_CLASS_PATH =
"org.apache.geode.internal.cache.tier.sockets.TestAutoSerializerObject1";
private static final String TEST_OBJECT2_CLASS_PATH =
"org.apache.geode.internal.cache.tier.sockets.TestAutoSerializerObject2";
private static final String TEST_FAULTY_CLASS_PATH =
"org.apache.geode.internal.cache.tier.sockets.TestAutoSerializerObject2Faulty";
private static final String DURABLE_CLIENT_ID = "durableClient";

// Traffic data
static final Map<String, TestAutoSerializerObject1> LIST_TEST_OBJECT1 = ImmutableMap.of(
"key1", new TestAutoSerializerObject1("aa", "bb", 300),
"key2", new TestAutoSerializerObject1("aa", "bb", 600),
"key3", new TestAutoSerializerObject1("aaa", "bbb", 500));

static final Map<String, TestAutoSerializerObject2> LIST_TEST_OBJECT2 = ImmutableMap.of(
"key1", new TestAutoSerializerObject2("cc", "ddd", 300),
"key2", new TestAutoSerializerObject2("cc", "dddd", 400));

@Rule
public GfshCommandRule gfsh = new GfshCommandRule();

@Rule
public ClusterStartupRule cluster = new ClusterStartupRule(5);

@Before
public void setUp() throws Exception {
Invoke.invokeInEveryVM(
() -> System.setProperty(AutoSerializableManager.NO_HARDCODED_EXCLUDES_PARAM, "true"));

locator =
cluster.startLocatorVM(0);
int locatorPort = locator.getPort();
server = cluster.startServerVM(1,
s -> s.withConnectionToLocator(locatorPort));

server2 = cluster.startServerVM(2,
s -> s.withConnectionToLocator(locatorPort));

gfsh.connectAndVerify(locator);
gfsh.executeAndAssertThat(
"configure pdx --auto-serializable-classes='" + TEST_OBJECT1_CLASS_PATH + ", "
+ TEST_OBJECT2_CLASS_PATH + "'")
.statusIsSuccess();
gfsh.executeAndAssertThat("create region --name=" + REPLICATE_REGION_NAME + " --type=REPLICATE")
.statusIsSuccess();
gfsh.executeAndAssertThat("create region --name=" + PARTITION_REGION_NAME + " --type=PARTITION")
.statusIsSuccess();

locator.invoke(() -> {
ClusterStartupRule.memberStarter
.waitUntilRegionIsReadyOnExactlyThisManyServers(SEPARATOR + REPLICATE_REGION_NAME, 2);
ClusterStartupRule.memberStarter
.waitUntilRegionIsReadyOnExactlyThisManyServers(SEPARATOR + PARTITION_REGION_NAME, 2);
});
}

@Test
public void testCorrectClassPathsAutoSerializer()
throws Exception {

String query1 = "SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME;
String query2 = "SELECT * FROM " + SEPARATOR + PARTITION_REGION_NAME;

startDurableClient(TEST_OBJECT1_CLASS_PATH, TEST_OBJECT2_CLASS_PATH);
createDurableCQs(query1, query2);
verifyThatOnlyOneServerHostDurableSubscription();

// Start another client and provision data with traffic that should trigger CQs
startDataProvisionClient(TEST_OBJECT1_CLASS_PATH, TEST_OBJECT2_CLASS_PATH);
provisionRegionsWithData();

// Check that all events are received and successfully deserialized in cq listener
checkCqEvents(LIST_TEST_OBJECT1.size(), LIST_TEST_OBJECT2.size());
verifyThatOnlyOneServerHostDurableSubscription();
}

@Test
public void testFaultyClassPathAutoSerializer()
throws Exception {
String query1 = "SELECT * FROM " + SEPARATOR + REPLICATE_REGION_NAME;
String query2 = "SELECT * FROM " + SEPARATOR + PARTITION_REGION_NAME;
startDurableClient(TEST_FAULTY_CLASS_PATH, TEST_OBJECT2_CLASS_PATH);
createDurableCQs(query1, query2);
verifyThatOnlyOneServerHostDurableSubscription();

// Start another client and provision data with traffic that should trigger CQs
startDataProvisionClient(TEST_OBJECT1_CLASS_PATH, TEST_OBJECT2_CLASS_PATH);
provisionRegionsWithData();

// Check that only events for which ReflectionBasedAutoSerializer is correctly set are received
// and successfully deserialized in cq listener
checkCqEvents(0, LIST_TEST_OBJECT2.size());
verifyThatOnlyOneServerHostDurableSubscription();
}

private void startDataProvisionClient(String... patterns) throws Exception {
int locatorPort = locator.getPort();
client2 = cluster.startClientVM(4, ccf -> ccf
.withLocatorConnection(locatorPort).withCacheSetup(c -> c
.setPdxSerializer(new ReflectionBasedAutoSerializer(patterns))));
}

private void startDurableClient(String... patterns)
throws Exception {
int locatorPort = locator.getPort();
client = cluster.startClientVM(3, ccf -> ccf
.withPoolSubscription(true).withLocatorConnection(locatorPort).withCacheSetup(c -> c
.setPdxSerializer(new ReflectionBasedAutoSerializer(patterns))
.set("durable-client-id", DURABLE_CLIENT_ID)));
}

private void createDurableCQs(String... queries) {
client.invoke(() -> {
TestAutoSerializerCqListener cqListener = new TestAutoSerializerCqListener();
DurableClientCQAutoSerializerDUnitTest.cqListener = cqListener;
assertThat(ClusterStartupRule.getClientCache()).isNotNull();
QueryService queryService = ClusterStartupRule.getClientCache().getQueryService();
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
cqAttributesFactory.addCqListener(cqListener);

for (String query : queries) {
CqQuery cq = queryService.newCq(query, cqAttributesFactory.create(), true);
cq.execute();
}
ClusterStartupRule.getClientCache().readyForEvents();
});
}

boolean isPrimaryServer(int primaryPort, MemberVM member) {
return primaryPort == member.getPort();
}

private void verifyThatOnlyOneServerHostDurableSubscription() {
int primPort = getPrimaryServerPort(client);
verifyDurableClientPresence(server, isPrimaryServer(primPort, server));
verifyDurableClientPresence(server2, isPrimaryServer(primPort, server2));
}

private void checkCqEvents(int expectedTestAutoSerializerObject1,
int expectedTestAutoSerializerObject2) {
// Check if number of events is correct
client.invoke(() -> {
await().untilAsserted(() -> assertThat(
DurableClientCQAutoSerializerDUnitTest.cqListener.getNumEvents())
.isEqualTo(expectedTestAutoSerializerObject1 + expectedTestAutoSerializerObject2));

// Check if events are deserialized correctly
if (expectedTestAutoSerializerObject1 != 0) {
assertEquals(DurableClientCQAutoSerializerDUnitTest.cqListener.testAutoSerializerObject1,
LIST_TEST_OBJECT1);
}
if (expectedTestAutoSerializerObject2 != 0) {
assertEquals(DurableClientCQAutoSerializerDUnitTest.cqListener.testAutoSerializerObject2,
LIST_TEST_OBJECT2);
}
});
}

private void verifyDurableClientPresence(MemberVM serverVM, boolean isExpected) {
serverVM.invoke(() -> {
await()
.until(() -> isExpected == (getNumberOfClientProxies() == 1));

if (isExpected) {
// Get the CacheClientProxy or not (if proxy set is empty)
CacheClientProxy proxy = getClientProxy();
assertThat(proxy).isNotNull();
// Verify that it is durable and its properties are correct
assertThat(proxy.isDurable()).isTrue();
assertThat(DURABLE_CLIENT_ID).isEqualTo(proxy.getDurableId());
}
});
}

private static CacheClientProxy getClientProxy() {
// Get the CacheClientProxy or not (if proxy set is empty)
CacheClientProxy proxy = null;
java.util.Iterator<CacheClientProxy> i = getCacheClientNotifier().getClientProxies().iterator();
if (i.hasNext()) {
proxy = i.next();
}
return proxy;
}

private static CacheClientNotifier getCacheClientNotifier() {
// Get the CacheClientNotifier
CacheServerImpl cacheServer = (CacheServerImpl) Objects
.requireNonNull(ClusterStartupRule.getCache()).getCacheServers().iterator().next();
assertNotNull(cacheServer);

// Get the CacheClientNotifier
return cacheServer.getAcceptor().getCacheClientNotifier();
}

private static int getNumberOfClientProxies() {
return getCacheClientNotifier().getClientProxies().size();
}

private void provisionRegionsWithData() {
client2.invoke(() -> {
ClientRegionFactory factory =
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY);
Region<String, TestAutoSerializerObject1> region = factory.create(REPLICATE_REGION_NAME);

// provision TestAutoSerializerObject1 data
for (Map.Entry<String, TestAutoSerializerObject1> entry : LIST_TEST_OBJECT1.entrySet()) {
region.put(entry.getKey(), entry.getValue());
}

Region<String, TestAutoSerializerObject2> region2 = factory.create(PARTITION_REGION_NAME);
// provision TestAutoSerializerObject2 data
for (Map.Entry<String, TestAutoSerializerObject2> entry : LIST_TEST_OBJECT2.entrySet()) {
region2.put(entry.getKey(), entry.getValue());
}
});
}

private int getPrimaryServerPort(ClientVM client) {
return client.invoke(() -> {
ClientCache cache = ClusterStartupRule.getClientCache();
PoolImpl pool = (PoolImpl) cache.getDefaultPool();
return pool.getPrimaryPort();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;

import java.util.HashMap;
import java.util.Map;

import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqListener;

public class TestAutoSerializerCqListener implements CqListener {
private int numEvents = 0;
private int numErrors = 0;
public Map<String, TestAutoSerializerObject1> testAutoSerializerObject1 = new HashMap<>();
public Map<String, TestAutoSerializerObject2> testAutoSerializerObject2 = new HashMap<>();

public int getNumEvents() {
return numEvents;
}

public int getNumErrors() {
return numErrors;
}

@Override
public void onEvent(CqEvent aCqEvent) {
Object obj = aCqEvent.getNewValue();
if (obj instanceof TestAutoSerializerObject1) {
testAutoSerializerObject1.put((String) aCqEvent.getKey(),
(TestAutoSerializerObject1) aCqEvent.getNewValue());
} else if (obj instanceof TestAutoSerializerObject2) {
testAutoSerializerObject2.put((String) aCqEvent.getKey(),
(TestAutoSerializerObject2) aCqEvent.getNewValue());
}
numEvents++;
}

@Override
public void onError(CqEvent aCqEvent) {
numErrors++;
}

@Override
public void close() {}
}
Loading

0 comments on commit 2e7e456

Please sign in to comment.