diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 912ac8bb1d859..4c6a29da631f3 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -264,7 +264,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >} ### RocksDB Native Metrics Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend. -The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs. +The metrics here are scoped to the operators with unsigned longs and have two kinds of types: +1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family. +2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB. {{< hint warning >}} Enabling RocksDB's native metrics may cause degraded performance and should be set carefully. diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index a38e72d0e1828..5fc8cb1ac2903 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -266,7 +266,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >} ### RocksDB Native Metrics Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend. -The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs. +The metrics here are scoped to the operators with unsigned longs and have two kinds of types: +1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family. +2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB. {{< hint warning >}} Enabling RocksDB's native metrics may cause degraded performance and should be set carefully. diff --git a/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html index a7922f2547a01..2edd8387d085f 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_native_metric_configuration.html @@ -26,6 +26,18 @@ Boolean Monitor block cache capacity. + +
state.backend.rocksdb.metrics.block-cache-hit
+ false + Boolean + Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT). + + +
state.backend.rocksdb.metrics.block-cache-miss
+ false + Boolean + Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS). +
state.backend.rocksdb.metrics.block-cache-pinned-usage
false @@ -38,11 +50,23 @@ Boolean Monitor the memory size for the entries residing in block cache. + +
state.backend.rocksdb.metrics.bytes-read
+ false + Boolean + Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB. + + +
state.backend.rocksdb.metrics.bytes-written
+ false + Boolean + Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB. +
state.backend.rocksdb.metrics.column-family-as-variable
false Boolean - Whether to expose the column family as a variable. + Whether to expose the column family as a variable for RocksDB property based metrics.
state.backend.rocksdb.metrics.compaction-pending
@@ -50,6 +74,18 @@ Boolean Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise. + +
state.backend.rocksdb.metrics.compaction-read-bytes
+ false + Boolean + Monitor the bytes read during compaction in RocksDB. + + +
state.backend.rocksdb.metrics.compaction-write-bytes
+ false + Boolean + Monitor the bytes written during compaction in RocksDB. +
state.backend.rocksdb.metrics.cur-size-active-mem-table
false @@ -92,6 +128,12 @@ Boolean Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise. + +
state.backend.rocksdb.metrics.iter-bytes-read
+ false + Boolean + Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB. +
state.backend.rocksdb.metrics.live-sst-files-size
false @@ -164,6 +206,12 @@ Boolean Monitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes. + +
state.backend.rocksdb.metrics.stall-micros
+ false + Boolean + Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB. +
state.backend.rocksdb.metrics.total-sst-files-size
false diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 07829949a2566..8ce78342d3cd5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -147,8 +147,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke /** This determines the type of priority queue state. */ @Nullable private PriorityQueueStateType priorityQueueStateType; - /** The default rocksdb metrics options. */ - private final RocksDBNativeMetricOptions defaultMetricOptions; + /** The default rocksdb property-based metrics options. */ + private final RocksDBNativeMetricOptions nativeMetricOptions; // -- runtime values, set on TaskManager when initializing / using the backend @@ -199,7 +199,7 @@ public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing) { public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing) { this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS; - this.defaultMetricOptions = new RocksDBNativeMetricOptions(); + this.nativeMetricOptions = new RocksDBNativeMetricOptions(); this.memoryConfiguration = new RocksDBMemoryConfiguration(); this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE; this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; @@ -263,7 +263,7 @@ private EmbeddedRocksDBStateBackend( } // configure metric options - this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(config); + this.nativeMetricOptions = RocksDBNativeMetricOptions.fromConfig(config); // configure RocksDB predefined options this.predefinedOptions = @@ -465,7 +465,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize()); } final RocksDBResourceContainer resourceContainer = - createOptionsAndResourceContainer(sharedResources); + createOptionsAndResourceContainer( + sharedResources, nativeMetricOptions.isStatisticsEnabled()); ExecutionConfig executionConfig = env.getExecutionConfig(); StreamCompressionDecorator keyGroupCompressionDecorator = @@ -496,7 +497,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) .setNumberOfTransferingThreads(getNumberOfTransferThreads()) .setNativeMetricOptions( - resourceContainer.getMemoryWatcherOptions(defaultMetricOptions)) + resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) .setWriteBatchSize(getWriteBatchSize()) .setOverlapFractionThreshold(getOverlapFractionThreshold()); return builder.build(); @@ -863,18 +864,20 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon @VisibleForTesting RocksDBResourceContainer createOptionsAndResourceContainer() { - return createOptionsAndResourceContainer(null); + return createOptionsAndResourceContainer(null, false); } @VisibleForTesting private RocksDBResourceContainer createOptionsAndResourceContainer( - @Nullable OpaqueMemoryResource sharedResources) { + @Nullable OpaqueMemoryResource sharedResources, + boolean enableStatistics) { return new RocksDBResourceContainer( configurableOptions != null ? configurableOptions : new Configuration(), predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT, rocksDbOptionsFactory, - sharedResources); + sharedResources, + enableStatistics); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 8706a90c6b08c..ca1adc27303a6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -114,7 +114,9 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken /** True if incremental checkpointing is enabled. */ private boolean enableIncrementalCheckpointing; + /** RocksDB property-based and statistics-based native metrics options. */ private RocksDBNativeMetricOptions nativeMetricOptions; + private int numberOfTransferingThreads; private long writeBatchSize = RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes(); @@ -309,7 +311,7 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { nativeMetricMonitor = nativeMetricOptions.isEnabled() ? new RocksDBNativeMetricMonitor( - nativeMetricOptions, metricGroup, db) + nativeMetricOptions, metricGroup, db, null) : null; } else { prepareDirectories(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java index 83c44c8915d6f..0ea5a1aec5540 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java @@ -26,10 +26,13 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.Statistics; +import org.rocksdb.TickerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; @@ -54,15 +57,32 @@ public class RocksDBNativeMetricMonitor implements Closeable { @GuardedBy("lock") private RocksDB rocksDB; + @Nullable + @GuardedBy("lock") + private Statistics statistics; + public RocksDBNativeMetricMonitor( @Nonnull RocksDBNativeMetricOptions options, @Nonnull MetricGroup metricGroup, - @Nonnull RocksDB rocksDB) { + @Nonnull RocksDB rocksDB, + @Nullable Statistics statistics) { this.options = options; this.metricGroup = metricGroup; this.rocksDB = rocksDB; - + this.statistics = statistics; this.lock = new Object(); + registerStatistics(); + } + + /** Register gauges to pull native metrics for the database. */ + private void registerStatistics() { + if (statistics != null) { + for (TickerType tickerType : options.getMonitorTickerTypes()) { + metricGroup.gauge( + String.format("rocksdb.%s", tickerType.name().toLowerCase()), + new RocksDBNativeStatisticsMetricView(tickerType)); + } + } } /** @@ -80,27 +100,38 @@ void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { : metricGroup.addGroup(columnFamilyName); for (String property : options.getProperties()) { - RocksDBNativeMetricView gauge = new RocksDBNativeMetricView(handle, property); + RocksDBNativePropertyMetricView gauge = + new RocksDBNativePropertyMetricView(handle, property); group.gauge(property, gauge); } } /** Updates the value of metricView if the reference is still valid. */ - private void setProperty( - ColumnFamilyHandle handle, String property, RocksDBNativeMetricView metricView) { + private void setProperty(RocksDBNativePropertyMetricView metricView) { if (metricView.isClosed()) { return; } try { synchronized (lock) { if (rocksDB != null) { - long value = rocksDB.getLongProperty(handle, property); + long value = rocksDB.getLongProperty(metricView.handle, metricView.property); metricView.setValue(value); } } } catch (RocksDBException e) { metricView.close(); - LOG.warn("Failed to read native metric {} from RocksDB.", property, e); + LOG.warn("Failed to read native metric {} from RocksDB.", metricView.property, e); + } + } + + private void setStatistics(RocksDBNativeStatisticsMetricView metricView) { + if (metricView.isClosed()) { + return; + } + if (statistics != null) { + synchronized (lock) { + metricView.setValue(statistics.getTickerCount(metricView.tickerType)); + } } } @@ -108,31 +139,46 @@ private void setProperty( public void close() { synchronized (lock) { rocksDB = null; + statistics = null; + } + } + + abstract static class RocksDBNativeView implements View { + private boolean closed; + + RocksDBNativeView() { + this.closed = false; + } + + void close() { + closed = true; + } + + boolean isClosed() { + return closed; } } /** - * A gauge which periodically pulls a RocksDB native metric for the specified column family / - * metric pair. + * A gauge which periodically pulls a RocksDB property-based native metric for the specified + * column family / metric pair. * *

