Skip to content

Commit

Permalink
Update logic for picking active consumer for failover subscription on…
Browse files Browse the repository at this point in the history
… non-partitioned topic. (apache#4604)

Instead of sorting the consumers based on priority level and consumer name then pick a active consumer, which could cause subscription getting into a flaky state, where the "active" consumer joins and leaves, no consumer is actually elected as "active" and consuming the messages.
Fix logic to always pick the first consumer in the consumer list without sorting consumers. So consumers will be picked as acive consumer based on the order of their subscription.
  • Loading branch information
MarvinCai authored and merlimat committed Jun 27, 2019
1 parent 9db3a78 commit 22f1d7d
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,45 @@ protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
}

/**
* @return the previous active consumer if the consumer is changed, otherwise null.
* Pick active consumer for a topic for {@link SubType#Failover} subscription.
* If it's a non-partitioned topic then it'll pick consumer based on order they subscribe to the topic.
* If is's a partitioned topic, first sort consumers based on their priority level and consumer name then
* distributed partitions evenly across consumers with highest priority level.
*
* @return the true consumer if the consumer is changed, otherwise false.
*/
protected boolean pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());

AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
consumers.sort((c1, c2) -> {
int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
if (priority != 0) {
hasPriorityConsumer.set(true);
return priority;
}
return c1.consumerName().compareTo(c2.consumerName());
});

int consumersSize = consumers.size();
// find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
// evenly across highest priority consumers
if (hasPriorityConsumer.get()) {
int highestPriorityLevel = consumers.get(0).getPriorityLevel();
for (int i = 0; i < consumers.size(); i++) {
if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
consumersSize = i;
break;
// By default always pick the first connected consumer for non partitioned topic.
int index = 0;

// If it's a partitioned topic, sort consumers based on priority level then consumer name.
if (partitionIndex >= 0) {
AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
consumers.sort((c1, c2) -> {
int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
if (priority != 0) {
hasPriorityConsumer.set(true);
return priority;
}
return c1.consumerName().compareTo(c2.consumerName());
});

int consumersSize = consumers.size();
// find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
// evenly across highest priority consumers
if (hasPriorityConsumer.get()) {
int highestPriorityLevel = consumers.get(0).getPriorityLevel();
for (int i = 0; i < consumers.size(); i++) {
if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
consumersSize = i;
break;
}
}
}
index = partitionIndex % consumersSize;
}
int index = partitionIndex % consumersSize;

Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, assume index 0 to pick a predictable consumer
partitionIndex = 0;
// For non partition topics, use a negative index so dispatcher won't sort consumers before picking
// an active consumer for the topic.
partitionIndex = -1;
}

if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void testAddRemoveConsumer() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);

int partitionIndex = 0;
int partitionIndex = 4;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic, sub);

Expand Down Expand Up @@ -376,7 +376,7 @@ public void testAddRemoveConsumer() throws Exception {
// 7. Remove last consumer
pdfc.removeConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size());
// not consumer group changes
assertNull(consumerChanges.poll());
Expand Down Expand Up @@ -417,6 +417,67 @@ public void testAddRemoveConsumer() throws Exception {
assertTrue(pdfc.canUnsubscribe(consumer1));
}

@Test
public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);

// Non partitioned topic.
int partitionIndex = -1;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic, sub);

// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());

// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertEquals(1, consumers.size());
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());

// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(),
false /* read compacted */, InitialPosition.Latest));
pdfc.addConsumer(consumer2);

// 4. Verify active consumer doesn't change
consumers = pdfc.getConsumers();
assertEquals(2, consumers.size());
CommandActiveConsumerChange change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, false);
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));

// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
change = consumerChanges.take();
verifyActiveConsumerChange(change, 3, false);
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer1));

// 7. Remove first consumer and active consumer should change to consumer2 since it's added before consumer3
// though consumer 3 has higher priority level
pdfc.removeConsumer(consumer1);
consumers = pdfc.getConsumers();
assertEquals(2, consumers.size());
change = consumerChanges.take();
verifyActiveConsumerChange(change, 2, true);
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer2.consumerName());
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer2));
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer2));
}

@Test
public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,67 +232,6 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);

for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
futures.clear();

// 6. consumer subscription should send messages to the new consumer if its name is highest in the list
for (int i = 0; i < 5; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer2.acknowledge(msg);
}
consumer1 = consumerBulder1.subscribe();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
for (int i = 5; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS));

rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);

for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
futures.add(producer.sendAsync(message.getBytes()));
}
FutureUtil.waitForAll(futures).get();
futures.clear();

// 7. consumer subscription should not send messages to the new consumer if its name is not highest in the list
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
TestConsumerStateEventListener listener3 = new TestConsumerStateEventListener();
Consumer<byte[]> consumer3 = consumerBuilder.clone().consumerName("3").consumerEventListener(listener3)
.subscribe();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);

verifyConsumerInactive(listener3, -1);

Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS));
for (int i = 5; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}

rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);

// 8. unsubscribe not allowed if multiple consumers connected
try {
consumer1.unsubscribe();
Expand All @@ -303,10 +242,9 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {

// 9. unsubscribe allowed if there is a lone consumer
consumer1.close();
consumer2.close();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
try {
consumer3.unsubscribe();
consumer2.unsubscribe();
} catch (PulsarClientException e) {
fail("Should not fail", e);
}
Expand All @@ -316,7 +254,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
assertNull(subRef);

producer.close();
consumer3.close();
consumer2.close();

admin.topics().delete(topicName);
}
Expand Down

0 comments on commit 22f1d7d

Please sign in to comment.