Skip to content

Commit

Permalink
Netty 4.1 (apache#689)
Browse files Browse the repository at this point in the history
* Upgrade to Netty-4.1

* Updated Recycler usages

* Converted usages of ByteBuf.order()

* Upgrade AsyncHttpClient

* Upgraded BK to 4.3.1.82-yahoo

* Fixed method name that changed in async-http-client-2.1

* Upgrade to Guava 20.0 to match same BK version

* Converted usages of templatized Recycler in protobuf generated code

* Fixed capacity in DoubleByteBuf

* Fixed usages of Objects.toStringHelper

* Use same netty version as BK

* Fixed kafka compat tests

* Include AsyncHttpClient default properties file in shaded jar

* Added ref count check in DoubleByteBufTest
  • Loading branch information
merlimat authored Nov 29, 2017
1 parent 4279306 commit 75de0a9
Show file tree
Hide file tree
Showing 54 changed files with 684 additions and 1,208 deletions.
58 changes: 32 additions & 26 deletions build/docker/protobuf.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/src/google/protobuf/compiler/java/java_message.cc b/src/google/protobuf/compiler/java/java_message.cc
index 4c087db..cd18e28 100644
index 4c087db5..658b9181 100644
--- a/src/google/protobuf/compiler/java/java_message.cc
+++ b/src/google/protobuf/compiler/java/java_message.cc
@@ -303,14 +303,14 @@ void MessageGenerator::Generate(io::Printer* printer) {
Expand All @@ -19,7 +19,7 @@ index 4c087db..cd18e28 100644
"static", is_own_file ? "" : "static",
"classname", descriptor_->name());
}
@@ -319,24 +319,56 @@ void MessageGenerator::Generate(io::Printer* printer) {
@@ -319,27 +319,61 @@ void MessageGenerator::Generate(io::Printer* printer) {
printer->Print(
"public $static$ final class $classname$ extends\n"
" com.google.protobuf.GeneratedMessage\n"
Expand Down Expand Up @@ -51,13 +51,13 @@ index 4c087db..cd18e28 100644
"// Use $classname$.newBuilder() to construct.\n"
- "private $classname$(Builder builder) {\n"
- " super(builder);\n"
+ "private io.netty.util.Recycler.Handle handle;\n"
+ "private $classname$(io.netty.util.Recycler.Handle handle) {\n"
+ "private final io.netty.util.Recycler.Handle<$classname$> handle;\n"
+ "private $classname$(io.netty.util.Recycler.Handle<$classname$> handle) {\n"
+ " this.handle = handle;\n"
"}\n"
+ "\n"
+ "\n"
+ " private static final io.netty.util.Recycler<$classname$> RECYCLER = new io.netty.util.Recycler<$classname$>() {\n"
+ " protected $classname$ newObject(Handle handle) {\n"
+ " protected $classname$ newObject(Handle<$classname$> handle) {\n"
+ " return new $classname$(handle);\n"
+ " }\n"
+ " };\n"
Expand All @@ -74,13 +74,19 @@ index 4c087db..cd18e28 100644
+
+ printer->Print(
+ " this.memoizedSerializedSize = -1;\n"
+ " if (handle != null) { RECYCLER.recycle(this, handle); }\n"
+ " handle.recycle(this);\n"
+ " }\n"
+ " \n"
// Used when constructing the default instance, which cannot be initialized
// immediately because it may cyclically refer to other default instances.
"private $classname$(boolean noInit) {}\n"
@@ -365,13 +397,6 @@ void MessageGenerator::Generate(io::Printer* printer) {
- "private $classname$(boolean noInit) {}\n"
+ "private $classname$(boolean noInit) {\n"
+ " this.handle = null;\n"
+ "}\n"
"\n"
"private static final $classname$ defaultInstance;\n"
"public static $classname$ getDefaultInstance() {\n"
@@ -365,13 +399,6 @@ void MessageGenerator::Generate(io::Printer* printer) {
messageGenerator.Generate(printer);
}

Expand All @@ -94,7 +100,7 @@ index 4c087db..cd18e28 100644
for (int i = 0; i < totalInts; i++) {
printer->Print("private int $bit_field_name$;\n",
"bit_field_name", GetBitFieldName(i));
@@ -450,7 +475,12 @@ GenerateMessageSerializationMethods(io::Printer* printer) {
@@ -450,7 +477,12 @@ GenerateMessageSerializationMethods(io::Printer* printer) {
ExtensionRangeOrdering());

printer->Print(
Expand All @@ -103,12 +109,12 @@ index 4c087db..cd18e28 100644
+ " throws java.io.IOException {\n"
+ " throw new RuntimeException(\"Cannot use CodedOutputStream\");\n"
+ "}\n\n"
+
+
+ "public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)\n"
" throws java.io.IOException {\n");
printer->Indent();
// writeTo(CodedOutputStream output) might be invoked without
@@ -567,14 +597,16 @@ GenerateParseFromMethods(io::Printer* printer) {
@@ -567,14 +599,16 @@ GenerateParseFromMethods(io::Printer* printer) {
"public static $classname$ parseFrom(\n"
" com.google.protobuf.ByteString data)\n"
" throws com.google.protobuf.InvalidProtocolBufferException {\n"
Expand All @@ -128,7 +134,7 @@ index 4c087db..cd18e28 100644
"}\n"
"public static $classname$ parseFrom(byte[] data)\n"
" throws com.google.protobuf.InvalidProtocolBufferException {\n"
@@ -674,13 +706,13 @@ void MessageGenerator::GenerateBuilder(io::Printer* printer) {
@@ -674,13 +708,13 @@ void MessageGenerator::GenerateBuilder(io::Printer* printer) {
printer->Print(
"public static final class Builder extends\n"
" com.google.protobuf.GeneratedMessage.ExtendableBuilder<\n"
Expand All @@ -144,7 +150,7 @@ index 4c087db..cd18e28 100644
"classname", ClassName(descriptor_));
}
} else {
@@ -688,14 +720,14 @@ void MessageGenerator::GenerateBuilder(io::Printer* printer) {
@@ -688,14 +722,14 @@ void MessageGenerator::GenerateBuilder(io::Printer* printer) {
printer->Print(
"public static final class Builder extends\n"
" com.google.protobuf.GeneratedMessage.Builder<Builder>\n"
Expand All @@ -161,30 +167,30 @@ index 4c087db..cd18e28 100644
"classname", ClassName(descriptor_));
}
}
@@ -760,9 +792,21 @@ void MessageGenerator::GenerateDescriptorMethods(io::Printer* printer) {
@@ -760,9 +794,21 @@ void MessageGenerator::GenerateDescriptorMethods(io::Printer* printer) {
void MessageGenerator::GenerateCommonBuilderMethods(io::Printer* printer) {
printer->Print(
"// Construct using $classname$.newBuilder()\n"
- "private Builder() {\n"
+ "private final io.netty.util.Recycler.Handle handle;\n"
+ "private Builder(io.netty.util.Recycler.Handle handle) {\n"
+ "private final io.netty.util.Recycler.Handle<Builder> handle;\n"
+ "private Builder(io.netty.util.Recycler.Handle<Builder> handle) {\n"
+ " this.handle = handle;\n"
" maybeForceBuilderInitialization();\n"
"}\n"
+ "private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {\n"
+ " protected Builder newObject(io.netty.util.Recycler.Handle handle) {\n"
+ " protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {\n"
+ " return new Builder(handle);\n"
+ " }\n"
+ " };\n"
+ "\n"
+ " public void recycle() {\n"
+ " clear();\n"
+ " if (handle != null) {RECYCLER.recycle(this, handle);}\n"
+ " }\n"
+ " handle.recycle(this);\n"
+ " }\n"
"\n",
"classname", ClassName(descriptor_));

@@ -801,7 +845,8 @@ void MessageGenerator::GenerateCommonBuilderMethods(io::Printer* printer) {
@@ -801,7 +847,8 @@ void MessageGenerator::GenerateCommonBuilderMethods(io::Printer* printer) {

printer->Print(
"private static Builder create() {\n"
Expand All @@ -194,7 +200,7 @@ index 4c087db..cd18e28 100644
"}\n"
"\n"
"public Builder clear() {\n"
@@ -864,7 +909,8 @@ void MessageGenerator::GenerateCommonBuilderMethods(io::Printer* printer) {
@@ -864,7 +911,8 @@ void MessageGenerator::GenerateCommonBuilderMethods(io::Printer* printer) {
"}\n"
"\n"
"public $classname$ buildPartial() {\n"
Expand All @@ -204,7 +210,7 @@ index 4c087db..cd18e28 100644
"classname", ClassName(descriptor_));

printer->Indent();
@@ -973,8 +1019,13 @@ void MessageGenerator::GenerateBuilderParsingMethods(io::Printer* printer) {
@@ -973,8 +1021,13 @@ void MessageGenerator::GenerateBuilderParsingMethods(io::Printer* printer) {
SortFieldsByNumber(descriptor_));

printer->Print(
Expand All @@ -219,7 +225,7 @@ index 4c087db..cd18e28 100644
" com.google.protobuf.ExtensionRegistryLite extensionRegistry)\n"
" throws java.io.IOException {\n");
printer->Indent();
@@ -1017,7 +1068,7 @@ void MessageGenerator::GenerateBuilderParsingMethods(io::Printer* printer) {
@@ -1017,7 +1070,7 @@ void MessageGenerator::GenerateBuilderParsingMethods(io::Printer* printer) {
" $on_changed$\n"
" return this;\n"
"default: {\n"
Expand All @@ -229,7 +235,7 @@ index 4c087db..cd18e28 100644
" return this;\n" // it's an endgroup tag
" }\n"
diff --git a/src/google/protobuf/compiler/java/java_message_field.cc b/src/google/protobuf/compiler/java/java_message_field.cc
index 251945a..1212be9 100644
index 251945af..1212be9e 100644
--- a/src/google/protobuf/compiler/java/java_message_field.cc
+++ b/src/google/protobuf/compiler/java/java_message_field.cc
@@ -371,7 +371,8 @@ GenerateParsingCode(io::Printer* printer) const {
Expand All @@ -243,7 +249,7 @@ index 251945a..1212be9 100644

void MessageFieldGenerator::
diff --git a/src/google/protobuf/message.cc b/src/google/protobuf/message.cc
index 91e6878..cfee383 100644
index 91e6878e..cfee3832 100644
--- a/src/google/protobuf/message.cc
+++ b/src/google/protobuf/message.cc
@@ -32,6 +32,7 @@
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ComparisonChain;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.RecyclableDuplicateByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
Expand All @@ -35,12 +34,12 @@ final class EntryImpl extends AbstractReferenceCounted implements Entry, Compara

private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
protected EntryImpl newObject(Handle handle) {
protected EntryImpl newObject(Handle<EntryImpl> handle) {
return new EntryImpl(handle);
}
};

private final Handle recyclerHandle;
private final Handle<EntryImpl> recyclerHandle;
private long ledgerId;
private long entryId;
ByteBuf data;
Expand Down Expand Up @@ -89,12 +88,12 @@ public static EntryImpl create(EntryImpl other) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = other.ledgerId;
entry.entryId = other.entryId;
entry.data = RecyclableDuplicateByteBuf.create(other.data);
entry.data = other.data.retainedDuplicate();
entry.setRefCnt(1);
return entry;
}

private EntryImpl(Recycler.Handle recyclerHandle) {
private EntryImpl(Recycler.Handle<EntryImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

Expand Down Expand Up @@ -137,20 +136,25 @@ public long getLedgerId() {
public long getEntryId() {
return entryId;
}

@Override
public int compareTo(EntryImpl other) {
return ComparisonChain.start().compare(ledgerId, other.ledgerId).compare(entryId, other.entryId).result();
}

@Override
public ReferenceCounted touch(Object hint) {
return this;
}

@Override
protected void deallocate() {
// This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
data.release();
data = null;
ledgerId = -1;
entryId = -1;
RECYCLER.recycle(this, recyclerHandle);
recyclerHandle.recycle(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1633,7 +1633,7 @@ List<Entry> filterReadEntries(List<Entry> entries) {

@Override
public synchronized String toString() {
return Objects.toStringHelper(this).add("ledger", ledger.getName()).add("name", name)
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("name", name)
.add("ackPos", markDeletePosition).add("readPos", readPosition).toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Range;

public class NonDurableCursorImpl extends ManagedCursorImpl {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void asyncDeleteCursor(final String consumerName, final DeleteCursorCallb

@Override
public synchronized String toString() {
return Objects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
.add("readPos", readPosition).toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.RecyclableDuplicateByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

Expand Down Expand Up @@ -86,7 +85,7 @@ public void setCloseWhenDone(boolean closeWhenDone) {
}

public void initiate() {
ByteBuf duplicateBuffer = RecyclableDuplicateByteBuf.create(data);
ByteBuf duplicateBuffer = data.retainedDuplicate();
// duplicatedBuffer has refCnt=1 at this point

ledger.asyncAddEntry(duplicateBuffer, this, ctx);
Expand Down Expand Up @@ -197,14 +196,14 @@ private void updateLatency() {
ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

private final Handle recyclerHandle;
private final Handle<OpAddEntry> recyclerHandle;

private OpAddEntry(Handle recyclerHandle) {
private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<OpAddEntry> RECYCLER = new Recycler<OpAddEntry>() {
protected OpAddEntry newObject(Recycler.Handle recyclerHandle) {
protected OpAddEntry newObject(Recycler.Handle<OpAddEntry> recyclerHandle) {
return new OpAddEntry(recyclerHandle);
}
};
Expand All @@ -219,7 +218,7 @@ public void recycle() {
closeWhenDone = false;
entryId = -1;
startTime = -1;
RECYCLER.recycle(this, recyclerHandle);
recyclerHandle.recycle(this);
}

private static final Logger log = LoggerFactory.getLogger(OpAddEntry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ public boolean isSlowestReader() {
return cursor.ledger.getSlowestConsumer() == cursor;
}

private final Handle recyclerHandle;
private final Handle<OpReadEntry> recyclerHandle;

private OpReadEntry(Handle recyclerHandle) {
private OpReadEntry(Handle<OpReadEntry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<OpReadEntry> RECYCLER = new Recycler<OpReadEntry>() {
protected OpReadEntry newObject(Recycler.Handle recyclerHandle) {
protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) {
return new OpReadEntry(recyclerHandle);
}
};
Expand All @@ -161,7 +161,7 @@ public void recycle() {
ctx = null;
entries = null;
nextReadPosition = null;
RECYCLER.recycle(this, recyclerHandle);
recyclerHandle.recycle(this);
}

private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ protected void deallocate() {
// no-op
}

@Override
public ReferenceCounted touch(Object hint) {
return this;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof RefString) {
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ flexible messaging model and an intuitive client API.</description>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<bookkeeper.version>4.3.1.72-yahoo</bookkeeper.version>
<bookkeeper.version>4.3.1.82-yahoo</bookkeeper.version>
<zookeeper.version>3.4.10</zookeeper.version>
<netty.version>4.0.46.Final</netty.version>
<netty.version>4.1.12.Final</netty.version>
<storm.version>1.0.5</storm.version>
<jetty.version>9.3.11.v20160721</jetty.version>
<athenz.version>1.7.17</athenz.version>
Expand All @@ -125,7 +125,7 @@ flexible messaging model and an intuitive client API.</description>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.0.31</version>
<version>2.1.0-alpha26</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
Expand Down Expand Up @@ -286,7 +286,7 @@ flexible messaging model and an intuitive client API.</description>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
<version>20.0</version>
</dependency>

<dependency>
Expand Down
Loading

0 comments on commit 75de0a9

Please sign in to comment.