Skip to content

Commit

Permalink
Make ChannelOutboundBuffer recycled
Browse files Browse the repository at this point in the history
  • Loading branch information
trustin committed Jul 18, 2013
1 parent 46ea0d4 commit 4cd7e62
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final long hashCode = ThreadLocalRandom.current().nextLong();
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(this);
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
Expand All @@ -62,6 +61,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile EventLoop eventLoop;
private volatile boolean registered;

private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(this);
private boolean inFlush0;

/** Cache for the string representation of this channel */
Expand Down Expand Up @@ -517,8 +517,11 @@ public void run() {
}

// fail all queued messages
ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer;
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.failUnflushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.recycle();
AbstractChannel.this.outboundBuffer = null;

if (wasActive && !isActive()) {
invokeLater(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand All @@ -34,7 +36,21 @@ public final class ChannelOutboundBuffer {

private static final int MIN_INITIAL_CAPACITY = 8;

private final AbstractChannel channel;
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override
protected ChannelOutboundBuffer newObject(Handle handle) {
return new ChannelOutboundBuffer(handle);
}
};

static ChannelOutboundBuffer newInstance(AbstractChannel channel) {
ChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}

private final Handle handle;
private AbstractChannel channel;

// Flushed messages are stored in a circulas queue.
private Object[] flushed;
Expand Down Expand Up @@ -63,12 +79,11 @@ public final class ChannelOutboundBuffer {
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int writable = 1;

ChannelOutboundBuffer(AbstractChannel channel) {
this(channel, MIN_INITIAL_CAPACITY << 1);
private ChannelOutboundBuffer(Handle handle) {
this(handle, MIN_INITIAL_CAPACITY << 1);
}

@SuppressWarnings("unchecked")
ChannelOutboundBuffer(AbstractChannel channel, int initialCapacity) {
private ChannelOutboundBuffer(Handle handle, int initialCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: >= 0)");
}
Expand All @@ -89,7 +104,7 @@ public final class ChannelOutboundBuffer {
initialCapacity = MIN_INITIAL_CAPACITY;
}

this.channel = channel;
this.handle = handle;

flushed = new Object[initialCapacity];
flushedPromises = new ChannelPromise[initialCapacity];
Expand All @@ -103,6 +118,20 @@ public final class ChannelOutboundBuffer {
unflushedTotals = new long[initialCapacity];
}

void recycle() {
if (head != tail) {
throw new IllegalStateException();
}
if (unflushedCount != 0) {
throw new IllegalStateException();
}
if (pendingOutboundBytes != 0) {
throw new IllegalStateException();
}

RECYCLER.recycle(this, handle);
}

void addMessage(Object msg, ChannelPromise promise) {
Object[] unflushed = this.unflushed;
int unflushedCount = this.unflushedCount;
Expand Down Expand Up @@ -276,6 +305,7 @@ public boolean remove() {
flushedPromises[head] = null;

decrementPendingOutboundBytes(flushedTotals[head]);
flushedTotals[head] = 0;

this.head = head + 1 & flushed.length - 1;
return true;
Expand All @@ -296,6 +326,7 @@ public boolean remove(Throwable cause) {
flushedPromises[head] = null;

decrementPendingOutboundBytes(flushedTotals[head]);
flushedTotals[head] = 0;

this.head = head + 1 & flushed.length - 1;
return true;
Expand Down Expand Up @@ -412,10 +443,14 @@ void failUnflushed(Throwable cause) {
try {
for (int i = 0; i < unflushedCount; i++) {
safeRelease(unflushed[i]);
unflushed[i] = null;
safeFail(unflushedPromises[i], cause);
unflushedPromises[i] = null;
decrementPendingOutboundBytes(unflushedTotals[i]);
unflushedTotals[i] = 0;
}
} finally {
this.unflushedCount = 0;
inFail = false;
}
}
Expand Down

0 comments on commit 4cd7e62

Please sign in to comment.