Skip to content

Commit

Permalink
Move marking ChannelPromise for writes uncancellable to addFlush for …
Browse files Browse the repository at this point in the history
…keep things simple
  • Loading branch information
Norman Maurer committed Feb 17, 2014
1 parent 31c24bc commit b49490e
Showing 1 changed file with 47 additions and 46 deletions.
93 changes: 47 additions & 46 deletions transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ private void addCapacity() {

void addFlush() {
unflushed = tail;

final int mask = buffer.length - 1;
int i = flushed;
while (i != unflushed && buffer[i].msg != null) {
Entry entry = buffer[i];
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending);
}
i = i + 1 & mask;
}
}

/**
Expand Down Expand Up @@ -255,12 +267,6 @@ public Object current(boolean preferDirect) {
} else {
// TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = buffer[flushed];
if (!entry.cancelled && !entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending);
}

Object msg = entry.msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
Expand Down Expand Up @@ -400,52 +406,47 @@ public ByteBuffer[] nioBuffers() {
Entry entry = buffer[i];

if (!entry.cancelled) {
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending);
} else {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers =
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
ByteBuffer[] nioBufs = entry.buffers;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.buffers = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

if (readableBytes > 0) {
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers =
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
}
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount ++] = nioBuf;
} else {
nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,
readableBytes, alloc, nioBuffers, nioBufferCount);
ByteBuffer[] nioBufs = entry.buffers;
if (nioBufs == null) {
// cached ByteBuffers as they may be expensive to create in terms
// of Object allocation
entry.buffers = nioBufs = buf.nioBuffers();
}
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
}
} else {
nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,
readableBytes, alloc, nioBuffers, nioBufferCount);
}
}
}

i = i + 1 & mask;
}
this.nioBufferCount = nioBufferCount;
Expand Down

0 comments on commit b49490e

Please sign in to comment.