Skip to content

Commit

Permalink
Use thread local to allocate temp byte[] instead of allocator (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Nov 3, 2017
1 parent 7c35ce4 commit d522c45
Showing 3 changed files with 19 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -71,10 +71,6 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.RecyclableDuplicateByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledHeapByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

public class Commands {

@@ -824,32 +820,6 @@ private static int getCurrentProtocolVersion() {
return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
}

public static final class RecyclableHeapByteBuf extends UnpooledHeapByteBuf {
private static final Recycler<RecyclableHeapByteBuf> RECYCLER = new Recycler<RecyclableHeapByteBuf>() {
@Override
protected RecyclableHeapByteBuf newObject(Handle handle) {
return new RecyclableHeapByteBuf(handle);
}
};

private final Handle handle;

private RecyclableHeapByteBuf(Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 4096, PulsarDecoder.MaxMessageSize);
this.handle = handle;
}

public static RecyclableHeapByteBuf get() {
return RECYCLER.get();

}

public void recycle() {
clear();
RECYCLER.recycle(this, handle);
}
}

public static enum ChecksumType {
Crc32c,
None;
Original file line number Diff line number Diff line change
@@ -55,8 +55,6 @@
import java.io.IOException;
import java.nio.ByteOrder;

import org.apache.pulsar.common.api.Commands.RecyclableHeapByteBuf;

import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistryLite;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -65,6 +63,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.FastThreadLocal;

public class ByteBufCodedInputStream {
public static interface ByteBufMessageBuilder {
@@ -145,20 +144,22 @@ public void readMessage(final ByteBufMessageBuilder builder, final ExtensionRegi
buf.writerIndex(writerIdx);
}

private static final FastThreadLocal<byte[]> localByteArray = new FastThreadLocal<>();

/** Read a {@code bytes} field value from the stream. */
public ByteString readBytes() throws IOException {
final int size = readRawVarint32();
if (size == 0) {
return ByteString.EMPTY;
} else {
RecyclableHeapByteBuf heapBuf = RecyclableHeapByteBuf.get();
if (size > heapBuf.writableBytes()) {
heapBuf.capacity(size);
byte[] localBuf = localByteArray.get();
if (localBuf == null || localBuf.length < size) {
localBuf = new byte[Math.max(size, 1024)];
localByteArray.set(localBuf);
}

heapBuf.writeBytes(buf, size);
ByteString res = ByteString.copyFrom(heapBuf.array(), heapBuf.arrayOffset(), heapBuf.readableBytes());
heapBuf.recycle();
buf.readBytes(localBuf, 0, size);
ByteString res = ByteString.copyFrom(localBuf, 0, size);
return res;
}
}
Original file line number Diff line number Diff line change
@@ -55,14 +55,13 @@
import java.io.IOException;
import java.nio.ByteOrder;

import org.apache.pulsar.common.api.Commands.RecyclableHeapByteBuf;

import com.google.protobuf.ByteString;
import com.google.protobuf.WireFormat;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.FastThreadLocal;

public class ByteBufCodedOutputStream {
public static interface ByteBufGeneratedMessage {
@@ -183,17 +182,19 @@ public void writeBytesNoTag(final ByteString value) throws IOException {
writeRawBytes(value);
}


private static final FastThreadLocal<byte[]> localByteArray = new FastThreadLocal<>();

/** Write a byte string. */
public void writeRawBytes(final ByteString value) throws IOException {
RecyclableHeapByteBuf heapBuf = RecyclableHeapByteBuf.get();
if (value.size() > heapBuf.writableBytes()) {
heapBuf.capacity(value.size());
byte[] localBuf = localByteArray.get();
if (localBuf == null || localBuf.length < value.size()) {
localBuf = new byte[Math.max(value.size(), 1024)];
localByteArray.set(localBuf);
}

value.copyTo(heapBuf.array(), 0);
heapBuf.writerIndex(value.size());
buf.writeBytes(heapBuf);
heapBuf.recycle();
value.copyTo(localBuf, 0);
buf.writeBytes(localBuf, 0, value.size());
}

public void writeEnum(final int fieldNumber, final int value) throws IOException {

0 comments on commit d522c45

Please sign in to comment.