Skip to content

Commit

Permalink
Fix bug that beforeConsume() of interceptor is not called when receiv…
Browse files Browse the repository at this point in the history
…er queue size is 0 (apache#5777)
  • Loading branch information
Masahiro Sakamoto authored and codelipenghui committed Dec 2, 2019
1 parent f0df253 commit 9b415ce
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
Expand All @@ -31,14 +38,9 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class InterceptorsTest extends ProducerConsumerBase {

private static final Logger log = LoggerFactory.getLogger(InterceptorsTest.class);
Expand All @@ -56,6 +58,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "receiverQueueSize")
public Object[][] getReceiverQueueSize() {
return new Object[][] { { 0 }, { 1000 } };
}

@Test
public void testProducerInterceptor() throws PulsarClientException {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
Expand Down Expand Up @@ -256,8 +263,8 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer2.close();
}

@Test
public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
@Test(dataProvider = "receiverQueueSize")
public void testConsumerInterceptorWithSingleTopicSubscribe(Integer receiverQueueSize) throws Exception {
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {
Expand Down Expand Up @@ -297,14 +304,16 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription")
.receiverQueueSize(receiverQueueSize)
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.enableBatching(false)
.create();

// Receive a message synchronously
producer.newMessage().value("Hello Pulsar!").send();

Message<String> received = consumer.receive();
MessageImpl<String> msg = (MessageImpl<String>) received;
boolean haveKey = false;
Expand All @@ -315,6 +324,50 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
}
Assert.assertTrue(haveKey);
consumer.acknowledge(received);

// Receive a message asynchronously
producer.newMessage().value("Hello Pulsar!").send();
received = consumer.receiveAsync().get();
msg = (MessageImpl<String>) received;
haveKey = false;
for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
haveKey = true;
}
}
Assert.assertTrue(haveKey);
consumer.acknowledge(received);
consumer.close();

final CompletableFuture<Message<String>> future = new CompletableFuture<>();
consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription")
.receiverQueueSize(receiverQueueSize)
.messageListener((c, m) -> {
try {
c.acknowledge(m);
} catch (Exception e) {
Assert.fail("Failed to acknowledge", e);
}
future.complete(m);
})
.subscribe();

// Receive a message using the message listener
producer.newMessage().value("Hello Pulsar!").send();
received = future.get();
msg = (MessageImpl<String>) received;
haveKey = false;
for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
haveKey = true;
}
}
Assert.assertTrue(haveKey);

producer.close();
consumer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf
protected Message<T> internalReceive() throws PulsarClientException {
zeroQueueLock.lock();
try {
return fetchSingleMessageFromBroker();
return beforeConsume(fetchSingleMessageFromBroker());
} finally {
zeroQueueLock.unlock();
}
Expand Down Expand Up @@ -155,7 +155,7 @@ private void triggerZeroQueueSizeListener(final Message<T> message) {
log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription,
message.getMessageId());
}
listener.received(ZeroQueueConsumerImpl.this, message);
listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message));
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,
message.getMessageId(), t);
Expand Down

0 comments on commit 9b415ce

Please sign in to comment.