Skip to content

Commit

Permalink
Feature / Interceptor for negative ack redelivery (apache#3962)
Browse files Browse the repository at this point in the history
* Feature / Interceptor for negative ack redelivery

*Motivation*

In some scenarios is it helpful to be able to set interceptor for redeliveries
being happening due to negative acknowledge.

*Modifications*

  - Add onNegativeAcksSend() method in ConsumerInterceptor interface.
  - Add handler for onNegativeAcksSend() interceptor in ConsumerBase.
  - Favor forEach on ConsumerInterceptor instead of classic for loop by index.
  - Optimization for each by index to avoid compute size() every iteration.
  - Add call method to onNegativeAckRedelivery() from NegativeAcksTracker.

* Add test case for onNegativeAcksSend interceptor
  • Loading branch information
lovelle authored and merlimat committed Apr 22, 2019
1 parent 981983f commit 7890750
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class InterceptorsTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -138,6 +141,11 @@ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwa
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand Down Expand Up @@ -192,6 +200,11 @@ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwa
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
Expand Down Expand Up @@ -254,6 +267,11 @@ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwa
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
Expand Down Expand Up @@ -321,6 +339,11 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message
ackHolder.clear();
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand Down Expand Up @@ -356,4 +379,69 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message
producer.close();
consumer.close();
}

@Test(timeOut = 5000)
public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {

}

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {

}

@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {

}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
messageIds.forEach(messageId -> latch.countDown());
}
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.create();

for (int i = 0; i < totalNumOfMessages; i++) {
producer.send("Mock message");
}

for (int i = 0; i < totalNumOfMessages; i++) {
Message<String> message = consumer.receive();

if (i % 2 == 0) {
consumer.negativeAcknowledge(message);
} else {
consumer.acknowledge(message);
}
}

latch.await();
Assert.assertEquals(latch.getCount(), 0);

producer.close();
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;

import java.util.Set;

/**
* A plugin interface that allows you to intercept (and possibly mutate)
* messages received by the consumer.
Expand Down Expand Up @@ -94,4 +96,15 @@ public interface ConsumerInterceptor<T> extends AutoCloseable {
* @param exception the exception on acknowledge.
*/
void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception);

/**
*
* This method will be called when a redelivery from a negative acknowledge occurs.
*
* <p>Any exception thrown by this method will be ignored by the caller.
*
* @param consumer the consumer which contains the interceptor
* @param messageIds message to ack, null if acknowledge fail.
*/
void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception)
}
}

protected void onNegativeAcksSend(Set<MessageId> messageIds) {
if (interceptors != null) {
interceptors.onNegativeAcksSend(this, messageIds);
}
}

protected synchronized void incrRefCount() {
++refCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;

/**
* A container that hold the list {@link org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain
Expand Down Expand Up @@ -62,7 +63,7 @@ public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
*/
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
Message<T> interceptorMessage = message;
for (int i = 0; i < interceptors.size(); i++) {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage);
} catch (Exception e) {
Expand All @@ -88,7 +89,7 @@ public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
* @param exception exception returned by broker.
*/
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
for (int i = 0; i < interceptors.size(); i++) {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledge(consumer, messageId, exception);
} catch (Exception e) {
Expand All @@ -109,7 +110,7 @@ public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable e
* @param exception exception returned by broker.
*/
public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
for (int i = 0; i < interceptors.size(); i++) {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception);
} catch (Exception e) {
Expand All @@ -118,9 +119,30 @@ public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, T
}
}

/**
* This is called when a redelivery from a negative acknowledge occurs.
* <p>
* This method calls {@link ConsumerInterceptor#onNegativeAcksSend(Consumer, Set)
* onNegativeAcksSend(Consumer, Set&lt;MessageId&gt;)} method for each interceptor.
* <p>
* This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
*
* @param consumer the consumer which contains the interceptors.
* @param messageIds set of message IDs being redelivery due a negative acknowledge.
*/
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds) {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onNegativeAcksSend(consumer, messageIds);
} catch (Exception e) {
log.warn("Error executing interceptor onNegativeAcksSend callback", e);
}
}
}

@Override
public void close() throws IOException {
for (int i = 0; i < interceptors.size(); i++) {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private synchronized void triggerRedelivery(Timeout t) {
});

messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);

this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
Expand Down

0 comments on commit 7890750

Please sign in to comment.