Skip to content

Commit

Permalink
Make sure only direct ByteBuffer are passed to the underlying jdk Cha…
Browse files Browse the repository at this point in the history
…nnel.

This is needed because of otherwise the JDK itself will do an extra ByteBuffer copy with it's own pool implementation. Even worth it will be done
multiple times if the ByteBuffer is always only partial written. With this change the copy is done inside of netty using it's own allocator and
only be done one time in all cases.
  • Loading branch information
Norman Maurer committed Sep 2, 2013
1 parent 6f79291 commit a52bbd2
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 24 deletions.
5 changes: 5 additions & 0 deletions buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ public interface ByteBufAllocator {
* Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
*/
CompositeByteBuf compositeDirectBuffer(int maxNumComponents);

/**
* Returns {@code true} if direct {@link ByteBuf}'s are pooled
*/
boolean isDirectBufferPooled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
}
}

@Override
public boolean isDirectBufferPooled() {
return directArenas != null;
}

public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(heapArenas.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
return new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}

@Override
public boolean isDirectBufferPooled() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.sun.nio.sctp.NotificationHandler;
import com.sun.nio.sctp.SctpChannel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
Expand Down Expand Up @@ -292,21 +294,27 @@ protected int doReadMessages(List<Object> buf) throws Exception {
}

@Override
protected boolean doWriteMessage(Object msg) throws Exception {
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
SctpMessage packet = (SctpMessage) msg;
ByteBuf data = packet.content();
int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}

ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (data.nioBufferCount() == 1) {
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}

final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
Expand All @@ -315,7 +323,15 @@ protected boolean doWriteMessage(Object msg) throws Exception {

final int writtenBytes = javaChannel().send(nioData, mi);

return writtenBytes > 0;
boolean done = writtenBytes > 0;
if (needsCopy) {
if (!done) {
in.current(new SctpMessage(mi, data));
} else {
in.current(data);
}
}
return done;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.sctp.DefaultSctpServerChannelConfig;
Expand Down Expand Up @@ -216,7 +217,7 @@ protected void doDisconnect() throws Exception {
}

@Override
protected boolean doWriteMessage(Object msg) throws Exception {
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.barchart.udt.TypeUDT;
import com.barchart.udt.nio.ServerSocketChannelUDT;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.udt.DefaultUdtServerChannelConfig;
import io.netty.channel.udt.UdtServerChannel;
Expand Down Expand Up @@ -93,7 +94,7 @@ protected void doFinishConnect() throws Exception {
}

@Override
protected boolean doWriteMessage(Object msg) throws Exception {
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.udt.DefaultUdtChannelConfig;
import io.netty.channel.udt.UdtChannel;
Expand Down Expand Up @@ -166,7 +167,7 @@ protected int doReadMessages(List<Object> buf) throws Exception {
}

@Override
protected boolean doWriteMessage(Object msg) throws Exception {
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
// expects a message
final UdtMessage message = (UdtMessage) msg;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -230,6 +231,16 @@ public Object current() {
}
}

/**
* Replace the current msg with the given one.
* The replaced msg will automatically be released
*/
public void current(Object msg) {
Entry entry = buffer[flushed];
safeRelease(entry.msg);
entry.msg = msg;
}

public void progress(long amount) {
Entry e = buffer[flushed];
ChannelPromise p = e.promise;
Expand Down Expand Up @@ -309,7 +320,7 @@ public ByteBuffer[] nioBuffers() {
int nioBufferCount = 0;

final int mask = buffer.length - 1;

final ByteBufAllocator alloc = channel.alloc();
Object m;
int i = flushed;
while (i != unflushed && (m = buffer[i].msg) != null) {
Expand All @@ -327,7 +338,7 @@ public ByteBuffer[] nioBuffers() {
if (readableBytes > 0) {
nioBufferSize += readableBytes;

if (buf.isDirect()) {
if (buf.isDirect() || !alloc.isDirectBufferPooled()) {
int count = buf.nioBufferCount();
if (count == 1) {
if (nioBufferCount == nioBuffers.length) {
Expand All @@ -347,7 +358,7 @@ public ByteBuffer[] nioBuffers() {
}
}
} else {
ByteBuf directBuf = channel.alloc().directBuffer(readableBytes);
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, readerIndex, readableBytes);
buf.release();
buffer[i].msg = directBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,21 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {

if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}

if (!buf.isDirect()) {
ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we rely on JDK's direct buffer pool.
buf = alloc.directBuffer(readableBytes).writeBytes(buf);
in.current(buf);
}
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {

boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
if (doWriteMessage(msg)) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
Expand Down Expand Up @@ -155,5 +155,5 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
*
* @return {@code true} if and only if the message has been written
*/
protected abstract boolean doWriteMessage(Object msg) throws Exception;
protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package io.netty.channel.socket.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DatagramChannelConfig;
Expand Down Expand Up @@ -223,10 +226,10 @@ protected int doReadMessages(List<Object> buf) throws Exception {
}

@Override
protected boolean doWriteMessage(Object msg) throws Exception {
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
final Object m;
final ByteBuf data;
final SocketAddress remoteAddress;
ByteBuf data;
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) msg;
Expand All @@ -250,13 +253,19 @@ protected boolean doWriteMessage(Object msg) throws Exception {
return true;
}

ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (data.nioBufferCount() == 1) {
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}

final int writtenBytes;
Expand All @@ -266,7 +275,24 @@ protected boolean doWriteMessage(Object msg) throws Exception {
writtenBytes = javaChannel().write(nioData);
}

return writtenBytes > 0;
boolean done = writtenBytes > 0;
if (needsCopy) {
// This means we have allocated a new buffer and need to store it back so we not need to allocate it again
// later
if (remoteAddress == null) {
// remoteAddress is null which means we can handle it as ByteBuf directly
in.current(data);
} else {
if (!done) {
// store it back with all the needed informations
in.current(new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(data, remoteAddress));
} else {
// Just store back the new create buffer so it is cleaned up once in.remove() is called.
in.current(data);
}
}
}
return done;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
Expand Down Expand Up @@ -151,7 +152,7 @@ protected void doDisconnect() throws Exception {
}

@Override
protected boolean doWriteMessage(Object msg) throws Exception {
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
}

0 comments on commit a52bbd2

Please sign in to comment.