Skip to content

Commit

Permalink
[FLINK-35858][State/ForSt] Support state namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Jul 25, 2024
1 parent 055e11e commit d4294c5
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.state.forst;

import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.state.v2.InternalPartitionedState;
import org.apache.flink.util.function.FunctionWithException;

import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -32,7 +33,7 @@
* @param <K> The type of the raw key.
*/
@ThreadSafe
public class ContextKey<K> {
public class ContextKey<K, N> {

private final RecordContext<K> recordContext;

Expand All @@ -48,6 +49,10 @@ public int getKeyGroup() {
return recordContext.getKeyGroup();
}

public N getNamespace(InternalPartitionedState<N> state) {
return recordContext.getNamespace(state);
}

/**
* Get the serialized key. If the cached serialized key within {@code RecordContext#payload} is
* null, the provided serialization function will be called, and the serialization result will
Expand All @@ -57,7 +62,7 @@ public int getKeyGroup() {
* @return the serialized bytes.
*/
public byte[] getOrCreateSerializedKey(
FunctionWithException<ContextKey<K>, byte[], IOException> serializeKeyFunc)
FunctionWithException<ContextKey<K, N>, byte[], IOException> serializeKeyFunc)
throws IOException {
if (recordContext.getExtra() != null) {
return (byte[]) recordContext.getExtra();
Expand All @@ -84,7 +89,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ContextKey<?> that = (ContextKey<?>) o;
ContextKey<?, ?> that = (ContextKey<?, ?>) o;
return Objects.equals(recordContext, that.recordContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
* @param <K> The type of key in get access request.
* @param <V> The type of value returned by get request.
*/
public class ForStDBGetRequest<K, V> {
public class ForStDBGetRequest<K, N, V> {

private final K key;
private final ForStInnerTable<K, V> table;
private final ContextKey<K, N> key;
private final ForStInnerTable<K, N, V> table;
private final InternalStateFuture<V> future;

private ForStDBGetRequest(K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) {
private ForStDBGetRequest(
ContextKey<K, N> key, ForStInnerTable<K, N, V> table, InternalStateFuture<V> future) {
this.key = key;
this.table = table;
this.future = future;
Expand All @@ -63,8 +64,8 @@ public void completeStateFutureExceptionally(String message, Throwable ex) {
future.completeExceptionally(message, ex);
}

static <K, V> ForStDBGetRequest<K, V> of(
K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) {
static <K, N, V> ForStDBGetRequest<K, N, V> of(
ContextKey<K, N> key, ForStInnerTable<K, N, V> table, InternalStateFuture<V> future) {
return new ForStDBGetRequest<>(key, table, future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@
* @param <K> The type of key in put access request.
* @param <V> The type of value in put access request.
*/
public class ForStDBPutRequest<K, V> {
public class ForStDBPutRequest<K, N, V> {

private final K key;
private final ContextKey<K, N> key;
@Nullable private final V value;
private final ForStInnerTable<K, V> table;
private final ForStInnerTable<K, N, V> table;
private final InternalStateFuture<Void> future;

private ForStDBPutRequest(
K key, V value, ForStInnerTable<K, V> table, InternalStateFuture<Void> future) {
ContextKey<K, N> key,
V value,
ForStInnerTable<K, N, V> table,
InternalStateFuture<Void> future) {
this.key = key;
this.value = value;
this.table = table;
Expand Down Expand Up @@ -76,10 +79,10 @@ public void completeStateFutureExceptionally(String message, Throwable ex) {
* If the value of the ForStDBPutRequest is null, then the request will signify the deletion of
* the data associated with that key.
*/
static <K, V> ForStDBPutRequest<K, V> of(
K key,
static <K, N, V> ForStDBPutRequest<K, N, V> of(
ContextKey<K, N> key,
@Nullable V value,
ForStInnerTable<K, V> table,
ForStInnerTable<K, N, V> table,
InternalStateFuture<Void> future) {
return new ForStDBPutRequest<>(key, value, table, future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public class ForStGeneralMultiGetOperation implements ForStDBOperation {

private final RocksDB db;

private final List<ForStDBGetRequest<?, ?>> batchRequest;
private final List<ForStDBGetRequest<?, ?, ?>> batchRequest;

private final Executor executor;

ForStGeneralMultiGetOperation(
RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor executor) {
RocksDB db, List<ForStDBGetRequest<?, ?, ?>> batchRequest, Executor executor) {
this.db = db;
this.batchRequest = batchRequest;
this.executor = executor;
Expand All @@ -58,7 +58,7 @@ public CompletableFuture<Void> process() {
AtomicReference<Exception> error = new AtomicReference<>();
AtomicInteger counter = new AtomicInteger(batchRequest.size());
for (int i = 0; i < batchRequest.size(); i++) {
ForStDBGetRequest<?, ?> request = batchRequest.get(i);
ForStDBGetRequest<?, ?, ?> request = batchRequest.get(i);
executor.execute(
() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* @param <K> The key type of the table.
* @param <V> The value type of the table.
*/
public interface ForStInnerTable<K, V> {
public interface ForStInnerTable<K, N, V> {

/** Get the columnFamily handle corresponding to table. */
ColumnFamilyHandle getColumnFamilyHandle();
Expand All @@ -46,7 +46,7 @@ public interface ForStInnerTable<K, V> {
* @return the key bytes
* @throws IOException Thrown if the serialization encountered an I/O related error.
*/
byte[] serializeKey(K key) throws IOException;
byte[] serializeKey(ContextKey<K, N> key) throws IOException;

/**
* Serialize the given value to the outputView.
Expand All @@ -73,7 +73,7 @@ public interface ForStInnerTable<K, V> {
* @param stateRequest The given stateRequest.
* @return The corresponding ForSt GetRequest.
*/
ForStDBGetRequest<K, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest);
ForStDBGetRequest<K, N, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest);

/**
* Build a {@link ForStDBPutRequest} that belong to {@code ForStInnerTable} with the given
Expand All @@ -82,5 +82,5 @@ public interface ForStInnerTable<K, V> {
* @param stateRequest The given stateRequest.
* @return The corresponding ForSt PutRequest.
*/
ForStDBPutRequest<K, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest);
ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public <N, S extends State, SV> S createState(
columnFamilyHandle,
(ValueStateDescriptor<SV>) stateDesc,
serializedKeyBuilder,
namespaceSerializer::duplicate,
valueSerializerView,
valueDeserializerView);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public CompletableFuture<Void> executeBatchRequests(
() -> {
long startTime = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
List<ForStDBPutRequest<?, ?>> putRequests =
List<ForStDBPutRequest<?, ?, ?>> putRequests =
stateRequestClassifier.pollDbPutRequests();
if (!putRequests.isEmpty()) {
ForStWriteBatchOperation writeOperations =
Expand All @@ -92,7 +92,7 @@ public CompletableFuture<Void> executeBatchRequests(
futures.add(writeOperations.process());
}

List<ForStDBGetRequest<?, ?>> getRequests =
List<ForStDBGetRequest<?, ?, ?>> getRequests =
stateRequestClassifier.pollDbGetRequests();
if (!getRequests.isEmpty()) {
ForStGeneralMultiGetOperation getOperations =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
*/
public class ForStStateRequestClassifier implements StateRequestContainer {

private final List<ForStDBGetRequest<?, ?>> dbGetRequests;
private final List<ForStDBGetRequest<?, ?, ?>> dbGetRequests;

private final List<ForStDBPutRequest<?, ?>> dbPutRequests;
private final List<ForStDBPutRequest<?, ?, ?>> dbPutRequests;

public ForStStateRequestClassifier() {
this.dbGetRequests = new ArrayList<>();
Expand Down Expand Up @@ -88,11 +88,11 @@ private void convertStateRequestsToForStDBRequests(StateRequest<?, ?, ?> stateRe
}
}

public List<ForStDBGetRequest<?, ?>> pollDbGetRequests() {
public List<ForStDBGetRequest<?, ?, ?>> pollDbGetRequests() {
return dbGetRequests;
}

public List<ForStDBPutRequest<?, ?>> pollDbPutRequests() {
public List<ForStDBPutRequest<?, ?, ?>> pollDbPutRequests() {
return dbPutRequests;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.state.forst;

import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.state.InternalStateFuture;
Expand All @@ -43,14 +44,16 @@
* @param <V> The type of the value.
*/
public class ForStValueState<K, N, V> extends InternalValueState<K, N, V>
implements ValueState<V>, ForStInnerTable<ContextKey<K>, V> {
implements ValueState<V>, ForStInnerTable<K, N, V> {

/** The column family which this internal value state belongs to. */
private final ColumnFamilyHandle columnFamilyHandle;

/** The serialized key builder which should be thread-safe. */
private final ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;

private final ThreadLocal<TypeSerializer<N>> namespaceSerializer;

/** The data outputStream used for value serializer, which should be thread-safe. */
private final ThreadLocal<DataOutputSerializer> valueSerializerView;

Expand All @@ -62,11 +65,13 @@ public ForStValueState(
ColumnFamilyHandle columnFamily,
ValueStateDescriptor<V> valueStateDescriptor,
Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilderInitializer,
Supplier<TypeSerializer<N>> namespaceSerializerInitializer,
Supplier<DataOutputSerializer> valueSerializerViewInitializer,
Supplier<DataInputDeserializer> valueDeserializerViewInitializer) {
super(stateRequestHandler, valueStateDescriptor);
this.columnFamilyHandle = columnFamily;
this.serializedKeyBuilder = ThreadLocal.withInitial(serializedKeyBuilderInitializer);
this.namespaceSerializer = ThreadLocal.withInitial(namespaceSerializerInitializer);
this.valueSerializerView = ThreadLocal.withInitial(valueSerializerViewInitializer);
this.valueDeserializerView = ThreadLocal.withInitial(valueDeserializerViewInitializer);
}
Expand All @@ -77,12 +82,13 @@ public ColumnFamilyHandle getColumnFamilyHandle() {
}

@Override
public byte[] serializeKey(ContextKey<K> contextKey) throws IOException {
public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException {
return contextKey.getOrCreateSerializedKey(
ctxKey -> {
SerializedCompositeKeyBuilder<K> builder = serializedKeyBuilder.get();
builder.setKeyAndKeyGroup(ctxKey.getRawKey(), ctxKey.getKeyGroup());
return builder.build();
return builder.buildCompositeKeyNamespace(
contextKey.getNamespace(this), namespaceSerializer.get());
});
}

Expand All @@ -103,23 +109,21 @@ public V deserializeValue(byte[] valueBytes) throws IOException {

@SuppressWarnings("unchecked")
@Override
public ForStDBGetRequest<ContextKey<K>, V> buildDBGetRequest(
StateRequest<?, ?, ?> stateRequest) {
public ForStDBGetRequest<K, N, V> buildDBGetRequest(StateRequest<?, ?, ?> stateRequest) {
Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.VALUE_GET);
ContextKey<K> contextKey =
ContextKey<K, N> contextKey =
new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext());
return ForStDBGetRequest.of(
contextKey, this, (InternalStateFuture<V>) stateRequest.getFuture());
}

@SuppressWarnings("unchecked")
@Override
public ForStDBPutRequest<ContextKey<K>, V> buildDBPutRequest(
StateRequest<?, ?, ?> stateRequest) {
public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?> stateRequest) {
Preconditions.checkArgument(
stateRequest.getRequestType() == StateRequestType.VALUE_UPDATE
|| stateRequest.getRequestType() == StateRequestType.CLEAR);
ContextKey<K> contextKey =
ContextKey<K, N> contextKey =
new ContextKey<>((RecordContext<K>) stateRequest.getRecordContext());
V value =
(stateRequest.getRequestType() == StateRequestType.CLEAR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public class ForStWriteBatchOperation implements ForStDBOperation {

private final RocksDB db;

private final List<ForStDBPutRequest<?, ?>> batchRequest;
private final List<ForStDBPutRequest<?, ?, ?>> batchRequest;

private final WriteOptions writeOptions;

private final Executor executor;

ForStWriteBatchOperation(
RocksDB db,
List<ForStDBPutRequest<?, ?>> batchRequest,
List<ForStDBPutRequest<?, ?, ?>> batchRequest,
WriteOptions writeOptions,
Executor executor) {
this.db = db;
Expand All @@ -57,7 +57,7 @@ public CompletableFuture<Void> process() {
() -> {
try (WriteBatch writeBatch =
new WriteBatch(batchRequest.size() * PER_RECORD_ESTIMATE_BYTES)) {
for (ForStDBPutRequest<?, ?> request : batchRequest) {
for (ForStDBPutRequest<?, ?, ?> request : batchRequest) {
if (request.valueIsNull()) {
// put(key, null) == delete(key)
writeBatch.delete(
Expand All @@ -71,12 +71,12 @@ public CompletableFuture<Void> process() {
}
}
db.write(writeOptions, writeBatch);
for (ForStDBPutRequest<?, ?> request : batchRequest) {
for (ForStDBPutRequest<?, ?, ?> request : batchRequest) {
request.completeStateFuture();
}
} catch (Exception e) {
String msg = "Error while write batch data to ForStDB.";
for (ForStDBPutRequest<?, ?> request : batchRequest) {
for (ForStDBPutRequest<?, ?, ?> request : batchRequest) {
// fail every state request in this batch
request.completeStateFutureExceptionally(msg, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
Expand Down Expand Up @@ -95,7 +96,7 @@ public <N> void setCurrentNamespaceForState(
};
}

protected ContextKey<Integer> buildContextKey(int i) {
protected ContextKey<Integer, Void> buildContextKey(int i) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i, 128);
RecordContext<Integer> recordContext =
new RecordContext<>(i, i, t -> {}, keyGroup, new Epoch(0));
Expand All @@ -117,6 +118,7 @@ protected ForStValueState<Integer, Void, String> buildForStValueState(String sta
cf,
valueStateDescriptor,
serializedKeyBuilder,
() -> VoidSerializer.INSTANCE,
valueSerializerView,
valueDeserializerView);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ public void testValueStateMultiGet() throws Exception {
buildForStValueState("test-multiGet-1");
ForStValueState<Integer, Void, String> valueState2 =
buildForStValueState("test-multiGet-2");
List<ForStDBGetRequest<?, ?>> batchGetRequest = new ArrayList<>();
List<ForStDBGetRequest<?, ?, ?>> batchGetRequest = new ArrayList<>();
List<Tuple2<String, TestStateFuture<String>>> resultCheckList = new ArrayList<>();

int keyNum = 1000;
for (int i = 0; i < keyNum; i++) {
TestStateFuture<String> future = new TestStateFuture<>();
ForStValueState<Integer, Void, String> table =
((i % 2 == 0) ? valueState1 : valueState2);
ForStDBGetRequest<ContextKey<Integer>, String> request =
ForStDBGetRequest<Integer, Void, String> request =
ForStDBGetRequest.of(buildContextKey(i), table, future);
batchGetRequest.add(request);

Expand Down
Loading

0 comments on commit d4294c5

Please sign in to comment.