Skip to content

Commit

Permalink
Support add custom properties for the reconsumeLater interface (apach…
Browse files Browse the repository at this point in the history
…e#13461)

Master Issue: apache#13410
### Motivation
For retry messages in some cases, users need to  properties

### Modifications
This change can be supported on the client side, need to add a set of interfaces to org.apache.pulsar.client.api.Consumer

```java
void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit);
```
  • Loading branch information
liudezhi2098 authored Dec 28, 2021
1 parent b0bafb1 commit 96b3e92
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
Expand Down Expand Up @@ -120,7 +123,7 @@ public void testRetryTopic() throws Exception {
checkConsumer.close();
}

@Test
@Test(timeOut = 60000)
public void testRetryTopicProperties() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";

Expand Down Expand Up @@ -156,8 +159,6 @@ public void testRetryTopicProperties() throws Exception {
originMessageIds.add(msgId.toString());
}

producer.close();

int totalReceived = 0;
Set<String> retryMessageIds = Sets.newHashSet();
do {
Expand Down Expand Up @@ -195,7 +196,6 @@ public void testRetryTopicProperties() throws Exception {
assertEquals(deadLetterMessageIds, originMessageIds);

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
Expand All @@ -212,6 +212,20 @@ public void testRetryTopicProperties() throws Exception {
assertNull(checkMessage);

checkConsumer.close();

// check the custom properties
producer.send(String.format("Hello Pulsar [%d]", 1).getBytes());
Message<byte[]> message = consumer.receive();
Map<String, String> customProperties = new HashMap<String, String>();
customProperties.put("custom_key", "custom_value");
consumer.reconsumeLater(message, customProperties, 1, TimeUnit.SECONDS);
message = consumer.receive();
String value = message.getProperty("custom_key");
assertEquals(value, "custom_value");
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID),
message.getProperty(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID));
producer.close();
consumer.close();
}

//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -304,6 +305,41 @@ public interface Consumer<T> extends Closeable {
*/
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;

/**
* reconsumeLater the consumption of {@link Messages}.
*
*<p>When a message is "reconsumeLater" it will be marked for redelivery after
* some custom delay.
*
* <p>Example of usage:
* <pre><code>
* while (true) {
* Message&lt;String&gt; msg = consumer.receive();
*
* try {
* // Process message...
*
* consumer.acknowledge(msg);
* } catch (Throwable t) {
* log.warn("Failed to process message");
* consumer.reconsumeLater(msg, 1000 , TimeUnit.MILLISECONDS);
* }
* }
* </code></pre>
*
* @param message
* the {@code Message} to be reconsumeLater
* @param customProperties
* the custom properties to be reconsumeLater
* @param delayTime
* the amount of delay before the message will be delivered
* @param unit
* the time unit for the delay
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientException;

/**
* reconsumeLater the consumption of {@link Messages}.
*
Expand Down Expand Up @@ -473,6 +509,21 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
*/
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);

/**
* Asynchronously reconsumeLater the consumption of a single message.
*
* @param message
* The {@code Message} to be reconsumeLater
* @param customProperties
* The custom properties to be reconsumeLater
* @param delayTime
* the amount of delay before the message will be delivered
* @param unit
* the time unit for the delay
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit);

/**
* Asynchronously reconsumeLater the consumption of {@link Messages}.
*
Expand Down Expand Up @@ -526,6 +577,24 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
*/
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit);

/**
* Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided
* message.
*
* <p>Cumulative reconsumeLater cannot be used when the consumer type is set to ConsumerShared.
*
* @param message
* The {@code message} to be cumulatively reconsumeLater
* @param customProperties
* The custom properties to be cumulatively reconsumeLater
* @param delayTime
* the amount of delay before the message will be delivered
* @param unit
* the time unit for the delay
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit);

/**
* Get statistics for the consumer.
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,16 @@ public void acknowledge(Messages<?> messages) throws PulsarClientException {

@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
reconsumeLater(message,null, delayTime, unit);
}

@Override
public void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) throws PulsarClientException {
if (!conf.isRetryEnable()) {
throw new PulsarClientException("reconsumeLater method not support!");
}
try {
reconsumeLaterAsync(message, delayTime, unit).get();
reconsumeLaterAsync(message, customProperties, delayTime, unit).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
Expand Down Expand Up @@ -428,6 +433,11 @@ public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {

@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
return reconsumeLaterAsync(message, null, delayTime, unit);
}

@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message,Map<String, String> customProperties, long delayTime, TimeUnit unit) {
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
}
Expand All @@ -436,7 +446,7 @@ public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long dela
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
return doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
return doReconsumeLater(message, AckType.Individual, customProperties, delayTime, unit);
}

@Override
Expand Down Expand Up @@ -464,14 +474,19 @@ public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {

@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
return reconsumeLaterCumulativeAsync(message, null, delayTime, unit);
}

@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
}
if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
"Cannot use cumulative acks on a non-exclusive subscription"));
}
return doReconsumeLater(message, AckType.Cumulative, Collections.emptyMap(), delayTime, unit);
return doReconsumeLater(message, AckType.Cumulative, customProperties, delayTime, unit);
}

@Override
Expand Down Expand Up @@ -565,7 +580,7 @@ protected abstract CompletableFuture<Void> doAcknowledge(List<MessageId> message
TransactionImpl txn);

protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String,Long> properties,
Map<String, String> customProperties,
long delayTime,
TimeUnit unit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
@SuppressWarnings("unchecked")
@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String, Long> properties,
Map<String, String> customProperties,
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
Expand Down Expand Up @@ -583,6 +583,9 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
String originTopicNameStr = getOriginTopicNameStr(message);
SortedMap<String, String> propertiesMap
= getPropertiesMap(message, originMessageIdStr, originTopicNameStr);
if(customProperties != null) {
propertiesMap.putAll(customProperties);
}
int reconsumetimes = 1;
if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumetimes = Integer.parseInt(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
Expand All @@ -600,7 +603,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
.value(retryMessage.getData())
.properties(propertiesMap);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> {
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
}).exceptionally(ex -> {
result.completeExceptionally(ex);
Expand All @@ -626,7 +629,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
typedMessageBuilderNew.key(message.getKey());
}
typedMessageBuilderNew.sendAsync()
.thenAccept(__ -> doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> result.complete(null)))
.thenAccept(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> result.complete(null)))
.exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
Expand Down Expand Up @@ -655,7 +658,9 @@ private SortedMap<String, String> getPropertiesMap(Message<?> message, String or
propertiesMap.putAll(message.getProperties());
}
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
//Compatible with the old version, will be deleted in the future
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
propertiesMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
return propertiesMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A

@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String,Long> properties,
Map<String, String> customProperties,
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
Expand All @@ -509,7 +509,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit)
return consumer.doReconsumeLater(message, ackType, customProperties, delayTime, unit)
.thenRun(() ->unAckedMessageTracker.remove(topicMessageId));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ public class RetryMessageUtil {
public static final String SYSTEM_PROPERTY_DELAY_TIME = "DELAY_TIME";
public static final String SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
public static final String SYSTEM_PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
@Deprecated
public static final String SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_IDY_TIME";

public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
public static final int MAX_RECONSUMETIMES = 16;
public static final String RETRY_GROUP_TOPIC_SUFFIX = "-RETRY";
public static final String DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
}
}

0 comments on commit 96b3e92

Please sign in to comment.