Skip to content

Commit

Permalink
Allow to configure KeyShared consumer with relaxed ordering (apache#7188
Browse files Browse the repository at this point in the history
)

### Motivation

In certain cases, it is useful to just use key-shared dispatcher in order to have the same key to go to same consumer, although the ordering is not required.

In this case, if we relax the ordering requirement, we can avoid new consumers getting stuck when an existing consumer is going through a prefetched queue of existing messages. This is especially relevant if the processing time is high.
  • Loading branch information
merlimat authored Jun 6, 2020
1 parent dcf8788 commit 1bdfc0d
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import com.google.common.base.Preconditions;

import io.netty.util.concurrent.FastThreadLocal;

import java.util.ArrayList;
Expand All @@ -32,19 +34,25 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {

private final boolean allowOutOfOrderDelivery;
private final StickyKeyConsumerSelector selector;

private boolean isDispatcherStuckOnReplays = false;
Expand All @@ -54,12 +62,32 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
* This means that, in order to preserve ordering, new consumers can only receive old
* messages, until the mark-delete position will move past this point.
*/
private final Map<Consumer, PositionImpl> recentlyJoinedConsumers = new HashMap<>();
private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, StickyKeyConsumerSelector selector) {
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription);
this.selector = selector;

this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();

switch (ksm.getKeySharedMode()) {
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
}
break;

case STICKY:
this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
break;

default:
throw new IllegalArgumentException("Invalid key-shared mode: " + ksm.getKeySharedMode());
}
}

@Override
Expand All @@ -69,7 +97,9 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

// If this was the 1st consumer, or if all the messages are already acked, then we
// don't need to do anything special
if (consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
if (allowOutOfOrderDelivery == false
&& consumerList.size() > 1
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
recentlyJoinedConsumers.put(consumer, (PositionImpl) cursor.getReadPosition());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -33,8 +34,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -50,18 +49,13 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -221,28 +215,10 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
previousDispatcher = dispatcher;
KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();

switch (ksm.getKeySharedMode()) {
case STICKY:
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
new HashRangeExclusiveStickyKeyConsumerSelector());
break;

case AUTO_SPLIT:
default:
StickyKeyConsumerSelector selector;
ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
}

dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, selector);
break;
}
KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
: KeySharedMeta.getDefaultInstance();
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.*;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void setup() throws Exception {
).thenReturn(false);

persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, new ConsistentHashingStickyKeyConsumerSelector(100));
topicMock, cursorMock, subscriptionMock, configMock, KeySharedMeta.getDefaultInstance());
persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.consumerFlow(consumerMock, 1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public abstract class KeySharedPolicy {

protected KeySharedMode keySharedMode;

protected boolean allowOutOfOrderDelivery = false;

public static final int DEFAULT_HASH_RANGE_SIZE = 2 << 15;

public static KeySharedPolicyAutoSplit autoSplitHashRange() {
Expand All @@ -41,6 +43,25 @@ public static KeySharedPolicySticky stickyHashRange() {

public abstract void validate();

/**
* If enabled, it will relax the ordering requirement, allowing the broker to send out-of-order messages in case of
* failures. This will make it faster for new consumers to join without being stalled by an existing slow consumer.
*
* <p>In this case, a single consumer will still receive all the keys, but they may be coming in different orders.
*
* @param allowOutOfOrderDelivery
* whether to allow for out of order delivery
* @return KeySharedPolicy instance
*/
public KeySharedPolicy setAllowOutOfOrderDelivery(boolean allowOutOfOrderDelivery) {
this.allowOutOfOrderDelivery = allowOutOfOrderDelivery;
return this;
}

public boolean isAllowOutOfOrderDelivery() {
return allowOutOfOrderDelivery;
}

public KeySharedMode getKeySharedMode() {
return this.keySharedMode;
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMode;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -567,24 +569,21 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
subscribeBuilder.setForceTopicCreation(createTopicIfDoesNotExist);

if (keySharedPolicy != null) {
switch (keySharedPolicy.getKeySharedMode()) {
case AUTO_SPLIT:
subscribeBuilder.setKeySharedMeta(PulsarApi.KeySharedMeta.newBuilder()
.setKeySharedMode(PulsarApi.KeySharedMode.AUTO_SPLIT));
break;
case STICKY:
PulsarApi.KeySharedMeta.Builder builder = PulsarApi.KeySharedMeta.newBuilder()
.setKeySharedMode(PulsarApi.KeySharedMode.STICKY);
List<Range> ranges = ((KeySharedPolicy.KeySharedPolicySticky) keySharedPolicy)
.getRanges();
for (Range range : ranges) {
builder.addHashRanges(PulsarApi.IntRange.newBuilder()
.setStart(range.getStart())
.setEnd(range.getEnd()));
}
subscribeBuilder.setKeySharedMeta(builder);
break;
KeySharedMeta.Builder keySharedMetaBuilder = PulsarApi.KeySharedMeta.newBuilder();
keySharedMetaBuilder.setAllowOutOfOrderDelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
keySharedMetaBuilder.setKeySharedMode(convertKeySharedMode(keySharedPolicy.getKeySharedMode()));

if (keySharedPolicy instanceof KeySharedPolicy.KeySharedPolicySticky) {
List<Range> ranges = ((KeySharedPolicy.KeySharedPolicySticky) keySharedPolicy)
.getRanges();
for (Range range : ranges) {
keySharedMetaBuilder.addHashRanges(PulsarApi.IntRange.newBuilder()
.setStart(range.getStart())
.setEnd(range.getEnd()));
}
}

subscribeBuilder.setKeySharedMeta(keySharedMetaBuilder.build());
}

if (startMessageId != null) {
Expand All @@ -611,6 +610,16 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
return res;
}


private static KeySharedMode convertKeySharedMode(org.apache.pulsar.client.api.KeySharedMode mode) {
switch (mode) {
case AUTO_SPLIT: return KeySharedMode.AUTO_SPLIT;
case STICKY: return KeySharedMode.STICKY;
default:
throw new IllegalArgumentException("Unexpected key shared mode: " + mode);
}
}

public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
CommandUnsubscribe.Builder unsubscribeBuilder = CommandUnsubscribe.newBuilder();
unsubscribeBuilder.setConsumerId(consumerId);
Expand Down
1 change: 1 addition & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ enum KeySharedMode {
message KeySharedMeta {
required KeySharedMode keySharedMode = 1;
repeated IntRange hashRanges = 3;
optional bool allowOutOfOrderDelivery = 4 [default = false];
}

message CommandSubscribe {
Expand Down

0 comments on commit 1bdfc0d

Please sign in to comment.