Note: As the returned property is of type {@code uint64_t} on C++ side * the returning value can be negative. Because java does not support unsigned long types, this * gauge wraps the result in a {@link BigInteger}. */ - class RocksDBNativeMetricView implements Gauge, View { + class RocksDBNativePropertyMetricView extends RocksDBNativeView implements Gauge { private final String property; private final ColumnFamilyHandle handle; private BigInteger bigInteger; - private boolean closed; - - private RocksDBNativeMetricView(ColumnFamilyHandle handle, @Nonnull String property) { + private RocksDBNativePropertyMetricView( + ColumnFamilyHandle handle, @Nonnull String property) { this.handle = handle; this.property = property; this.bigInteger = BigInteger.ZERO; - this.closed = false; } public void setValue(long value) { @@ -149,22 +195,40 @@ public void setValue(long value) { } } - public void close() { - closed = true; + @Override + public BigInteger getValue() { + return bigInteger; } - public boolean isClosed() { - return closed; + @Override + public void update() { + setProperty(this); + } + } + + /** + * A gauge which periodically pulls a RocksDB statistics-based native metric for the database. + */ + class RocksDBNativeStatisticsMetricView extends RocksDBNativeView implements Gauge { + private final TickerType tickerType; + private long value; + + private RocksDBNativeStatisticsMetricView(TickerType tickerType) { + this.tickerType = tickerType; } @Override - public BigInteger getValue() { - return bigInteger; + public Long getValue() { + return value; + } + + void setValue(long value) { + this.value = value; } @Override public void update() { - setProperty(handle, property, this); + setStatistics(this); } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java index 1cec947e5c585..f6fc5c759aeaf 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java @@ -18,19 +18,29 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; +import org.rocksdb.TickerType; + import java.io.Serializable; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; /** - * Enable which RocksDB metrics to forward to Flink's metrics reporter. All metrics report at the - * column family level and return unsigned long values. + * Enable which RocksDB metrics to forward to Flink's metrics reporter. + * + *

Property based metrics would report at the column family level and return unsigned long + * values. + * + *

Statistics based metrics would report at the database level, it can return ticker or histogram + * kind results. * *

Properties and doc comments are taken from RocksDB documentation. See @@ -39,6 +49,10 @@ public class RocksDBNativeMetricOptions implements Serializable { private static final long serialVersionUID = 1L; + // -------------------------------------------------------------------------------------------- + // RocksDB property based metrics, report at column family level + // -------------------------------------------------------------------------------------------- + public static final String METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY = "state.backend.rocksdb.metrics" + ".column-family-as-variable"; @@ -227,11 +241,77 @@ public class RocksDBNativeMetricOptions implements Serializable { ConfigOptions.key(METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY) .booleanType() .defaultValue(false) - .withDescription("Whether to expose the column family as a variable."); + .withDescription( + "Whether to expose the column family as a variable for RocksDB property based metrics."); + + // -------------------------------------------------------------------------------------------- + // RocksDB statistics based metrics, report at database level + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption MONITOR_BLOCK_CACHE_HIT = + ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-hit") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT)."); + + public static final ConfigOption MONITOR_BLOCK_CACHE_MISS = + ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-miss") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS)."); + + public static final ConfigOption MONITOR_BYTES_READ = + ConfigOptions.key("state.backend.rocksdb.metrics.bytes-read") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB."); + + public static final ConfigOption MONITOR_ITER_BYTES_READ = + ConfigOptions.key("state.backend.rocksdb.metrics.iter-bytes-read") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB."); + + public static final ConfigOption MONITOR_BYTES_WRITTEN = + ConfigOptions.key("state.backend.rocksdb.metrics.bytes-written") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB."); + + public static final ConfigOption MONITOR_COMPACTION_READ_BYTES = + ConfigOptions.key("state.backend.rocksdb.metrics.compaction-read-bytes") + .booleanType() + .defaultValue(false) + .withDescription("Monitor the bytes read during compaction in RocksDB."); + + public static final ConfigOption MONITOR_COMPACTION_WRITE_BYTES = + ConfigOptions.key("state.backend.rocksdb.metrics.compaction-write-bytes") + .booleanType() + .defaultValue(false) + .withDescription("Monitor the bytes written during compaction in RocksDB."); + + public static final ConfigOption MONITOR_STALL_MICROS = + ConfigOptions.key("state.backend.rocksdb.metrics.stall-micros") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB."); /** Creates a {@link RocksDBNativeMetricOptions} based on an external configuration. */ public static RocksDBNativeMetricOptions fromConfig(ReadableConfig config) { RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions(); + configurePropertyMetrics(options, config); + configureStatisticsMetrics(options, config); + return options; + } + + private static void configurePropertyMetrics( + RocksDBNativeMetricOptions options, ReadableConfig config) { if (config.get(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) { options.enableNumImmutableMemTable(); } @@ -337,15 +417,51 @@ public static RocksDBNativeMetricOptions fromConfig(ReadableConfig config) { } options.setColumnFamilyAsVariable(config.get(COLUMN_FAMILY_AS_VARIABLE)); + } - return options; + private static void configureStatisticsMetrics( + RocksDBNativeMetricOptions options, ReadableConfig config) { + for (Map.Entry, TickerType> entry : tickerTypeMapping.entrySet()) { + if (config.get(entry.getKey())) { + options.monitorTickerTypes.add(entry.getValue()); + } + } } + private static final Map, TickerType> tickerTypeMapping = + new HashMap, TickerType>() { + private static final long serialVersionUID = 1L; + + { + put(MONITOR_BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_HIT); + put(MONITOR_BLOCK_CACHE_MISS, TickerType.BLOCK_CACHE_MISS); + put(MONITOR_BYTES_READ, TickerType.BYTES_READ); + put(MONITOR_ITER_BYTES_READ, TickerType.ITER_BYTES_READ); + put(MONITOR_BYTES_WRITTEN, TickerType.BYTES_WRITTEN); + put(MONITOR_COMPACTION_READ_BYTES, TickerType.COMPACT_READ_BYTES); + put(MONITOR_COMPACTION_WRITE_BYTES, TickerType.COMPACT_WRITE_BYTES); + put(MONITOR_STALL_MICROS, TickerType.STALL_MICROS); + } + }; + private final Set properties; + private final Set monitorTickerTypes; private boolean columnFamilyAsVariable = COLUMN_FAMILY_AS_VARIABLE.defaultValue(); public RocksDBNativeMetricOptions() { this.properties = new HashSet<>(); + this.monitorTickerTypes = new HashSet<>(); + } + + @VisibleForTesting + public void enableNativeStatistics(ConfigOption nativeStatisticsOption) { + TickerType tickerType = tickerTypeMapping.get(nativeStatisticsOption); + if (tickerType != null) { + monitorTickerTypes.add(tickerType); + } else { + throw new IllegalArgumentException( + "Unknown configurable native statistics option " + nativeStatisticsOption); + } } /** Returns number of immutable memtables that have not yet been flushed. */ @@ -501,18 +617,28 @@ public void setColumnFamilyAsVariable(boolean columnFamilyAsVariable) { this.columnFamilyAsVariable = columnFamilyAsVariable; } - /** @return the enabled RocksDB metrics */ + /** @return the enabled RocksDB property-based metrics */ public Collection getProperties() { return Collections.unmodifiableCollection(properties); } + /** @return the enabled RocksDB statistics metrics. */ + public Collection getMonitorTickerTypes() { + return Collections.unmodifiableCollection(monitorTickerTypes); + } + /** - * {{@link RocksDBNativeMetricMonitor}} is enabled is any property is set. + * {{@link RocksDBNativeMetricMonitor}} is enabled if any property or ticker type is set. * * @return true if {{RocksDBNativeMetricMonitor}} should be enabled, false otherwise. */ public boolean isEnabled() { - return !properties.isEmpty(); + return !properties.isEmpty() || isStatisticsEnabled(); + } + + /** @return true if RocksDB statistics metrics are enabled, false otherwise. */ + public boolean isStatisticsEnabled() { + return !monitorTickerTypes.isEmpty(); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 50752a817ac53..ed652980d042a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -18,6 +18,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; @@ -34,6 +35,7 @@ import org.rocksdb.IndexType; import org.rocksdb.PlainTableConfig; import org.rocksdb.ReadOptions; +import org.rocksdb.Statistics; import org.rocksdb.TableFormatConfig; import org.rocksdb.WriteOptions; import org.slf4j.Logger; @@ -73,35 +75,42 @@ public final class RocksDBResourceContainer implements AutoCloseable { */ @Nullable private final OpaqueMemoryResource sharedResources; + private final boolean enableStatistics; + /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; + @VisibleForTesting public RocksDBResourceContainer() { - this(new Configuration(), PredefinedOptions.DEFAULT, null, null); + this(new Configuration(), PredefinedOptions.DEFAULT, null, null, false); } + @VisibleForTesting public RocksDBResourceContainer( PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory) { - this(new Configuration(), predefinedOptions, optionsFactory, null); + this(new Configuration(), predefinedOptions, optionsFactory, null, false); } + @VisibleForTesting public RocksDBResourceContainer( PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory, @Nullable OpaqueMemoryResource sharedResources) { - this(new Configuration(), predefinedOptions, optionsFactory, sharedResources); + this(new Configuration(), predefinedOptions, optionsFactory, sharedResources, false); } public RocksDBResourceContainer( ReadableConfig configuration, PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory, - @Nullable OpaqueMemoryResource sharedResources) { + @Nullable OpaqueMemoryResource sharedResources, + boolean enableStatistics) { this.configuration = configuration; this.predefinedOptions = checkNotNull(predefinedOptions); this.optionsFactory = optionsFactory; this.sharedResources = sharedResources; + this.enableStatistics = enableStatistics; this.handlesToClose = new ArrayList<>(); } @@ -127,6 +136,12 @@ public DBOptions getDbOptions() { opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager()); } + if (enableStatistics) { + Statistics statistics = new Statistics(); + opt.setStatistics(statistics); + handlesToClose.add(statistics); + } + return opt; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java index 9e9a12d7760cb..6c1d4625fb65e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java @@ -143,7 +143,8 @@ private void loadDb() throws IOException { // init native metrics monitor if configured nativeMetricMonitor = nativeMetricOptions.isEnabled() - ? new RocksDBNativeMetricMonitor(nativeMetricOptions, metricGroup, db) + ? new RocksDBNativeMetricMonitor( + nativeMetricOptions, metricGroup, db, dbOptions.statistics()) : null; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java index f2b95870e19fc..a03de345e017a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java @@ -31,9 +31,11 @@ import org.junit.Rule; import org.junit.Test; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.List; /** validate native metric monitor. */ public class RocksDBNativeMetricMonitorTest { @@ -42,14 +44,14 @@ public class RocksDBNativeMetricMonitorTest { private static final String COLUMN_FAMILY_NAME = "column-family"; - @Rule public RocksDBResource rocksDBResource = new RocksDBResource(); + @Rule public RocksDBResource rocksDBResource = new RocksDBResource(true); @Test public void testMetricMonitorLifecycle() throws Throwable { // We use a local variable here to manually control the life-cycle. // This allows us to verify that metrics do not try to access // RocksDB after the monitor was closed. - RocksDBResource localRocksDBResource = new RocksDBResource(); + RocksDBResource localRocksDBResource = new RocksDBResource(true); localRocksDBResource.before(); SimpleMetricRegistry registry = new SimpleMetricRegistry(); @@ -64,24 +66,38 @@ public void testMetricMonitorLifecycle() throws Throwable { // value since empty memtables // have overhead. options.enableSizeAllMemTables(); + options.enableNativeStatistics(RocksDBNativeMetricOptions.MONITOR_BYTES_WRITTEN); RocksDBNativeMetricMonitor monitor = - new RocksDBNativeMetricMonitor(options, group, localRocksDBResource.getRocksDB()); + new RocksDBNativeMetricMonitor( + options, + group, + localRocksDBResource.getRocksDB(), + localRocksDBResource.getDbOptions().statistics()); ColumnFamilyHandle handle = localRocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME); monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle); Assert.assertEquals( - "Failed to register metrics for column family", 1, registry.metrics.size()); - - RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0); + "Failed to register metrics for column family", 1, registry.propertyMetrics.size()); - view.update(); + // write something to ensure the bytes-written is not zero. + localRocksDBResource.getRocksDB().put(new byte[4], new byte[10]); - Assert.assertNotEquals( - "Failed to pull metric from RocksDB", BigInteger.ZERO, view.getValue()); + for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view : + registry.propertyMetrics) { + view.update(); + Assert.assertNotEquals( + "Failed to pull metric from RocksDB", BigInteger.ZERO, view.getValue()); + view.setValue(0L); + } - view.setValue(0L); + for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view : + registry.statisticsMetrics) { + view.update(); + Assert.assertNotEquals(0L, (long) view.getValue()); + view.setValue(0L); + } // After the monitor is closed no metric should be accessing RocksDB anymore. // If they do, then this test will likely fail with a segmentation fault. @@ -89,10 +105,18 @@ public void testMetricMonitorLifecycle() throws Throwable { localRocksDBResource.after(); - view.update(); + for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view : + registry.propertyMetrics) { + view.update(); + Assert.assertEquals( + "Failed to release RocksDB reference", BigInteger.ZERO, view.getValue()); + } - Assert.assertEquals( - "Failed to release RocksDB reference", BigInteger.ZERO, view.getValue()); + for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view : + registry.statisticsMetrics) { + view.update(); + Assert.assertEquals(0L, (long) view.getValue()); + } } @Test @@ -111,11 +135,16 @@ public void testReturnsUnsigned() throws Throwable { options.enableSizeAllMemTables(); RocksDBNativeMetricMonitor monitor = - new RocksDBNativeMetricMonitor(options, group, localRocksDBResource.getRocksDB()); + new RocksDBNativeMetricMonitor( + options, + group, + localRocksDBResource.getRocksDB(), + localRocksDBResource.getDbOptions().statistics()); ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME); monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle); - RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0); + RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view = + registry.propertyMetrics.get(0); view.setValue(-1); BigInteger result = view.getValue(); @@ -127,7 +156,7 @@ public void testReturnsUnsigned() throws Throwable { } @Test - public void testClosedGaugesDontRead() { + public void testClosedGaugesDontRead() throws RocksDBException { SimpleMetricRegistry registry = new SimpleMetricRegistry(); GenericMetricGroup group = new GenericMetricGroup( @@ -137,23 +166,42 @@ public void testClosedGaugesDontRead() { RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions(); options.enableSizeAllMemTables(); + options.enableNativeStatistics(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT); RocksDBNativeMetricMonitor monitor = - new RocksDBNativeMetricMonitor(options, group, rocksDBResource.getRocksDB()); + new RocksDBNativeMetricMonitor( + options, + group, + rocksDBResource.getRocksDB(), + rocksDBResource.getDbOptions().statistics()); ColumnFamilyHandle handle = rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME); monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle); - RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = registry.metrics.get(0); + rocksDBResource.getRocksDB().put(new byte[4], new byte[10]); - view.close(); - view.update(); + for (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView view : + registry.propertyMetrics) { + view.close(); + view.update(); + Assert.assertEquals( + "Closed gauge still queried RocksDB", BigInteger.ZERO, view.getValue()); + } - Assert.assertEquals("Closed gauge still queried RocksDB", BigInteger.ZERO, view.getValue()); + for (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView view : + registry.statisticsMetrics) { + view.close(); + view.update(); + Assert.assertEquals("Closed gauge still queried RocksDB", 0L, (long) view.getValue()); + } } static class SimpleMetricRegistry implements MetricRegistry { - ArrayList metrics = new ArrayList<>(); + List propertyMetrics = + new ArrayList<>(); + + List statisticsMetrics = + new ArrayList<>(); @Override public char getDelimiter() { @@ -167,8 +215,13 @@ public int getNumberReporters() { @Override public void register(Metric metric, String metricName, AbstractMetricGroup group) { - if (metric instanceof RocksDBNativeMetricMonitor.RocksDBNativeMetricView) { - metrics.add((RocksDBNativeMetricMonitor.RocksDBNativeMetricView) metric); + if (metric instanceof RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView) { + propertyMetrics.add( + (RocksDBNativeMetricMonitor.RocksDBNativePropertyMetricView) metric); + } else if (metric + instanceof RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView) { + statisticsMetrics.add( + (RocksDBNativeMetricMonitor.RocksDBNativeStatisticsMetricView) metric); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java index 66b6221c179bc..238489cb8e1be 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java @@ -30,6 +30,7 @@ import org.rocksdb.InfoLogLevel; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; +import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,8 @@ public class RocksDBResource extends ExternalResource { /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */ private final RocksDBOptionsFactory optionsFactory; + private final boolean enableStatistics; + /** Temporary folder that provides the working directory for the RocksDB instance. */ private TemporaryFolder temporaryFolder; @@ -78,8 +81,14 @@ public class RocksDBResource extends ExternalResource { private ArrayList handlesToClose = new ArrayList<>(); public RocksDBResource() { + this(false); + } + + public RocksDBResource(boolean enableStatistics) { this( new RocksDBOptionsFactory() { + private static final long serialVersionUID = 1L; + @Override public DBOptions createDBOptions( DBOptions currentOptions, Collection handlesToClose) { @@ -111,11 +120,14 @@ public ColumnFamilyOptions createColumnOptions( return new ColumnFamilyOptions().optimizeForPointLookup(40960); } - }); + }, + enableStatistics); } - public RocksDBResource(@Nonnull RocksDBOptionsFactory optionsFactory) { + public RocksDBResource( + @Nonnull RocksDBOptionsFactory optionsFactory, boolean enableStatistics) { this.optionsFactory = optionsFactory; + this.enableStatistics = enableStatistics; } public ColumnFamilyHandle getDefaultColumnFamily() { @@ -134,6 +146,10 @@ public ReadOptions getReadOptions() { return readOptions; } + public DBOptions getDbOptions() { + return dbOptions; + } + public RocksDBWriteBatchWrapper getBatchWrapper() { return batchWrapper; } @@ -165,6 +181,11 @@ protected void before() throws Throwable { .setStatsDumpPeriodSec(0), handlesToClose) .setCreateIfMissing(true); + if (enableStatistics) { + Statistics statistics = new Statistics(); + dbOptions.setStatistics(statistics); + handlesToClose.add(statistics); + } this.columnFamilyOptions = optionsFactory.createColumnOptions(new ColumnFamilyOptions(), handlesToClose); this.writeOptions = new WriteOptions(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 8f8dfc4093b72..e16d1504812a6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -531,7 +531,7 @@ public void testConfigurableOptionsFromConfig() throws Exception { try (RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer( - configuration, PredefinedOptions.DEFAULT, null, null)) { + configuration, PredefinedOptions.DEFAULT, null, null, false)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(-1, dbOptions.maxOpenFiles()); @@ -610,7 +610,11 @@ public void testPredefinedAndConfigurableOptions() throws Exception { configuration.set(RocksDBConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL); try (final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer( - configuration, PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, null)) { + configuration, + PredefinedOptions.SPINNING_DISK_OPTIMIZED, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -622,7 +626,8 @@ public void testPredefinedAndConfigurableOptions() throws Exception { new Configuration(), PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, - null)) { + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions);