Skip to content

Commit

Permalink
[FLINK-24786][state] Introduce and expose RocksDB statistics as metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Myasuka committed Jul 14, 2022
1 parent 4f7ebbb commit 46e0014
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 75 deletions.
4 changes: 3 additions & 1 deletion docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >}
### RocksDB Native Metrics

Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend.
The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs.
The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.

{{< hint warning >}}
Enabling RocksDB's native metrics may cause degraded performance and should be set carefully.
Expand Down
4 changes: 3 additions & 1 deletion docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ Please refer to the [metrics system documentation]({{< ref "docs/ops/metrics" >}
### RocksDB Native Metrics

Flink can report metrics from RocksDB's native code, for applications using the RocksDB state backend.
The metrics here are scoped to the operators and then further broken down by column family; values are reported as unsigned longs.
The metrics here are scoped to the operators with unsigned longs and have two kinds of types:
1. RocksDB property-based metrics, which is broken down by column family, e.g. number of currently running compactions of one specific column family.
2. RocksDB statistics-based metrics, which holds at the database level, e.g. total block cache hit count within the DB.

{{< hint warning >}}
Enabling RocksDB's native metrics may cause degraded performance and should be set carefully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@
<td>Boolean</td>
<td>Monitor block cache capacity.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.block-cache-hit</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT).</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.block-cache-miss</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS).</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.block-cache-pinned-usage</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -38,18 +50,42 @@
<td>Boolean</td>
<td>Monitor the memory size for the entries residing in block cache.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.bytes-read</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.bytes-written</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.column-family-as-variable</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to expose the column family as a variable.</td>
<td>Whether to expose the column family as a variable for RocksDB property based metrics.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.compaction-pending</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.compaction-read-bytes</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the bytes read during compaction in RocksDB.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.compaction-write-bytes</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the bytes written during compaction in RocksDB.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.cur-size-active-mem-table</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -92,6 +128,12 @@
<td>Boolean</td>
<td>Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.iter-bytes-read</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.live-sst-files-size</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -164,6 +206,12 @@
<td>Boolean</td>
<td>Monitor the approximate size of the active, unflushed immutable, and pinned immutable memtables in bytes.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.stall-micros</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.metrics.total-sst-files-size</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
/** This determines the type of priority queue state. */
@Nullable private PriorityQueueStateType priorityQueueStateType;

/** The default rocksdb metrics options. */
private final RocksDBNativeMetricOptions defaultMetricOptions;
/** The default rocksdb property-based metrics options. */
private final RocksDBNativeMetricOptions nativeMetricOptions;

// -- runtime values, set on TaskManager when initializing / using the backend

Expand Down Expand Up @@ -199,7 +199,7 @@ public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing) {
public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing) {
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.nativeMetricOptions = new RocksDBNativeMetricOptions();
this.memoryConfiguration = new RocksDBMemoryConfiguration();
this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
Expand Down Expand Up @@ -263,7 +263,7 @@ private EmbeddedRocksDBStateBackend(
}

// configure metric options
this.defaultMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);
this.nativeMetricOptions = RocksDBNativeMetricOptions.fromConfig(config);

// configure RocksDB predefined options
this.predefinedOptions =
Expand Down Expand Up @@ -465,7 +465,8 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());
}
final RocksDBResourceContainer resourceContainer =
createOptionsAndResourceContainer(sharedResources);
createOptionsAndResourceContainer(
sharedResources, nativeMetricOptions.isStatisticsEnabled());

