Skip to content

Commit

Permalink
PIP-75: Perform serialization/deserialization with LightProto (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jan 6, 2021
1 parent c2a4e66 commit c12765a
Show file tree
Hide file tree
Showing 209 changed files with 3,037 additions and 55,747 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;

/**
* A ManagedLedger it's a superset of a BookKeeper ledger concept.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds
/**
* Managed-ledger can setup different custom EnsemblePlacementPolicy (eg: affinity to write ledgers to only setup of
* group of bookies).
*
*
* @return
*/
public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPolicyClassName() {
Expand All @@ -595,7 +595,7 @@ public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPo

/**
* Returns EnsemblePlacementPolicy configured for the Managed-ledger.
*
*
* @param bookKeeperEnsemblePlacementPolicyClassName
*/
public void setBookKeeperEnsemblePlacementPolicyClassName(
Expand All @@ -605,7 +605,7 @@ public void setBookKeeperEnsemblePlacementPolicyClassName(

/**
* Returns properties required by configured bookKeeperEnsemblePlacementPolicy.
*
*
* @return
*/
public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
Expand All @@ -615,7 +615,7 @@ public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
/**
* Managed-ledger can setup different custom EnsemblePlacementPolicy which needs
* bookKeeperEnsemblePlacementPolicy-Properties.
*
*
* @param bookKeeperEnsemblePlacementPolicyProperties
*/
public void setBookKeeperEnsemblePlacementPolicyProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonDurableCursorImpl extends ManagedCursorImpl {

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
super(bookkeeper, config, ledger, cursorName);

// Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.testng.annotations.Test;

public class ManagedCursorContainerTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.testng.annotations.Test;

public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -3070,44 +3070,44 @@ public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedExc
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(cursor.getNumberOfEntries(), totalEntries);
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(2).setEnd(4).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(2).setEnd(4)));
List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(2, deletedIndexes.get(0).getStart());
Assert.assertEquals(4, deletedIndexes.get(0).getEnd());

deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(8).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(3).setEnd(8)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(1, deletedIndexes.size());
Assert.assertEquals(2, deletedIndexes.get(0).getStart());
Assert.assertEquals(8, deletedIndexes.get(0).getEnd());

deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(0).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(0)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertEquals(2, deletedIndexes.size());
Assert.assertEquals(0, deletedIndexes.get(0).getStart());
Assert.assertEquals(0, deletedIndexes.get(0).getEnd());
Assert.assertEquals(2, deletedIndexes.get(1).getStart());
Assert.assertEquals(8, deletedIndexes.get(1).getEnd());

deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(1).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(9).setEnd(9).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(1).setEnd(1)));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(9).setEnd(9)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
Assert.assertNull(deletedIndexes);
Assert.assertEquals(positions[0], cursor.getMarkDeletedPosition());

deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(5)));
cursor.delete(positions[1]);
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(6).setEnd(8).build()));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(6).setEnd(8)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[1]), 10);
Assert.assertNull(deletedIndexes);

deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(5)));
cursor.markDelete(positions[3]);
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[2]), 10);
Assert.assertNull(deletedIndexes);

deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build()));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(5)));
cursor.resetCursor(positions[0]);
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[3]), 10);
Assert.assertNull(deletedIndexes);
Expand All @@ -3125,12 +3125,12 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(cursor.getNumberOfEntries(), totalEntries);
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build()));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(new IntRange().setStart(3).setEnd(6)));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));

ledger = factory.open("test_batch_indexes_deletion_persistent");
cursor = ledger.openCursor("c1");
Expand All @@ -3139,7 +3139,7 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
Assert.assertEquals(deletedIndexes.get(0).getStart(), 3);
Assert.assertEquals(deletedIndexes.get(0).getEnd(), 6);
Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[4]);
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
Assert.assertNull(deletedIndexes);
Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]);
Expand Down Expand Up @@ -3179,13 +3179,12 @@ private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize)
List<IntRange> result = new ArrayList<>();
BitSet bitSet = BitSet.valueOf(bitSetLongArray);
int nextClearBit = bitSet.nextClearBit(0);
IntRange.Builder builder = IntRange.newBuilder();
while (nextClearBit != -1 && nextClearBit <= batchSize) {
int nextSetBit = bitSet.nextSetBit(nextClearBit);
if (nextSetBit == -1) {
break;
}
result.add(builder.setStart(nextClearBit).setEnd(nextSetBit - 1).build());
result.add(new IntRange().setStart(nextClearBit).setEnd(nextSetBit - 1));
nextClearBit = bitSet.nextClearBit(nextSetBit);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
Expand Down
15 changes: 1 addition & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ flexible messaging model and an intuitive client API.</description>
<testRetryCount>1</testRetryCount>
<docker.organization>apachepulsar</docker.organization>

<!-- pin the protobuf-shaded version to make the pulsar build friendly to intellij -->
<pulsar.protobuf.shaded.version>2.1.0-incubating</pulsar.protobuf.shaded.version>

<!-- apache commons -->
<commons-compress.version>1.19</commons-compress.version>

Expand All @@ -121,7 +118,6 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
<protobuf2.version>2.4.1</protobuf2.version>
<protobuf3.version>3.11.4</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.18.0</grpc.version>
Expand Down Expand Up @@ -226,7 +222,7 @@ flexible messaging model and an intuitive client API.</description>
<errorprone.version>2.4.0</errorprone.version>
<errorprone.javac.version>9+181-r4173-1</errorprone.javac.version>
<errorprone-slf4j.version>0.1.4</errorprone-slf4j.version>

<lightproto-maven-plugin.version>0.2</lightproto-maven-plugin.version>

<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
Expand Down Expand Up @@ -1246,10 +1242,6 @@ flexible messaging model and an intuitive client API.</description>
<!-- These files are generated automatically by the Protobuf compiler
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>bin/proto/MLDataFormats_pb2.py</exclude>

Expand All @@ -1267,10 +1259,6 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>

<!-- These files are BSD licensed files -->
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java</exclude>

<!-- Imported from Netty - Apache License v2 -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/util/AbstractCASReferenceCounted.java</exclude>

Expand Down Expand Up @@ -1871,4 +1859,3 @@ flexible messaging model and an intuitive client API.</description>
</repositories>

</project>

76 changes: 0 additions & 76 deletions protobuf-shaded/pom.xml

This file was deleted.

Loading

0 comments on commit c12765a

Please sign in to comment.