Skip to content

Commit

Permalink
[issue 4589] Fix redelivered message logic of partition topic (apache…
Browse files Browse the repository at this point in the history
…#4653)

Fixes apache#4589

Motivation
When using Partition-topic, the logic of redeliver messages will not be triggered when the time of ackTimeout arrives.

This is because the unAckedMessageTrackerPtr_->add(msg.getMessageId()) is not call in the listener handling of partitioned topic in cpp code
  • Loading branch information
wolfstudy authored and jiazhai committed Jul 19, 2019
1 parent 6f5416e commit cc5f25b
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 2 deletions.
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
}
messages_.push(msg);
if (messageListener_) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
}
Expand Down
51 changes: 51 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,57 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeout) {
client.close();
}

static long messagesReceived = 0;

static void unackMessageListenerFunction(Consumer consumer, const Message &msg) { messagesReceived++; }

TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
Client client(lookupUrl);
long unAckedMessagesTimeoutMs = 10000;

std::string topicName = "persistent://public/default/testPartitionTopicUnAckedMessageTimeout";

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/testPartitionTopicUnAckedMessageTimeout/partitions";
int res = makePutRequest(url, "3");

LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

std::string subName = "my-sub-name";

Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setMessageListener(
std::bind(unackMessageListenerFunction, std::placeholders::_1, std::placeholders::_2));
consConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
result = client.subscribe(topicName, subName, consConfig, consumer);
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(consumer.getSubscriptionName(), subName);

for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
producer.sendAsync(msg, nullptr);
}

producer.flush();
long timeWaited = 0;
while (true) {
// maximum wait time
ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 3);
if (messagesReceived >= 10 * 2) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
timeWaited += 500;
}
}

TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
Client client(lookupUrl);
std::string topicName = "testUnAckedMessageTimeoutListener";
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/c_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func subscribeAsync(client *client, options ConsumerOptions, schema Schema, call
C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
subName, conf, callbackPtr)

for idx, _ := range options.Topics {
for idx := range options.Topics {
C.free(unsafe.Pointer(a[idx]))
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/c_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Error struct {

func newError(result C.pulsar_result, msg string) error {
return &Error{
msg: fmt.Sprintf("%s: %s", msg, C.GoString(C.pulsar_result_str(result))),
msg: fmt.Sprintf("%s: %v", msg, C.GoString(C.pulsar_result_str(result))),
result: Result(result),
}
}
Expand Down
62 changes: 62 additions & 0 deletions pulsar-client-go/pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,3 +656,65 @@ func TestConsumerShared(t *testing.T) {
}
}()
}

func TestConsumer_AckTimeout(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topic := fmt.Sprintf("my-topic-%d", time.Now().Unix())

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AckTimeout: 10 * time.Second,
Name: "my-consumer-name",
Type: Shared,
})

assert.Nil(t, err)
defer consumer.Close()

assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
assert.Equal(t, consumer.Subscription(), "my-sub")

ctx := context.Background()

// send one message
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-pulsar")),
}); err != nil {
t.Fatal(err)
}

// receive message but not ack
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-pulsar"))

// wait ack timeout
time.Sleep(10 * time.Second)

// receive message again
msgAgain, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, string(msgAgain.Payload()), fmt.Sprintf("hello-pulsar"))

if err := consumer.Ack(msgAgain); err != nil {
assert.Nil(t, err)
}

if err := consumer.Unsubscribe(); err != nil {
assert.Nil(t, err)
}
}

0 comments on commit cc5f25b

Please sign in to comment.