diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index 950058bd14ab4..13f8d81bdd2ee 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.contrib.streaming.state.util.MergeUtils; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.internal.InternalListState; @@ -60,6 +59,11 @@ public class RocksDBListState */ private final WriteOptions writeOptions; + /** + * Separator of StringAppendTestOperator in RocksDB. + */ + private static final byte DELIMITER = ','; + /** * Creates a new {@code RocksDBListState}. * @@ -202,13 +206,17 @@ public void addAll(List values) throws Exception { private byte[] getPreMergedValue(List values) throws IOException { DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - List bytes = new ArrayList<>(values.size()); + keySerializationStream.reset(); + boolean first = true; for (V value : values) { - keySerializationStream.reset(); + if (first) { + first = false; + } else { + keySerializationStream.write(DELIMITER); + } valueSerializer.serialize(value, out); - bytes.add(keySerializationStream.toByteArray()); } - return MergeUtils.merge(bytes); + return keySerializationStream.toByteArray(); } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java deleted file mode 100644 index 6cf27814bdb54..0000000000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state.util; - -import org.apache.flink.annotation.VisibleForTesting; - -import java.util.List; - -/** - * Utils to simulate StringAppendTestOperator's merge operations in RocksDB. - */ -public class MergeUtils { - @VisibleForTesting - protected static final byte DELIMITER = ','; - - /** - * Merge operands into a single value that can be put directly into RocksDB. - */ - public static byte[] merge(List operands) { - if (operands == null || operands.size() == 0) { - return null; - } - - if (operands.size() == 1) { - return operands.get(0); - } - - int numBytes = 0; - for (byte[] arr : operands) { - numBytes += arr.length + 1; - } - numBytes--; - - byte[] result = new byte[numBytes]; - - System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length); - - for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) { - result[arrIndex] = DELIMITER; - arrIndex += 1; - System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length); - arrIndex += operands.get(i).length; - } - - return result; - } -} diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java index 153584f4d78b9..670c355e088ae 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java @@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state.benchmark; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; -import org.apache.flink.contrib.streaming.state.util.MergeUtils; import org.apache.flink.testutils.junit.RetryOnFailure; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.TestLogger; @@ -63,6 +62,8 @@ */ public class RocksDBListStatePerformanceTest extends TestLogger { + private static final byte DELIMITER = ','; + @Rule public final TemporaryFolder tmp = new TemporaryFolder(); @@ -121,7 +122,7 @@ public void testRocksDbListStateAPIs() throws Exception { for (int i = 0; i < num; i++) { list.add(valueBytes); } - byte[] premerged = MergeUtils.merge(list); + byte[] premerged = merge(list); log.info("begin update"); @@ -132,4 +133,36 @@ public void testRocksDbListStateAPIs() throws Exception { log.info("end update - duration: {} ns", (endInsert2 - beginInsert2)); } } + + /** + * Merge operands into a single value that can be put directly into RocksDB. + */ + public static byte[] merge(List operands) { + if (operands == null || operands.size() == 0) { + return null; + } + + if (operands.size() == 1) { + return operands.get(0); + } + + int numBytes = 0; + for (byte[] arr : operands) { + numBytes += arr.length + 1; + } + numBytes--; + + byte[] result = new byte[numBytes]; + + System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length); + + for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) { + result[arrIndex] = DELIMITER; + arrIndex += 1; + System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length); + arrIndex += operands.get(i).length; + } + + return result; + } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java deleted file mode 100644 index 3493f761ad159..0000000000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state.util; - -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertTrue; - -/** - * Tests for MergeUtils. - */ -public class MergeUtilsTest { - - @Test - public void testMergeMulti() { - List list = Arrays.asList( - new byte[]{0, 1, 2, 3}, - new byte[]{4}, - new byte[]{5, 6}); - - byte[] expected = new byte[]{0, 1, 2, 3, MergeUtils.DELIMITER, 4, MergeUtils.DELIMITER, 5, 6}; - assertTrue(Arrays.equals(expected, MergeUtils.merge(list))); - } - - @Test - public void testMergeEmptyList() { - // Empty list - assertTrue(Arrays.equals(null, MergeUtils.merge(Collections.emptyList()))); - } - - @Test - public void testMergeSingleton() { - // Singleton list - byte[] singletonData = new byte[] {0x42}; - assertTrue(Arrays.equals(singletonData, MergeUtils.merge(Arrays.asList(singletonData)))); - } -}