Skip to content

Commit

Permalink
Add key-shared consumer range to internal topic stats. (apache#8567)
Browse files Browse the repository at this point in the history
Expose consumer range information in topic internal stats
  • Loading branch information
MarvinCai authored Nov 17, 2020
1 parent 8da9422 commit 8d7dd11
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand Down Expand Up @@ -126,6 +128,25 @@ public Consumer select(byte[] stickyKey) {
}
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new LinkedHashMap<>();
rwLock.readLock().lock();
try {
int start = 0;
for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
for (Consumer consumer: entry.getValue()) {
result.computeIfAbsent(consumer.consumerName(), key -> new ArrayList<>())
.add("[" + start + ", " + entry.getKey() + "]");
}
start = entry.getKey() + 1;
}
} finally {
rwLock.readLock().unlock();
}
return result;
}

Map<Integer, List<Consumer>> getRangeConsumer() {
return Collections.unmodifiableMap(hashRing);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -112,6 +114,18 @@ public Consumer select(byte[] stickyKey) {
}
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
int start = 0;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + start + ", " + entry.getKey() + "]");
start = entry.getKey() + 1;
}
return result;
}

private int findBiggestRange() {
int slots = 0;
int busiestRange = rangeSize;
Expand Down Expand Up @@ -147,10 +161,6 @@ private boolean is2Power(int num) {
return (num & num - 1) == 0;
}

Map<Consumer, Integer> getConsumerRange() {
return Collections.unmodifiableMap(consumerRange);
}

Map<Integer, Consumer> getRangeConsumer() {
return Collections.unmodifiableMap(rangeMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* This is a sticky-key consumer selector based user provided range.
* User is responsible for making sure provided range for all consumers cover the rangeSize
* else there'll be chance that a key fall in a `whole` that not handled by any consumer.
*/
public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyConsumerSelector {

private final int rangeSize;
Expand Down Expand Up @@ -63,6 +70,24 @@ public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
Map.Entry<Integer, Consumer> prev = null;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
if (prev == null) {
prev = entry;
} else {
if (prev.getValue().equals(entry.getValue())) {
result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>())
.add("[" + prev.getKey() + ", " + entry.getKey() + "]");
}
prev = null;
}
}
return result;
}

Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;

import java.util.List;
import java.util.Map;

public interface StickyKeyConsumerSelector {

int DEFAULT_RANGE_SIZE = 2 << 15;
Expand All @@ -43,4 +46,10 @@ public interface StickyKeyConsumerSelector {
* @return consumer
*/
Consumer select(byte[] stickyKey);

/**
* Get key hash ranges handled by each consumer
* @return A map where key is a consumer name and value is list of hash range it receiving message for.
*/
Map<String, List<String>> getConsumerKeyHashRanges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}

public Map<String, List<String>> getConsumerKeyHashRanges() {
return selector.getConsumerKeyHashRanges();
}

private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ public String getTypeString() {
return "Failover";
case Shared:
return "Shared";
case Key_Shared:
return "Key_Shared";
}

return "Null";
Expand Down Expand Up @@ -899,6 +901,8 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared?
((PersistentStickyKeyDispatcherMultipleConsumers)dispatcher).getConsumerKeyHashRanges(): null;
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStats consumerStats = consumer.getStats();
subStats.consumers.add(consumerStats);
Expand All @@ -911,6 +915,9 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer.consumerName())) {
consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer.consumerName());
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -134,4 +139,28 @@ public void testConsumerSelect() throws ConsumerAssignException {
Assert.assertEquals(selectionMap.get("c4").intValue(), N);
}


@Test
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20});
for (int index = 0; index < consumerName.size(); index++) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(consumerName.get(index));
selector.addConsumer(consumer);
}

Map<String, Set<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableSet.of("[0, 330121749]", "[330121750, 618146114]", "[1797637922, 1976098885]"));
expectedResult.put("consumer2", ImmutableSet.of("[938427576, 1094135919]", "[1138613629, 1342907082]", "[1342907083, 1797637921]"));
expectedResult.put("consumer3", ImmutableSet.of("[618146115, 772640562]", "[772640563, 938427575]", "[1094135920, 1138613628]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue().stream().collect(Collectors.toSet()), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableList;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {

@Test
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
for (int index = 0; index < consumerName.size(); index++) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(consumerName.get(index));
selector.addConsumer(consumer);
}

Map<String, List<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableList.of("[49, 64]"));
expectedResult.put("consumer4", ImmutableList.of("[33, 48]"));
expectedResult.put("consumer2", ImmutableList.of("[17, 32]"));
expectedResult.put("consumer3", ImmutableList.of("[0, 16]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -107,6 +111,36 @@ public void testInvalidRangeTotal() {
new HashRangeExclusiveStickyKeyConsumerSelector(0);
}

@Test
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20});
for (int index = 0; index < consumerName.size(); index++) {
Consumer consumer = mock(Consumer.class);
PulsarApi.KeySharedMeta keySharedMeta = PulsarApi.KeySharedMeta.newBuilder()
.setKeySharedMode(PulsarApi.KeySharedMode.STICKY)
.addHashRanges(PulsarApi.IntRange.newBuilder().setStart(range.get(index)[0])
.setEnd(range.get(index)[1]).build())
.build();
when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
when(consumer.consumerName()).thenReturn(consumerName.get(index));
Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
selector.addConsumer(consumer);
}

Map<String, List<String>> expectedResult = new HashMap<>();
expectedResult.put("consumer1", ImmutableList.of("[0, 2]"));
expectedResult.put("consumer2", ImmutableList.of("[3, 7]"));
expectedResult.put("consumer3", ImmutableList.of("[9, 12]"));
expectedResult.put("consumer4", ImmutableList.of("[15, 20]"));
for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
}

@Test
public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssignException {
HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -77,6 +78,9 @@ public class ConsumerStats {
public long lastAckedTimestamp;
public long lastConsumedTimestamp;

/** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
public List<String> keyHashRanges;

/** Metadata (key/value strings) associated with this consumer. */
public Map<String, String> metadata;

Expand Down

0 comments on commit 8d7dd11

Please sign in to comment.