Skip to content

Commit

Permalink
Expose metrics of Caffeine caches (apache#5320)
Browse files Browse the repository at this point in the history
* Expose metrics of Caffeine and Guava caches

* Fixed test

* removed guava cache instrumentation

* Fixed test
  • Loading branch information
merlimat authored Oct 24, 2019
1 parent 402ecec commit 7c3f3bd
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 41 deletions.
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_servlet-0.5.0.jar
- io.prometheus-simpleclient_log4j2-0.5.0.jar
- io.prometheus-simpleclient_jetty-0.5.0.jar
- io.prometheus-simpleclient_caffeine-0.5.0.jar
* Bean Validation API -- javax.validation-validation-api-1.1.0.Final.jar
* Log4J
- log4j-log4j-1.2.17.jar
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${prometheus.version}</version>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_caffeine</artifactId>
<version>${prometheus.version}</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
Expand Down
1 change: 0 additions & 1 deletion pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@
<artifactId>simpleclient_jetty</artifactId>
</dependency>


<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public static void setDefaultEnsemblePlacementPolicy(
REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName()));

ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
ZooKeeperCache zkc = new ZooKeeperCache("bookies-racks", zkClient,
conf.getZooKeeperOperationTimeoutSeconds()) {
};
if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
zkc.stop();
Expand All @@ -170,7 +171,8 @@ public static void setDefaultEnsemblePlacementPolicy(
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientSecondaryIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
ZooKeeperCache zkc = new ZooKeeperCache("bookies-isolation", zkClient,
conf.getZooKeeperOperationTimeoutSeconds()) {
};

if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
Expand All @@ -185,7 +187,8 @@ private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfi
Class<? extends EnsemblePlacementPolicy> policyClass) {
bkConf.setEnsemblePlacementPolicy(policyClass);
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
ZooKeeperCache zkc = new ZooKeeperCache("bookies-rackaware", zkClient,
conf.getZooKeeperOperationTimeoutSeconds()) {
};
if (!zkCache.compareAndSet(null, zkc)) {
zkc.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -159,8 +160,11 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
this.localZkCache = pulsar.getLocalZkCache();
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
// ownedBundlesCache contains all namespaces that are owned by the local broker
this.ownedBundlesCache = Caffeine.newBuilder().executor(MoreExecutors.directExecutor())
this.ownedBundlesCache = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.recordStats()
.buildAsync(new OwnedServiceUnitCacheLoader());
CacheMetricsCollector.CAFFEINE.addCache("owned-bundles", this.ownedBundlesCache);
}

/**
Expand Down Expand Up @@ -283,7 +287,7 @@ public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
}
return FutureUtil.waitForAll(allFutures);
}


/**
* Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker
Expand Down Expand Up @@ -322,7 +326,7 @@ public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {

/**
* Disable bundle in local cache and on zk
*
*
* @param bundle
* @throws Exception
*/
Expand All @@ -332,10 +336,10 @@ public void disableOwnership(NamespaceBundle bundle) throws Exception {
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfoDisabled), -1);
ownershipReadOnlyCache.invalidate(path);
}

/**
* Update bundle state in a local cache
*
*
* @param bundle
* @throws Exception
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand All @@ -68,7 +69,9 @@ public class NamespaceBundleFactory implements ZooKeeperCacheListener<LocalPolic
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;

this.bundlesCache = Caffeine.newBuilder().buildAsync((NamespaceName namespace, Executor executor) -> {
this.bundlesCache = Caffeine.newBuilder()
.recordStats()
.buildAsync((NamespaceName namespace, Executor executor) -> {
String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Loading cache with bundles for {}", namespace);
Expand Down Expand Up @@ -101,6 +104,8 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
return future;
});

CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);

// local-policies have been changed which has contains namespace bundles
pulsar.getLocalZkCacheService().policiesCache()
.registerListener((String path, LocalPolicies data, Stat stat) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -69,7 +70,6 @@ public void testPerTopicStats() throws Exception {
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());

Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e -> {
Expand Down Expand Up @@ -241,7 +241,7 @@ private static Multimap<String, Metric> parseMetrics(String metrics) {
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");

Splitter.on("\n").split(metrics).forEach(line -> {
Expand All @@ -250,8 +250,7 @@ private static Multimap<String, Metric> parseMetrics(String metrics) {
}

Matcher matcher = pattern.matcher(line);

checkArgument(matcher.matches());
assertTrue(matcher.matches());
String name = matcher.group(1);

Metric m = new Metric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ private static Map<String, Metric> parseMetrics(String metrics) {
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
Arrays.asList(metrics.split("\n")).forEach(line -> {
if (line.isEmpty() || line.startsWith("#")) {
Expand Down
4 changes: 4 additions & 0 deletions pulsar-zookeeper-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
<artifactId>aspectjweaver</artifactId>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_caffeine</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.stats;

import lombok.experimental.UtilityClass;

@UtilityClass
public class CacheMetricsCollector {

public static final io.prometheus.client.cache.caffeine.CacheMetricsCollector CAFFEINE = new io.prometheus.client.cache.caffeine.CacheMetricsCollector()
.register();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
super(null, zkOperationTimeoutSeconds, orderedExecutor);
super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class);

public LocalZooKeeperCache(final ZooKeeper zk, int zkOperationTimeoutSeconds, final OrderedExecutor executor) {
super(zk, zkOperationTimeoutSeconds, executor);
super("local-zk", zk, zkOperationTimeoutSeconds, executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ private ZooKeeperDataCache<BookiesRackConfiguration> getAndSetZkCache(Configurat
try {
ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(zkTimeout).build();
zkCache = new ZooKeeperCache(zkClient, (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
zkCache = new ZooKeeperCache("bookies-racks", zkClient,
(int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
};
conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ private ZooKeeperCache getAndSetZkCache(Configuration conf) {
try {
ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(zkTimeout).build();
zkCache = new ZooKeeperCache(zkClient, (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
zkCache = new ZooKeeperCache("bookies-isolation", zkClient,
(int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
};
conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
} catch (Exception e) {
Expand Down Expand Up @@ -186,7 +187,7 @@ private Set<BookieSocketAddress> getBlacklistedBookies(int ensembleSize) {
blacklistedBookies.remove(new BookieSocketAddress(bookieAddress));
}
}
}
}
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -89,24 +90,35 @@ public static interface CacheUpdater<T> {

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
checkNotNull(executor);
this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
this.executor = executor;
this.zkSession.set(zkSession);
this.shouldShutdownExecutor = false;

this.dataCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
this.dataCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.buildAsync((key, executor1) -> null);

this.childrenCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
this.childrenCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.buildAsync((key, executor1) -> null);
this.existsCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)

this.existsCache = Caffeine.newBuilder()
.recordStats()
.expireAfterWrite(5, TimeUnit.MINUTES)
.buildAsync((key, executor1) -> null);

CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", dataCache);
CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-children", childrenCache);
CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-exists", existsCache);
}

public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
this(zkSession, zkOperationTimeoutSeconds,
public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
this(cacheName, zkSession, zkOperationTimeoutSeconds,
OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
this.shouldShutdownExecutor = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testBasic() throws Exception {
// Case1: ZKCache is given
ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf1 = new ClientConfiguration();
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testBasic() throws Exception {
public void testNoBookieInfo() throws Exception {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testBookieInfoChange() throws Exception {

ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping
Expand Down
Loading

0 comments on commit 7c3f3bd

Please sign in to comment.