Skip to content

Commit

Permalink
GEODE-8365: Redis Delta not propagating updated hash values properly (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
sabbey37 authored Jul 17, 2020
1 parent 85ab541 commit 28fb073
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public static void setup() {
ports.put(SERVER1, redisContainer.getFirstMappedPort());

jedisConnetedToServer1 = new Jedis("localhost", ports.get(SERVER1), JEDIS_TIMEOUT);
startSpringApp(APP1, SERVER1, DEFAULT_SESSION_TIMEOUT);
startSpringApp(APP2, SERVER1, DEFAULT_SESSION_TIMEOUT);
startSpringApp(APP1, DEFAULT_SESSION_TIMEOUT, ports.get(SERVER1));
startSpringApp(APP2, DEFAULT_SESSION_TIMEOUT, ports.get(SERVER1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public static void setup() {
ports.put(SERVER1, redisContainer.getFirstMappedPort());

jedisConnetedToServer1 = new Jedis("localhost", ports.get(SERVER1), JEDIS_TIMEOUT);
startSpringApp(APP1, SERVER1, SHORT_SESSION_TIMEOUT);
startSpringApp(APP2, SERVER1, SHORT_SESSION_TIMEOUT);
startSpringApp(APP1, SHORT_SESSION_TIMEOUT, ports.get(SERVER1));
startSpringApp(APP2, SHORT_SESSION_TIMEOUT, ports.get(SERVER1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.Random;

import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -36,10 +32,9 @@
import org.junit.Test;
import redis.clients.jedis.Jedis;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.internal.cache.BucketDump;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
Expand All @@ -66,6 +61,7 @@ public class DeltaDUnitTest {

private static int redisServerPort1;
private static int redisServerPort2;
private static Random random;

@BeforeClass
public static void classSetup() {
Expand All @@ -81,6 +77,7 @@ public static void classSetup() {

jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
random = new Random();
}

@Before
Expand All @@ -104,59 +101,21 @@ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenAppending() {
jedis1.set(key, baseValue);
for (int i = 0; i < ITERATION_COUNT; i++) {
jedis1.append(key, String.valueOf(i));

byte[] server1LocalValue = server1.invoke(() -> (byte[]) getLocalData(key, r -> {
RedisData localValue = r.get(new ByteArrayWrapper(key.getBytes()));

try {
return BlobHelper.serializeToBlob(localValue);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));

byte[] server2LocalValue = server2.invoke(() -> (byte[]) getLocalData(key, r -> {
RedisData localValue = r.get(new ByteArrayWrapper(key.getBytes()));

try {
return BlobHelper.serializeToBlob(localValue);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));

assertThat(Arrays.equals(server1LocalValue, server2LocalValue));
}
compareBuckets();
}

@Test
public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenAddingToSet() {
String key = "key";

List<String> members = makeMemberList(ITERATION_COUNT, "member-");

for (String member : members) {
jedis1.sadd(key, member);
Set<ByteArrayWrapper> server1LocalSet =
server1.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.smembers();
}));

Set<ByteArrayWrapper> server2LocalSet =
server2.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.smembers();
}));

assertThat(server1LocalSet).containsExactlyInAnyOrder(server2LocalSet.toArray(
new ByteArrayWrapper[] {}));
}

compareBuckets();
}

@Test
Expand All @@ -168,31 +127,8 @@ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenRemovingFromSet()

for (String member : members) {
jedis1.srem(key, member);
Set<ByteArrayWrapper> server1LocalSet =
server1.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.smembers();
}));

Set<ByteArrayWrapper> server2LocalSet =
server2.invoke(() -> (Set<ByteArrayWrapper>) getLocalData(key, r -> {
RedisSet localSet = (RedisSet) r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.smembers();
}));

if (server1LocalSet == null || server2LocalSet == null) {
assertThat(server1LocalSet).isEqualTo(server2LocalSet);
} else {
assertThat(server1LocalSet).containsExactlyInAnyOrder(server2LocalSet.toArray(
new ByteArrayWrapper[] {}));
}
}
compareBuckets();
}

@Test
Expand All @@ -203,22 +139,26 @@ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenAddingToHash() {

for (String field : testMap.keySet()) {
jedis1.hset(key, field, testMap.get(field));
}
compareBuckets();
}

Collection<ByteArrayWrapper> server1LocalHash =
server1.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
return localSet.hgetall();
}));
@Test
public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenUpdatingHashValues() {
String key = "key";

Collection<ByteArrayWrapper> server2LocalHash =
server2.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
return localSet.hgetall();
}));
Map<String, String> testMap = makeHashMap(ITERATION_COUNT, "field-", "value-");
jedis1.hset(key, testMap);

