Skip to content

Commit

Permalink
[PIP-51] Introduce sticky consumer (apache#5388)
Browse files Browse the repository at this point in the history
### Motivation

Introduce sticky consumer, users can enable it by

```java
client.newConsumer()
         .keySharedPolicy(KeySharedPolicy.exclusiveHashRange().hashRangeTotal(10).ranges(Range.of(0, 10)))
         .subscribe();
```

### Modifications

Add a new consumer selector named HashRangeExclusiveStickyKeyConsumerSelector to support sticky consumer.

This change added tests and can be verified as follows:

Add new unit tests.
  • Loading branch information
codelipenghui authored and sijie committed Nov 8, 2019
1 parent c460f22 commit 951664c
Show file tree
Hide file tree
Showing 31 changed files with 5,688 additions and 3,583 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -103,10 +104,13 @@ public class Consumer {

private final Map<String, String> metadata;

private final PulsarApi.KeySharedMeta keySharedMeta;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) throws BrokerServiceException {
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
Expand All @@ -118,6 +122,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.subscriptionInitialPosition = subscriptionInitialPosition;
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
Expand Down Expand Up @@ -444,6 +449,10 @@ public int getUnackedMessages() {
return unackedMessages;
}

public PulsarApi.KeySharedMeta getKeySharedMeta() {
return keySharedMeta;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,18 @@
* Select consumer will return the ceiling key of message key hashcode % range size.
*
*/
public class HashRangeStickyKeyConsumerSelector implements StickyKeyConsumerSelector {

public static final int DEFAULT_RANGE_SIZE = 2 << 15;
public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyConsumerSelector {

private final int rangeSize;

private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
private final Map<Consumer, Integer> consumerRange;

public HashRangeStickyKeyConsumerSelector() {
public HashRangeAutoSplitStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
}

public HashRangeStickyKeyConsumerSelector(int rangeSize) {
public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
if (rangeSize < 2) {
throw new IllegalArgumentException("range size must greater than 2");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyConsumerSelector {

private final int rangeSize;
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;

public HashRangeExclusiveStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
}

public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) {
super();
if (rangeSize < 1) {
throw new IllegalArgumentException("range size must greater than 0");
}
this.rangeSize = rangeSize;
this.rangeMap = new ConcurrentSkipListMap<>();
}

@Override
public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
validateKeySharedMeta(consumer);
for (PulsarApi.IntRange intRange : consumer.getKeySharedMeta().getHashRangesList()) {
rangeMap.put(intRange.getStart(), consumer);
rangeMap.put(intRange.getEnd(), consumer);
}
}

@Override
public void removeConsumer(Consumer consumer) {
rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer));
}

@Override
public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}

public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);
Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
return ceilingConsumer;
} else {
return null;
}
} else {
return null;
}
}

private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
if (consumer.getKeySharedMeta() == null) {
throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
}
List<PulsarApi.IntRange> ranges = consumer.getKeySharedMeta().getHashRangesList();
if (ranges.isEmpty()) {
throw new BrokerServiceException.ConsumerAssignException("Ranges for KeyShared policy must not be empty.");
}
for (PulsarApi.IntRange intRange : ranges) {

if (intRange.getStart() > intRange.getEnd()) {
throw new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end");
}

Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(intRange.getStart());
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(intRange.getEnd());

if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + floorEntry.getValue());
}

if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
}

if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) {
throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
}
}
}

Map<Integer, Consumer> getRangeConsumer() {
return Collections.unmodifiableMap(rangeMap);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
final boolean forceTopicCreation = subscribe.getForceTopicCreation();
final PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null;

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
Expand Down Expand Up @@ -706,13 +707,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated));

readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta));
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition,
startMessageRollbackDurationSec, isReplicated);
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
}
})
.thenAccept(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public interface StickyKeyConsumerSelector {

int DEFAULT_RANGE_SIZE = 2 << 15;

/**
* Add a new consumer
* @param consumer new consumer
Expand All @@ -41,4 +43,11 @@ public interface StickyKeyConsumerSelector {
* @return consumer
*/
Consumer select(byte[] stickyKey);

/**
* Select a consumer by hash of the sticky they
* @param keyHash hash of sticky key
* @return
*/
Consumer select(int keyHash);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -92,7 +93,7 @@ default long getOriginalSequenceId() {
CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicateSubscriptionState);
long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta);

CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
Expand All @@ -40,10 +39,10 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis

private final StickyKeyConsumerSelector selector;

public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
StickyKeyConsumerSelector selector) {
super(topic, subscription);
//TODO: Consumer selector Pluggable
selector = new HashRangeStickyKeyConsumerSelector();
this.selector = selector;
}

@Override
Expand Down Expand Up @@ -76,10 +75,7 @@ public void sendMessages(List<Entry> entries) {
while (iterator.hasNext()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = null;
if (selector instanceof HashRangeStickyKeyConsumerSelector) {
consumer = ((HashRangeStickyKeyConsumerSelector)selector).select(entriesWithSameKey.getKey());
}
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -120,7 +122,25 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this);
if (consumer.getKeySharedMeta() != null) {
switch (consumer.getKeySharedMeta().getKeySharedMode()) {
case STICKY:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeExclusiveStickyKeyConsumerSelector());
break;
case AUTO_SPLIT:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeAutoSplitStickyKeyConsumerSelector());
break;
default:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeAutoSplitStickyKeyConsumerSelector());
break;
}
} else {
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeAutoSplitStickyKeyConsumerSelector());
}
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -246,7 +247,7 @@ public void removeProducer(Producer producer) {
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState) {
long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

Expand Down Expand Up @@ -297,7 +298,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole(), metadata, readCompacted, initialPosition);
cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
Expand All @@ -45,10 +44,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi

private final StickyKeyConsumerSelector selector;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, StickyKeyConsumerSelector selector) {
super(topic, cursor, subscription);
//TODO: Consumer selector Pluggable
selector = new HashRangeStickyKeyConsumerSelector();
this.selector = selector;
}

@Override
Expand Down Expand Up @@ -79,10 +78,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = null;
if (selector instanceof HashRangeStickyKeyConsumerSelector) {
consumer = ((HashRangeStickyKeyConsumerSelector)selector).select(entriesWithSameKey.getKey());
}
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
Expand Down
Loading

0 comments on commit 951664c

Please sign in to comment.