Skip to content

Commit 1d1bf5c

Browse files
authored
[FLINK-34975][state/forst] Execute read/write state request in different executor (apache#25360)
1 parent fe97786 commit 1d1bf5c

File tree

6 files changed

+71
-14
lines changed

6 files changed

+71
-14
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import static org.apache.flink.configuration.ConfigOptions.key;
4040
import static org.apache.flink.configuration.description.LinkElement.link;
4141
import static org.apache.flink.configuration.description.TextElement.code;
42+
import static org.apache.flink.state.forst.ForStOptions.EXECUTOR_READ_IO_PARALLELISM;
43+
import static org.apache.flink.state.forst.ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM;
4244
import static org.rocksdb.CompactionStyle.FIFO;
4345
import static org.rocksdb.CompactionStyle.LEVEL;
4446
import static org.rocksdb.CompactionStyle.NONE;
@@ -322,6 +324,9 @@ public class ForStConfigurableOptions implements Serializable {
322324

323325
static final ConfigOption<?>[] CANDIDATE_CONFIGS =
324326
new ConfigOption<?>[] {
327+
// configurable forst executor
328+
EXECUTOR_WRITE_IO_PARALLELISM,
329+
EXECUTOR_READ_IO_PARALLELISM,
325330
// configurable DBOptions
326331
MAX_BACKGROUND_THREADS,
327332
MAX_OPEN_FILES,

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,12 @@ public StateExecutor createStateExecutor() {
274274
throw new FlinkRuntimeException(
275275
"Attempt to create StateExecutor after ForStKeyedStateBackend is disposed.");
276276
}
277-
// TODO: Make io parallelism configurable
278277
StateExecutor stateExecutor =
279-
new ForStStateExecutor(4, db, optionsContainer.getWriteOptions());
278+
new ForStStateExecutor(
279+
optionsContainer.getReadIoParallelism(),
280+
optionsContainer.getWriteIoParallelism(),
281+
db,
282+
optionsContainer.getWriteOptions());
280283
managedStateExecutors.add(stateExecutor);
281284
return stateExecutor;
282285
}

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java

+14
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,18 @@ public class ForStOptions {
156156
+ "when '%s' is configured to '%s'. Increasing this value can improve the performance "
157157
+ "of rocksdb timer service, but consumes more heap memory at the same time.",
158158
TIMER_SERVICE_FACTORY.key(), ForStDB.name()));
159+
160+
public static final ConfigOption<Integer> EXECUTOR_READ_IO_PARALLELISM =
161+
ConfigOptions.key("state.backend.forst.memory.executor-read-io-parallelism")
162+
.intType()
163+
.defaultValue(3)
164+
.withDescription(
165+
"The number of threads used for read IO operations in the executor.");
166+
167+
public static final ConfigOption<Integer> EXECUTOR_WRITE_IO_PARALLELISM =
168+
ConfigOptions.key("state.backend.forst.memory.executor-write-io-parallelism")
169+
.intType()
170+
.defaultValue(1)
171+
.withDescription(
172+
"The number of threads used for write IO operations in the executor.");
159173
}

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java

+8
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,14 @@ public Path getDbPath() {
277277
}
278278
}
279279

280+
public int getReadIoParallelism() {
281+
return configuration.get(ForStOptions.EXECUTOR_READ_IO_PARALLELISM);
282+
}
283+
284+
public int getWriteIoParallelism() {
285+
return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM);
286+
}
287+
280288
/**
281289
* Prepare local and remote directories.
282290
*

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java

+35-10
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,41 @@ public class ForStStateExecutor implements StateExecutor {
5252
*/
5353
private final ExecutorService coordinatorThread;
5454

55-
/** The worker thread that actually executes the {@link StateRequest}s. */
56-
private final ExecutorService workerThreads;
55+
/** The worker thread that actually executes the read {@link StateRequest}s. */
56+
private final ExecutorService readThreads;
57+
58+
/** The worker thread that actually executes the write {@link StateRequest}s. */
59+
private final ExecutorService writeThreads;
5760

5861
private final RocksDB db;
5962

6063
private final WriteOptions writeOptions;
6164

6265
private Throwable executionError;
6366

