Skip to content

Commit

Permalink
Expose Redelivery Count By Message Properties(issue-3030) (apache#3033)
Browse files Browse the repository at this point in the history
* Expose Redelivery Count By Message Properties(issue-3030)

* Add getRedeliveryCount in Message interface(issue-3030)

* Change redeliveryCount to final.
  • Loading branch information
codelipenghui authored and merlimat committed Nov 25, 2018
1 parent 7bbcc72 commit 9d2c001
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

public class ExposeMessageRedeliveryCountTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testRedeliveryCount() throws PulsarClientException {

final String topic = "persistent://my-property/my-ns/redeliveryCount";

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

producer.send("Hello Pulsar".getBytes());

do {
Message<byte[]> message = consumer.receive();
message.getProperties();
final int redeliveryCount = message.getRedeliveryCount();
if (redeliveryCount > 2) {
consumer.acknowledge(message);
Assert.assertEquals(3, redeliveryCount);
break;
}
} while (true);

producer.close();
consumer.close();
}

@Test(timeOut = 30000)
public void testRedeliveryCountWithPartitionedTopic() throws PulsarClientException, PulsarAdminException {

final String topic = "persistent://my-property/my-ns/redeliveryCount.partitioned";

admin.topics().createPartitionedTopic(topic, 3);

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

producer.send("Hello Pulsar".getBytes());

do {
Message<byte[]> message = consumer.receive();
message.getProperties();
final int redeliveryCount = message.getRedeliveryCount();
if (redeliveryCount > 2) {
consumer.acknowledge(message);
Assert.assertEquals(3, redeliveryCount);
break;
}
} while (true);

producer.close();
consumer.close();

admin.topics().deletePartitionedTopic(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,16 @@ public interface Message<T> {
* @return
*/
Optional<EncryptionContext> getEncryptionCtx();

/**
* Get message redelivery count, redelivery count maintain in pulsar broker. When client acknowledge message
* timeout, broker will dispatch message again with message redelivery count in CommandMessage defined.
*
* Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
* redelivery count will be recalculate.
*
* @since 2.3.0
* @return message redelivery count
*/
int getRedeliveryCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), msgId,
msgMetadata, uncompressedPayload,
createEncryptionContext(msgMetadata), cnx, schema);
createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
uncompressedPayload.release();
msgMetadata.recycle();

Expand Down Expand Up @@ -1000,7 +1000,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
messageId.getEntryId(), getPartitionIndex(), i, acker);
final MessageImpl<T> message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload,
createEncryptionContext(msgMetadata), cnx, schema);
createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class MessageImpl<T> implements Message<T> {

private String topic; // only set for incoming messages
transient private Map<String, String> properties;
private final int redeliveryCount;

// Constructor for out-going message
static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema<T> schema) {
Expand All @@ -81,10 +82,16 @@ static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, Byt

MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0);
}

MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = messageId;
this.topic = topic;
this.cnx = cnx;
this.redeliveryCount = redeliveryCount;

// Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
// release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is
Expand All @@ -104,10 +111,17 @@ static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuilder, Byt
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0);
}

MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = batchMessageIdImpl;
this.topic = topic;
this.cnx = cnx;
this.redeliveryCount = redeliveryCount;

this.payload = Unpooled.copiedBuffer(payload);
this.encryptionCtx = encryptionCtx;
Expand Down Expand Up @@ -149,6 +163,7 @@ public MessageImpl(String topic, String msgId, Map<String, String> properties,
this.payload = payload;
this.properties = Collections.unmodifiableMap(properties);
this.schema = schema;
this.redeliveryCount = 0;
}

public static MessageImpl<byte[]> deserialize(ByteBuf headersAndPayload) throws IOException {
Expand Down Expand Up @@ -323,6 +338,7 @@ public void recycle() {

private MessageImpl(Handle<MessageImpl<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
this.redeliveryCount = 0;
}

private Handle<MessageImpl<?>> recyclerHandle;
Expand Down Expand Up @@ -352,4 +368,9 @@ void setMessageId(MessageIdImpl messageId) {
public Optional<EncryptionContext> getEncryptionCtx() {
return encryptionCtx;
}

@Override
public int getRedeliveryCount() {
return redeliveryCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public Optional<EncryptionContext> getEncryptionCtx() {
return msg.getEncryptionCtx();
}

@Override
public int getRedeliveryCount() {
return msg.getRedeliveryCount();
}

public Message<T> getMessage() {
return msg;
}
Expand Down

0 comments on commit 9d2c001

Please sign in to comment.