From 3671960ec519d3eea6a78e498d61c860db1a2d6b Mon Sep 17 00:00:00 2001 From: Jinzhong Li Date: Fri, 8 Sep 2023 11:01:25 +0800 Subject: [PATCH] [FLINK-14032][state] Make the cache size of RocksDBPriorityQueueSetFactory configurable --- .../generated/rocksdb_configuration.html | 6 ++ .../state_backend_rocksdb_section.html | 6 ++ .../state/EmbeddedRocksDBStateBackend.java | 24 +++-- .../RocksDBKeyedStateBackendBuilder.java | 21 ++-- .../streaming/state/RocksDBOptions.java | 13 +++ .../state/RocksDBPriorityQueueConfig.java | 97 +++++++++++++++++++ .../state/RocksDBPriorityQueueSetFactory.java | 18 +++- .../state/RocksDBStateBackendConfigTest.java | 28 ++++++ .../state/RocksDBStateOptionTest.java | 8 +- .../streaming/state/RocksDBTestUtils.java | 5 +- .../benchmark/StateBackendBenchmarkUtils.java | 4 +- .../flink/test/state/BackendSwitchSpecs.java | 3 +- 12 files changed, 201 insertions(+), 32 deletions(-) create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueConfig.java diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html index 977de4392f023..5cd25cda3f884 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html @@ -68,6 +68,12 @@ String The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. Current supported candidate predefined-options are DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM or FLASH_SSD_OPTIMIZED. Note that user customized options and options from the RocksDBOptionsFactory are applied on top of these predefined ones. + +
state.backend.rocksdb.timer-service.cache-size
+ 128 + Integer + The cache size per keyGroup of rocksdb timer service factory. This option only has an effect when 'state.backend.rocksdb.timer-service.factory' is configured to 'ROCKSDB'. Increasing this value can improve the performance of rocksdb timer service, but consumes more heap memory at the same time. +
state.backend.rocksdb.timer-service.factory
ROCKSDB diff --git a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html index fa92a7783cfcb..2dd92d7c0e058 100644 --- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html +++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html @@ -44,6 +44,12 @@ Double The maximum amount of memory that write buffers may take, as a fraction of the total shared memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured. + +
state.backend.rocksdb.timer-service.cache-size
+ 128 + Integer + The cache size per keyGroup of rocksdb timer service factory. This option only has an effect when 'state.backend.rocksdb.timer-service.factory' is configured to 'ROCKSDB'. Increasing this value can improve the performance of rocksdb timer service, but consumes more heap memory at the same time. +
state.backend.rocksdb.timer-service.factory
ROCKSDB diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index db077eb00afe5..f236a051ccf69 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -82,7 +82,6 @@ import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; -import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -146,8 +145,10 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke /** The configuration for memory settings (pool sizes, etc.). */ private final RocksDBMemoryConfiguration memoryConfiguration; - /** This determines the type of priority queue state. */ - @Nullable private PriorityQueueStateType priorityQueueStateType; + /** + * The configuration for rocksdb priorityQueue state settings (priorityQueue state type, etc.). + */ + private final RocksDBPriorityQueueConfig priorityQueueConfig; /** The default rocksdb property-based metrics options. */ private final RocksDBNativeMetricOptions nativeMetricOptions; @@ -209,6 +210,7 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE; this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT; + this.priorityQueueConfig = new RocksDBPriorityQueueConfig(); } /** @@ -242,11 +244,9 @@ private EmbeddedRocksDBStateBackend( original.memoryConfiguration, config); this.memoryConfiguration.validate(); - if (null == original.priorityQueueStateType) { - this.priorityQueueStateType = config.get(TIMER_SERVICE_FACTORY); - } else { - this.priorityQueueStateType = original.priorityQueueStateType; - } + this.priorityQueueConfig = + RocksDBPriorityQueueConfig.fromOtherAndConfiguration( + original.priorityQueueConfig, config); // configure local directories if (original.localRocksDbDirectories != null) { @@ -497,7 +497,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( keyGroupRange, executionConfig, localRecoveryConfig, - getPriorityQueueStateType(), + priorityQueueConfig, ttlTimeProvider, latencyTrackingStateConfig, metricGroup, @@ -727,9 +727,7 @@ public boolean isIncrementalCheckpointsEnabled() { * @return The type of the priority queue state. */ public PriorityQueueStateType getPriorityQueueStateType() { - return priorityQueueStateType == null - ? TIMER_SERVICE_FACTORY.defaultValue() - : priorityQueueStateType; + return priorityQueueConfig.getPriorityQueueStateType(); } /** @@ -737,7 +735,7 @@ public PriorityQueueStateType getPriorityQueueStateType() { * not explicitly set. */ public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType) { - this.priorityQueueStateType = checkNotNull(priorityQueueStateType); + this.priorityQueueConfig.setPriorityQueueStateType(priorityQueueStateType); } // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 6b3eaee04ea7a..e7959add21192 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -91,7 +91,8 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; - private final EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType; + /** The configuration of rocksDB priorityQueue state. */ + private final RocksDBPriorityQueueConfig priorityQueueConfig; /** The configuration of local recovery. */ private final LocalRecoveryConfig localRecoveryConfig; @@ -137,7 +138,7 @@ public RocksDBKeyedStateBackendBuilder( KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, - EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, + RocksDBPriorityQueueConfig priorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, @@ -159,7 +160,7 @@ public RocksDBKeyedStateBackendBuilder( cancelStreamRegistry); this.operatorIdentifier = operatorIdentifier; - this.priorityQueueStateType = priorityQueueStateType; + this.priorityQueueConfig = priorityQueueConfig; this.localRecoveryConfig = localRecoveryConfig; // ensure that we use the right merge operator, because other code relies on this this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory); @@ -186,7 +187,7 @@ public RocksDBKeyedStateBackendBuilder( KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, - EmbeddedRocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, + RocksDBPriorityQueueConfig rocksDBPriorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, @@ -207,7 +208,7 @@ public RocksDBKeyedStateBackendBuilder( keyGroupRange, executionConfig, localRecoveryConfig, - priorityQueueStateType, + rocksDBPriorityQueueConfig, ttlTimeProvider, latencyTrackingStateConfig, metricGroup, @@ -478,7 +479,7 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation( writeBatchSize, optionsContainer.getWriteBufferManagerCapacity(), overlapFractionThreshold); - } else if (priorityQueueStateType + } else if (priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) { return new RocksDBHeapTimersFullRestoreOperation<>( keyGroupRange, @@ -570,7 +571,7 @@ private PriorityQueueSetFactory initPriorityQueueFactory( RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor) { PriorityQueueSetFactory priorityQueueFactory; - switch (priorityQueueStateType) { + switch (priorityQueueConfig.getPriorityQueueStateType()) { case HEAP: priorityQueueFactory = createHeapQueueFactory(); break; @@ -586,11 +587,13 @@ private PriorityQueueSetFactory initPriorityQueueFactory( writeBatchWrapper, nativeMetricMonitor, columnFamilyOptionsFactory, - optionsContainer.getWriteBufferManagerCapacity()); + optionsContainer.getWriteBufferManagerCapacity(), + priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize()); break; default: throw new IllegalArgumentException( - "Unknown priority queue state type: " + priorityQueueStateType); + "Unknown priority queue state type: " + + priorityQueueConfig.getPriorityQueueStateType()); } return priorityQueueFactory; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java index 44ad83e2dd80e..69576520fc387 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java @@ -62,6 +62,19 @@ public class RocksDBOptions { .withDescription( "This determines the factory for timer service state implementation."); + /** The cache size per key-group for ROCKSDB timer service factory implementation. */ + @Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB) + public static final ConfigOption ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE = + ConfigOptions.key("state.backend.rocksdb.timer-service.cache-size") + .intType() + .defaultValue(128) + .withDescription( + String.format( + "The cache size per keyGroup of rocksdb timer service factory. This option only has an effect " + + "when '%s' is configured to '%s'. Increasing this value can improve the performance " + + "of rocksdb timer service, but consumes more heap memory at the same time.", + TIMER_SERVICE_FACTORY.key(), ROCKSDB.name())); + /** * The number of threads used to transfer (download and upload) files in RocksDBStateBackend. */ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueConfig.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueConfig.java new file mode 100644 index 0000000000000..a0a2cbee8ca28 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueConfig.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.PriorityQueueStateType; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The configuration of rocksDB priority queue state implementation. */ +public class RocksDBPriorityQueueConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final int UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE = -1; + + /** This determines the type of priority queue state. */ + private @Nullable PriorityQueueStateType priorityQueueStateType; + + /** cache size per keyGroup for rocksDB priority queue state. */ + private int rocksDBPriorityQueueSetCacheSize; + + public RocksDBPriorityQueueConfig() { + this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE); + } + + public RocksDBPriorityQueueConfig( + PriorityQueueStateType priorityQueueStateType, int rocksDBPriorityQueueSetCacheSize) { + this.priorityQueueStateType = priorityQueueStateType; + this.rocksDBPriorityQueueSetCacheSize = rocksDBPriorityQueueSetCacheSize; + } + + /** + * Gets the type of the priority queue state. It will fall back to the default value if it is + * not explicitly set. + */ + public PriorityQueueStateType getPriorityQueueStateType() { + return priorityQueueStateType == null + ? TIMER_SERVICE_FACTORY.defaultValue() + : priorityQueueStateType; + } + + public void setPriorityQueueStateType(PriorityQueueStateType type) { + this.priorityQueueStateType = checkNotNull(type); + } + + /** + * Gets the cache size of rocksDB priority queue set. It will fall back to the default value if + * it is not explicitly set. + */ + public int getRocksDBPriorityQueueSetCacheSize() { + return rocksDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE + ? ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue() + : rocksDBPriorityQueueSetCacheSize; + } + + public static RocksDBPriorityQueueConfig fromOtherAndConfiguration( + RocksDBPriorityQueueConfig other, ReadableConfig config) { + PriorityQueueStateType priorityQueueType = + (null == other.priorityQueueStateType) + ? config.get(TIMER_SERVICE_FACTORY) + : other.priorityQueueStateType; + int cacheSize = + (other.rocksDBPriorityQueueSetCacheSize + == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE) + ? config.get(ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE) + : other.rocksDBPriorityQueueSetCacheSize; + return new RocksDBPriorityQueueConfig(priorityQueueType, cacheSize); + } + + public static RocksDBPriorityQueueConfig buildWithPriorityQueueType( + PriorityQueueStateType type) { + return new RocksDBPriorityQueueConfig( + type, ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue()); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java index 41dcd29208531..23a1a6913f636 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; import org.rocksdb.ColumnFamilyHandle; @@ -51,8 +52,8 @@ */ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { - /** Default cache size per key-group. */ - @VisibleForTesting static final int DEFAULT_CACHES_SIZE = 128; // TODO make this configurable + /** The priorityQueue cache size per key-group. */ + private final int cacheSize; /** A shared buffer to serialize elements for the priority queue. */ @Nonnull private final DataOutputSerializer sharedElementOutView; @@ -81,7 +82,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { RocksDBWriteBatchWrapper writeBatchWrapper, RocksDBNativeMetricMonitor nativeMetricMonitor, Function columnFamilyOptionsFactory, - Long writeBufferManagerCapacity) { + Long writeBufferManagerCapacity, + int cacheSize) { this.keyGroupRange = keyGroupRange; this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.numberOfKeyGroups = numberOfKeyGroups; @@ -94,6 +96,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { this.sharedElementOutView = new DataOutputSerializer(128); this.sharedElementInView = new DataInputDeserializer(); this.writeBufferManagerCapacity = writeBufferManagerCapacity; + Preconditions.checkArgument(cacheSize > 0); + this.cacheSize = cacheSize; } @Nonnull @@ -131,8 +135,7 @@ public RocksDBCachingPriorityQueueSet create( int numKeyGroups, @Nonnull KeyExtractorFunction keyExtractor, @Nonnull PriorityComparator elementPriorityComparator) { - TreeOrderedSetCache orderedSetCache = - new TreeOrderedSetCache(DEFAULT_CACHES_SIZE); + TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(cacheSize); return new RocksDBCachingPriorityQueueSet<>( keyGroupId, keyGroupPrefixBytes, @@ -225,4 +228,9 @@ private RocksDBKeyedStateBackend.RocksDbKvStateInfo tryRegisterPriorityQueue return stateInfo; } + + @VisibleForTesting + public int getCacheSize() { + return cacheSize; + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 36e0d3c2bce9e..6934111cd7b6b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -213,6 +213,34 @@ public void testConfigureTimerService() throws Exception { env.close(); } + @Test + public void testConfigureRocksDBPriorityQueueFactoryCacheSize() throws Exception { + final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); + int cacheSize = 512; + Configuration conf = new Configuration(); + conf.set( + RocksDBOptions.TIMER_SERVICE_FACTORY, + EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); + conf.set(RocksDBOptions.ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE, cacheSize); + + rocksDbBackend = + rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader()); + + RocksDBKeyedStateBackend keyedBackend = + createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + + Assert.assertEquals( + RocksDBPriorityQueueSetFactory.class, + keyedBackend.getPriorityQueueFactory().getClass()); + Assert.assertEquals( + cacheSize, + ((RocksDBPriorityQueueSetFactory) keyedBackend.getPriorityQueueFactory()) + .getCacheSize()); + keyedBackend.dispose(); + env.close(); + } + /** Validates that user custom configuration from code should override the flink-conf.yaml. */ @Test public void testConfigureTimerServiceLoadingFromApplication() throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java index 4f88dabfc3063..39f5c31d6e9f5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateOptionTest.java @@ -131,7 +131,13 @@ public void testUseOptimizePointLookupWithPriorityQueue() throws IOException { PriorityQueue> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp())); // ensure we insert timers more than cache capacity. - int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42; + assertTrue( + keyedStateBackend.getPriorityQueueFactory() + instanceof RocksDBPriorityQueueSetFactory); + int queueSize = + ((RocksDBPriorityQueueSetFactory) keyedStateBackend.getPriorityQueueFactory()) + .getCacheSize() + + 42; List timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList()); Collections.shuffle(timeStamps); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index 049591211dbc6..8fd9a72be4b8c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -71,7 +71,7 @@ public static RocksDBKeyedStateBackendBuilder builderForTestDefaults( new KeyGroupRange(0, 1), new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - queueStateType, + RocksDBPriorityQueueConfig.buildWithPriorityQueueType(queueStateType), TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), @@ -101,7 +101,8 @@ public static RocksDBKeyedStateBackendBuilder builderForTestDB( new KeyGroupRange(0, 1), new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP, + RocksDBPriorityQueueConfig.buildWithPriorityQueueType( + EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP), TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java index ebec2b5c6fce8..09c15a7eb14ed 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/state/benchmark/StateBackendBenchmarkUtils.java @@ -35,6 +35,7 @@ import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder; +import org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueConfig; import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; @@ -172,7 +173,8 @@ private static RocksDBKeyedStateBackend createRocksDBKeyedStateBackend(Fil new KeyGroupRange(0, 1), executionConfig, new LocalRecoveryConfig(null), - EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB, + RocksDBPriorityQueueConfig.buildWithPriorityQueueType( + EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB), TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java index 9e825b682d258..5b28b51e1e348 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java @@ -25,6 +25,7 @@ import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.PriorityQueueStateType; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder; +import org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueConfig; import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -107,7 +108,7 @@ public CheckpointableKeyedStateBackend createBackend( keyGroupRange, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - queueStateType, + RocksDBPriorityQueueConfig.buildWithPriorityQueueType(queueStateType), TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(),