Skip to content

Commit

Permalink
GEODE-9172: Add Junit rule to start docker-based redis cluster (apach…
Browse files Browse the repository at this point in the history
…e#6379)

The fun part here is that some redis commands will respond with internal
IP addresses which need to be rewritten so that clients are able to make
use of them. Currently CLUSTER SLOTS and CLUSTER NODES are implemented
with this functionality.
  • Loading branch information
jdeppe-pivotal authored Apr 27, 2021
1 parent 3d12601 commit 3daa950
Show file tree
Hide file tree
Showing 13 changed files with 1,044 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.redis;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

import java.util.List;
import java.util.stream.Collectors;

import org.junit.ClassRule;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.exceptions.JedisMovedDataException;

import org.apache.geode.test.dunit.rules.ClusterNode;
import org.apache.geode.test.dunit.rules.NativeRedisClusterTestRule;

/**
* This class serves merely as an example of using the {@link NativeRedisClusterTestRule}.
* Eventually it can be deleted since we'll end up with more comprehensive tests for various
* {@code CLUSTER} commands.
*/
public class NativeRedisClusterTest {

@ClassRule
public static NativeRedisClusterTestRule cluster = new NativeRedisClusterTestRule();

@Test
public void testEachProxyReturnsExposedPorts() {
for (Integer port : cluster.getExposedPorts()) {
try (Jedis jedis = new Jedis("localhost", port)) {
List<ClusterNode> nodes =
NativeRedisClusterTestRule.parseClusterNodes(jedis.clusterNodes());
List<Integer> ports = nodes.stream().map(f -> f.port).collect(Collectors.toList());
assertThat(ports).containsExactlyInAnyOrderElementsOf(cluster.getExposedPorts());
}
}
}

@Test
public void testClusterAwareClient() {
try (JedisCluster jedis =
new JedisCluster(new HostAndPort("localhost", cluster.getExposedPorts().get(0)))) {
jedis.set("a", "0"); // slot 15495
jedis.set("b", "1"); // slot 3300
jedis.set("c", "2"); // slot 7365
jedis.set("d", "3"); // slot 11298
jedis.set("e", "4"); // slot 15363
jedis.set("f", "5"); // slot 3168
jedis.set("g", "6"); // slot 7233
jedis.set("h", "7"); // slot 11694
jedis.set("i", "8"); // slot 15759
jedis.set("j", "9"); // slot 3564
}
}

@Test
public void testMoved() {
try (Jedis jedis =
new Jedis("localhost", cluster.getExposedPorts().get(0), 100000)) {
assertThatThrownBy(() -> jedis.set("a", "A"))
.isInstanceOf(JedisMovedDataException.class)
.hasMessageContaining("127.0.0.1");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.redis.internal.proxy;

import java.util.Map;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.handler.codec.redis.ErrorRedisMessage;
import io.netty.handler.codec.redis.FullBulkStringRedisMessage;
import io.netty.util.CharsetUtil;

import org.apache.geode.redis.internal.netty.Coder;
import org.apache.geode.redis.internal.netty.CoderException;

public class ClusterNodesResponseProcessor implements RedisResponseProcessor {

private final Map<HostPort, HostPort> mappings;

public ClusterNodesResponseProcessor(Map<HostPort, HostPort> mappings) {
this.mappings = mappings;
}

@Override
public Object process(Object message, Channel channel) {
if (message instanceof ErrorRedisMessage) {
return message;
}

ByteBuf buf = ((FullBulkStringRedisMessage) message).content();
String input = buf.toString(CharsetUtil.UTF_8);

for (Map.Entry<HostPort, HostPort> entry : mappings.entrySet()) {
String findHostPort = entry.getKey().getHost() + ":" + entry.getKey().getPort();
String replaceHostPort = entry.getValue().getHost() + ":" + entry.getValue().getPort();

input = input.replace(findHostPort, replaceHostPort);
}

buf.release();

ByteBuf response;
try {
response = Coder.getBulkStringResponse(channel.alloc().buffer(), input);
} catch (CoderException e) {
throw new RuntimeException(e);
}

return response;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.redis.internal.proxy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.netty.channel.Channel;
import io.netty.handler.codec.redis.ArrayRedisMessage;
import io.netty.handler.codec.redis.ErrorRedisMessage;
import io.netty.handler.codec.redis.FullBulkStringRedisMessage;
import io.netty.handler.codec.redis.IntegerRedisMessage;
import io.netty.handler.codec.redis.RedisMessage;
import io.netty.handler.codec.redis.SimpleStringRedisMessage;
import io.netty.util.CharsetUtil;
import org.apache.commons.lang3.tuple.Pair;

public class ClusterSlotsResponseProcessor implements RedisResponseProcessor {

private final Map<HostPort, HostPort> mappings;

public ClusterSlotsResponseProcessor(Map<HostPort, HostPort> mappings) {
this.mappings = mappings;
}

/**
* CLUSTER SLOTS looks something like this:
*
* <pre>
* 1) 1) (integer) 0
* 2) (integer) 5460
* 3) 1) "127.0.0.1"
* 2) (integer) 30001
* 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366"
* 4) 1) "127.0.0.1"
* 2) (integer) 30004
* 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"
* 2) 1) (integer) 5461
* 2) (integer) 10922
* 3) 1) "127.0.0.1"
* 2) (integer) 30002
* 3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013"
* 4) 1) "127.0.0.1"
* 2) (integer) 30005
* 3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"
* </pre>
*/
@Override
public Object process(Object message, Channel channel) {
if (message instanceof ErrorRedisMessage) {
return message;
}

ArrayRedisMessage input = (ArrayRedisMessage) message;
List<RedisMessage> response = new ArrayList<>();

for (RedisMessage entry : input.children()) {
List<RedisMessage> newInner = new ArrayList<>();
ArrayRedisMessage inner = (ArrayRedisMessage) entry;

// slot start
newInner.add(inner.children().get(0));
// slot end
newInner.add(inner.children().get(1));

for (int i = 2; i < inner.children().size(); i++) {
ArrayRedisMessage hostPortArray = (ArrayRedisMessage) inner.children().get(i);
String host = ((FullBulkStringRedisMessage) hostPortArray.children().get(0))
.content().toString(CharsetUtil.UTF_8);
Integer port = (int) ((IntegerRedisMessage) hostPortArray.children().get(1)).value();
Pair<String, Integer> newMapping = getMapping(host, port);

List<RedisMessage> newHostPortArray = new ArrayList<>();
newHostPortArray.add(new SimpleStringRedisMessage(newMapping.getLeft()));
newHostPortArray.add(new IntegerRedisMessage(newMapping.getRight()));
for (int j = 2; j < hostPortArray.children().size(); j++) {
newHostPortArray.add(hostPortArray.children().get(j));
}

newInner.add(new ArrayRedisMessage(newHostPortArray));
}

response.add(new ArrayRedisMessage(newInner));
}

return new ArrayRedisMessage(response);
}

private Pair<String, Integer> getMapping(String host, Integer port) {
for (Map.Entry<HostPort, HostPort> entry : mappings.entrySet()) {
HostPort from = entry.getKey();
if (from.getHost().equals(host) && from.getPort().equals(port)) {
return Pair.of(entry.getValue().getHost(), entry.getValue().getPort());
}
}

throw new IllegalArgumentException("Unable to map host and port " + host + ":" + port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.redis.internal.proxy;

import java.util.Objects;

public class HostPort {

private final String host;
private final Integer port;

public HostPort(String host, Integer port) {
this.host = host;
this.port = port;
}

public String getHost() {
return host;
}

public Integer getPort() {
return port;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof HostPort)) {
return false;
}
HostPort hostPort = (HostPort) o;
return Objects.equals(host, hostPort.host)
&& Objects.equals(port, hostPort.port);
}

@Override
public int hashCode() {
return Objects.hash(host, port);
}

@Override
public String toString() {
return host + ":" + port;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.redis.internal.proxy;

import java.util.Map;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.redis.ErrorRedisMessage;

public class MovedResponseHandler extends ChannelInboundHandlerAdapter {

private final Map<HostPort, HostPort> mappings;
private final Channel inboundChannel;

public MovedResponseHandler(Channel inboundChannel, Map<HostPort, HostPort> mappings) {
this.inboundChannel = inboundChannel;
this.mappings = mappings;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ErrorRedisMessage) {
String content = ((ErrorRedisMessage) msg).content();
if (content.startsWith("MOVED")) {
for (Map.Entry<HostPort, HostPort> entry : mappings.entrySet()) {
String hostPort = entry.getKey().getHost() + ":" + entry.getKey().getPort();
int index = content.indexOf(hostPort);
if (index >= 0) {
String newHostPort = entry.getValue().getHost() + ":" + entry.getValue().getPort();
String response = content.substring(0, index) + newHostPort;
inboundChannel.writeAndFlush(new ErrorRedisMessage(response));
return;
}
}

throw new IllegalStateException("Unmapped MOVED received: " + content);
}
}

// Hand off to next handler
ctx.fireChannelRead(msg);
}
}
Loading

0 comments on commit 3daa950

Please sign in to comment.