Skip to content

Commit

Permalink
Use consistent hashing in KeyShared distribution (apache#6791)
Browse files Browse the repository at this point in the history
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
  • Loading branch information
merlimat authored Jun 1, 2020
1 parent cf99e90 commit 4bf8268
Show file tree
Hide file tree
Showing 16 changed files with 451 additions and 840 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ subscriptionExpiryCheckIntervalInMinutes=5
# Enable Key_Shared subscription (default is enabled)
subscriptionKeySharedEnable=true

# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
# consistent hashing to reassign keys to new consumers
subscriptionKeySharedUseConsistentHashing=false

# On KeyShared subscriptions, number of points in the consistent-hashing ring.
# The higher the number, the more equal the assignment of keys to consumers
subscriptionKeySharedConsistentHashingReplicaPoints=100

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
Expand Down
8 changes: 8 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ subscriptionExpirationTimeMinutes=0
# Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
subscriptionRedeliveryTrackerEnabled=true

# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
# consistent hashing to reassign keys to new consumers
subscriptionKeySharedUseConsistentHashing=false

# On KeyShared subscriptions, number of points in the consistent-hashing ring.
# The higher the number, the more equal the assignment of keys to consumers
subscriptionKeySharedConsistentHashingReplicaPoints=100

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean subscriptionKeySharedEnable = true;

@FieldContext(category = CATEGORY_POLICIES,
doc = "On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or " +
"consistent hashing to reassign keys to new consumers")
private boolean subscriptionKeySharedUseConsistentHashing = false;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "On KeyShared subscriptions, number of points in the consistent-hashing ring. "
+ "The higher the number, the more equal the assignment of keys to consumers")
private int subscriptionKeySharedConsistentHashingReplicaPoints = 100;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Set the default behavior for message deduplication in the broker.\n\n"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.common.util.Murmur3_32Hash;

/**
* This is a consumer selector based fixed hash range.
*
* The implementation uses consistent hashing to evenly split, the
* number of keys assigned to each consumer.
*/
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {

private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

// Consistent-Hash ring
private final NavigableMap<Integer, Consumer> hashRing;

private final int numberOfPoints;

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
this.hashRing = new TreeMap<>();
this.numberOfPoints = numberOfPoints;
}

@Override
public void addConsumer(Consumer consumer) throws ConsumerAssignException {
rwLock.writeLock().lock();
try {
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the consumer name
for (int i = 0; i < numberOfPoints; i++) {
String key = consumer.consumerName() + i;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
hashRing.put(hash, consumer);
}
} finally {
rwLock.writeLock().unlock();
}
}

@Override
public void removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
// Remove all the points that were added for this consumer
for (int i = 0; i < numberOfPoints; i++) {
String key = consumer.consumerName() + i;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
hashRing.remove(hash, consumer);
}
} finally {
rwLock.writeLock().unlock();
}
}

@Override
public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}

@Override
public Consumer select(int hash) {
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
return null;
}

Map.Entry<Integer, Consumer> ceilingEntry = hashRing.ceilingEntry(hash);
if (ceilingEntry != null) {
return ceilingEntry.getValue();
} else {
return hashRing.firstEntry().getValue();
}
} finally {
rwLock.readLock().unlock();
}
}

Map<Integer, Consumer> getRangeConsumer() {
return Collections.unmodifiableMap(hashRing);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ public Consumer select(int hash) {
}
}

@Override
public Consumer selectByIndex(int index) {
if (rangeMap.size() > 0) {
return rangeMap.ceilingEntry(index).getValue();
} else {
return null;
}
}

private int findBiggestRange() {
int slots = 0;
int busiestRange = rangeSize;
Expand Down Expand Up @@ -159,11 +150,6 @@ private boolean is2Power(int num) {
return (num & num - 1) == 0;
}

@Override
public int getRangeSize() {
return rangeSize;
}

Map<Consumer, Integer> getConsumerRange() {
return Collections.unmodifiableMap(consumerRange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,6 @@ public Consumer select(int hash) {
}
}

@Override
public Consumer selectByIndex(int index) {
if (rangeMap.size() > 0) {
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(index);
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(index);
Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
return ceilingConsumer;
} else {
return null;
}
} else {
return null;
}
}

@Override
public int getRangeSize() {
return rangeSize;
}

private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
if (consumer.getKeySharedMeta() == null) {
throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,4 @@ public interface StickyKeyConsumerSelector {
* @return
*/
Consumer select(int keyHash);

/**
* Select a consumer by key hash range index.
* @param index index of the key hash range
* @return
*/
Consumer selectByIndex(int index);

int getRangeSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public void sendMessages(List<Entry> entries) {
if (entries.size() > 0) {
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -122,24 +126,27 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
previousDispatcher = dispatcher;
if (consumer.getKeySharedMeta() != null) {
switch (consumer.getKeySharedMeta().getKeySharedMode()) {
case STICKY:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeExclusiveStickyKeyConsumerSelector());
break;
case AUTO_SPLIT:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeAutoSplitStickyKeyConsumerSelector());
break;
default:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeAutoSplitStickyKeyConsumerSelector());
break;
}
} else {
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeAutoSplitStickyKeyConsumerSelector());
KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();

switch (ksm.getKeySharedMode()) {
case STICKY:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeExclusiveStickyKeyConsumerSelector());
break;

case AUTO_SPLIT:
default:
StickyKeyConsumerSelector selector;
ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
}

dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
break;
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
Expand All @@ -82,7 +82,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
Expand Down
Loading

0 comments on commit 4bf8268

Please sign in to comment.