Skip to content

Commit

Permalink
GEODE-10083: Fix RedisProxy to correctly process MOVED response (apac…
Browse files Browse the repository at this point in the history
…he#7394)

- When a MOVED response is received, the already queued processor needs
  to be discarded otherwise subsequent requests may possibly be
  corrupted.
  • Loading branch information
jdeppe-pivotal authored Feb 25, 2022
1 parent 45cbe7f commit b729702
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.apache.geode.redis.internal.proxy;

import java.util.Map;
import java.util.Queue;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -26,14 +27,17 @@ public class MovedResponseHandler extends ChannelInboundHandlerAdapter {

private final Map<HostPort, HostPort> mappings;
private final Channel inboundChannel;
private final Queue<RedisResponseProcessor> processors;

public MovedResponseHandler(Channel inboundChannel, Map<HostPort, HostPort> mappings) {
public MovedResponseHandler(Channel inboundChannel, Map<HostPort, HostPort> mappings,
Queue<RedisResponseProcessor> processors) {
this.inboundChannel = inboundChannel;
this.mappings = mappings;
this.processors = processors;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ErrorRedisMessage) {
String content = ((ErrorRedisMessage) msg).content();
if (content.startsWith("MOVED")) {
Expand All @@ -44,6 +48,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
String newHostPort = entry.getValue().getHost() + ":" + entry.getValue().getPort();
String response = content.substring(0, index) + newHostPort;
inboundChannel.writeAndFlush(new ErrorRedisMessage(response));
// No need to have a processor deal with this now
processors.poll();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class RedisProxyInboundHandler extends ChannelInboundHandlerAdapter {
private final ClusterSlotsResponseProcessor slotsResponseProcessor;
private final ClusterNodesResponseProcessor nodesResponseProcessor;
private MovedResponseHandler movedResponseHandler;
private Queue<RedisResponseProcessor> processors;

public RedisProxyInboundHandler(Channel inboundChannel, String remoteHost, int remotePort,
Map<HostPort, HostPort> mappings) {
Expand All @@ -67,8 +70,9 @@ public RedisProxyInboundHandler(Channel inboundChannel, String remoteHost, int r
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel inboundChannel = ctx.channel();
outboundHandler = new RedisProxyOutboundHandler(inboundChannel);
movedResponseHandler = new MovedResponseHandler(inboundChannel, mappings);
processors = new LinkedBlockingQueue<>();
outboundHandler = new RedisProxyOutboundHandler(inboundChannel, processors);
movedResponseHandler = new MovedResponseHandler(inboundChannel, mappings, processors);

// Start the connection attempt.
Bootstrap b = new Bootstrap();
Expand Down Expand Up @@ -128,9 +132,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
case "cluster":
String sub = getArg(rMessage, 1);
if ("slots".equals(sub)) {
outboundHandler.addResponseProcessor(slotsResponseProcessor);
processors.add(slotsResponseProcessor);
} else if ("nodes".equals(sub)) {
outboundHandler.addResponseProcessor(nodesResponseProcessor);
processors.add(nodesResponseProcessor);
}
break;
case "hello":
Expand All @@ -141,7 +145,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
inboundChannel.writeAndFlush(error);
return;
default:
outboundHandler.addResponseProcessor(NoopRedisResponseProcessor.INSTANCE);
processors.add(NoopRedisResponseProcessor.INSTANCE);
}

outboundChannel.writeAndFlush(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.apache.geode.redis.internal.proxy;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -29,11 +28,13 @@
public class RedisProxyOutboundHandler extends ChannelInboundHandlerAdapter {

private static final Logger logger = LogService.getLogger();
private final Queue<RedisResponseProcessor> processors = new LinkedBlockingQueue<>();
private final Queue<RedisResponseProcessor> processors;
private final Channel inboundChannel;

public RedisProxyOutboundHandler(Channel inboundChannel) {
public RedisProxyOutboundHandler(Channel inboundChannel,
Queue<RedisResponseProcessor> processors) {
this.inboundChannel = inboundChannel;
this.processors = processors;
}

@Override
Expand Down Expand Up @@ -71,7 +72,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
RedisProxyInboundHandler.closeOnFlush(ctx.channel());
}

public void addResponseProcessor(RedisResponseProcessor processor) {
processors.add(processor);
}
}

0 comments on commit b729702

Please sign in to comment.