Skip to content

Commit

Permalink
Fixed increasing consumer permits after ack dedup operation (apache#3787
Browse files Browse the repository at this point in the history
)
  • Loading branch information
merlimat authored and massakam committed Mar 8, 2019
1 parent 32a244d commit ded732c
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.util.concurrent.TimeUnit;

import lombok.Cleanup;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ConsumerDedupPermitsUpdate extends ProducerConsumerBase {
@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
producerBaseSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "combinations")
public Object[][] combinations() {
return new Object[][] {
// batching-enabled - queue-size
{ false, 0 },
{ false, 1 },
{ false, 10 },
{ false, 100 },
{ true, 1 },
{ true, 10 },
{ true, 100 },
};
}

@Test(timeOut = 30000, dataProvider = "combinations")
public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) throws Exception {
String topic = "persistent://my-property/my-ns/my-topic-" + System.nanoTime();

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
// Use high ack delay to simulate a message being tracked as dup
.acknowledgmentGroupTime(1, TimeUnit.HOURS)
.receiverQueueSize(receiverQueueSize)
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(batchingEnabled)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();

for (int i = 0; i < 30; i++) {
producer.sendAsync("hello-" + i);
}
producer.flush();

// Consumer receives and acks all the messages, though the acks
// are still cached in client lib
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "hello-" + i);
consumer.acknowledge(msg);
}

// Trigger redelivery by unloading the topic.
admin.topics().unload(topic);

// Consumer dedup logic will detect the dups and not bubble them up to the application
// (With zero-queue we cannot use receive with timeout)
if (receiverQueueSize > 0) {
Message<String> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
assertNull(msg);
}

// The flow permits in consumer shouldn't have been messed up by the deduping
// and we should be able to get new messages through
for (int i = 0; i < 30; i++) {
producer.sendAsync("new-message-" + i);
}
producer.flush();

for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,8 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.subscribeFuture = subscribeFuture;
this.listener = conf.getMessageListener();
this.consumerEventListener = conf.getConsumerEventListener();
if (receiverQueueSize <= 1) {
this.incomingMessages = Queues.newArrayBlockingQueue(1);
} else {
this.incomingMessages = new GrowableArrayBlockingQueue<>();
}
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();

this.listenerExecutor = listenerExecutor;
this.pendingReceives = Queues.newConcurrentLinkedQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,18 +723,6 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
messageId.getEntryId());
}

MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, msgId);
}
if (conf.getReceiverQueueSize() == 0) {
increaseAvailablePermits(cnx);
}
return;
}

MessageMetadata msgMetadata = null;
ByteBuf payload = headersAndPayload;

Expand All @@ -751,6 +739,19 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
return;
}

final int numMessages = msgMetadata.getNumMessagesInBatch();

MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, msgId);
}

increaseAvailablePermits(cnx, numMessages);
return;
}

ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx);

boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
Expand All @@ -769,8 +770,6 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
return;
}

final int numMessages = msgMetadata.getNumMessagesInBatch();

// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
Expand Down

0 comments on commit ded732c

Please sign in to comment.