assertThat(server1LocalHash).containsExactlyInAnyOrder(server2LocalHash.toArray(
new ByteArrayWrapper[] {}));
for (int i = 0; i < 100; i++) {
Map<String, String> retrievedMap = jedis1.hgetAll(key);
int rand = random.nextInt(retrievedMap.size());
String fieldToUpdate = "field-" + rand;
String valueToUpdate = retrievedMap.get(fieldToUpdate);
retrievedMap.put(fieldToUpdate, valueToUpdate + " updated");
jedis1.hset(key, retrievedMap);
}
compareBuckets();
}

@Test
Expand All @@ -230,32 +170,8 @@ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenRemovingFromHash(

for (String field : testMap.keySet()) {
jedis1.hdel(key, field, testMap.get(field));

Collection<ByteArrayWrapper> server1LocalHash =
server1.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.hgetall();
}));

Collection<ByteArrayWrapper> server2LocalHash =
server2.invoke(() -> (Collection<ByteArrayWrapper>) getLocalData(key, r -> {
RedisHash localSet = (RedisHash) r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.hgetall();
}));

if (server1LocalHash == null || server2LocalHash == null) {
assertThat(server1LocalHash).isEqualTo(server2LocalHash);
} else {
assertThat(server1LocalHash).containsExactlyInAnyOrder(server2LocalHash.toArray(
new ByteArrayWrapper[] {}));
}
}
compareBuckets();
}

@Test
Expand All @@ -266,34 +182,34 @@ public void shouldCorrectlyPropagateDeltaToSecondaryServer_whenExpiring() {
String key = baseKey + i;
jedis1.set(key, "value");
jedis1.expire(key, 20);
long server1LocalExpirtionTimestamp = server1.invoke(() -> (long) getLocalData(key, r -> {
RedisData localSet = r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.getExpirationTimestamp();
}));

long server2LocalExpirationTimestamp = server2.invoke(() -> (long) getLocalData(key, r -> {
RedisData localSet = r.get(new ByteArrayWrapper(key.getBytes()));
if (localSet == null) {
return null;
}
return localSet.getExpirationTimestamp();
}));

assertThat(server1LocalExpirtionTimestamp).isEqualTo(server2LocalExpirationTimestamp);
}
}

private static Object getLocalData(String key,
Function<Region<ByteArrayWrapper, RedisData>, Object> func) {
InternalCache cache = ClusterStartupRule.getCache();
Region<ByteArrayWrapper, RedisData> region = cache.getRegion("__REDIS_DATA");
Region<ByteArrayWrapper, RedisData> localRegion =
PartitionRegionHelper.getLocalData(region);
for (int i = 0; i < ITERATION_COUNT; i++) {
String key = baseKey + i;
jedis1.expire(key, 80);
}
compareBuckets();
}

return func.apply(localRegion);
private void compareBuckets() {
server1.invoke(() -> {
for (int j = 0; j < 113; j++) {
InternalCache cache = ClusterStartupRule.getCache();
PartitionedRegion region = (PartitionedRegion) cache.getRegion("__REDIS_DATA");
List<BucketDump> buckets = region.getAllBucketEntries(j);
assertThat(buckets.size()).isEqualTo(2);
Map<Object, Object> bucket1 = buckets.get(0).getValues();
Map<Object, Object> bucket2 = buckets.get(1).getValues();
assertThat(bucket1).containsExactlyInAnyOrderEntriesOf(bucket2);

bucket1.keySet().forEach(key -> {
RedisData value1 = (RedisData) bucket1.get(key);
RedisData value2 = (RedisData) bucket2.get(key);

assertThat(value1.getExpirationTimestamp()).isEqualTo(value2.getExpirationTimestamp());
});
}
});
}