ExecutionConfig executionConfig = env.getExecutionConfig();
StreamCompressionDecorator keyGroupCompressionDecorator =
Expand Down Expand Up @@ -496,7 +497,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
.setNativeMetricOptions(
resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
.setWriteBatchSize(getWriteBatchSize())
.setOverlapFractionThreshold(getOverlapFractionThreshold());
return builder.build();
Expand Down Expand Up @@ -863,18 +864,20 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon

@VisibleForTesting
RocksDBResourceContainer createOptionsAndResourceContainer() {
return createOptionsAndResourceContainer(null);
return createOptionsAndResourceContainer(null, false);
}

@VisibleForTesting
private RocksDBResourceContainer createOptionsAndResourceContainer(
@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
boolean enableStatistics) {

return new RocksDBResourceContainer(
configurableOptions != null ? configurableOptions : new Configuration(),
predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
rocksDbOptionsFactory,
sharedResources);
sharedResources,
enableStatistics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;

/** RocksDB property-based and statistics-based native metrics options. */
private RocksDBNativeMetricOptions nativeMetricOptions;

private int numberOfTransferingThreads;
private long writeBatchSize =
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
Expand Down Expand Up @@ -309,7 +311,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
nativeMetricMonitor =
nativeMetricOptions.isEnabled()
? new RocksDBNativeMetricMonitor(
nativeMetricOptions, metricGroup, db)
nativeMetricOptions, metricGroup, db, null)
: null;
} else {
prepareDirectories();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.TickerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.io.Closeable;
Expand All @@ -54,15 +57,32 @@ public class RocksDBNativeMetricMonitor implements Closeable {
@GuardedBy("lock")
private RocksDB rocksDB;

@Nullable
@GuardedBy("lock")
private Statistics statistics;

public RocksDBNativeMetricMonitor(
@Nonnull RocksDBNativeMetricOptions options,
@Nonnull MetricGroup metricGroup,
@Nonnull RocksDB rocksDB) {
@Nonnull RocksDB rocksDB,
@Nullable Statistics statistics) {
this.options = options;
this.metricGroup = metricGroup;
this.rocksDB = rocksDB;

this.statistics = statistics;
this.lock = new Object();
registerStatistics();
}

/** Register gauges to pull native metrics for the database. */
private void registerStatistics() {
if (statistics != null) {
for (TickerType tickerType : options.getMonitorTickerTypes()) {
metricGroup.gauge(
String.format("rocksdb.%s", tickerType.name().toLowerCase()),
new RocksDBNativeStatisticsMetricView(tickerType));
}
}
}

/**
Expand All @@ -80,59 +100,85 @@ void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) {
: metricGroup.addGroup(columnFamilyName);

for (String property : options.getProperties()) {
RocksDBNativeMetricView gauge = new RocksDBNativeMetricView(handle, property);
RocksDBNativePropertyMetricView gauge =
new RocksDBNativePropertyMetricView(handle, property);
group.gauge(property, gauge);
}
}

/** Updates the value of metricView if the reference is still valid. */
private void setProperty(
ColumnFamilyHandle handle, String property, RocksDBNativeMetricView metricView) {
private void setProperty(RocksDBNativePropertyMetricView metricView) {
if (metricView.isClosed()) {
return;
}
try {
synchronized (lock) {
if (rocksDB != null) {
long value = rocksDB.getLongProperty(handle, property);
long value = rocksDB.getLongProperty(metricView.handle, metricView.property);
metricView.setValue(value);
}
}
} catch (RocksDBException e) {
metricView.close();
LOG.warn("Failed to read native metric {} from RocksDB.", property, e);
LOG.warn("Failed to read native metric {} from RocksDB.", metricView.property, e);
}
}

private void setStatistics(RocksDBNativeStatisticsMetricView metricView) {
if (metricView.isClosed()) {
return;
}
if (statistics != null) {
synchronized (lock) {
metricView.setValue(statistics.getTickerCount(metricView.tickerType));
}
}
}

@Override
public void close() {
synchronized (lock) {
rocksDB = null;
statistics = null;
}
}

abstract static class RocksDBNativeView implements View {
private boolean closed;

RocksDBNativeView() {
this.closed = false;
}

void close() {
closed = true;
}

boolean isClosed() {
return closed;
}
}

/**
* A gauge which periodically pulls a RocksDB native metric for the specified column family /
* metric pair.
* A gauge which periodically pulls a RocksDB property-based native metric for the specified
* column family / metric pair.
*
* <p><strong>Note</strong>: As the returned property is of type {@code uint64_t} on C++ side
* the returning value can be negative. Because java does not support unsigned long types, this
* gauge wraps the result in a {@link BigInteger}.
*/
class RocksDBNativeMetricView implements Gauge<BigInteger>, View {
class RocksDBNativePropertyMetricView extends RocksDBNativeView implements Gauge<BigInteger> {
private final String property;

private final ColumnFamilyHandle handle;

private BigInteger bigInteger;

private boolean closed;

private RocksDBNativeMetricView(ColumnFamilyHandle handle, @Nonnull String property) {
private RocksDBNativePropertyMetricView(
ColumnFamilyHandle handle, @Nonnull String property) {
this.handle = handle;
this.property = property;
this.bigInteger = BigInteger.ZERO;
this.closed = false;
}

public void setValue(long value) {
Expand All @@ -149,22 +195,40 @@ public void setValue(long value) {
}
}

public void close() {
closed = true;
@Override
public BigInteger getValue() {
return bigInteger;
}

public boolean isClosed() {
return closed;
@Override
public void update() {
setProperty(this);
}
}

/**
* A gauge which periodically pulls a RocksDB statistics-based native metric for the database.
*/
class RocksDBNativeStatisticsMetricView extends RocksDBNativeView implements Gauge<Long> {
private final TickerType tickerType;
private long value;

private RocksDBNativeStatisticsMetricView(TickerType tickerType) {
this.tickerType = tickerType;
}

@Override
public BigInteger getValue() {
return bigInteger;
public Long getValue() {
return value;
}

void setValue(long value) {
this.value = value;
}

@Override
public void update() {
setProperty(handle, property, this);
setStatistics(this);
}
}
}
Loading

0 comments on commit 46e0014

Please sign in to comment.