Skip to content

Commit

Permalink
Added metrics for topic lookups operations (apache#8272)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Oct 19, 2020
1 parent 4f2726c commit ad8c5ca
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -63,6 +63,7 @@
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -751,6 +752,13 @@ public void onUpdate(final String path, final LocalBrokerData data, final Stat s
scheduler.submit(this::updateAll);
}

private static final Summary selectBrokerForAssignment = Summary.build("pulsar_broker_load_manager_bundle_assigment", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();

/**
* As the leader broker, find a suitable broker for the assignment of the given bundle.
*
Expand All @@ -761,81 +769,92 @@ public void onUpdate(final String path, final LocalBrokerData data, final Stat s
@Override
public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// Use brokerCandidateCache as a lock to reduce synchronization.
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
// distribute bundles evenly to candidate-brokers

LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
brokerToNamespaceToBundleRange);
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
long startTime = System.nanoTime();

try {
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
} catch ( BrokerFilterException x ) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

if ( brokerCandidateCache.isEmpty() ) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}
// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(),
brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
// distribute bundles evenly to candidate-brokers

if (!broker.isPresent()) {
// No brokers available
return broker;
}
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
brokerToNamespaceToBundleRange);
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
}
// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Add new bundle to preallocated.
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker.get());
if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
.add(bundleRange);
// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
}

// Add new bundle to preallocated.
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker.get());

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
.add(bundleRange);
}
return broker;
}
return broker;
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,41 @@
*/
package org.apache.pulsar.broker.namespace;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.AdminResource.PARTITIONED_TOPIC_PATH_ZNODE;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static org.apache.pulsar.common.util.Codec.decode;

import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;

import io.netty.channel.EventLoopGroup;
import io.prometheus.client.Counter;

import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
Expand All @@ -37,6 +69,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
Expand Down Expand Up @@ -67,43 +100,12 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.AdminResource.PARTITIONED_TOPIC_PATH_ZNODE;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static org.apache.pulsar.common.util.Codec.decode;

/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
* for the <code>PulsarService</code>.
Expand Down Expand Up @@ -151,6 +153,19 @@ public static enum AddressType {

private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;


private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();

private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();


/**
* Default constructor.
*
Expand All @@ -175,8 +190,26 @@ public void initialize() {
}

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
return getBundleAsync(topic)
long startTime = System.nanoTime();

CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> findBrokerServiceUrl(bundle, options));

future.thenAccept(optResult -> {
lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (optResult.isPresent()) {
if (optResult.get().isRedirect()) {
lookupRedirects.inc();
} else {
lookupAnswers.inc();
}
}
}).exceptionally(ex -> {
lookupFailures.inc();
return null;
});

return future;
}

public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.PulsarWebResource;
Expand Down Expand Up @@ -202,6 +203,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;

private final ObserverGauge pendingLookupRequests;
private final ObserverGauge pendingTopicLoadRequests;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
Expand Down Expand Up @@ -335,6 +339,16 @@ public Map<String, String> deserialize(String key, byte[] content) throws Except
.loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration());

this.defaultServerBootstrap = defaultServerBootstrap();

this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest()
- lookupRequestSemaphore.get().availablePermits())
.register();

this.pendingTopicLoadRequests = ObserverGauge.build("pulsar_broker_topic_load_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits())
.register();
}

// This call is used for starting additional protocol handlers
Expand Down
Loading

0 comments on commit ad8c5ca

Please sign in to comment.