Skip to content

Commit

Permalink
Fixed typo in ConsumerBuilder acknowledgmentGroupTime option (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Apr 23, 2018
1 parent 2ac2b5c commit 9947f74
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

Expand Down Expand Up @@ -1472,7 +1472,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

Expand Down Expand Up @@ -1543,7 +1543,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

List<String> topics = admin.persistentTopics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

Expand Down Expand Up @@ -1492,7 +1492,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

Expand Down Expand Up @@ -1563,7 +1563,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

List<String> topics = admin.persistentTopics().getList("prop-xyz/use/ns1");
assertEquals(topics.size(), 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
SubscriptionStats subStats;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
Expand Down Expand Up @@ -219,7 +219,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
SubscriptionStats subStats;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener();
TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);


// 1. two consumers on the same subscription
ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1")
.consumerEventListener(listener1).acknowledmentGroupTime(0, TimeUnit.SECONDS);
.consumerEventListener(listener1).acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public void testUnackedCountWithRedeliveries() throws Exception {

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(10).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS);
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe();

for (int i = 0; i < numMsgs; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void testFailoverSingleAckedNormalTopic() throws Exception {
// 2. Create consumer
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover)
.acknowledmentGroupTime(0, TimeUnit.SECONDS);
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw
final int totalProducedMsgs = 500;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriberName)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
.enableBatching(false)
Expand Down Expand Up @@ -703,15 +703,15 @@ public void testBlockBrokerDispatching() throws Exception {

ConsumerImpl<byte[]> consumer1Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// create subscription-2 and 3
ConsumerImpl<byte[]> consumer1Sub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
consumer1Sub2.close();
ConsumerImpl<byte[]> consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
consumer1Sub3.close();

Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down Expand Up @@ -751,7 +751,7 @@ public void testBlockBrokerDispatching() throws Exception {
// (1.b) consumer2 with same sub should not receive any more messages as subscription is blocked
ConsumerImpl<byte[]> consumer2Sub1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName1).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumer2Msgs = 0;
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS);
Expand All @@ -776,7 +776,7 @@ public void testBlockBrokerDispatching() throws Exception {
**/
ConsumerImpl<byte[]> consumerSub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Set<MessageId> messages2 = Sets.newHashSet();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS);
Expand All @@ -793,7 +793,7 @@ public void testBlockBrokerDispatching() throws Exception {
/** (3) if Subscription3 is acking then it shouldn't be blocked **/
consumer1Sub3 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriberName3).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumedMsgsSub3 = 0;
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception {
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic1").subscriptionName("my-subscriber-name")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS);
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();

Expand Down Expand Up @@ -1143,7 +1143,7 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic").create();
Expand Down Expand Up @@ -1307,7 +1307,7 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/unacked-topic")
Expand Down Expand Up @@ -1885,7 +1885,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic2").subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(queueSize)
.acknowledmentGroupTime(0, TimeUnit.SECONDS);
.acknowledgmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> c1 = consumerBuilder.subscribe();
Consumer<byte[]> c2 = consumerBuilder.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2")
Expand Down Expand Up @@ -1988,7 +1988,7 @@ public void testRedeliveryFailOverConsumer() throws Exception {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/unacked-topic").subscriptionName("subscriber-1")
.receiverQueueSize(receiverQueueSize).subscriptionType(SubscriptionType.Failover)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic")
.create();
Expand Down Expand Up @@ -2284,7 +2284,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

// 1. Invalid key name
try {
Expand Down Expand Up @@ -2318,7 +2318,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
consumer.close();
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

int msgNum = 0;
try {
Expand All @@ -2339,7 +2339,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
// Set keyreader
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
.cryptoKeyReader(new EncKeyReader()).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

for (int i = msgNum; i < totalMsg - 1; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
Expand All @@ -2356,7 +2356,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
consumer.close();
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

// Receive should proceed and discard encrypted messages
msg = consumer.receive(5, TimeUnit.SECONDS);
Expand All @@ -2382,12 +2382,12 @@ public void testConsumerSubscriptionInitialize() throws Exception {

// 2, create consumer
Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-default").subscribe();
Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-latest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionName("test-subscription-earliest")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

// 3, produce 5 messages more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void testTopicInternalStats() throws Exception {
final String topicName = "persistent://my-property/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
final int numberOfMsgs = 1000;
for (int i = 0; i < numberOfMsgs; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Pr
}

if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
consumerBuilder.acknowledmentGroupTime(
consumerBuilder.acknowledgmentGroupTime(
Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
}

Expand Down
Loading

0 comments on commit 9947f74

Please sign in to comment.