64-
public ForStStateExecutor(int ioParallelism, RocksDB db, WriteOptions writeOptions) {
67+
public ForStStateExecutor(
68+
int readIoParallelism, int writeIoParallelism, RocksDB db, WriteOptions writeOptions) {
69+
Preconditions.checkState(readIoParallelism > 0 || writeIoParallelism > 0);
6570
this.coordinatorThread =
6671
Executors.newSingleThreadScheduledExecutor(
6772
new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
68-
this.workerThreads =
69-
Executors.newFixedThreadPool(
70-
ioParallelism, new ExecutorThreadFactory("ForSt-StateExecutor-IO"));
73+
if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
74+
this.readThreads =
75+
Executors.newFixedThreadPool(
76+
Math.max(readIoParallelism, writeIoParallelism),
77+
new ExecutorThreadFactory("ForSt-StateExecutor-IO"));
78+
this.writeThreads = null;
79+
} else {
80+
this.readThreads =
81+
Executors.newFixedThreadPool(
82+
readIoParallelism,
83+
new ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
84+
85+
this.writeThreads =
86+
Executors.newFixedThreadPool(
87+
writeIoParallelism,
88+
new ExecutorThreadFactory("ForSt-StateExecutor-write-IO"));
89+
}
7190
this.db = db;
7291
this.writeOptions = writeOptions;
7392
}
@@ -89,23 +108,26 @@ public CompletableFuture<Void> executeBatchRequests(
89108
if (!putRequests.isEmpty()) {
90109
ForStWriteBatchOperation writeOperations =
91110
new ForStWriteBatchOperation(
92-
db, putRequests, writeOptions, workerThreads);
111+
db,
112+
putRequests,
113+
writeOptions,
114+
writeThreads == null ? readThreads : writeThreads);
93115
futures.add(writeOperations.process());
94116
}
95117

96118
List<ForStDBGetRequest<?, ?, ?, ?>> getRequests =
97119
stateRequestClassifier.pollDbGetRequests();
98120
if (!getRequests.isEmpty()) {
99121
ForStGeneralMultiGetOperation getOperations =
100-
new ForStGeneralMultiGetOperation(db, getRequests, workerThreads);
122+
new ForStGeneralMultiGetOperation(db, getRequests, readThreads);
101123
futures.add(getOperations.process());
102124
}
103125

104126
List<ForStDBIterRequest<?, ?, ?, ?, ?>> iterRequests =
105127
stateRequestClassifier.pollDbIterRequests();
106128
if (!iterRequests.isEmpty()) {
107129
ForStIterateOperation iterOperations =
108-
new ForStIterateOperation(db, iterRequests, workerThreads);
130+
new ForStIterateOperation(db, iterRequests, readThreads);
109131
futures.add(iterOperations.process());
110132
}
111133

@@ -155,7 +177,10 @@ private void checkState() {
155177

156178
@Override
157179
public void shutdown() {
158-
workerThreads.shutdown();
180+
readThreads.shutdown();
181+
if (writeThreads != null) {
182+
writeThreads.shutdown();
183+
}
159184
coordinatorThread.shutdown();
160185
LOG.info("Shutting down the ForStStateExecutor.");
161186
}

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class ForStStateExecutorTest extends ForStDBOperationTestBase {
4646
@Test
4747
@SuppressWarnings("unchecked")
4848
void testExecuteValueStateRequest() throws Exception {
49-
ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, new WriteOptions());
49+
ForStStateExecutor forStStateExecutor =
50+
new ForStStateExecutor(3, 1, db, new WriteOptions());
5051
ForStValueState<Integer, VoidNamespace, String> state1 =
5152
buildForStValueState("value-state-1");
5253
ForStValueState<Integer, VoidNamespace, String> state2 =
@@ -129,7 +130,8 @@ void testExecuteValueStateRequest() throws Exception {
129130

130131
@Test
131132
void testExecuteMapStateRequest() throws Exception {
132-
ForStStateExecutor forStStateExecutor = new ForStStateExecutor(4, db, new WriteOptions());
133+
ForStStateExecutor forStStateExecutor =
134+
new ForStStateExecutor(3, 1, db, new WriteOptions());
133135
ForStMapState<Integer, VoidNamespace, String, String> state =
134136
buildForStMapState("map-state");
135137
StateRequestContainer stateRequestContainer =

0 commit comments

Comments
 (0)