Skip to content

Commit

Permalink
[FLINK-14032][state] Make the cache size of RocksDBPriorityQueueSetFa…
Browse files Browse the repository at this point in the history
…ctory configurable
  • Loading branch information
ljz2051 authored and masteryhx committed Sep 13, 2023
1 parent be509e6 commit 3671960
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 32 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/rocksdb_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<td>String</td>
<td>The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the RocksDBOptionsFactory are applied on top of these predefined ones.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.cache-size</h5></td>
<td style="word-wrap: break-word;">128</td>
<td>Integer</td>
<td>The cache size per keyGroup of rocksdb timer service factory. This option only has an effect when 'state.backend.rocksdb.timer-service.factory' is configured to 'ROCKSDB'. Increasing this value can improve the performance of rocksdb timer service, but consumes more heap memory at the same time.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
<td style="word-wrap: break-word;">ROCKSDB</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>Double</td>
<td>The maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.cache-size</h5></td>
<td style="word-wrap: break-word;">128</td>
<td>Integer</td>
<td>The cache size per keyGroup of rocksdb timer service factory. This option only has an effect when 'state.backend.rocksdb.timer-service.factory' is configured to 'ROCKSDB'. Increasing this value can improve the performance of rocksdb timer service, but consumes more heap memory at the same time.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
<td style="word-wrap: break-word;">ROCKSDB</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -146,8 +145,10 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
/** The configuration for memory settings (pool sizes, etc.). */
private final RocksDBMemoryConfiguration memoryConfiguration;

/** This determines the type of priority queue state. */
@Nullable private PriorityQueueStateType priorityQueueStateType;
/**
* The configuration for rocksdb priorityQueue state settings (priorityQueue state type, etc.).
*/
private final RocksDBPriorityQueueConfig priorityQueueConfig;

/** The default rocksdb property-based metrics options. */
private final RocksDBNativeMetricOptions nativeMetricOptions;
Expand Down Expand Up @@ -209,6 +210,7 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing
this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT;
this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
}

/**
Expand Down Expand Up @@ -242,11 +244,9 @@ private EmbeddedRocksDBStateBackend(
original.memoryConfiguration, config);
this.memoryConfiguration.validate();

if (null == original.priorityQueueStateType) {
this.priorityQueueStateType = config.get(TIMER_SERVICE_FACTORY);
} else {
this.priorityQueueStateType = original.priorityQueueStateType;
}
this.priorityQueueConfig =
RocksDBPriorityQueueConfig.fromOtherAndConfiguration(
original.priorityQueueConfig, config);

// configure local directories
if (original.localRocksDbDirectories != null) {
Expand Down Expand Up @@ -497,7 +497,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
keyGroupRange,
executionConfig,
localRecoveryConfig,
getPriorityQueueStateType(),
priorityQueueConfig,
ttlTimeProvider,
latencyTrackingStateConfig,
metricGroup,
Expand Down Expand Up @@ -727,17 +727,15 @@ public boolean isIncrementalCheckpointsEnabled() {
* @return The type of the priority queue state.
*/
public PriorityQueueStateType getPriorityQueueStateType() {
return priorityQueueStateType == null
? TIMER_SERVICE_FACTORY.defaultValue()
: priorityQueueStateType;
return priorityQueueConfig.getPriorityQueueStateType();
}

