Skip to content

Commit

Permalink
Fix bug: pattern consumer's receive() will cause deadlock for topics …
Browse files Browse the repository at this point in the history
…auto discovery (apache#7206)

Fixes apache#7168 

### Motivation

When a pattern consumer is blocked by `receive()`, the `mutex_` will be held until new messages arrived. If the auto discovery timer task found new topics and tried to subscribe them, `mutex_` must be acquired first, then the deadlock happened.

### Modifications

- Release the `mutex_` after the consumer's state was verified.
- Change unit tests to verify that new topics could be subscribed when the consumer's blocked by `receive(Message&)` or `receive(Message&, int)` methods.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as *BasicEndToEndTest.testPatternMultiTopicsConsumerAutoDiscovery*.
  • Loading branch information
BewareMyPower authored Jun 9, 2020
1 parent f26bab3 commit c2759be
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 81 deletions.
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
LOG_ERROR("Can not receive when a listener has been set");
return ResultInvalidConfiguration;
}
messages_.pop(msg);
lock.unlock();
messages_.pop(msg);

unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
Expand All @@ -508,8 +508,8 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
return ResultInvalidConfiguration;
}

lock.unlock();
if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
lock.unlock();
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
Expand Down
144 changes: 65 additions & 79 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2245,102 +2245,88 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) {
ASSERT_EQ(consumer.getSubscriptionName(), subName);
LOG_INFO("created pattern consumer with not match topics at beginning");

auto createProducer = [&client](Producer &producer, const std::string &topic, int numPartitions) {
if (numPartitions > 0) {
const std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions";
int res = makePutRequest(url, std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409);
}

const std::string fullTopicName = "persistent://public/default/" + topic;
Result result = client.createProducer(fullTopicName, producer);
ASSERT_EQ(ResultOk, result);
};

// 2. create 4 topics, in which 3 match the pattern.
std::string topicName1 = "persistent://public/default/patternTopicsAutoConsumerPubSub1";
std::string topicName2 = "persistent://public/default/patternTopicsAutoConsumerPubSub2";
std::string topicName3 = "persistent://public/default/patternTopicsAutoConsumerPubSub3";
std::vector<Producer> producers(4);
createProducer(producers[0], "patternTopicsAutoConsumerPubSub1", 2);
createProducer(producers[1], "patternTopicsAutoConsumerPubSub2", 3);
createProducer(producers[2], "patternTopicsAutoConsumerPubSub3", 4);
// This will not match pattern
std::string topicName4 = "persistent://public/default/notMatchPatternTopicsAutoConsumerPubSub4";
createProducer(producers[3], "notMatchPatternTopicsAutoConsumerPubSub4", 4);

// call admin api to make topics partitioned
std::string url1 =
adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub1/partitions";
std::string url2 =
adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub2/partitions";
std::string url3 =
adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub3/partitions";
std::string url4 =
adminUrl + "admin/v2/persistent/public/default/notMatchPatternTopicsAutoConsumerPubSub4/partitions";
constexpr int messageNumber = 100;

int res = makePutRequest(url1, "2");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url2, "3");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url3, "4");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url4, "4");
ASSERT_FALSE(res != 204 && res != 409);
std::thread consumeThread([&consumer] {
LOG_INFO("Consuming and acking 300 messages by pattern topics consumer");
for (int i = 0; i < 3 * messageNumber; i++) {
Message m;
// Ensure new topics can be discovered when the consumer is blocked by receive(Message&, int)
ASSERT_EQ(ResultOk, consumer.receive(m, 30000));
ASSERT_EQ(ResultOk, consumer.acknowledge(m));
}
// 5. pattern consumer already subscribed 3 topics
LOG_INFO("Consumed and acked 300 messages by pattern topics consumer");

Producer producer1;
result = client.createProducer(topicName1, producer1);
ASSERT_EQ(ResultOk, result);
Producer producer2;
result = client.createProducer(topicName2, producer2);
ASSERT_EQ(ResultOk, result);
Producer producer3;
result = client.createProducer(topicName3, producer3);
ASSERT_EQ(ResultOk, result);
Producer producer4;
result = client.createProducer(topicName4, producer4);
ASSERT_EQ(ResultOk, result);
LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 producer not match");
// verify no more to receive, because producers[3] not match pattern
Message m;
ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
});

// 3. wait enough time to trigger auto discovery
std::this_thread::sleep_for(std::chrono::microseconds(2 * 1000 * 1000));
std::this_thread::sleep_for(std::chrono::seconds(2));

// 4. produce data.
int messageNumber = 100;
std::string msgContent = "msg-content";
LOG_INFO("Publishing 100 messages by producer 1 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer1.send(msg));
}

msgContent = "msg-content2";
LOG_INFO("Publishing 100 messages by producer 2 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer2.send(msg));
for (size_t i = 0; i < producers.size(); i++) {
const std::string msgContent = "msg-content" + std::to_string(i);
LOG_INFO("Publishing " << messageNumber << " messages by producer " << i << " synchronously");
for (int j = 0; j < messageNumber; j++) {
Message msg = MessageBuilder().setContent(msgContent).build();
ASSERT_EQ(ResultOk, producers[i].send(msg));
}
}

msgContent = "msg-content3";
LOG_INFO("Publishing 100 messages by producer 3 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer3.send(msg));
}
consumeThread.join();

msgContent = "msg-content4";
LOG_INFO("Publishing 100 messages by producer 4 synchronously");
for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
std::stringstream stream;
stream << msgContent << msgNum;
Message msg = MessageBuilder().setContent(stream.str()).build();
ASSERT_EQ(ResultOk, producer4.send(msg));
}
consumeThread = std::thread([&consumer] {
LOG_INFO("Consuming and acking 100 messages by pattern topics consumer");
for (int i = 0; i < messageNumber; i++) {
Message m;
// Ensure new topics can be discovered when the consumer is blocked by receive(Message&)
ASSERT_EQ(ResultOk, consumer.receive(m));
ASSERT_EQ(ResultOk, consumer.acknowledge(m));
}
// 9. pattern consumer subscribed a new topic
LOG_INFO("Consumed and acked 100 messages by pattern topics consumer");

// 5. pattern consumer already subscribed 3 topics
LOG_INFO("Consuming and acking 300 messages by pattern topics consumer");
for (int i = 0; i < 3 * messageNumber; i++) {
// verify no more to receive
Message m;
ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
ASSERT_EQ(ResultOk, consumer.acknowledge(m));
}
LOG_INFO("Consumed and acked 300 messages by pattern topics consumer");
ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
});
// 6. Create a producer to a new topic
createProducer(producers[0], "patternTopicsAutoConsumerPubSub5", 4);

// verify no more to receive, because producer4 not match pattern
Message m;
ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
// 7. wait enough time to trigger auto discovery
std::this_thread::sleep_for(std::chrono::seconds(2));

ASSERT_EQ(ResultOk, consumer.unsubscribe());
// 8. produce data
for (int i = 0; i < messageNumber; i++) {
Message msg = MessageBuilder().setContent("msg-content-5").build();
ASSERT_EQ(ResultOk, producers[0].send(msg));
}

consumeThread.join();
ASSERT_EQ(ResultOk, consumer.unsubscribe());
client.shutdown();
}

Expand Down

0 comments on commit c2759be

Please sign in to comment.