private Map<String, String> makeHashMap(int hashSize, String baseFieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class RedisSessionDUnitTest extends SessionDUnitTest {
@BeforeClass
public static void setup() {
SessionDUnitTest.setup();
startSpringApp(APP1, SERVER1, SERVER2, DEFAULT_SESSION_TIMEOUT);
startSpringApp(APP2, SERVER2, SERVER1, DEFAULT_SESSION_TIMEOUT);
startSpringApp(APP1, DEFAULT_SESSION_TIMEOUT, ports.get(SERVER1), ports.get(SERVER2));
startSpringApp(APP2, DEFAULT_SESSION_TIMEOUT, ports.get(SERVER2), ports.get(SERVER1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,32 +112,20 @@ protected static void startRedisServer(int server) {
});
}

protected static void startSpringApp(int sessionApp, int primaryServer, long sessionTimeout) {
int primaryRedisPort = ports.get(primaryServer);
protected static void startSpringApp(int sessionApp, long sessionTimeout, int... serverPorts) {
int httpPort = ports.get(sessionApp);
VM host = cluster.getVM(sessionApp);
host.invoke("Start a Spring app", () -> {
System.setProperty("server.port", "" + httpPort);
System.setProperty("spring.redis.port", "" + primaryRedisPort);
System.setProperty("spring.redis.port", "" + serverPorts[0]);
System.setProperty("server.servlet.session.timeout", "" + sessionTimeout + "s");
springApplicationContext = SpringApplication.run(
RedisSpringTestApplication.class, "" + primaryRedisPort);
});
}
String[] args = new String[serverPorts.length];

static void startSpringApp(int sessionApp, int primaryServer, int secondaryServer,
long sessionTimeout) {
int primaryRedisPort = ports.get(primaryServer);
int failoverRedisPort = ports.get(secondaryServer);
int httpPort = ports.get(sessionApp);
VM host = cluster.getVM(sessionApp);
host.invoke("Start a Spring app", () -> {
System.setProperty("server.port", "" + httpPort);
System.setProperty("spring.redis.port", "" + primaryRedisPort);
System.setProperty("server.servlet.session.timeout", "" + sessionTimeout + "s");
for (int i = 0; i < serverPorts.length; i++) {
args[i] = "" + serverPorts[i];
}
springApplicationContext = SpringApplication.run(
RedisSpringTestApplication.class,
"" + primaryRedisPort, "" + failoverRedisPort);
RedisSpringTestApplication.class, args);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class SessionExpirationDUnitTest extends SessionDUnitTest {
@BeforeClass
public static void setup() {
SessionDUnitTest.setup();
startSpringApp(APP1, SERVER1, SERVER2, SHORT_SESSION_TIMEOUT);
startSpringApp(APP2, SERVER2, SERVER1, SHORT_SESSION_TIMEOUT);
startSpringApp(APP1, SHORT_SESSION_TIMEOUT, ports.get(SERVER1), ports.get(SERVER2));
startSpringApp(APP2, SHORT_SESSION_TIMEOUT, ports.get(SERVER2), ports.get(SERVER1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,24 @@ public int hset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key
while (iterator.hasNext()) {
ByteArrayWrapper field = iterator.next();
ByteArrayWrapper value = iterator.next();
boolean added;
boolean added = true;
boolean newField;
if (nx) {
added = hashPutIfAbsent(field, value) == null;
newField = hashPutIfAbsent(field, value) == null;
added = newField;
} else {
added = hashPut(field, value) == null;
newField = hashPut(field, value) == null;
}

if (added) {
if (deltaInfo == null) {
deltaInfo = new AddsDeltaInfo();
}
deltaInfo.add(field);
deltaInfo.add(value);
}

if (newField) {
fieldsAdded++;
}
}
Expand Down

0 comments on commit 28fb073

Please sign in to comment.