Skip to content

Commit

Permalink
Ack response implementation (apache#8996)
Browse files Browse the repository at this point in the history
## Motivation
in order to handle ack response implementation. When this PR commit, I will handle apache#8997.

## implement
1. we implement a new PersistentAcknowledgmentsWithResponseGroupingTracker.
2. we will add two ackRequests struct for async and sync flush.
3. add a timer to handle timeout and the timeout task don't need to lock, because the timeout is sequential.
### Verifying this change
Add the tests for it
  • Loading branch information
congbobo184 authored Jan 21, 2021
1 parent 9d92f68 commit 93dac25
Show file tree
Hide file tree
Showing 21 changed files with 724 additions and 598 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ public void testAckCommand() throws Exception {
PositionImpl pos = new PositionImpl(0, 0);

clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
null, Collections.emptyMap());
null, Collections.emptyMap(), -1);
channel.writeInbound(clientCommand);

// verify nothing is sent out on the wire after ack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
Expand All @@ -46,15 +47,20 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testBatchListAck() throws Exception {
ackListMessage(true,true);
ackListMessage(true,false);
ackListMessage(false,false);
ackListMessage(false,true);
@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

public void ackListMessage(boolean isBatch, boolean isPartitioned) throws Exception {
@Test(timeOut = 30000, dataProvider = "ackReceiptEnabled")
public void testBatchListAck(boolean ackReceiptEnabled) throws Exception {
ackListMessage(true,true, ackReceiptEnabled);
ackListMessage(true,false, ackReceiptEnabled);
ackListMessage(false,false, ackReceiptEnabled);
ackListMessage(false,true, ackReceiptEnabled);
}

public void ackListMessage(boolean isBatch, boolean isPartitioned, boolean ackReceiptEnabled) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID();
final String subName = "testBatchAck-sub" + UUID.randomUUID();
final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
Expand All @@ -72,6 +78,8 @@ public void ackListMessage(boolean isBatch, boolean isPartitioned) throws Except
.topic(topic)
.negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(ackReceiptEnabled)
.isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
sendMessagesAsyncAndWait(producer, messageNum);
List<MessageId> messages = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.UUID;
Expand All @@ -43,12 +44,19 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testAllTimerTaskShouldCanceledAfterConsumerClosed() throws PulsarClientException, InterruptedException {
@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "ackReceiptEnabled")
public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean ackReceiptEnabled)
throws PulsarClientException, InterruptedException {
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.subscriptionName("test")
.isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
consumer.close();
Thread.sleep(2000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@

import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;
Expand All @@ -54,6 +56,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

/**
* It verifies that redelivered messages are sorted based on the ledger-ids.
* <pre>
Expand All @@ -64,8 +71,8 @@ protected void cleanup() throws Exception {
* </pre>
* @throws Exception
*/
@Test
public void testOrderedRedelivery() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testOrderedRedelivery(boolean ackReceiptEnabled) throws Exception {
String topic = "persistent://my-property/my-ns/redelivery-" + System.currentTimeMillis();

conf.setManagedLedgerMaxEntriesPerLedger(2);
Expand All @@ -77,7 +84,8 @@ public void testOrderedRedelivery() throws Exception {
.producerName("my-producer-name")
.create();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
.subscriptionType(SubscriptionType.Shared);
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe();

final int totalMsgs = 100;
Expand Down Expand Up @@ -130,12 +138,14 @@ public void testOrderedRedelivery() throws Exception {
}
}

@Test
public void testUnAckMessageRedeliveryWithReceiveAsync() throws PulsarClientException, ExecutionException, InterruptedException {
@Test(dataProvider = "ackReceiptEnabled")
public void testUnAckMessageRedeliveryWithReceiveAsync(boolean ackReceiptEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
String topic = "persistent://my-property/my-ns/async-unack-redelivery";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
.isAckReceiptEnabled(ackReceiptEnabled)
.enableBatchIndexAcknowledgment(ackReceiptEnabled)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();

Expand Down Expand Up @@ -165,7 +175,6 @@ public void testUnAckMessageRedeliveryWithReceiveAsync() throws PulsarClientExce
}

assertEquals(10, messageReceived);

for (int i = 0; i < messages; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public static Object[][] variationsForExpectedPos() {
};
}

@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
Expand Down Expand Up @@ -269,6 +274,11 @@ public long millis() {
}
}

@DataProvider(name = "batchAndAckReceipt")
public Object[][] codecProviderWithAckReceipt() {
return new Object[][] { { 0, true}, { 1000, false }, { 0, true }, { 1000, false }};
}

@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
Expand Down Expand Up @@ -311,10 +321,11 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exceptio
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "batch")
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Exception {
@Test(dataProvider = "batchAndAckReceipt")
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic2")
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionName("my-subscriber-name").subscribe();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
Expand Down Expand Up @@ -1033,8 +1044,8 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testDeactivatingBacklogConsumer() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testDeactivatingBacklogConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

final long batchMessageDelayMs = 100;
Expand All @@ -1047,10 +1058,12 @@ public void testDeactivatingBacklogConsumer() throws Exception {
// 1. Subscriber Faster subscriber: let it consume all messages immediately
Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub1)
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();
// 1.b. Subscriber Slow subscriber:
Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/" + topicName).subscriptionName(sub2)
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(receiverSize).subscribe();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic);
Expand All @@ -1077,7 +1090,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
// 3. Consume messages: at Faster subscriber
for (int i = 0; i < totalMsgs; i++) {
msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
subscriber1.acknowledge(msg);
subscriber1.acknowledgeAsync(msg);
}

// wait : so message can be eligible to to be evict from cache
Expand All @@ -1096,7 +1109,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
// 6. consume messages : at slower subscriber
for (int i = 0; i < totalMsgs; i++) {
msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
subscriber2.acknowledge(msg);
subscriber2.acknowledgeAsync(msg);
}

topicRef.checkBackloggedCursors();
Expand Down Expand Up @@ -1258,13 +1271,14 @@ public void testSendCallBack() throws Exception {
*
* @throws Exception
*/
@Test(timeOut = 30000)
public void testSharedConsumerAckDifferentConsumer() throws Exception {
@Test(dataProvider = "ackReceiptEnabled", timeOut = 30000)
public void testSharedConsumerAckDifferentConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Expand Down Expand Up @@ -1355,8 +1369,8 @@ private void receiveAsync(Consumer<byte[]> consumer, int totalMessage, int curre
*
* @throws Exception
*/
@Test
public void testConsumerBlockingWithUnAckedMessages() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testConsumerBlockingWithUnAckedMessages(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
Expand All @@ -1368,6 +1382,7 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -1395,13 +1410,7 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
assertEquals(messages.size(), unAckedMessagesBufferSize);

// start acknowledging messages
messages.forEach(m -> {
try {
consumer.acknowledge(m);
} catch (PulsarClientException e) {
fail("ack failed", e);
}
});
messages.forEach(consumer::acknowledgeAsync);

// try to consume remaining messages
int remainingMessages = totalProducedMsgs - messages.size();
Expand Down Expand Up @@ -1434,8 +1443,8 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
*
* @throws Exception
*/
@Test
public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
Expand All @@ -1450,6 +1459,7 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -1508,8 +1518,8 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
*
* @throws Exception
*/
@Test
public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testMutlipleSharedConsumerBlockingWithUnAckedMessages(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
Expand All @@ -1522,11 +1532,13 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -1663,8 +1675,8 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
}
}

@Test
public void testUnackBlockRedeliverMessages() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testUnackBlockRedeliverMessages(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
Expand All @@ -1677,6 +1689,7 @@ public void testUnackBlockRedeliverMessages() throws Exception {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -1730,8 +1743,8 @@ public void testUnackBlockRedeliverMessages() throws Exception {
}
}

@Test(dataProvider = "batch")
public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
@Test(dataProvider = "batchAndAckReceipt")
public void testUnackedBlockAtBatch(int batchMessageDelayMs, boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
Expand All @@ -1744,6 +1757,7 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnackedMessages);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.isAckReceiptEnabled(ackReceiptEnabled)
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared).subscribe();

ProducerBuilder<byte[]> producerBuidler = pulsarClient.newProducer()
Expand Down Expand Up @@ -1800,7 +1814,7 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
if (msg != null) {
messages.add(msg);
totalReceiveMessages++;
consumer1.acknowledge(msg);
consumer1.acknowledgeAsync(msg);
log.info("Received message: " + new String(msg.getData()));
} else {
break;
Expand Down Expand Up @@ -2326,8 +2340,8 @@ public void testSharedSamePriorityConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testRedeliveryFailOverConsumer() throws Exception {
@Test(dataProvider = "ackReceiptEnabled")
public void testRedeliveryFailOverConsumer(boolean ackReceiptEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

final int receiverQueueSize = 10;
Expand All @@ -2336,6 +2350,7 @@ public void testRedeliveryFailOverConsumer() throws Exception {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
Expand Down
Loading

0 comments on commit 93dac25

Please sign in to comment.