Skip to content

Commit

Permalink
Merge remote branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
danbev committed Jul 19, 2013
2 parents 8c0dfd6 + eb3283c commit 87399f1
Show file tree
Hide file tree
Showing 36 changed files with 905 additions and 792 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package io.netty.handler.codec.http.websocketx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

/**
* Encodes a {@link WebSocketFrame} into a {@link ByteBuf}.
Expand All @@ -27,56 +30,71 @@
* <tt>WebSocketServer</tt> example located in the {@code io.netty.example.http.websocket} package.
*/
@Sharable
public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame> implements WebSocketFrameEncoder {
public class WebSocket00FrameEncoder extends MessageToMessageEncoder<WebSocketFrame> implements WebSocketFrameEncoder {
private static final ByteBuf _0X00 = Unpooled.unreleasableBuffer(
Unpooled.directBuffer(1, 1).writeByte((byte) 0x00));
private static final ByteBuf _0XFF = Unpooled.unreleasableBuffer(
Unpooled.directBuffer(1, 1).writeByte((byte) 0xFF));
private static final ByteBuf _0XFF_0X00 = Unpooled.unreleasableBuffer(
Unpooled.directBuffer(2, 2).writeByte((byte) 0xFF).writeByte((byte) 0x00));

@Override
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
if (msg instanceof TextWebSocketFrame) {
// Text frame
ByteBuf data = msg.content();
out.writeByte((byte) 0x00);
out.writeBytes(data, data.readerIndex(), data.readableBytes());
out.writeByte((byte) 0xFF);

out.add(_0X00.duplicate());
out.add(data.retain());
out.add(_0XFF.duplicate());
} else if (msg instanceof CloseWebSocketFrame) {
// Close frame
out.writeByte((byte) 0xFF);
out.writeByte((byte) 0x00);
out.add(_0XFF_0X00);
} else {
// Binary frame
ByteBuf data = msg.content();
int dataLen = data.readableBytes();
out.ensureWritable(dataLen + 5);

// Encode type.
out.writeByte((byte) 0x80);
ByteBuf buf = ctx.alloc().buffer(5);
boolean release = true;
try {
// Encode type.
buf.writeByte((byte) 0x80);

// Encode length.
int b1 = dataLen >>> 28 & 0x7F;
int b2 = dataLen >>> 14 & 0x7F;
int b3 = dataLen >>> 7 & 0x7F;
int b4 = dataLen & 0x7F;
if (b1 == 0) {
if (b2 == 0) {
if (b3 == 0) {
out.writeByte(b4);
// Encode length.
int b1 = dataLen >>> 28 & 0x7F;
int b2 = dataLen >>> 14 & 0x7F;
int b3 = dataLen >>> 7 & 0x7F;
int b4 = dataLen & 0x7F;
if (b1 == 0) {
if (b2 == 0) {
if (b3 == 0) {
buf.writeByte(b4);
} else {
buf.writeByte(b3 | 0x80);
buf.writeByte(b4);
}
} else {
out.writeByte(b3 | 0x80);
out.writeByte(b4);
buf.writeByte(b2 | 0x80);
buf.writeByte(b3 | 0x80);
buf.writeByte(b4);
}
} else {
out.writeByte(b2 | 0x80);
out.writeByte(b3 | 0x80);
out.writeByte(b4);
buf.writeByte(b1 | 0x80);
buf.writeByte(b2 | 0x80);
buf.writeByte(b3 | 0x80);
buf.writeByte(b4);
}
} else {
out.writeByte(b1 | 0x80);
out.writeByte(b2 | 0x80);
out.writeByte(b3 | 0x80);
out.writeByte(b4);
}

// Encode binary data.
out.writeBytes(data, data.readerIndex(), dataLen);
// Encode binary data.
out.add(buf);
out.add(data.retain());
release = false;
} finally {
if (release) {
buf.release();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,21 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;
import java.util.List;

/**
* <p>
* Encodes a web socket frame into wire protocol version 8 format. This code was forked from <a
* href="https://github.com/joewalnes/webbit">webbit</a> and modified.
* </p>
*/
public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame> implements WebSocketFrameEncoder {
public class WebSocket08FrameEncoder extends MessageToMessageEncoder<WebSocketFrame> implements WebSocketFrameEncoder {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket08FrameEncoder.class);

Expand All @@ -94,7 +95,7 @@ public WebSocket08FrameEncoder(boolean maskPayload) {
}

@Override
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {

byte[] mask;

Expand Down Expand Up @@ -138,38 +139,61 @@ protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out
+ length);
}

int maskLength = maskPayload ? 4 : 0;
if (length <= 125) {
out.ensureWritable(2 + maskLength + length);
out.writeByte(b0);
byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
out.writeByte(b);
} else if (length <= 0xFFFF) {
out.ensureWritable(4 + maskLength + length);
out.writeByte(b0);
out.writeByte(maskPayload ? 0xFE : 126);
out.writeByte(length >>> 8 & 0xFF);
out.writeByte(length & 0xFF);
} else {
out.ensureWritable(10 + maskLength + length);
out.writeByte(b0);
out.writeByte(maskPayload ? 0xFF : 127);
out.writeLong(length);
}

// Write payload
if (maskPayload) {
int random = (int) (Math.random() * Integer.MAX_VALUE);
mask = ByteBuffer.allocate(4).putInt(random).array();
out.writeBytes(mask);
boolean release = true;
ByteBuf buf = null;
try {
int maskLength = maskPayload ? 4 : 0;
if (length <= 125) {
int size = 2 + maskLength;
if (maskPayload) {
size += length;
}
buf = ctx.alloc().buffer(size);
buf.writeByte(b0);
byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length);
buf.writeByte(b);
} else if (length <= 0xFFFF) {
int size = 4 + maskLength;
if (maskPayload) {
size += length;
}
buf = ctx.alloc().buffer(size);
buf.writeByte(b0);
buf.writeByte(maskPayload ? 0xFE : 126);
buf.writeByte(length >>> 8 & 0xFF);
buf.writeByte(length & 0xFF);
} else {
int size = 10 + maskLength;
if (maskPayload) {
size += length;
}
buf = ctx.alloc().buffer(size);
buf.writeByte(b0);
buf.writeByte(maskPayload ? 0xFF : 127);
buf.writeLong(length);
}

int counter = 0;
for (int i = data.readerIndex(); i < data.writerIndex(); i ++) {
byte byteData = data.getByte(i);
out.writeByte(byteData ^ mask[counter++ % 4]);
// Write payload
if (maskPayload) {
int random = (int) (Math.random() * Integer.MAX_VALUE);
mask = ByteBuffer.allocate(4).putInt(random).array();
buf.writeBytes(mask);

int counter = 0;
for (int i = data.readerIndex(); i < data.writerIndex(); i ++) {
byte byteData = data.getByte(i);
buf.writeByte(byteData ^ mask[counter++ % 4]);
}
out.add(buf);
} else {
out.add(buf);
out.add(data.retain());
}
release = false;
} finally {
if (release && buf != null) {
buf.release();
}
} else {
out.writeBytes(data, data.readerIndex(), data.readableBytes());
}
}
}
61 changes: 44 additions & 17 deletions codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,7 @@
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {

private final TypeParameterMatcher outboundMsgMatcher;
private final MessageToByteEncoder<I> encoder = new MessageToByteEncoder<I>() {
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return ByteToMessageCodec.this.acceptOutboundMessage(msg);
}

@Override
protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception {
ByteToMessageCodec.this.encode(ctx, msg, out);
}
};
private final MessageToByteEncoder<I> encoder;

private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
Expand All @@ -59,22 +49,43 @@ protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> ou
};

/**
* Create a new instance which will try to detect the types to encode out of the type parameter
* of the class.
* @see {@link #ByteToMessageCodec(boolean)} with {@code true} as boolean parameter.
*/
protected ByteToMessageCodec() {
checkForSharableAnnotation();
this(true);
}

/**
* @see {@link #ByteToMessageCodec(Class, boolean)} with {@code true} as boolean value.
*/
protected ByteToMessageCodec(Class<? extends I> outboundMessageType) {
this(outboundMessageType, true);
}

/**
* Create a new instance which will try to detect the types to match out of the type parameter of the class.
*
* @param preferDirect {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
* the encoded messages. If {@code false} is used it will allocate a heap
* {@link ByteBuf}, which is backed by an byte array.
*/
protected ByteToMessageCodec(boolean preferDirect) {
outboundMsgMatcher = TypeParameterMatcher.find(this, ByteToMessageCodec.class, "I");
encoder = new Encoder(preferDirect);
}

/**
* Create a new instance.
* Create a new instance
*
* @param outboundMessageType The type of messages to encode
* @param outboundMessageType The type of messages to match
* @param preferDirect {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
* the encoded messages. If {@code false} is used it will allocate a heap
* {@link ByteBuf}, which is backed by an byte array.
*/
protected ByteToMessageCodec(Class<? extends I> outboundMessageType) {
protected ByteToMessageCodec(Class<? extends I> outboundMessageType, boolean preferDirect) {
checkForSharableAnnotation();
outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType);
encoder = new Encoder(preferDirect);
}

private void checkForSharableAnnotation() {
Expand Down Expand Up @@ -118,4 +129,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decode(ctx, in, out);
}

private final class Encoder extends MessageToByteEncoder<I> {
Encoder(boolean preferDirect) {
super(preferDirect);
}

@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return ByteToMessageCodec.this.acceptOutboundMessage(msg);
}

@Override
protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception {
ByteToMessageCodec.this.encode(ctx, msg, out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testFailSlowTooLongFrameRecovery() throws Exception {
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
ByteBuf buf = (ByteBuf) ch.readInbound();
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}

Expand All @@ -63,6 +64,7 @@ public void testFailFastTooLongFrameRecovery() throws Exception {
ch.writeInbound(Unpooled.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' }));
ByteBuf buf = (ByteBuf) ch.readInbound();
assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
buf.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,27 @@ public void setUp() throws Exception {
public void testPrependLength() throws Exception {
final EmbeddedChannel ch = new EmbeddedChannel(new LengthFieldPrepender(4));
ch.writeOutbound(msg);
assertThat((ByteBuf) ch.readOutbound(), is(wrappedBuffer(new byte[]{0, 0, 0, 1, 'A'})));
final ByteBuf buf = (ByteBuf) ch.readOutbound();
assertThat(buf, is(wrappedBuffer(new byte[]{0, 0, 0, 1, 'A'})));
buf.release();
}

@Test
public void testPrependLengthIncludesLengthFieldLength() throws Exception {
final EmbeddedChannel ch = new EmbeddedChannel(new LengthFieldPrepender(4, true));
ch.writeOutbound(msg);
assertThat((ByteBuf) ch.readOutbound(), is(wrappedBuffer(new byte[]{0, 0, 0, 5, 'A'})));
final ByteBuf buf = (ByteBuf) ch.readOutbound();
assertThat(buf, is(wrappedBuffer(new byte[]{0, 0, 0, 5, 'A'})));
buf.release();
}

@Test
public void testPrependAdjustedLength() throws Exception {
final EmbeddedChannel ch = new EmbeddedChannel(new LengthFieldPrepender(4, -1));
ch.writeOutbound(msg);
assertThat((ByteBuf) ch.readOutbound(), is(wrappedBuffer(new byte[]{0, 0, 0, 0, 'A'})));
final ByteBuf buf = (ByteBuf) ch.readOutbound();
assertThat(buf, is(wrappedBuffer(new byte[]{0, 0, 0, 0, 'A'})));
buf.release();
}

@Test
Expand Down
Loading

0 comments on commit 87399f1

Please sign in to comment.