Skip to content

Commit

Permalink
Improve Key_Shared subscription message dispatching performance. (apa…
Browse files Browse the repository at this point in the history
…che#6647)

* Improve Key_Shared subscription message dispatching performance.

* Fix unit tests.

* Remove system.out.println
  • Loading branch information
codelipenghui authored Apr 5, 2020
1 parent 52ae182 commit c555eb7
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ public Consumer select(int hash) {
}
}

@Override
public Consumer selectByIndex(int index) {
if (rangeMap.size() > 0) {
return rangeMap.ceilingEntry(index).getValue();
} else {
return null;
}
}

private int findBiggestRange() {
int slots = 0;
int busiestRange = rangeSize;
Expand Down Expand Up @@ -150,6 +159,11 @@ private boolean is2Power(int num) {
return (num & num - 1) == 0;
}

@Override
public int getRangeSize() {
return rangeSize;
}

Map<Consumer, Integer> getConsumerRange() {
return Collections.unmodifiableMap(consumerRange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,28 @@ public Consumer select(int hash) {
}
}

@Override
public Consumer selectByIndex(int index) {
if (rangeMap.size() > 0) {
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(index);
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(index);
Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
return ceilingConsumer;
} else {
return null;
}
} else {
return null;
}
}

@Override
public int getRangeSize() {
return rangeSize;
}

private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
if (consumer.getKeySharedMeta() == null) {
throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,13 @@ public interface StickyKeyConsumerSelector {
* @return
*/
Consumer select(int keyHash);

/**
* Select a consumer by key hash range index.
* @param index index of the key hash range
* @return
*/
Consumer selectByIndex(int index);

int getRangeSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public void sendMessages(List<Entry> entries) {
if (entries.size() > 0) {
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.select(entriesWithSameKey.getKey());
Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
Expand All @@ -82,7 +82,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.select(entriesWithSameKey.getKey());
Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,16 +753,17 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc
}
FutureUtil.waitForAll(sendFutureList).get();

String receivedKey = "";
int receivedMessageIndex = 0;
for (int i = 0; i < 30; i++) {
Message<byte[]> received = consumer.receive();
if (i < 10) {
assertEquals(received.getKey(), "key-1");
} else if (i < 20) {
assertEquals(received.getKey(), "key-2");
} else {
assertEquals(received.getKey(), "key-3");
if (!received.getKey().equals(receivedKey)) {
receivedKey = received.getKey();
receivedMessageIndex = 0;
}
assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
consumer.acknowledge(received);
receivedMessageIndex++;
}

for (int i = 0; i < 10; i++) {
Expand All @@ -777,16 +778,17 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc
}
FutureUtil.waitForAll(sendFutureList).get();

receivedKey = "";
receivedMessageIndex = 0;
for (int i = 0; i < 30; i++) {
Message<byte[]> received = consumer.receive();
if (i < 10) {
assertEquals(new String(received.getOrderingKey()), "key-1");
} else if (i < 20) {
assertEquals(new String(received.getOrderingKey()), "key-2");
} else {
assertEquals(new String(received.getOrderingKey()), "key-3");
if (!new String(received.getOrderingKey()).equals(receivedKey)) {
receivedKey = new String(received.getOrderingKey());
receivedMessageIndex = 0;
}
assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
consumer.acknowledge(received);
receivedMessageIndex++;
}

consumer.close();
Expand Down

0 comments on commit c555eb7

Please sign in to comment.