/**
* Sets the type of the priority queue state. It will fallback to the default value, if it is
* not explicitly set.
*/
public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) {
this.priorityQueueStateType = checkNotNull(priorityQueueStateType);
this.priorityQueueConfig.setPriorityQueueStateType(priorityQueueStateType);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
/** String that identifies the operator that owns this backend. */
private final String operatorIdentifier;

private final EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType;
/** The configuration of rocksDB priorityQueue state. */
private final RocksDBPriorityQueueConfig priorityQueueConfig;

/** The configuration of local recovery. */
private final LocalRecoveryConfig localRecoveryConfig;
Expand Down Expand Up @@ -137,7 +138,7 @@ public RocksDBKeyedStateBackendBuilder(
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
LocalRecoveryConfig localRecoveryConfig,
EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
RocksDBPriorityQueueConfig priorityQueueConfig,
TtlTimeProvider ttlTimeProvider,
LatencyTrackingStateConfig latencyTrackingStateConfig,
MetricGroup metricGroup,
Expand All @@ -159,7 +160,7 @@ public RocksDBKeyedStateBackendBuilder(
cancelStreamRegistry);

this.operatorIdentifier = operatorIdentifier;
this.priorityQueueStateType = priorityQueueStateType;
this.priorityQueueConfig = priorityQueueConfig;
this.localRecoveryConfig = localRecoveryConfig;
// ensure that we use the right merge operator, because other code relies on this
this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory);
Expand All @@ -186,7 +187,7 @@ public RocksDBKeyedStateBackendBuilder(
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
LocalRecoveryConfig localRecoveryConfig,
EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
RocksDBPriorityQueueConfig rocksDBPriorityQueueConfig,
TtlTimeProvider ttlTimeProvider,
LatencyTrackingStateConfig latencyTrackingStateConfig,
MetricGroup metricGroup,
Expand All @@ -207,7 +208,7 @@ public RocksDBKeyedStateBackendBuilder(
keyGroupRange,
executionConfig,
localRecoveryConfig,
priorityQueueStateType,
rocksDBPriorityQueueConfig,
ttlTimeProvider,
latencyTrackingStateConfig,
metricGroup,
Expand Down Expand Up @@ -478,7 +479,7 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
writeBatchSize,
optionsContainer.getWriteBufferManagerCapacity(),
overlapFractionThreshold);
} else if (priorityQueueStateType
} else if (priorityQueueConfig.getPriorityQueueStateType()
== EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
return new RocksDBHeapTimersFullRestoreOperation<>(
keyGroupRange,
Expand Down Expand Up @@ -570,7 +571,7 @@ private PriorityQueueSetFactory initPriorityQueueFactory(
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor) {
PriorityQueueSetFactory priorityQueueFactory;
switch (priorityQueueStateType) {
switch (priorityQueueConfig.getPriorityQueueStateType()) {
case HEAP:
priorityQueueFactory = createHeapQueueFactory();
break;
Expand All @@ -586,11 +587,13 @@ private PriorityQueueSetFactory initPriorityQueueFactory(
writeBatchWrapper,
nativeMetricMonitor,
columnFamilyOptionsFactory,
optionsContainer.getWriteBufferManagerCapacity());
optionsContainer.getWriteBufferManagerCapacity(),
priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize());
break;
default:
throw new IllegalArgumentException(
"Unknown priority queue state type: " + priorityQueueStateType);
"Unknown priority queue state type: "
+ priorityQueueConfig.getPriorityQueueStateType());
}
return priorityQueueFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ public class RocksDBOptions {
.withDescription(
"This determines the factory for timer service state implementation.");

/** The cache size per key-group for ROCKSDB timer service factory implementation. */
@Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB)
public static final ConfigOption<Integer> ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE =
ConfigOptions.key("state.backend.rocksdb.timer-service.cache-size")
.intType()
.defaultValue(128)
.withDescription(
String.format(
"The cache size per keyGroup of rocksdb timer service factory. This option only has an effect "
+ "when '%s' is configured to '%s'. Increasing this value can improve the performance "
+ "of rocksdb timer service, but consumes more heap memory at the same time.",
TIMER_SERVICE_FACTORY.key(), ROCKSDB.name()));

/**
* The number of threads used to transfer (download and upload) files in RocksDBStateBackend.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.PriorityQueueStateType;

import javax.annotation.Nullable;

import java.io.Serializable;

import static org.apache.flink.contrib.streaming.state.RocksDBOptions.ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** The configuration of rocksDB priority queue state implementation. */
public class RocksDBPriorityQueueConfig implements Serializable {

private static final long serialVersionUID = 1L;

private static final int UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE = -1;

/** This determines the type of priority queue state. */
private @Nullable PriorityQueueStateType priorityQueueStateType;

/** cache size per keyGroup for rocksDB priority queue state. */
private int rocksDBPriorityQueueSetCacheSize;

public RocksDBPriorityQueueConfig() {
this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE);
}

public RocksDBPriorityQueueConfig(
PriorityQueueStateType priorityQueueStateType, int rocksDBPriorityQueueSetCacheSize) {
this.priorityQueueStateType = priorityQueueStateType;
this.rocksDBPriorityQueueSetCacheSize = rocksDBPriorityQueueSetCacheSize;
}

/**
* Gets the type of the priority queue state. It will fall back to the default value if it is
* not explicitly set.
*/
public PriorityQueueStateType getPriorityQueueStateType() {
return priorityQueueStateType == null
? TIMER_SERVICE_FACTORY.defaultValue()
: priorityQueueStateType;
}

public void setPriorityQueueStateType(PriorityQueueStateType type) {
this.priorityQueueStateType = checkNotNull(type);
}

/**
* Gets the cache size of rocksDB priority queue set. It will fall back to the default value if
* it is not explicitly set.
*/
public int getRocksDBPriorityQueueSetCacheSize() {
return rocksDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
? ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue()
: rocksDBPriorityQueueSetCacheSize;
}

public static RocksDBPriorityQueueConfig fromOtherAndConfiguration(
RocksDBPriorityQueueConfig other, ReadableConfig config) {
PriorityQueueStateType priorityQueueType =
(null == other.priorityQueueStateType)
? config.get(TIMER_SERVICE_FACTORY)
: other.priorityQueueStateType;
int cacheSize =
(other.rocksDBPriorityQueueSetCacheSize
== UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE)
? config.get(ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE)
: other.rocksDBPriorityQueueSetCacheSize;
return new RocksDBPriorityQueueConfig(priorityQueueType, cacheSize);
}

public static RocksDBPriorityQueueConfig buildWithPriorityQueueType(
PriorityQueueStateType type) {
return new RocksDBPriorityQueueConfig(
type, ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;

import org.rocksdb.ColumnFamilyHandle;
Expand All @@ -51,8 +52,8 @@
*/
public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {

/** Default cache size per key-group. */
@VisibleForTesting static final int DEFAULT_CACHES_SIZE = 128; // TODO make this configurable
/** The priorityQueue cache size per key-group. */
private final int cacheSize;

/** A shared buffer to serialize elements for the priority queue. */
@Nonnull private final DataOutputSerializer sharedElementOutView;
Expand Down Expand Up @@ -81,7 +82,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
Long writeBufferManagerCapacity) {
Long writeBufferManagerCapacity,
int cacheSize) {
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.numberOfKeyGroups = numberOfKeyGroups;
Expand All @@ -94,6 +96,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
this.sharedElementOutView = new DataOutputSerializer(128);
this.sharedElementInView = new DataInputDeserializer();
this.writeBufferManagerCapacity = writeBufferManagerCapacity;
Preconditions.checkArgument(cacheSize > 0);
this.cacheSize = cacheSize;
}

@Nonnull
Expand Down Expand Up @@ -131,8 +135,7 @@ public RocksDBCachingPriorityQueueSet<T> create(
int numKeyGroups,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnull PriorityComparator<T> elementPriorityComparator) {
TreeOrderedSetCache orderedSetCache =
new TreeOrderedSetCache(DEFAULT_CACHES_SIZE);
TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(cacheSize);
return new RocksDBCachingPriorityQueueSet<>(
keyGroupId,
keyGroupPrefixBytes,
Expand Down Expand Up @@ -225,4 +228,9 @@ private <T> RocksDBKeyedStateBackend.RocksDbKvStateInfo tryRegisterPriorityQueue

return stateInfo;
}

@VisibleForTesting
public int getCacheSize() {
return cacheSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,34 @@ public void testConfigureTimerService() throws Exception {
env.close();
}

@Test
public void testConfigureRocksDBPriorityQueueFactoryCacheSize() throws Exception {
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();
int cacheSize = 512;
Configuration conf = new Configuration();
conf.set(
RocksDBOptions.TIMER_SERVICE_FACTORY,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
conf.set(RocksDBOptions.ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE, cacheSize);

rocksDbBackend =
rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());

RocksDBKeyedStateBackend<Integer> keyedBackend =
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);

Assert.assertEquals(
RocksDBPriorityQueueSetFactory.class,
keyedBackend.getPriorityQueueFactory().getClass());
Assert.assertEquals(
cacheSize,
((RocksDBPriorityQueueSetFactory) keyedBackend.getPriorityQueueFactory())
.getCacheSize());
keyedBackend.dispose();
env.close();
}

/** Validates that user custom configuration from code should override the flink-conf.yaml. */
@Test
public void testConfigureTimerServiceLoadingFromApplication() throws Exception {
Expand Down
Loading

0 comments on commit 3671960

Please sign in to comment.