Skip to content

Commit

Permalink
Allow to obtain RecvByteBufAllocator.Handle to allow more flexible im…
Browse files Browse the repository at this point in the history
…plementations

Motivation:

At the moment it's only possible for a user to set the RecvByteBufAllocator for a Channel but not access the Handle once it is assigned. This makes it hard to write more flexible implementations.

Modifications:

Add a new method to the Channel.Unsafe to allow access the the used Handle for the Channel. The RecvByteBufAllocator.Handle is created lazily.

Result:

It's possible to write more flexible implementatons that allow to adjust stuff on the fly for a Handle that is used by a Channel
  • Loading branch information
Norman Maurer committed Aug 12, 2014
1 parent 02e7e53 commit 286b899
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ protected void doDisconnect() throws Exception {
}

final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;

@Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
Expand Down Expand Up @@ -419,10 +418,7 @@ public void connect(SocketAddress remote, SocketAddress local, ChannelPromise ch
@Override
void epollInReady() {
DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,10 +693,7 @@ void epollInReady() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

ByteBuf byteBuf = null;
boolean close = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett

private final NotificationHandler<?> notificationHandler;

private RecvByteBufAllocator.Handle allocHandle;

private static SctpChannel newSctpChannel() {
try {
return SctpChannel.open();
Expand Down Expand Up @@ -265,10 +263,7 @@ protected void doClose() throws Exception {
protected int doReadMessages(List<Object> buf) throws Exception {
SctpChannel ch = javaChannel();

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public class OioSctpChannel extends AbstractOioMessageChannel

private final NotificationHandler<?> notificationHandler;

private RecvByteBufAllocator.Handle allocHandle;

private static SctpChannel openChannel() {
try {
return SctpChannel.open();
Expand Down Expand Up @@ -187,10 +185,7 @@ protected int doReadMessages(List<Object> msgs) throws Exception {
Set<SelectionKey> reableKeys = readSelector.selectedKeys();
try {
for (SelectionKey ignored : reableKeys) {
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config().getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
ByteBuf buffer = allocHandle.allocate(config().getAllocator());
boolean free = true;

Expand Down
9 changes: 9 additions & 0 deletions transport/src/main/java/io/netty/channel/AbstractChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,17 @@ final MessageSizeEstimator.Handle estimatorHandle() {
protected abstract class AbstractUnsafe implements Unsafe {

private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private RecvByteBufAllocator.Handle recvHandle;
private boolean inFlush0;

@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}

@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop().asInvoker();
Expand Down
6 changes: 6 additions & 0 deletions transport/src/main/java/io/netty/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
*/
interface Unsafe {

/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();

/**
* Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ protected AbstractNioUnsafe newUnsafe() {
}

private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle;

private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
Expand Down Expand Up @@ -102,10 +101,7 @@ public void read() {
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

ByteBuf byteBuf = null;
int messages = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';

private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown;

/**
Expand Down Expand Up @@ -82,10 +81,7 @@ protected void doRead() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

ByteBuf byteBuf = allocHandle.allocate(alloc());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public final class NioDatagramChannel
private final DatagramChannelConfig config;

private Map<InetAddress, List<MembershipKey>> memberships;
private RecvByteBufAllocator.Handle allocHandle;

private static DatagramChannel newSocket(SelectorProvider provider) {
try {
Expand Down Expand Up @@ -230,10 +229,8 @@ protected void doClose() throws Exception {
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

ByteBuf data = allocHandle.allocate(config.getAllocator());
boolean free = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private final DatagramChannelConfig config;
private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EmptyArrays.EMPTY_BYTES, 0);

private RecvByteBufAllocator.Handle allocHandle;

private static MulticastSocket newSocket() {
try {
return new MulticastSocket(null);
Expand Down Expand Up @@ -202,10 +200,7 @@ protected void doClose() throws Exception {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

ByteBuf data = config.getAllocator().heapBuffer(allocHandle.guess());
boolean free = true;
Expand Down

0 comments on commit 286b899

Please sign in to comment.