Skip to content

Commit

Permalink
Issue netty#242: Add the ability to send many commands with a single …
Browse files Browse the repository at this point in the history
…call

Also:
* Code cleanup
* Hide internal constants from a user
  • Loading branch information
trustin committed Mar 30, 2012
1 parent fd0b0a4 commit a065b1c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 56 deletions.
10 changes: 5 additions & 5 deletions codec/src/main/java/io/netty/handler/codec/redis/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
* Command serialization.
*/
public class Command {
public static final byte[] ARGS_PREFIX = "*".getBytes();
public static final byte[] CRLF = "\r\n".getBytes();
public static final byte[] BYTES_PREFIX = "$".getBytes();
public static final byte[] EMPTY_BYTES = new byte[0];
public static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);
static final byte[] ARGS_PREFIX = "*".getBytes();
static final byte[] CRLF = "\r\n".getBytes();
static final byte[] BYTES_PREFIX = "$".getBytes();
static final byte[] EMPTY_BYTES = new byte[0];
static final byte[] NEG_ONE_AND_CRLF = convertWithCRLF(-1);

private byte[][] arguments;
private Object[] objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class RedisDecoder extends ReplayingDecoder<State> {
* via the {@link #readInteger(ChannelBuffer)} method
*
* @param is the {@link ChannelBuffer} to read from
* @return content
* @throws IOException is thrown if the line-ending is not CRLF
*/
public static byte[] readBytes(ChannelBuffer is) throws IOException {
Expand All @@ -64,10 +63,6 @@ public static byte[] readBytes(ChannelBuffer is) throws IOException {

/**
* Read an {@link Integer} from the {@link ChannelBuffer}
*
* @param is
* @return integer
* @throws IOException
*/
public static int readInteger(ChannelBuffer is) throws IOException {
int size = 0;
Expand Down
63 changes: 17 additions & 46 deletions codec/src/main/java/io/netty/handler/codec/redis/RedisEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,71 +18,42 @@
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelDownstreamHandler;
import io.netty.channel.ChannelHandler.Sharable;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* {@link SimpleChannelDownstreamHandler} which encodes {@link Command}'s to {@link ChannelBuffer}'s
*
*
*/
@Sharable
public class RedisEncoder extends SimpleChannelDownstreamHandler {

private final Queue<ChannelBuffer> pool;

/**
* Calls {@link #RedisEncoder(boolean)} with <code>false</code>
*/
public RedisEncoder() {
this(false);
}

/**
* Create a new {@link RedisEncoder} instance
*
* @param poolBuffers <code>true</code> if the {@link ChannelBuffer}'s should be pooled. This should be used with caution as this
* can lead to unnecessary big memory consummation if one of the written values is very big and the rest is very small.
*/
public RedisEncoder(boolean poolBuffers) {
if (poolBuffers) {
pool = new ConcurrentLinkedQueue<ChannelBuffer>();
} else {
pool = null;
}
}


@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object o = e.getMessage();
if (o instanceof Command) {
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();

Command command = (Command) o;
ChannelBuffer cb = null;
if (pool != null) {
cb = pool.poll();
}
if (cb == null) {
cb = ChannelBuffers.dynamicBuffer();
}
command.write(cb);
Channels.write(ctx, future, cb);

} else if (o instanceof Iterable) {
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
ChannelFuture future = e.getFuture();

if (pool != null) {
final ChannelBuffer finalCb = cb;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
finalCb.clear();
pool.add(finalCb);
}
});
// Useful for transactions and database select
for (Object i : (Iterable<?>) o) {
if (i instanceof Command) {
Command command = (Command) i;
command.write(cb);
} else {
super.writeRequested(ctx, e);
return;
}
}
Channels.write(ctx, future, cb);
} else {
Expand Down

0 comments on commit a065b1c

Please sign in to comment.