Skip to content

Commit

Permalink
[FLINK-14484][state-backend] Control memory usage of RocksDB via Cach…
Browse files Browse the repository at this point in the history
…e and WriteBufferManager

This closes apache#10416
  • Loading branch information
Myasuka authored and StephanEwen committed Dec 6, 2019
1 parent 9da970f commit 112e865
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
return sharedRocksKeyBuilder;
}

@VisibleForTesting
DBOptions getDbOptions() {
return dbOptions;
}

@VisibleForTesting
boolean isDisposed() {
return this.disposed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,24 @@ public class RocksDBOptions {
"The default options factory is %s, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.",
DefaultConfigurableOptionsFactory.class.getName()));

public static final ConfigOption<String> BOUNDED_MEMORY_SIZE = ConfigOptions
.key("state.backend.rocksdb.per-slot.total.memory")
.stringType()
.noDefaultValue()
.withDescription("The total memory size shared among all RocksDB instances per slot. " +
"This option has no default value which means no bounded memory limit.");

public static final ConfigOption<Double> WRITE_BUFFER_RATIO = ConfigOptions
.key("state.backend.rocksdb.write-buffer.ratio")
.doubleType()
.defaultValue(0.5)
.withDescription(String.format("This option would only take effect when %s is configured, " +
"all RocksDB instances would share the same write buffer manager with the ratio of a LRUCache.", BOUNDED_MEMORY_SIZE));

public static final ConfigOption<Double> HIGH_PRI_POOL_RATIO = ConfigOptions
.key("state.backend.rocksdb.high-pri-pool.ratio")
.doubleType()
.defaultValue(0.1)
.withDescription(String.format("This option would only take effect when %s is configured, " +
"all RocksDB instances would share the high priority ratio for index, filter, and compression dictionary blocks in a LRUCache.", BOUNDED_MEMORY_SIZE));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.rocksdb.Cache;
import org.rocksdb.WriteBufferManager;

