Skip to content

Commit

Permalink
Avoid enable DLQ on Key_Shared subscription. (apache#9163)
Browse files Browse the repository at this point in the history
Fixes apache#9156

### Motivation

In Pulsar, DLQ only supports non-ordered subscriptions, But now the Key_Shared subscription also allowed enable DLQ.
Should avoid enable DLQ on the Key_Shared subscription tpye.

### Modifications

Only send message to DLQ if not Key_Shared subscription type.
  • Loading branch information
MarvinCai authored Feb 2, 2021
1 parent 69ae2df commit 9af8577
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ public interface RawReader {

static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
RawReader r = null;
try {
r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
} catch (PulsarClientException.InvalidConfigurationException e) {
// Shouldn't happen, as exception was thrown if DLQ enabled for Key_Shared sub type, RawReader use
// Exclusive sub type so should be fine.
}
RawReader finalR = r;
return future.thenCompose((consumer) -> finalR.seekAsync(MessageId.earliest)).thenApply((ignore) -> finalR);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -49,7 +50,8 @@ public class RawReaderImpl implements RawReader {
private RawConsumerImpl consumer;

public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
CompletableFuture<Consumer<byte[]>> consumerFuture)
throws PulsarClientException.InvalidConfigurationException {
consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
Expand Down Expand Up @@ -107,7 +109,8 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;

RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
CompletableFuture<Consumer<byte[]>> consumerFuture)
throws PulsarClientException.InvalidConfigurationException {
super(client,
conf.getSingleTopic(),
conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,63 @@ public void testDeadLetterTopic() throws Exception {
newPulsarClient.close();
}

@Test(timeOut = 10000)
public void testDLQDisabledForKeySharedSubtype() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";

final int maxRedeliveryCount = 2;

final int sendMessages = 100;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}

producer.close();

int totalReceived = 0;
Message<byte[]> message;
do {
message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
assertNotNull(message);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
// make sure message not acked sent to retry topic.
assertEquals(totalReceived, sendMessages * (maxRedeliveryCount + 1));

// make sure no message sent to dead letter topic.
Message dlqMessage = deadLetterConsumer.receive(5, TimeUnit.SECONDS);
assertNull(dlqMessage);

deadLetterConsumer.close();
consumer.close();
newPulsarClient.close();
}

@Test(timeOut = 30000)
public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
Expand All @@ -37,6 +41,8 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
Expand All @@ -50,6 +56,7 @@
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -64,6 +71,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -477,6 +486,46 @@ public void testTopicNameValid() throws Exception{
}).get();
}

@Test
public void testSubscribeKeySharedWithDLQ() throws Exception{
final String topicName = "persistent://prop/use/ns-abc/testTopicNameValid";
TenantInfo tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, 3);
// Through builder
pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("subscriptionName")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("DLQ").build())
.receiverQueueSize(0)
.subscribeAsync().handle((res, exception) -> {
assertTrue(exception instanceof PulsarClientException.InvalidConfigurationException);
assertEquals(((PulsarClientException.InvalidConfigurationException) exception).getMessage(), "DeadLetterQueue is not supported for Key_Shared subscription type since DLQ can't guarantee message ordering.");
return null;
}).get();

// Through constructor
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
ConsumerConfigurationData<Byte[]> consumerConfig = new ConsumerConfigurationData<>();
consumerConfig.setDeadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("DLQ").build());
consumerConfig.setSubscriptionType(SubscriptionType.Key_Shared);
when(mockClient.newConsumerId()).thenReturn(1l);
when(mockClient.getConfiguration()).thenReturn(new ClientConfigurationData());
when(mockClient.timer()).thenReturn(new HashedWheelTimer());
when(mockClient.eventLoopGroup()).thenReturn(new NioEventLoopGroup());
try {
ConsumerImpl consumer = new ConsumerImpl(mockClient, "my-topic", consumerConfig,
Executors.newSingleThreadExecutor(), 0, false, new CompletableFuture<>(),
MessageId.earliest, 100, Schema.BYTES,
new ConsumerInterceptors(Collections.emptyList()), false);
} catch (Exception exception) {
assertTrue(exception instanceof PulsarClientException.InvalidConfigurationException);
assertEquals(exception.getMessage(), "Deadletter topic on Key_Shared subscription type is not supported.");
}
}

@Test
public void testSubscribeUnsubscribeSingleTopic() throws Exception {
String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,36 @@ public CompletableFuture<Consumer<T>> subscribeAsync() {
return FutureUtil.failedFuture(
new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
}
if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
if (conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
String retryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String deadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
if(conf.getDeadLetterPolicy() == null) {
conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
.retryLetterTopic(retryLetterTopic)
.deadLetterTopic(deadLetterTopic)
.build());
DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder()
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
.retryLetterTopic(retryLetterTopic);
// Don't set DLQ for key shared subType since it requires msg to be ordered for key.
if (conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
dlpBuilder.deadLetterTopic(deadLetterTopic);
}
conf.setDeadLetterPolicy(dlpBuilder.build());
} else {
if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
}
if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())
&& conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
}
}
conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
}
if (conf.getDeadLetterPolicy() != null && StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())
&& conf.getSubscriptionType() == SubscriptionType.Key_Shared) {
return FutureUtil
.failedFuture(new InvalidConfigurationException("DeadLetterQueue is not supported for" +
" Key_Shared subscription type since DLQ can't guarantee message ordering."));
}
return interceptorList == null || interceptorList.size() == 0 ?
client.subscribeAsync(conf, schema, null) :
client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
Expand Down
Loading

0 comments on commit 9af8577

Please sign in to comment.