Skip to content

Commit

Permalink
Refresh nodes when renewSlotCache (redis#2642)
Browse files Browse the repository at this point in the history
This PR solves redis#2504 redis#2550, when renewSlotCache, we also remove dead nodes according to the
latest query.
  • Loading branch information
yangbodong22011 authored Sep 30, 2021
1 parent 211d903 commit c056a11
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 7 deletions.
41 changes: 34 additions & 7 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -173,6 +177,7 @@ private void discoverClusterSlots(Jedis jedis) {
w.lock();
try {
this.slots.clear();
Set<String> hostAndPortKeys = new HashSet<>();

for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
Expand All @@ -183,15 +188,37 @@ private void discoverClusterSlots(Jedis jedis) {

List<Integer> slotNums = getAssignedSlotArray(slotInfo);

// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
if (hostInfos.isEmpty()) {
continue;
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.isEmpty()) {
continue;
}

HostAndPort targetNode = generateHostAndPort(hostInfos);
hostAndPortKeys.add(getNodeKey(targetNode));
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}

// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);
assignSlotsToNode(slotNums, targetNode);
// Remove dead nodes according to the latest query
Iterator<Entry<String, JedisPool>> entryIt = nodes.entrySet().iterator();
while (entryIt.hasNext()) {
Entry<String, JedisPool> entry = entryIt.next();
if (!hostAndPortKeys.contains(entry.getKey())) {
JedisPool pool = entry.getValue();
try {
if (pool != null) {
pool.destroy();
}
} catch (Exception e) {
// pass, may be this node dead
}
entryIt.remove();
}
}
} finally {
w.unlock();
Expand Down
58 changes: 58 additions & 0 deletions src/test/java/redis/clients/jedis/tests/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static redis.clients.jedis.tests.utils.AssertUtil.assertByteArraySetEquals;

Expand Down Expand Up @@ -805,6 +806,63 @@ public void nullKeys() {
}
}


@Test
public void clusterRefreshNodes() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);

try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT,
DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, "cluster", DEFAULT_POOL_CONFIG)) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4

// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = JedisCluster.HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < JedisCluster.HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}

node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);

// cluster.set("key", "value"); will get JedisMovedDataException and renewSlotCache
cluster.set("key", "value");

assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));

// make 4 nodes to 3 nodes
cleanUp();
setUp();
// cluster.set("bar", "foo") will get JedisMovedDataException and renewSlotCache
cluster.set("bar", "foo");
assertEquals(3, cluster.getClusterNodes().size());
}
}

@Test
public void georadiusStore() {
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
Expand Down

0 comments on commit c056a11

Please sign in to comment.