Skip to content

Commit

Permalink
[FLINK-34975][state/forst] Support multiget for forstdb (apache#25363)
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored Sep 25, 2024
1 parent 733a566 commit f33abff
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.flink.state.forst;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand All @@ -33,39 +37,167 @@
public class ForStGeneralMultiGetOperation implements ForStDBOperation {

private final RocksDB db;

private final List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest;

List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests;
List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests;

private final Executor executor;

private final Runnable subProcessFinished;

private final int readIoParallelism;

ForStGeneralMultiGetOperation(
RocksDB db, List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest, Executor executor) {
this(db, batchRequest, executor, null);
this(db, batchRequest, executor, 1, null);
}

ForStGeneralMultiGetOperation(
RocksDB db,
List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest,
Executor executor,
int readIoParallelism,
Runnable subProcessFinished) {
this.db = db;
this.batchRequest = batchRequest;
this.executor = executor;
this.subProcessFinished = subProcessFinished;
this.readIoParallelism = readIoParallelism;
this.splitRequests = new ArrayList<>();
this.mapCheckRequests = new ArrayList<>();
classifyAndSplitRequests(splitRequests, mapCheckRequests);
}

@Override
public CompletableFuture<Void> process() {
// TODO: Use MultiGet to optimize this implement

CompletableFuture<Void> future = new CompletableFuture<>();

AtomicReference<Exception> error = new AtomicReference<>();
AtomicInteger counter = new AtomicInteger(batchRequest.size());

processOneByOne(mapCheckRequests, error, counter, future);
for (List<ForStDBGetRequest<?, ?, ?, ?>> getRequests : splitRequests) {
executor.execute(
() -> {
try {
ReadOptions readOptions = new ReadOptions();
readOptions.setReadaheadSize(0);
List<byte[]> keys = new ArrayList<>(getRequests.size());
List<ColumnFamilyHandle> columnFamilyHandles =
new ArrayList<>(getRequests.size());

for (int i = 0; i < getRequests.size(); i++) {
ForStDBGetRequest<?, ?, ?, ?> request = getRequests.get(i);
try {
if (error.get() == null) {
byte[] key = request.buildSerializedKey();
keys.add(key);
columnFamilyHandles.add(request.getColumnFamilyHandle());
} else {
completeExceptionallyRequest(
request,
"Error already occurred in other state request of the same group, failed the state request directly",
error.get());
}
} catch (IOException e) {
error.set(e);
completeExceptionallyRequest(
request,
"Error when execute ForStDb serialized get key",
e);
future.completeExceptionally(e);
}
}
if (error.get() != null) {
return;
}
List<byte[]> values = null;
try {
values = db.multiGetAsList(readOptions, columnFamilyHandles, keys);
} catch (Exception e) {
error.set(e);
future.completeExceptionally(e);
for (int i = 0; i < getRequests.size(); i++) {
completeExceptionallyRequest(
getRequests.get(i), "Error occurred when multiGet", e);
}
}
if (error.get() != null) {
return;
}
for (int i = 0; i < getRequests.size(); i++) {
ForStDBGetRequest<?, ?, ?, ?> request = getRequests.get(i);
try {
if (error.get() == null) {
request.completeStateFuture(values.get(i));
} else {
completeExceptionallyRequest(
request,
"Error already occurred in other state request of the same "
+ "group, failed the state request directly",
error.get());
}
} catch (Exception e) {
error.set(e);
completeExceptionallyRequest(
request, "Error when complete get future.", e);
future.completeExceptionally(e);
}
}

if (counter.addAndGet(-getRequests.size()) == 0
&& !future.isCompletedExceptionally()) {
future.complete(null);
}

} finally {
if (subProcessFinished != null) {
subProcessFinished.run();
}
}
});
}
return future;
}

private void completeExceptionallyRequest(
ForStDBGetRequest<?, ?, ?, ?> request, String message, Exception e) {
request.completeStateFutureExceptionally(message, e);
}

private void classifyAndSplitRequests(
List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests,
List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests) {
List<ForStDBGetRequest<?, ?, ?, ?>> getRequests = new ArrayList<>();
for (int i = 0; i < batchRequest.size(); i++) {
ForStDBGetRequest<?, ?, ?, ?> request = batchRequest.get(i);
if (request instanceof ForStDBMapCheckRequest) {
mapCheckRequests.add(request);
} else {
getRequests.add(request);
}
}

for (int p = 0; p < readIoParallelism; p++) {
int startIndex = getRequests.size() * p / readIoParallelism;
int endIndex = getRequests.size() * (p + 1) / readIoParallelism;
if (startIndex < endIndex) {
splitRequests.add(new ArrayList<>());
}
for (int i = startIndex; i < endIndex; i++) {
splitRequests.get(splitRequests.size() - 1).add(getRequests.get(i));
}
}
}

private void processOneByOne(
List<ForStDBGetRequest<?, ?, ?, ?>> requests,
AtomicReference<Exception> error,
AtomicInteger counter,
CompletableFuture<Void> future) {
for (int i = 0; i < requests.size(); i++) {
ForStDBGetRequest<?, ?, ?, ?> request = requests.get(i);
executor.execute(
() -> {
try {
Expand All @@ -84,7 +216,8 @@ public CompletableFuture<Void> process() {
future.completeExceptionally(e);
} finally {
if (counter.decrementAndGet() == 0
&& !future.isCompletedExceptionally()) {
&& !future.isCompletedExceptionally()
&& !future.isDone()) {
future.complete(null);
}
if (subProcessFinished != null) {
Expand All @@ -93,11 +226,10 @@ public CompletableFuture<Void> process() {
}
});
}
return future;
}

@Override
public int subProcessCount() {
return batchRequest.size();
return mapCheckRequests.size() + splitRequests.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ public CompletableFuture<Void> executeBatchRequests(
if (!getRequests.isEmpty()) {
ForStGeneralMultiGetOperation getOperations =
new ForStGeneralMultiGetOperation(
db, getRequests, readThreads, ongoing::decrementAndGet);
db,
getRequests,
readThreads,
readThreadCount,
ongoing::decrementAndGet);
// sub process count should -1, since we have added 1 on top.
ongoing.addAndGet(getOperations.subProcessCount() - 1);
futures.add(getOperations.process());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void testValueStateMultiGet() throws Exception {
db.put(request.getColumnFamilyHandle(), keyBytes, valueBytes);
}

ExecutorService executor = Executors.newFixedThreadPool(4);
ExecutorService executor = Executors.newFixedThreadPool(3);
ForStGeneralMultiGetOperation generalMultiGetOperation =
new ForStGeneralMultiGetOperation(db, batchGetRequest, executor);
new ForStGeneralMultiGetOperation(db, batchGetRequest, executor, 3, null);
generalMultiGetOperation.process().get();

for (Tuple2<String, TestStateFuture<String>> tuple : resultCheckList) {
Expand Down

0 comments on commit f33abff

Please sign in to comment.