Skip to content

Commit 713c30f

Browse files
committed
[FLINK-35026][runtime][config] Introduce async execution configurations
1 parent 714d1cb commit 713c30f

File tree

7 files changed

+180
-27
lines changed

7 files changed

+180
-27
lines changed

flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java

+38
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.api.common;
2020

21+
import org.apache.flink.annotation.Experimental;
2122
import org.apache.flink.annotation.Internal;
2223
import org.apache.flink.annotation.Public;
2324
import org.apache.flink.annotation.PublicEvolving;
@@ -1085,6 +1086,43 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
10851086
configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression);
10861087
}
10871088

1089+
// --------------------------------------------------------------------------------------------
1090+
// Asynchronous execution configurations
1091+
// --------------------------------------------------------------------------------------------
1092+
1093+
@Experimental
1094+
public int getAsyncInflightRecordsLimit() {
1095+
return configuration.get(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT);
1096+
}
1097+
1098+
@Experimental
1099+
public ExecutionConfig setAsyncInflightRecordsLimit(int limit) {
1100+
configuration.set(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT, limit);
1101+
return this;
1102+
}
1103+
1104+
@Experimental
1105+
public int getAsyncStateBufferSize() {
1106+
return configuration.get(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE);
1107+
}
1108+
1109+
@Experimental
1110+
public ExecutionConfig setAsyncStateBufferSize(int bufferSize) {
1111+
configuration.set(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE, bufferSize);
1112+
return this;
1113+
}
1114+
1115+
@Experimental
1116+
public long getAsyncStateBufferTimeout() {
1117+
return configuration.get(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT);
1118+
}
1119+
1120+
@Experimental
1121+
public ExecutionConfig setAsyncStateBufferTimeout(long timeout) {
1122+
configuration.set(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT, timeout);
1123+
return this;
1124+
}
1125+
10881126
@Override
10891127
public boolean equals(Object obj) {
10901128
if (obj instanceof ExecutionConfig) {

flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java

+59
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.configuration;
2020

21+
import org.apache.flink.annotation.Experimental;
2122
import org.apache.flink.annotation.PublicEvolving;
2223
import org.apache.flink.annotation.docs.Documentation;
2324
import org.apache.flink.api.common.BatchShuffleMode;
@@ -181,4 +182,62 @@ public class ExecutionOptions {
181182
+ " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs"
182183
+ SORT_INPUTS.key()
183184
+ " to be enabled.");
185+
186+
// ------------------------- Async State Execution --------------------------
187+
188+
/**
189+
* The max limit of in-flight records number in async state execution, 'in-flight' refers to the
190+
* records that have entered the operator but have not yet been processed and emitted to the
191+
* downstream. If the in-flight records number exceeds the limit, the newly records entering
192+
* will be blocked until the in-flight records number drops below the limit.
193+
*/
194+
@Experimental
195+
@Documentation.ExcludeFromDocumentation(
196+
"This is an experimental option, internal use only for now.")
197+
public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
198+
ConfigOptions.key("execution.async-state.in-flight-records-limit")
199+
.intType()
200+
.defaultValue(6000)
201+
.withDescription(
202+
"The max limit of in-flight records number in async state execution, 'in-flight' refers"
203+
+ " to the records that have entered the operator but have not yet been processed and"
204+
+ " emitted to the downstream. If the in-flight records number exceeds the limit,"
205+
+ " the newly records entering will be blocked until the in-flight records number drops below the limit.");
206+
207+
/**
208+
* The size of buffer under async state execution. Async state execution provides a buffer
209+
* mechanism to reduce state access. When the number of state requests in the buffer exceeds the
210+
* batch size, a batched state execution would be triggered. Larger batch sizes will bring
211+
* higher end-to-end latency, this option works with {@link #ASYNC_STATE_BUFFER_TIMEOUT} to
212+
* control the frequency of triggering.
213+
*/
214+
@Experimental
215+
@Documentation.ExcludeFromDocumentation(
216+
"This is an experimental option, internal use only for now.")
217+
public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_SIZE =
218+
ConfigOptions.key("execution.async-state.buffer-size")
219+
.intType()
220+
.defaultValue(1000)
221+
.withDescription(
222+
"The size of buffer under async state execution. Async state execution provides a buffer mechanism to reduce state access."
223+
+ " When the number of state requests in the active buffer exceeds the batch size,"
224+
+ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
225+
+ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
226+
227+
/**
228+
* The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
229+
* #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform
230+
* actively.
231+
*/
232+
@Experimental
233+
@Documentation.ExcludeFromDocumentation(
234+
"This is an experimental option, internal use only for now.")
235+
public static final ConfigOption<Long> ASYNC_STATE_BUFFER_TIMEOUT =
236+
ConfigOptions.key("execution.async-state.buffer-timeout")
237+
.longType()
238+
.defaultValue(1000L)
239+
.withDescription(
240+
"The timeout of buffer triggering in milliseconds. If the buffer has not reached the"
241+
+ " 'execution.async-state.buffer-size' within 'buffer-timeout' milliseconds,"
242+
+ " a trigger will perform actively.");
184243
}

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java

+15-14
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,24 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
5252

5353
private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
5454

55-
public static final int DEFAULT_BATCH_SIZE = 1000;
56-
public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
55+
private static final int DEFAULT_BATCH_SIZE = 1000;
56+
57+
private static final int DEFAULT_BUFFER_TIMEOUT = 1000;
58+
private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
5759

5860
/**
5961
* The batch size. When the number of state requests in the active buffer exceeds the batch
6062
* size, a batched state execution would be triggered.
6163
*/
6264
private final int batchSize;
6365

66+
/**
67+
* The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the
68+
* activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a
69+
* trigger will perform actively.
70+
*/
71+
private final long bufferTimeOut;
72+
6473
/** The max allowed number of in-flight records. */
6574
private final int maxInFlightRecordNum;
6675

@@ -98,27 +107,19 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
98107
/** Max parallelism of the job. */
99108
private final int maxParallelism;
100109

101-
public AsyncExecutionController(
102-
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxParallelism) {
103-
this(
104-
mailboxExecutor,
105-
stateExecutor,
106-
DEFAULT_BATCH_SIZE,
107-
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM,
108-
maxParallelism);
109-
}
110-
111110
public AsyncExecutionController(
112111
MailboxExecutor mailboxExecutor,
113112
StateExecutor stateExecutor,
113+
int maxParallelism,
114114
int batchSize,
115-
int maxInFlightRecords,
116-
int maxParallelism) {
115+
long bufferTimeOut,
116+
int maxInFlightRecords) {
117117
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
118118
this.mailboxExecutor = mailboxExecutor;
119119
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
120120
this.stateExecutor = stateExecutor;
121121
this.batchSize = batchSize;
122+
this.bufferTimeOut = bufferTimeOut;
122123
this.maxInFlightRecordNum = maxInFlightRecords;
123124
this.stateRequestsBuffer = new StateRequestBuffer<>();
124125
this.inFlightRecordNum = new AtomicInteger(0);

flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ class AsyncExecutionControllerTest {
6363

6464
@BeforeEach
6565
void setup() {
66-
aec = new AsyncExecutionController<>(new SyncMailboxExecutor(), createStateExecutor(), 128);
66+
aec =
67+
new AsyncExecutionController<>(
68+
new SyncMailboxExecutor(), createStateExecutor(), 128, 1000, 10000, 6000);
6769
underlyingState = new TestUnderlyingState();
6870
valueState = new TestValueState(aec, underlyingState);
6971
output = new AtomicInteger();
@@ -238,14 +240,16 @@ void testRecordsRunInOrder() {
238240
@Test
239241
void testInFlightRecordControl() {
240242
final int batchSize = 5;
243+
final int timeout = 1000;
241244
final int maxInFlight = 10;
242245
aec =
243246
new AsyncExecutionController<>(
244247
new SyncMailboxExecutor(),
245248
new TestStateExecutor(),
249+
128,
246250
batchSize,
247-
maxInFlight,
248-
128);
251+
timeout,
252+
maxInFlight);
249253
valueState = new TestValueState(aec, underlyingState);
250254

251255
AtomicInteger output = new AtomicInteger();

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
2727
import org.apache.flink.runtime.asyncprocessing.RecordContext;
2828
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
29+
import org.apache.flink.runtime.execution.Environment;
2930
import org.apache.flink.runtime.state.CheckpointStreamFactory;
3031
import org.apache.flink.runtime.state.KeyedStateBackend;
3132
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -65,13 +66,23 @@ public void setup(
6566
StreamConfig config,
6667
Output<StreamRecord<OUT>> output) {
6768
super.setup(containingTask, config, output);
68-
// TODO: properly read config and setup
69-
final MailboxExecutor mailboxExecutor =
70-
containingTask.getEnvironment().getMainMailboxExecutor();
71-
int maxParallelism =
72-
containingTask.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
69+
final Environment environment = containingTask.getEnvironment();
70+
final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor();
71+
final int maxParallelism = environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
72+
final int inFlightRecordsLimit =
73+
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
74+
final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize();
75+
final long asyncBufferTimeout =
76+
environment.getExecutionConfig().getAsyncStateBufferTimeout();
77+
// TODO: initial state executor and set state executor for aec
7378
this.asyncExecutionController =
74-
new AsyncExecutionController(mailboxExecutor, null, maxParallelism);
79+
new AsyncExecutionController(
80+
mailboxExecutor,
81+
null,
82+
maxParallelism,
83+
asyncBufferSize,
84+
asyncBufferTimeout,
85+
inFlightRecordsLimit);
7586
}
7687

7788
@Override

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
2727
import org.apache.flink.runtime.asyncprocessing.RecordContext;
2828
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
29+
import org.apache.flink.runtime.execution.Environment;
2930
import org.apache.flink.runtime.state.CheckpointStreamFactory;
3031
import org.apache.flink.runtime.state.KeyedStateBackend;
3132
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
@@ -60,19 +61,28 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
6061
public AbstractAsyncStateStreamOperatorV2(
6162
StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
6263
super(parameters, numberOfInputs);
63-
this.mailboxExecutor =
64-
parameters.getContainingTask().getEnvironment().getMainMailboxExecutor();
64+
final Environment environment = parameters.getContainingTask().getEnvironment();
65+
this.mailboxExecutor = environment.getMainMailboxExecutor();
6566
}
6667

6768
/** Initialize necessary state components for {@link AbstractStreamOperatorV2}. */
6869
@Override
6970
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
7071
throws Exception {
7172
super.initializeState(streamTaskStateManager);
72-
// TODO: Read config and properly set.
73+
74+
final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit();
75+
final int asyncBufferSize = getExecutionConfig().getAsyncStateBufferSize();
76+
final long asyncBufferTimeout = getExecutionConfig().getAsyncStateBufferTimeout();
7377
int maxParallelism = getExecutionConfig().getMaxParallelism();
7478
this.asyncExecutionController =
75-
new AsyncExecutionController(mailboxExecutor, null, maxParallelism);
79+
new AsyncExecutionController(
80+
mailboxExecutor,
81+
null,
82+
maxParallelism,
83+
asyncBufferSize,
84+
asyncBufferTimeout,
85+
inFlightRecordsLimit);
7686
}
7787

7888
@Override

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,36 @@ void testBufferTimeoutDisabled() {
462462
.isEqualTo(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
463463
}
464464

465+
@Test
466+
void testAsyncExecutionConfigurations() {
467+
Configuration config = new Configuration();
468+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
469+
env.configure(config, this.getClass().getClassLoader());
470+
471+
assertThat(env.getConfig().getAsyncInflightRecordsLimit())
472+
.isEqualTo(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT.defaultValue());
473+
assertThat(env.getConfig().getAsyncStateBufferSize())
474+
.isEqualTo(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE.defaultValue());
475+
assertThat(env.getConfig().getAsyncStateBufferTimeout())
476+
.isEqualTo(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT.defaultValue());
477+
478+
config.set(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT, 3);
479+
config.set(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE, 2);
480+
config.set(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT, 1L);
481+
env.configure(config, this.getClass().getClassLoader());
482+
assertThat(env.getConfig().getAsyncInflightRecordsLimit()).isEqualTo(3);
483+
assertThat(env.getConfig().getAsyncStateBufferSize()).isEqualTo(2);
484+
assertThat(env.getConfig().getAsyncStateBufferTimeout()).isEqualTo(1);
485+
486+
env.getConfig()
487+
.setAsyncInflightRecordsLimit(6)
488+
.setAsyncStateBufferSize(5)
489+
.setAsyncStateBufferTimeout(4);
490+
assertThat(env.getConfig().getAsyncInflightRecordsLimit()).isEqualTo(6);
491+
assertThat(env.getConfig().getAsyncStateBufferSize()).isEqualTo(5);
492+
assertThat(env.getConfig().getAsyncStateBufferTimeout()).isEqualTo(4);
493+
}
494+
465495
private void testBufferTimeout(Configuration config, StreamExecutionEnvironment env) {
466496
env.configure(config, this.getClass().getClassLoader());
467497
assertThat(env.getBufferTimeout())

0 commit comments

Comments
 (0)