From c555eb7c915dcceec4df5273eacf5b38b2261123 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Mon, 6 Apr 2020 00:06:54 +0800 Subject: [PATCH] Improve Key_Shared subscription message dispatching performance. (#6647) * Improve Key_Shared subscription message dispatching performance. * Fix unit tests. * Remove system.out.println --- ...ngeAutoSplitStickyKeyConsumerSelector.java | 14 ++++++++++ ...ngeExclusiveStickyKeyConsumerSelector.java | 22 ++++++++++++++++ .../service/StickyKeyConsumerSelector.java | 9 +++++++ ...tStickyKeyDispatcherMultipleConsumers.java | 4 +-- ...tStickyKeyDispatcherMultipleConsumers.java | 4 +-- .../broker/service/BatchMessageTest.java | 26 ++++++++++--------- 6 files changed, 63 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java index 5c3c5b5715568..a6e93c217b8ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java @@ -115,6 +115,15 @@ public Consumer select(int hash) { } } + @Override + public Consumer selectByIndex(int index) { + if (rangeMap.size() > 0) { + return rangeMap.ceilingEntry(index).getValue(); + } else { + return null; + } + } + private int findBiggestRange() { int slots = 0; int busiestRange = rangeSize; @@ -150,6 +159,11 @@ private boolean is2Power(int num) { return (num & num - 1) == 0; } + @Override + public int getRangeSize() { + return rangeSize; + } + Map getConsumerRange() { return Collections.unmodifiableMap(consumerRange); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index 21e94bac10d39..8fb99ede81922 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -80,6 +80,28 @@ public Consumer select(int hash) { } } + @Override + public Consumer selectByIndex(int index) { + if (rangeMap.size() > 0) { + Map.Entry ceilingEntry = rangeMap.ceilingEntry(index); + Map.Entry floorEntry = rangeMap.floorEntry(index); + Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null; + Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null; + if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) { + return ceilingConsumer; + } else { + return null; + } + } else { + return null; + } + } + + @Override + public int getRangeSize() { + return rangeSize; + } + private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException { if (consumer.getKeySharedMeta() == null) { throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer."); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java index 88852b5249695..545e42d6274de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java @@ -50,4 +50,13 @@ public interface StickyKeyConsumerSelector { * @return */ Consumer select(int keyHash); + + /** + * Select a consumer by key hash range index. + * @param index index of the key hash range + * @return + */ + Consumer selectByIndex(int index); + + int getRangeSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index c5183cd5f1b95..3fdad35a49c60 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -67,7 +67,7 @@ public void sendMessages(List entries) { if (entries.size() > 0) { final Map> groupedEntries = new HashMap<>(); for (Entry entry : entries) { - int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())); + int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize(); groupedEntries.putIfAbsent(key, new ArrayList<>()); groupedEntries.get(key).add(entry); } @@ -75,7 +75,7 @@ public void sendMessages(List entries) { while (iterator.hasNext()) { final Map.Entry> entriesWithSameKey = iterator.next(); //TODO: None key policy - Consumer consumer = selector.select(entriesWithSameKey.getKey()); + Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey()); if (consumer != null) { SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index abe0cf729b078..248f45a6c2ec3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -73,7 +73,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { } final Map> groupedEntries = new HashMap<>(); for (Entry entry : entries) { - int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())); + int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize(); groupedEntries.putIfAbsent(key, new ArrayList<>()); groupedEntries.get(key).add(entry); } @@ -82,7 +82,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { final Map.Entry> entriesWithSameKey = iterator.next(); //TODO: None key policy - Consumer consumer = selector.select(entriesWithSameKey.getKey()); + Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey()); if (consumer == null) { // Do nothing, cursor will be rewind at reconnection log.info("[{}] rewind because no available consumer found for key {} from total {}", name, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 3d5f95603b057..1ddbd413bb94b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -753,16 +753,17 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc } FutureUtil.waitForAll(sendFutureList).get(); + String receivedKey = ""; + int receivedMessageIndex = 0; for (int i = 0; i < 30; i++) { Message received = consumer.receive(); - if (i < 10) { - assertEquals(received.getKey(), "key-1"); - } else if (i < 20) { - assertEquals(received.getKey(), "key-2"); - } else { - assertEquals(received.getKey(), "key-3"); + if (!received.getKey().equals(receivedKey)) { + receivedKey = received.getKey(); + receivedMessageIndex = 0; } + assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10); consumer.acknowledge(received); + receivedMessageIndex++; } for (int i = 0; i < 10; i++) { @@ -777,16 +778,17 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc } FutureUtil.waitForAll(sendFutureList).get(); + receivedKey = ""; + receivedMessageIndex = 0; for (int i = 0; i < 30; i++) { Message received = consumer.receive(); - if (i < 10) { - assertEquals(new String(received.getOrderingKey()), "key-1"); - } else if (i < 20) { - assertEquals(new String(received.getOrderingKey()), "key-2"); - } else { - assertEquals(new String(received.getOrderingKey()), "key-3"); + if (!new String(received.getOrderingKey()).equals(receivedKey)) { + receivedKey = new String(received.getOrderingKey()); + receivedMessageIndex = 0; } + assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10); consumer.acknowledge(received); + receivedMessageIndex++; } consumer.close();