Skip to content

Commit

Permalink
GEODE-2954 Old client gets null memberID in cache listener
Browse files Browse the repository at this point in the history
I've added a new test that demonstrates that a new-version server sends
an EventID to a client that the client is unable to deserialize
completely.  It gets an error when deserializing its member ID,
causing cache listeners to get a null when requesting the ID of
the member that effected the change.

The fix is to reserialize the member ID in EventID.toData if the
destination stream is for an older version, such as a 1.1.0 client.
This ensures the proper on-wire format is used for that version of Geode.

I've also bumped up the version ordinal for 1.2 since version 59 is
marked as unusable in Version.java.

I've changed the Banner to show the version ordinal because the other
version information in the banner isn't completely trustworthy.  It
looks for a GemFireVersion.properties file on the classpath to get
this information and so it may not get it from the Geode jar file
as expected.
  • Loading branch information
bschuchardt committed May 25, 2017
1 parent 096c22d commit e79d27d
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ static void print(PrintWriter out, String args[]) {

GemFireVersion.print(out);

out.println("Communications version: " + Version.CURRENT_ORDINAL);

out.println("Process ID: " + processId);
out.println("User: " + sp.get("user.name"));
sp.remove("user.name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class Version implements Comparable<Version> {
/** byte used as ordinal to represent this <code>Version</code> */
private final short ordinal;

public static final int HIGHEST_VERSION = 60;
public static final int HIGHEST_VERSION = 65;

private static final Version[] VALUES = new Version[HIGHEST_VERSION + 1];

Expand Down Expand Up @@ -190,7 +190,7 @@ public class Version implements Comparable<Version> {
public static final Version GEODE_111 =
new Version("GEODE", "1.1.1", (byte) 1, (byte) 1, (byte) 1, (byte) 0, GEODE_111_ORDINAL);

private static final byte GEODE_120_ORDINAL = 60;
private static final byte GEODE_120_ORDINAL = 65;

public static final Version GEODE_120 =
new Version("GEODE", "1.2.0", (byte) 1, (byte) 2, (byte) 0, (byte) 0, GEODE_120_ORDINAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.geode.internal.InternalDataSerializer;
import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
Expand Down Expand Up @@ -322,7 +323,15 @@ public int getDSFID() {
}

public void toData(DataOutput dop) throws IOException {
DataSerializer.writeByteArray(this.membershipID, dop);
Version version = InternalDataSerializer.getVersionForDataStream(dop);
if (version.compareTo(Version.GFE_90) <= 0) {
InternalDistributedMember member = getDistributedMember();
HeapDataOutputStream hdos = new HeapDataOutputStream(version);
member.writeEssentialData(hdos);
DataSerializer.writeByteArray(hdos.toByteArray(), dop);
} else {
DataSerializer.writeByteArray(this.membershipID, dop);
}
DataSerializer.writeByteArray(getOptimizedByteArrayForEventID(this.threadID, this.sequenceID),
dop);
dop.writeInt(this.bucketID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,28 @@
*/
package org.apache.geode.internal.cache.tier.sockets;

import static org.junit.Assert.assertFalse;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.standalone.VersionManager;
import org.apache.geode.test.junit.categories.BackwardCompatibilityTest;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Category({DistributedTest.class, ClientServerTest.class, BackwardCompatibilityTest.class})
@RunWith(Parameterized.class)
Expand All @@ -46,4 +57,37 @@ public ClientServerMiscBCDUnitTest(String version) {
testVersion = version;
}

@Test
public void testSubscriptionWithCurrentServerAndOldClients() throws Exception {
// start server first
int serverPort = initServerCache(true);
VM client1 = Host.getHost(0).getVM(testVersion, 1);
VM client2 = Host.getHost(0).getVM(testVersion, 3);
String hostname = NetworkUtils.getServerHostName(Host.getHost(0));
client1.invoke("create client1 cache", () -> {
createClientCache(hostname, serverPort);
populateCache();
registerInterest();
});
client2.invoke("create client2 cache", () -> {
Pool ignore = createClientCache(hostname, serverPort);
});

client2.invoke("putting data in client2", () -> putForClient());

// client1 will receive client2's updates asynchronously
client1.invoke(() -> {
Region r2 = getCache().getRegion(REGION_NAME2);
MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener();
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> verifier.eventReceived);
});

// client2's update should have included a memberID - GEODE-2954
client1.invoke(() -> {
Region r2 = getCache().getRegion(REGION_NAME2);
MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener();
assertFalse(verifier.memberIDNotReceived);
});
}

}
Loading

0 comments on commit e79d27d

Please sign in to comment.