Skip to content

Commit

Permalink
Overall clean-up in EpollSocketChannel
Browse files Browse the repository at this point in the history
- Extract writev part from doWrite() for simplicity
- Clearer comments
  • Loading branch information
trustin committed Feb 17, 2014
1 parent 5205079 commit 2a74378
Showing 1 changed file with 48 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,45 @@ private int doWriteBytes(ByteBuf buf, int readable) throws Exception {
return localFlushedAmount;
}

private void writeBytesMultiple(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {

int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();

long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);

if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();

// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

if (readableBytes < localWrittenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {

buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
}
}

/**
* Write a {@link DefaultFileRegion}
*
Expand All @@ -148,51 +187,24 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
clearEpollOut();
break;
}
// Do non-gathering write for a single buffer case.

// Do gathering write if:
// * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region.
if (msgCount > 1) {
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
if (nioBuffers != null) {
writeBytesMultiple(in, msgCount, nioBuffers);

int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();

long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);

if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();

// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

if (readableBytes < localWrittenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {

buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
}
// try again as a ChannelFuture may be notified in the meantime and triggered another flush
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
}

// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
Expand Down

0 comments on commit 2a74378

Please sign in to comment.