Skip to content

Commit

Permalink
[FLINK-8441] [State Backend] [RocksDB] change RocksDBListState to ser…
Browse files Browse the repository at this point in the history
…ialize values and separators in stream to avoid extra bytes copying

This closes apache#5323.
  • Loading branch information
bowenli86 authored and StefanRRichter committed Jan 23, 2018
1 parent a2533f4 commit ce25688
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalListState;
Expand Down Expand Up @@ -60,6 +59,11 @@ public class RocksDBListState<K, N, V>
*/
private final WriteOptions writeOptions;

/**
* Separator of StringAppendTestOperator in RocksDB.
*/
private static final byte DELIMITER = ',';

/**
* Creates a new {@code RocksDBListState}.
*
Expand Down Expand Up @@ -202,13 +206,17 @@ public void addAll(List<V> values) throws Exception {
private byte[] getPreMergedValue(List<V> values) throws IOException {
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);

List<byte[]> bytes = new ArrayList<>(values.size());
keySerializationStream.reset();
boolean first = true;
for (V value : values) {
keySerializationStream.reset();
if (first) {
first = false;
} else {
keySerializationStream.write(DELIMITER);
}
valueSerializer.serialize(value, out);
bytes.add(keySerializationStream.toByteArray());
}

return MergeUtils.merge(bytes);
return keySerializationStream.toByteArray();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state.benchmark;

import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -63,6 +62,8 @@
*/
public class RocksDBListStatePerformanceTest extends TestLogger {

private static final byte DELIMITER = ',';

@Rule
public final TemporaryFolder tmp = new TemporaryFolder();

Expand Down Expand Up @@ -121,7 +122,7 @@ public void testRocksDbListStateAPIs() throws Exception {
for (int i = 0; i < num; i++) {
list.add(valueBytes);
}
byte[] premerged = MergeUtils.merge(list);
byte[] premerged = merge(list);

log.info("begin update");

Expand All @@ -132,4 +133,36 @@ public void testRocksDbListStateAPIs() throws Exception {
log.info("end update - duration: {} ns", (endInsert2 - beginInsert2));
}
}

/**
* Merge operands into a single value that can be put directly into RocksDB.
*/
public static byte[] merge(List<byte[]> operands) {
if (operands == null || operands.size() == 0) {
return null;
}

if (operands.size() == 1) {
return operands.get(0);
}

int numBytes = 0;
for (byte[] arr : operands) {
numBytes += arr.length + 1;
}
numBytes--;

byte[] result = new byte[numBytes];

System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);

for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
result[arrIndex] = DELIMITER;
arrIndex += 1;
System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
arrIndex += operands.get(i).length;
}

return result;
}
}

This file was deleted.

0 comments on commit ce25688

Please sign in to comment.