Skip to content

Commit

Permalink
Reduce unnecessary track message calls. (apache#4595)
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui authored Jun 26, 2019
1 parent 946b9af commit 553f0fd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.ConsumerImpl;
Expand All @@ -29,6 +33,7 @@

import com.google.common.collect.Sets;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -123,4 +128,53 @@ public void testOrderedRedelivery() throws Exception {
consumer2.close();
}

@Test
public void testUnAckMessageRedeliveryWithReceiveAsync() throws PulsarClientException, ExecutionException, InterruptedException {
String topic = "persistent://my-property/my-ns/async-unack-redelivery";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(5)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.create();

final int messages = 10;
List<CompletableFuture<Message<String>>> futures = new ArrayList<>(10);
for (int i = 0; i < messages; i++) {
futures.add(consumer.receiveAsync());
}

for (int i = 0; i < messages; i++) {
producer.sendAsync("my-message-" + i);
}

int messageReceived = 0;
for (CompletableFuture<Message<String>> future : futures) {
Message<String> message = future.get();
assertNotNull(message);
messageReceived++;
// Don't ack message, wait for ack timeout.
}

assertEquals(10, messageReceived);

for (int i = 0; i < messages; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
messageReceived++;
consumer.acknowledge(message);
}

assertEquals(20, messageReceived);

producer.close();
consumer.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,8 @@ protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
return interceptMsg;
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
Expand All @@ -341,10 +339,8 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
}

if (message != null) {
trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
result.complete(interceptMsg);
messageProcessed(message);
result.complete(beforeConsume(message));
}

return result;
Expand All @@ -355,12 +351,11 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
if (interceptMsg != null) {
messageProcessed(interceptMsg);
if (message == null) {
return null;
}
return interceptMsg;
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
State state = getState();
if (state != State.Closing && state != State.Closed) {
Expand Down Expand Up @@ -821,7 +816,6 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
}
if (!pendingReceives.isEmpty()) {
trackMessage(message);
notifyPendingReceivedCallback(message, null);
} else if (canEnqueueMessage(message)) {
incomingMessages.add(message);
Expand Down Expand Up @@ -1044,19 +1038,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
increaseAvailablePermits(currentCnx);
stats.updateNumMsgsReceived(msg);

if (conf.getAckTimeoutMillis() != 0) {
// reset timer for messages that are received by the client
MessageIdImpl id = (MessageIdImpl) msg.getMessageId();
if (id instanceof BatchMessageIdImpl) {
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
}
if (partitionIndex != -1) {
// we should no longer track this message, TopicsConsumer will take care from now onwards
unAckedMessageTracker.remove(id);
} else {
unAckedMessageTracker.add(id);
}
}
trackMessage(msg);
}

protected void trackMessage(Message<?> msg) {
Expand All @@ -1068,7 +1050,12 @@ protected void trackMessage(Message<?> msg) {
// do not add each item in batch message into tracker
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
}
unAckedMessageTracker.add(id);
if (partitionIndex != -1) {
// we should no longer track this message, TopicsConsumer will take care from now onwards
unAckedMessageTracker.remove(id);
} else {
unAckedMessageTracker.add(id);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
try {
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
unAckedMessageTracker.add(topicMessage.getMessageId());

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
Expand All @@ -270,6 +269,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
if (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
unAckedMessageTracker.add(topicMessage.getMessageId());
listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
} else {
// Enqueue the message so that it can be retrieved when application calls receive()
Expand Down

0 comments on commit 553f0fd

Please sign in to comment.