diff --git a/all/pom.xml b/all/pom.xml
index 84d4c9cb9c2d3..2fa789811d7bd 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -59,6 +59,11 @@
org.slf4j
slf4j-log4j12
+
+
+ org.apache.bookkeeper.stats
+ datasketches-metrics-provider
+
diff --git a/conf/bookkeeper.conf b/conf/bookkeeper.conf
index cd2fa28212738..ea04abe57418c 100644
--- a/conf/bookkeeper.conf
+++ b/conf/bookkeeper.conf
@@ -295,8 +295,9 @@ readBufferSizeBytes=4096
#useHostNameAsBookieID=false
# Stats Provider Class
-#statsProviderClass=org.apache.bookkeeper.stats.CodahaleMetricsProvider
-#codahaleStatsJmxEndpoint=metrics
+statsProviderClass=org.apache.bokkeeper.stats.datasketches.DataSketchesMetricsProvider
+dataSketchesMetricsJsonFileReporter=data/bookie-stats.json
+dataSketchesMetricsUpdateIntervalSeconds=60
## DB Ledger storage configuration
diff --git a/docs/ClusterSetup.md b/docs/ClusterSetup.md
index b47644bb8314f..122b3cff39eb8 100644
--- a/docs/ClusterSetup.md
+++ b/docs/ClusterSetup.md
@@ -333,7 +333,9 @@ $ bin/pulsar-admin persistent stats persistent://test/us-west/ns1/my-topic
## Monitoring
-Pulsar metrics can be collected from the brokers and are exported in JSON format.
+### Broker stats
+
+Pulsar broker metrics can be collected from the brokers and are exported in JSON format.
There are two main types of metrics:
@@ -349,3 +351,22 @@ bin/pulsar-admin broker-stats monitoring-metrics
```
All the message rates are updated every 1min.
+
+### BookKeeper stats
+
+There are several stats frameworks that works with BookKeeper and that
+can be enabled by changing the `statsProviderClass` in
+`conf/bookkeeper.conf`.
+
+By following the instructions above, the `DataSketchesMetricsProvider`
+will be enabled. It features a very efficient way to compute latency
+quantiles, along with rates and counts.
+
+The stats are dumped every interval into a JSON file that is overwritten
+each time.
+
+```properties
+statsProviderClass=org.apache.bokkeeper.stats.datasketches.DataSketchesMetricsProvider
+dataSketchesMetricsJsonFileReporter=data/bookie-stats.json
+dataSketchesMetricsUpdateIntervalSeconds=60
+```
diff --git a/pom.xml b/pom.xml
index 91e8e73adae3e..6da7f9708b312 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,7 @@ flexible messaging model and an intuitive client API.
org.apache.bookkeeper.stats
- codahale-metrics-provider
+ datasketches-metrics-provider
${bookkeeper.version}
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 645bd89d6b3f0..1aa8342db8805 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -41,6 +41,11 @@
bookkeeper-server
+
+ org.apache.bookkeeper.stats
+ datasketches-metrics-provider
+
+
com.github.ben-manes.caffeine
caffeine
diff --git a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index a1f47b83fe784..d9737e46917d3 100644
--- a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -41,6 +41,9 @@
*/
package com.yahoo.pulsar.zookeeper;
+import static org.apache.commons.io.FileUtils.cleanDirectory;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@@ -48,19 +51,19 @@
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.commons.io.FileUtils.cleanDirectory;
-import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bokkeeper.stats.datasketches.DataSketchesMetricsProvider;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
-import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -109,6 +112,7 @@ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort,
String bkDataDirName;
BookieServer bs[];
ServerConfiguration bsConfs[];
+ StatsProvider statsProviders[];
Integer initialPort = 5000;
/**
@@ -167,13 +171,13 @@ private void initializeZookeper() throws IOException {
}
}
- private void runBookies(ServerConfiguration baseConf) throws IOException, KeeperException, InterruptedException,
- BookieException, UnavailableException, CompatibilityException {
+ private void runBookies(ServerConfiguration baseConf) throws Exception {
LOG.info("Starting Bookie(s)");
// Create Bookie Servers (B1, B2, B3)
bs = new BookieServer[numberOfBookies];
bsConfs = new ServerConfiguration[numberOfBookies];
+ statsProviders = new StatsProvider[numberOfBookies];
for (int i = 0; i < numberOfBookies; i++) {
@@ -194,7 +198,16 @@ private void runBookies(ServerConfiguration baseConf) throws IOException, Keeper
bsConfs[i].setAllowLoopback(true);
bsConfs[i].setGcWaitTime(60000);
- bs[i] = new BookieServer(bsConfs[i]);
+ String statsFilePath = FileSystems.getDefault()
+ .getPath(bkDataDir.getAbsolutePath(), "bookie-stats.json").toString();
+
+ // Initialize Stats Provider
+ statsProviders[i] = new DataSketchesMetricsProvider();
+ bsConfs[i].setProperty("dataSketchesMetricsJsonFileReporter", statsFilePath);
+ statsProviders[i].start(bsConfs[i]);
+
+ StatsLogger statsLogger = statsProviders[i].getStatsLogger("");
+ bs[i] = new BookieServer(bsConfs[i], statsLogger);
bs[i].start();
LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, initialPort + i,
bkDataDir.getAbsolutePath());
@@ -223,6 +236,10 @@ public void stop() throws Exception {
bookie.shutdown();
}
+ for (StatsProvider statsProvider : statsProviders) {
+ statsProvider.stop();
+ }
+
zkc.close();
zks.shutdown();
serverFactory.shutdown();