From 20b356c73c0c2da4bf6690700f8b949c8fc3042a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 18 May 2017 17:48:30 +0200 Subject: [PATCH] [hotfix] Remove unnecessary job id from RocksDBKeyedStateBackend --- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 2 -- .../contrib/streaming/state/RocksDBStateBackend.java | 3 +-- .../test/query/KVStateRequestSerializerRocksDBTest.java | 8 +++----- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d0f73bf22264e..88a759d068eac 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -18,7 +18,6 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -180,7 +179,6 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private static final String SST_FILE_SUFFIX = ".sst"; public RocksDBKeyedStateBackend( - JobID jobId, String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 55b8be231718e..2b70dcd0552af 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -300,10 +300,9 @@ public AbstractKeyedStateBackend createKeyedStateBackend( lazyInitializeForJob(env, operatorIdentifier); File instanceBasePath = - new File(getNextStoragePath(), "job-" + jobId.toString() + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); + new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); return new RocksDBKeyedStateBackend<>( - jobID, operatorIdentifier, env.getUserClassLoader(), instanceBasePath, diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java index 05f72c2677d6d..0f99afb35f48b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java @@ -19,7 +19,6 @@ package org.apache.flink.test.query; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -66,7 +65,6 @@ public final class KVStateRequestSerializerRocksDBTest { final static class RocksDBKeyedStateBackend2 extends RocksDBKeyedStateBackend { RocksDBKeyedStateBackend2( - final JobID jobId, final String operatorIdentifier, final ClassLoader userCodeClassLoader, final File instanceBasePath, @@ -78,7 +76,7 @@ final static class RocksDBKeyedStateBackend2 extends RocksDBKeyedStateBackend final KeyGroupRange keyGroupRange, final ExecutionConfig executionConfig) throws Exception { - super(jobId, operatorIdentifier, userCodeClassLoader, + super(operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, executionConfig, false); @@ -110,7 +108,7 @@ public void testListSerialization() throws Exception { ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); final RocksDBKeyedStateBackend2 longHeapKeyedStateBackend = new RocksDBKeyedStateBackend2<>( - new JobID(), "no-op", + "no-op", ClassLoader.getSystemClassLoader(), temporaryFolder.getRoot(), dbOptions, @@ -147,7 +145,7 @@ public void testMapSerialization() throws Exception { ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); final RocksDBKeyedStateBackend longHeapKeyedStateBackend = new RocksDBKeyedStateBackend<>( - new JobID(), "no-op", + "no-op", ClassLoader.getSystemClassLoader(), temporaryFolder.getRoot(), dbOptions,