Skip to content

Commit

Permalink
maxBytesPerRead channel configuration
Browse files Browse the repository at this point in the history
Motiviation:
The current read loops don't fascilitate reading a maximum amount of bytes. This capability is useful to have more fine grain control over how much data is injested.

Modifications:
- Add a setMaxBytesPerRead(int) and getMaxBytesPerRead() to ChannelConfig
- Add a setMaxBytesPerIndividualRead(int) and getMaxBytesPerIndividualRead to ChannelConfig
- Add methods to RecvByteBufAllocator so that a pluggable scheme can be used to control the behavior of the read loop.
- Modify read loop for all transport types to respect the new RecvByteBufAllocator API

Result:
The ability to control how many bytes are read for each read operation/loop, and a more extensible read loop.
  • Loading branch information
Scottmitch committed Aug 6, 2015
1 parent b714297 commit cf171ff
Show file tree
Hide file tree
Showing 73 changed files with 1,095 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
buf = new AdvancedLeakAwareByteBuf(buf, leak);
}
break;
default:
break;
}
return buf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.netty.buffer;

import io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache;
import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.netty.buffer;

import io.netty.util.IllegalReferenceCountException;
import org.junit.Test;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil;
Expand All @@ -34,7 +35,7 @@
import java.nio.channels.UnresolvedAddressException;

abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
private static final ChannelMetadata DATA = new ChannelMetadata(false);
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final int readFlag;
private final FileDescriptor fileDescriptor;
protected int flags = Native.EPOLLET;
Expand Down Expand Up @@ -93,7 +94,7 @@ public boolean isActive() {

@Override
public ChannelMetadata metadata() {
return DATA;
return METADATA;
}

@Override
Expand Down Expand Up @@ -231,6 +232,7 @@ protected static void checkResolvable(InetSocketAddress addr) {
protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
int writerIndex = byteBuf.writerIndex();
int localReadAmount;
unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
if (byteBuf.hasMemoryAddress()) {
localReadAmount = Native.readAddress(
fileDescriptor.intValue(), byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
Expand Down Expand Up @@ -295,6 +297,7 @@ protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Excepti

protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;
private EpollRecvByteAllocatorHandle allocHandle;

/**
* Called once EPOLLIN event is ready to be processed
Expand All @@ -308,6 +311,20 @@ void epollRdHupReady() {
// NOOP
}

@Override
public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
allocHandle = newEpollHandle(super.recvBufAllocHandle());
}
return allocHandle;
}

/**
* Create a new {@EpollRecvByteAllocatorHandle} instance.
* @param handle The handle to wrap with EPOLL specific logic.
*/
protected abstract EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle);

@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.FileDescriptor;

import java.net.InetSocketAddress;
import java.net.SocketAddress;


public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);

protected AbstractEpollServerChannel(int fd) {
super(fd, Native.EPOLLIN);
Expand All @@ -38,6 +40,11 @@ protected AbstractEpollServerChannel(FileDescriptor fd) {
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
}

@Override
public ChannelMetadata metadata() {
return METADATA;
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
Expand Down Expand Up @@ -77,6 +84,11 @@ public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, C
channelPromise.setFailure(new UnsupportedOperationException());
}

@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
}

@Override
void epollInReady() {
assert eventLoop().inEventLoop();
Expand All @@ -90,40 +102,36 @@ void epollInReady() {
}

final ChannelPipeline pipeline = pipeline();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

Throwable exception = null;
try {
try {
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0;
do {
int socketFd = Native.accept(fd().intValue(), acceptedAddress);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
readPending = false;
allocHandle.incMessagesRead(1);

try {
int len = acceptedAddress[0];
pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
if (edgeTriggered) { // We must keep reading if ET is enabled
pipeline.fireExceptionCaught(t);
} else {
throw t;
}
}
} while (++ messages < maxMessagesPerRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;

import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -48,8 +49,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static io.netty.util.internal.ObjectUtil.checkNotNull;

public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {

private static final String EXPECTED_TYPES =
Expand Down Expand Up @@ -595,9 +594,6 @@ private void safeClosePipe(int pipe) {
}

class EpollStreamUnsafe extends AbstractEpollUnsafe {

private RecvByteBufAllocator.Handle allocHandle;

private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {
Expand All @@ -619,6 +615,7 @@ private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, T
byteBuf.release();
}
}
recvBufAllocHandle().readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
Expand Down Expand Up @@ -770,14 +767,19 @@ private boolean doFinishConnect() throws Exception {
void epollRdHupReady() {
if (isActive()) {
// If it is still active, we need to call epollInReady as otherwise we may miss to
// read pending data from the underyling file descriptor.
// read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709
epollInReady();
} else {
closeOnRead(pipeline());
}
}

@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET));
}

@Override
void epollInReady() {
final ChannelConfig config = config();
Expand All @@ -791,84 +793,69 @@ void epollInReady() {

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
int messages = 0;
int totalReadAmount = 0;
do {
SpliceInTask spliceTask = spliceQueue.peek();
if (spliceTask != null) {
if (spliceTask.spliceIn(allocHandle)) {
// We need to check if it is still active as if not we removed all SpliceTasks in
// doClose(...)
if (isActive()) {
spliceQueue.remove();
try {
SpliceInTask spliceTask = spliceQueue.peek();
if (spliceTask != null) {
if (spliceTask.spliceIn(allocHandle)) {
// We need to check if it is still active as if not we removed all SpliceTasks in
// doClose(...)
if (isActive()) {
spliceQueue.remove();
}
continue;
} else {
break;
}
continue;
} else {
break;
}
}

// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
allocHandle.record(totalReadAmount);

// Avoid overflow.
totalReadAmount = localReadAmount;
} else {
totalReadAmount += localReadAmount;
}

if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
if (!edgeTriggered && !config.isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
readPending = false;
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} catch (Throwable t) {
if (edgeTriggered) { // We must keep reading if ET is enabled
if (byteBuf != null) {
byteBuf.release();
byteBuf = null;
}
pipeline.fireExceptionCaught(t);
} else {
// byteBuf is release in outer exception handling if necessary.
throw t;
}
}
} while (++ messages < maxMessagesPerRead);
} while (allocHandle.continueReading());

allocHandle.readComplete();
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);

if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close);
if (!closed) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() {
eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
epollInReady();
Expand Down Expand Up @@ -919,8 +906,6 @@ protected final int spliceIn(int pipeOut, RecvByteBufAllocator.Handle handle) th
length -= localSplicedIn;
}

// record the number of bytes we spliced before
handle.record(splicedIn);
return splicedIn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
}

@Override
@Deprecated
public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
Expand Down
Loading

0 comments on commit cf171ff

Please sign in to comment.