diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index cd1bf05b2faf7..48c441ce2084a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -80,34 +80,45 @@ protected void notifyActiveConsumerChanged(Consumer activeConsumer) { } /** - * @return the previous active consumer if the consumer is changed, otherwise null. + * Pick active consumer for a topic for {@link SubType#Failover} subscription. + * If it's a non-partitioned topic then it'll pick consumer based on order they subscribe to the topic. + * If is's a partitioned topic, first sort consumers based on their priority level and consumer name then + * distributed partitions evenly across consumers with highest priority level. + * + * @return the true consumer if the consumer is changed, otherwise false. */ protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); - - AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false); - consumers.sort((c1, c2) -> { - int priority = c1.getPriorityLevel() - c2.getPriorityLevel(); - if (priority != 0) { - hasPriorityConsumer.set(true); - return priority; - } - return c1.consumerName().compareTo(c2.consumerName()); - }); - - int consumersSize = consumers.size(); - // find number of consumers which are having the highest priorities. so partitioned-topic assignment happens - // evenly across highest priority consumers - if (hasPriorityConsumer.get()) { - int highestPriorityLevel = consumers.get(0).getPriorityLevel(); - for (int i = 0; i < consumers.size(); i++) { - if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) { - consumersSize = i; - break; + // By default always pick the first connected consumer for non partitioned topic. + int index = 0; + + // If it's a partitioned topic, sort consumers based on priority level then consumer name. + if (partitionIndex >= 0) { + AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false); + consumers.sort((c1, c2) -> { + int priority = c1.getPriorityLevel() - c2.getPriorityLevel(); + if (priority != 0) { + hasPriorityConsumer.set(true); + return priority; + } + return c1.consumerName().compareTo(c2.consumerName()); + }); + + int consumersSize = consumers.size(); + // find number of consumers which are having the highest priorities. so partitioned-topic assignment happens + // evenly across highest priority consumers + if (hasPriorityConsumer.get()) { + int highestPriorityLevel = consumers.get(0).getPriorityLevel(); + for (int i = 0; i < consumers.size(); i++) { + if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) { + consumersSize = i; + break; + } } } + index = partitionIndex % consumersSize; } - int index = partitionIndex % consumersSize; + Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index af2dde93540a1..16651ee85a7e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -195,8 +195,9 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce case Failover: int partitionIndex = TopicName.getPartitionIndex(topicName); if (partitionIndex < 0) { - // For non partition topics, assume index 0 to pick a predictable consumer - partitionIndex = 0; + // For non partition topics, use a negative index so dispatcher won't sort consumers before picking + // an active consumer for the topic. + partitionIndex = -1; } if (dispatcher == null || dispatcher.getType() != SubType.Failover) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 4b9dfdaf96905..f3d1c970b73f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -304,7 +304,7 @@ public void testAddRemoveConsumer() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); - int partitionIndex = 0; + int partitionIndex = 4; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, SubType.Failover, partitionIndex, topic, sub); @@ -376,7 +376,7 @@ public void testAddRemoveConsumer() throws Exception { // 7. Remove last consumer pdfc.removeConsumer(consumer2); consumers = pdfc.getConsumers(); - assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName()); + assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(3, consumers.size()); // not consumer group changes assertNull(consumerChanges.poll()); @@ -417,6 +417,67 @@ public void testAddRemoveConsumer() throws Exception { assertTrue(pdfc.canUnsubscribe(consumer1)); } + @Test + public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { + log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); + + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); + + // Non partitioned topic. + int partitionIndex = -1; + PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, + SubType.Failover, partitionIndex, topic, sub); + + // 1. Verify no consumers connected + assertFalse(pdfc.isConsumerConnected()); + + // 2. Add a consumer + Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, + "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */, InitialPosition.Latest)); + pdfc.addConsumer(consumer1); + List consumers = pdfc.getConsumers(); + assertEquals(1, consumers.size()); + assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); + + // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. + Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */, InitialPosition.Latest)); + pdfc.addConsumer(consumer2); + + // 4. Verify active consumer doesn't change + consumers = pdfc.getConsumers(); + assertEquals(2, consumers.size()); + CommandActiveConsumerChange change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); + + // 5. Add another consumer which has higher priority level + Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest)); + pdfc.addConsumer(consumer3); + consumers = pdfc.getConsumers(); + assertEquals(3, consumers.size()); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 3, false); + assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); + verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer1)); + + // 7. Remove first consumer and active consumer should change to consumer2 since it's added before consumer3 + // though consumer 3 has higher priority level + pdfc.removeConsumer(consumer1); + consumers = pdfc.getConsumers(); + assertEquals(2, consumers.size()); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, true); + assertTrue(pdfc.getActiveConsumer().consumerName() == consumer2.consumerName()); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer2)); + verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer2)); + } + @Test public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 99566fdda279b..9389c1a88211c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -232,67 +232,6 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); - for (int i = 0; i < numMsgs; i++) { - String message = "my-message-" + i; - futures.add(producer.sendAsync(message.getBytes())); - } - FutureUtil.waitForAll(futures).get(); - futures.clear(); - - // 6. consumer subscription should send messages to the new consumer if its name is highest in the list - for (int i = 0; i < 5; i++) { - msg = consumer2.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer2.acknowledge(msg); - } - consumer1 = consumerBulder1.subscribe(); - Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); - for (int i = 5; i < numMsgs; i++) { - msg = consumer1.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer1.acknowledge(msg); - } - Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS)); - - rolloverPerIntervalStats(); - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); - - for (int i = 0; i < numMsgs; i++) { - String message = "my-message-" + i; - futures.add(producer.sendAsync(message.getBytes())); - } - FutureUtil.waitForAll(futures).get(); - futures.clear(); - - // 7. consumer subscription should not send messages to the new consumer if its name is not highest in the list - for (int i = 0; i < 5; i++) { - msg = consumer1.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer1.acknowledge(msg); - } - TestConsumerStateEventListener listener3 = new TestConsumerStateEventListener(); - Consumer consumer3 = consumerBuilder.clone().consumerName("3").consumerEventListener(listener3) - .subscribe(); - Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); - - verifyConsumerInactive(listener3, -1); - - Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS)); - for (int i = 5; i < numMsgs; i++) { - msg = consumer1.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(msg); - Assert.assertEquals(new String(msg.getData()), "my-message-" + i); - consumer1.acknowledge(msg); - } - - rolloverPerIntervalStats(); - Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - assertEquals(subRef.getNumberOfEntriesInBacklog(), 0); - // 8. unsubscribe not allowed if multiple consumers connected try { consumer1.unsubscribe(); @@ -303,10 +242,9 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { // 9. unsubscribe allowed if there is a lone consumer consumer1.close(); - consumer2.close(); Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); try { - consumer3.unsubscribe(); + consumer2.unsubscribe(); } catch (PulsarClientException e) { fail("Should not fail", e); } @@ -316,7 +254,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { assertNull(subRef); producer.close(); - consumer3.close(); + consumer2.close(); admin.topics().delete(topicName); }