Skip to content

Commit

Permalink
[pulsar-test] enable pooled netty allocator for unit-tests (apache#11753
Browse files Browse the repository at this point in the history
)

Fixes apache#11750

* [pulsar-test] enable pooled netty allocator for unit-tests

* Make TransactionMarkerDeleteTest work with pooled allocator

* Revisit fix for making TransactionMarkerDeleteTest work with pooled allocator

Co-authored-by: Lari Hotari <[email protected]>
  • Loading branch information
rdhabalia and lhotari authored Aug 24, 2021
1 parent e9292b3 commit 4b77a84
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine> -Xmx1G -XX:+UseG1GC
-Dpulsar.allocator.pooled=false
-Dpulsar.allocator.pooled=true
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
-Dio.netty.tryReflectionSetAccessible=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
Expand All @@ -55,7 +53,7 @@
import org.testng.collections.Sets;

@Test(groups = "broker")
public class TransactionMarkerDeleteTest extends BrokerTestBase{
public class TransactionMarkerDeleteTest extends BrokerTestBase {

@BeforeMethod
@Override
Expand Down Expand Up @@ -96,7 +94,6 @@ public void testMarkerDeleteTimes() throws Exception {
}



@Test
public void testMarkerDelete() throws Exception {

Expand All @@ -108,7 +105,7 @@ public void testMarkerDelete() throws Exception {
ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);

payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
msgMetadata, payload);
msgMetadata, payload);

ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
PersistentTopic topic = mock(PersistentTopic.class);
Expand All @@ -119,15 +116,16 @@ public void testMarkerDelete() throws Exception {
PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
managedLedger.openCursor("test"), false);

Position position1 = managedLedger.addEntry(payload.array());
Position markerPosition1 = managedLedger.addEntry(Markers
.newTxnCommitMarker(1, 1, 1).array());
byte[] payloadBytes = toBytes(payload);
Position position1 = managedLedger.addEntry(payloadBytes);
Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
.newTxnCommitMarker(1, 1, 1)));

Position position2 = managedLedger.addEntry(payload.array());
Position markerPosition2 = managedLedger.addEntry(Markers
.newTxnAbortMarker(1, 1, 1).array());
Position position2 = managedLedger.addEntry(payloadBytes);
Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
.newTxnAbortMarker(1, 1, 1)));

Position position3 = managedLedger.addEntry(payload.array());
Position position3 = managedLedger.addEntry(payloadBytes);

assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
Expand All @@ -150,14 +148,14 @@ public void testMarkerDelete() throws Exception {
.compareTo((PositionImpl) markerPosition2) == 0);

// add consequent marker
managedLedger.addEntry(Markers
.newTxnCommitMarker(1, 1, 1).array());
managedLedger.addEntry(toBytes(Markers
.newTxnCommitMarker(1, 1, 1)));

managedLedger.addEntry(Markers
.newTxnAbortMarker(1, 1, 1).array());
managedLedger.addEntry(toBytes(Markers
.newTxnAbortMarker(1, 1, 1)));

Position markerPosition3 = managedLedger.addEntry(Markers
.newTxnAbortMarker(1, 1, 1).array());
Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
.newTxnAbortMarker(1, 1, 1)));

// ack with transaction, then commit this transaction
persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
Expand All @@ -171,4 +169,11 @@ public void testMarkerDelete() throws Exception {
.compareTo((PositionImpl) markerPosition3) == 0);

}

static byte[] toBytes(ByteBuf byteBuf) {
byte[] buf = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buf);
byteBuf.release();
return buf;
}
}

0 comments on commit 4b77a84

Please sign in to comment.