/**
* Shared objects among RocksDB instances per slot.
*/
public class RocksDBSharedObjects implements AutoCloseable {

private final Cache cache;
private final WriteBufferManager writeBufferManager;

RocksDBSharedObjects(Cache cache, WriteBufferManager writeBufferManager) {
this.cache = cache;
this.writeBufferManager = writeBufferManager;
}

@Override
public void close() {
writeBufferManager.close();
cache.close();
}

public Cache getCache() {
return cache;
}

public WriteBufferManager getWriteBufferManager() {
return writeBufferManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
Expand All @@ -50,10 +52,15 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBufferManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,10 +77,15 @@
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.flink.contrib.streaming.state.RocksDBOptions.BOUNDED_MEMORY_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.HIGH_PRI_POOL_RATIO;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.WRITE_BUFFER_RATIO;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -111,6 +123,8 @@ public enum PriorityQueueStateType {

private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1;

static final long UNDEFINED_VALUE = -1;

// ------------------------------------------------------------------------

// -- configuration values, set in the application / configuration
Expand Down Expand Up @@ -146,6 +160,18 @@ public enum PriorityQueueStateType {
*/
private TernaryBoolean enableTtlCompactionFilter;

/**
* Total memory for all rocksDB instances at this slot.
* Currently, we would use a LRU cache to serve as the total memory usage.
*/
private long totalMemoryPerSlot;

/** The write buffer ratio which consumed by write-buffer manager from the shared cache. */
private double writeBufferRatio;

/** The high priority pool ratio which consumed by index&filter from the shared cache. */
private double highPriPoolRatio;

/** This determines the type of priority queue state. */
private final PriorityQueueStateType priorityQueueStateType;

Expand Down Expand Up @@ -267,6 +293,9 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean
this.priorityQueueStateType = PriorityQueueStateType.HEAP;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
this.totalMemoryPerSlot = UNDEFINED_VALUE;
this.writeBufferRatio = UNDEFINED_VALUE;
this.highPriPoolRatio = UNDEFINED_VALUE;
}

/**
Expand Down Expand Up @@ -312,6 +341,33 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config,
this.enableTtlCompactionFilter = original.enableTtlCompactionFilter
.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));

configureBoundedMemory(
() -> (double) original.getTotalMemoryPerSlot(),
() -> {
if (config.getString(BOUNDED_MEMORY_SIZE) != null) {
setTotalMemoryPerSlot(config.getString(BOUNDED_MEMORY_SIZE));
} else {
// we still left total memory size per slot as undefined if no actual settings.
this.totalMemoryPerSlot = UNDEFINED_VALUE;
}
},
() -> setTotalMemoryPerSlot(String.valueOf(original.totalMemoryPerSlot)));

configureBoundedMemory(
original::getWriteBufferRatio,
() -> setWriteBufferRatio(config.getDouble(WRITE_BUFFER_RATIO)),
() -> setWriteBufferRatio(original.writeBufferRatio));

configureBoundedMemory(
original::getHighPriPoolRatio,
() -> setHighPriPoolRatio(config.getDouble(HIGH_PRI_POOL_RATIO)),
() -> setHighPriPoolRatio(original.highPriPoolRatio));

if (isBoundedMemoryEnabled()) {
Preconditions.checkArgument(this.writeBufferRatio + this.highPriPoolRatio < 1.0,
String.format("Illegal sum of writeBufferRatio %s and highPriPoolRatio %s, should be less than 1.0", this.writeBufferRatio, this.highPriPoolRatio));
}

final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY);

this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
Expand Down Expand Up @@ -356,6 +412,14 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config,
}
}

private void configureBoundedMemory(Supplier<Double> getOriginalValue, Runnable parseFromConfig, Runnable setFromOriginal) {
if (getOriginalValue.get() == UNDEFINED_VALUE) {
parseFromConfig.run();
} else {
setFromOriginal.run();
}
}

// ------------------------------------------------------------------------
// Reconfiguration
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -491,14 +555,53 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();

DBOptions dbOptions = getDbOptions();
Function<String, ColumnFamilyOptions> createColumnOptions;

if (isBoundedMemoryEnabled()) {
RocksDBSharedObjects rocksDBSharedObjects;
MemoryManager memoryManager = env.getMemoryManager();
// only initialized LRUCache and write buffer manager once.
// we would not dispose the cache and write buffer manager during disposing state backend
// as the same objects are shared by every RocksDB instance per slot.
while ((rocksDBSharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject()) == null) {
if (memoryManager.getLazyInitializeSharedObjectHelper().compareAndSet(false, true)) {
Cache lruCache = new LRUCache(getTotalMemoryPerSlot(), -1, false, getHighPriPoolRatio());
WriteBufferManager writeBufferManager = new WriteBufferManager((long) (getTotalMemoryPerSlot() * getWriteBufferRatio()), lruCache);

rocksDBSharedObjects = new RocksDBSharedObjects(lruCache, writeBufferManager);
memoryManager.setStateBackendSharedObject(rocksDBSharedObjects);
}
}

Cache blockCache = checkNotNull(rocksDBSharedObjects.getCache());
dbOptions.setWriteBufferManager(checkNotNull(rocksDBSharedObjects.getWriteBufferManager()));

createColumnOptions = stateName -> {
ColumnFamilyOptions columnOptions = getColumnOptions();
TableFormatConfig tableFormatConfig = columnOptions.tableFormatConfig();
Preconditions.checkArgument(tableFormatConfig instanceof BlockBasedTableConfig,
"We currently only support BlockBasedTableConfig When bounding total memory.");
BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
blockBasedTableConfig.setBlockCache(blockCache);
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
columnOptions.setTableFormatConfig(blockBasedTableConfig);
return columnOptions;
};
} else {
createColumnOptions = stateName -> getColumnOptions();
}

ExecutionConfig executionConfig = env.getExecutionConfig();
StreamCompressionDecorator keyGroupCompressionDecorator = getCompressionDecorator(executionConfig);
RocksDBKeyedStateBackendBuilder<K> builder = new RocksDBKeyedStateBackendBuilder<>(
operatorIdentifier,
env.getUserClassLoader(),
instanceBasePath,
getDbOptions(),
stateName -> getColumnOptions(),
dbOptions,
createColumnOptions,
kvStateRegistry,
keySerializer,
numberOfKeyGroups,
Expand Down Expand Up @@ -704,6 +807,13 @@ public boolean isTtlCompactionFilterEnabled() {
return enableTtlCompactionFilter.getOrDefault(TTL_COMPACT_FILTER_ENABLED.defaultValue());
}

/**
* Gets whether bounding memory is enabled for this state backend.
*/
public boolean isBoundedMemoryEnabled() {
return totalMemoryPerSlot > 0;
}

/**
* Enable compaction filter to cleanup state with TTL is enabled.
*
Expand Down Expand Up @@ -853,6 +963,34 @@ public void setNumberOfTransferingThreads(int numberOfTransferingThreads) {
setNumberOfTransferThreads(numberOfTransferingThreads);
}

public void setTotalMemoryPerSlot(String totalMemoryPerSlotStr) {
long totalMemoryPerSlot = MemorySize.parseBytes(totalMemoryPerSlotStr);
Preconditions.checkArgument(totalMemoryPerSlot > 0, String.format("Illegal total memory per slot %s for RocksDBs.", totalMemoryPerSlot));
this.totalMemoryPerSlot = totalMemoryPerSlot;
}

public long getTotalMemoryPerSlot() {
return totalMemoryPerSlot;
}

public void setWriteBufferRatio(double writeBufferRatio) {
Preconditions.checkArgument(writeBufferRatio > 0 && writeBufferRatio < 1.0, String.format("Illegal write buffer ratio %s for RocksDBs.", writeBufferRatio));
this.writeBufferRatio = writeBufferRatio;
}

public double getWriteBufferRatio() {
return writeBufferRatio;
}

public void setHighPriPoolRatio(double highPriPoolRatio) {
Preconditions.checkArgument(highPriPoolRatio > 0 && highPriPoolRatio < 1.0, String.format("Illegal high priority pool ratio %s for RocksDBs.", highPriPoolRatio));
this.highPriPoolRatio = highPriPoolRatio;
}

public double getHighPriPoolRatio() {
return highPriPoolRatio;
}

// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 112e865

Please sign in to comment.