Skip to content

Commit f1ecb9e

Browse files
authored
[FLINK-35030][runtime] Introduce Epoch Manager for under async execution (apache#24748)
1 parent cc21eec commit f1ecb9e

File tree

11 files changed

+484
-12
lines changed

11 files changed

+484
-12
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java

+36-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.state.v2.State;
2424
import org.apache.flink.core.state.InternalStateFuture;
2525
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
26+
import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
2627
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
2728
import org.apache.flink.util.Preconditions;
2829
import org.apache.flink.util.function.ThrowingRunnable;
@@ -76,6 +77,9 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
7677
*/
7778
private final MailboxExecutor mailboxExecutor;
7879

80+
/** Exception handler to handle the exception thrown by asynchronous framework. */
81+
private final AsyncFrameworkExceptionHandler exceptionHandler;
82+
7983
/** The key accounting unit which is used to detect the key conflict. */
8084
final KeyAccountingUnit<K> keyAccountingUnit;
8185

@@ -86,7 +90,7 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
8690
private final StateFutureFactory<K> stateFutureFactory;
8791

8892
/** The state executor where the {@link StateRequest} is actually executed. */
89-
StateExecutor stateExecutor;
93+
final StateExecutor stateExecutor;
9094

9195
/** The corresponding context that currently runs in task thread. */
9296
RecordContext<K> currentContext;
@@ -102,6 +106,15 @@ public class AsyncExecutionController<K> implements StateRequestHandler {
102106
/** Max parallelism of the job. */
103107
private final int maxParallelism;
104108

109+
/** The reference of epoch manager. */
110+
final EpochManager epochManager;
111+
112+
/**
113+
* The parallel mode of epoch execution. Keep this field internal for now, until we could see
114+
* the concrete need for {@link ParallelMode#PARALLEL_BETWEEN_EPOCH} from average users.
115+
*/
116+
final ParallelMode epochParallelMode = ParallelMode.SERIAL_BETWEEN_EPOCH;
117+
105118
public AsyncExecutionController(
106119
MailboxExecutor mailboxExecutor,
107120
AsyncFrameworkExceptionHandler exceptionHandler,
@@ -112,6 +125,7 @@ public AsyncExecutionController(
112125
int maxInFlightRecords) {
113126
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
114127
this.mailboxExecutor = mailboxExecutor;
128+
this.exceptionHandler = exceptionHandler;
115129
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor, exceptionHandler);
116130
this.stateExecutor = stateExecutor;
117131
this.batchSize = batchSize;
@@ -131,11 +145,13 @@ public AsyncExecutionController(
131145
},
132146
"AEC-buffer-timeout"));
133147

148+
this.epochManager = new EpochManager(this);
134149
LOG.info(
135-
"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}",
150+
"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}",
136151
this.batchSize,
137152
this.bufferTimeout,
138-
this.maxInFlightRecordNum);
153+
this.maxInFlightRecordNum,
154+
this.epochParallelMode);
139155
}
140156

141157
/**
@@ -152,13 +168,15 @@ public RecordContext<K> buildContext(Object record, K key) {
152168
RecordContext.EMPTY_RECORD,
153169
key,
154170
this::disposeContext,
155-
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
171+
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
172+
epochManager.onRecord());
156173
}
157174
return new RecordContext<>(
158175
record,
159176
key,
160177
this::disposeContext,
161-
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism));
178+
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
179+
epochManager.onRecord());
162180
}
163181

164182
/**
@@ -177,6 +195,7 @@ public void setCurrentContext(RecordContext<K> switchingContext) {
177195
* @param toDispose the context to dispose.
178196
*/
179197
void disposeContext(RecordContext<K> toDispose) {
198+
epochManager.completeOneRecord(toDispose.getEpoch());
180199
keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
181200
inFlightRecordNum.decrementAndGet();
182201
RecordContext<K> nextRecordCtx =
@@ -311,6 +330,18 @@ public void drainInflightRecords(int targetNum) {
311330
}
312331
}
313332

333+
public void processNonRecord(ThrowingRunnable<? extends Exception> action) {
334+
Runnable wrappedAction =
335+
() -> {
336+
try {
337+
action.run();
338+
} catch (Exception e) {
339+
exceptionHandler.handleException("Failed to process non-record.", e);
340+
}
341+
};
342+
epochManager.onNonRecord(wrappedAction, epochParallelMode);
343+
}
344+
314345
@VisibleForTesting
315346
public StateExecutor getStateExecutor() {
316347
return stateExecutor;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.asyncprocessing;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import javax.annotation.Nullable;
25+
26+
import java.util.LinkedList;
27+
28+
/**
29+
* Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g.
30+
* watermark, record attributes). Records are assigned to a unique epoch based on their arrival,
31+
* records within an epoch are allowed to be parallelized, while the non-record of an epoch can only
32+
* be executed when all records in this epoch have finished.
33+
*
34+
* <p>For more details please refer to FLIP-425.
35+
*/
36+
public class EpochManager {
37+
private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);
38+
39+
/**
40+
* This enum defines whether parallel execution between epochs is allowed. We should keep this
41+
* internal and away from API module for now, until we could see the concrete need for {@link
42+
* #PARALLEL_BETWEEN_EPOCH} from average users.
43+
*/
44+
public enum ParallelMode {
45+
/**
46+
* Subsequent epochs must wait until the previous epoch is completed before they can start.
47+
*/
48+
SERIAL_BETWEEN_EPOCH,
49+
/**
50+
* Subsequent epochs can begin execution even if the previous epoch has not yet completed.
51+
* Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
52+
*/
53+
PARALLEL_BETWEEN_EPOCH
54+
}
55+
56+
/**
57+
* The reference to the {@link AsyncExecutionController}, used for {@link
58+
* ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
59+
*/
60+
final AsyncExecutionController<?> asyncExecutionController;
61+
62+
/** The number of epochs that have arrived. */
63+
long epochNum;
64+
65+
/** The output queue to hold ongoing epochs. */
66+
LinkedList<Epoch> outputQueue;
67+
68+
/** Current active epoch, only one active epoch at the same time. */
69+
Epoch activeEpoch;
70+
71+
public EpochManager(AsyncExecutionController<?> aec) {
72+
this.epochNum = 0;
73+
this.outputQueue = new LinkedList<>();
74+
this.asyncExecutionController = aec;
75+
// init an empty epoch, the epoch action will be updated when non-record is received.
76+
this.activeEpoch = new Epoch(epochNum++);
77+
}
78+
79+
/**
80+
* Add a record to the current epoch and return the current open epoch, the epoch will be
81+
* associated with the {@link RecordContext} of this record. Must be invoked within task thread.
82+
*
83+
* @return the current open epoch.
84+
*/
85+
public Epoch onRecord() {
86+
activeEpoch.ongoingRecordCount++;
87+
return activeEpoch;
88+
}
89+
90+
/**
91+
* Add a non-record to the current epoch, close current epoch and open a new epoch. Must be
92+
* invoked within task thread.
93+
*
94+
* @param action the action associated with this non-record.
95+
* @param parallelMode the parallel mode for this epoch.
96+
*/
97+
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
98+
LOG.trace(
99+
"on NonRecord, old epoch: {}, outputQueue size: {}",
100+
activeEpoch,
101+
outputQueue.size());
102+
switchActiveEpoch(action);
103+
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
104+
asyncExecutionController.drainInflightRecords(0);
105+
}
106+
}
107+
108+
/**
109+
* Complete one record in the specific epoch. Must be invoked within task thread.
110+
*
111+
* @param epoch the specific epoch
112+
*/
113+
public void completeOneRecord(Epoch epoch) {
114+
if (--epoch.ongoingRecordCount == 0) {
115+
tryFinishInQueue();
116+
}
117+
}
118+
119+
private void tryFinishInQueue() {
120+
// If one epoch has been closed before and all records in
121+
// this epoch have finished, the epoch will be removed from the output queue.
122+
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
123+
LOG.trace(
124+
"Finish epoch: {}, outputQueue size: {}",
125+
outputQueue.peek(),
126+
outputQueue.size());
127+
outputQueue.pop();
128+
}
129+
}
130+
131+
private void switchActiveEpoch(Runnable action) {
132+
activeEpoch.close(action);
133+
outputQueue.offer(activeEpoch);
134+
this.activeEpoch = new Epoch(epochNum++);
135+
tryFinishInQueue();
136+
}
137+
138+
/** The status of an epoch, see Fig.6 in FLIP-425 for details. */
139+
enum EpochStatus {
140+
/**
141+
* The subsequent non-record input has not arrived. So arriving records will be collected
142+
* into current epoch.
143+
*/
144+
OPEN,
145+
/**
146+
* The records belong to this epoch is settled since the following non-record input has
147+
* arrived, the newly arriving records would be collected into the next epoch.
148+
*/
149+
CLOSED,
150+
/**
151+
* One epoch can only be finished when it meets the following three conditions. 1. The
152+
* records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in
153+
* the front of outputQueue.
154+
*/
155+
FINISHED
156+
}
157+
158+
/**
159+
* All inputs are segment into distinct epochs, marked by the arrival of non-record inputs.
160+
* Records are assigned to a unique epoch based on their arrival.
161+
*/
162+
public static class Epoch {
163+
/** The id of this epoch for easy debugging. */
164+
long id;
165+
/** The number of records that are still ongoing in this epoch. */
166+
int ongoingRecordCount;
167+
168+
/** The action associated with non-record of this epoch(e.g. advance watermark). */
169+
@Nullable Runnable action;
170+
171+
EpochStatus status;
172+
173+
public Epoch(long id) {
174+
this.id = id;
175+
this.ongoingRecordCount = 0;
176+
this.status = EpochStatus.OPEN;
177+
this.action = null;
178+
}
179+
180+
/**
181+
* Try to finish this epoch.
182+
*
183+
* @return whether this epoch has been finished.
184+
*/
185+
boolean tryFinish() {
186+
if (this.status == EpochStatus.FINISHED) {
187+
return true;
188+
}
189+
if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) {
190+
this.status = EpochStatus.FINISHED;
191+
if (action != null) {
192+
action.run();
193+
}
194+
return true;
195+
}
196+
return false;
197+
}
198+
199+
/** Close this epoch. */
200+
void close(Runnable action) {
201+
this.action = action;
202+
this.status = EpochStatus.CLOSED;
203+
}
204+
205+
public String toString() {
206+
return String.format(
207+
"Epoch{id=%d, ongoingRecord=%d, status=%s}", id, ongoingRecordCount, status);
208+
}
209+
}
210+
}

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.runtime.asyncprocessing;
2020

21+
import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
22+
2123
import javax.annotation.Nullable;
2224

2325
import java.util.Objects;
@@ -46,7 +48,7 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun
4648

4749
/**
4850
* The disposer for disposing this context. This should be invoked in {@link
49-
* #referenceCountReachedZero()}, which may be called once the ref count reaches zero in any
51+
* #referenceCountReachedZero}, which may be called once the ref count reaches zero in any
5052
* thread.
5153
*/
5254
private final Consumer<RecordContext<K>> disposer;
@@ -61,13 +63,18 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun
6163
*/
6264
private @Nullable volatile Object extra;
6365

64-
public RecordContext(Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup) {
66+
/** The epoch of this context. */
67+
private final Epoch epoch;
68+
69+
public RecordContext(
70+
Object record, K key, Consumer<RecordContext<K>> disposer, int keyGroup, Epoch epoch) {
6571
super(0);
6672
this.record = record;
6773
this.key = key;
6874
this.keyOccupied = false;
6975
this.disposer = disposer;
7076
this.keyGroup = keyGroup;
77+
this.epoch = epoch;
7178
}
7279

7380
public Object getRecord() {
@@ -112,6 +119,10 @@ public Object getExtra() {
112119
return extra;
113120
}
114121

122+
public Epoch getEpoch() {
123+
return epoch;
124+
}
125+
115126
@Override
116127
public int hashCode() {
117128
return Objects.hash(record, key);
@@ -129,6 +140,12 @@ public boolean equals(Object o) {
129140
if (!Objects.equals(record, that.record)) {
130141
return false;
131142
}
143+
if (!Objects.equals(keyGroup, that.keyGroup)) {
144+
return false;
145+
}
146+
if (!Objects.equals(epoch, that.epoch)) {
147+
return false;
148+
}
132149
return Objects.equals(key, that.key);
133150
}
134151

@@ -143,6 +160,8 @@ public String toString() {
143160
+ keyOccupied
144161
+ ", ref="
145162
+ getReferenceCount()
163+
+ ", epoch="
164+
+ epoch.id
146165
+ "}";
147166
}
148167

0 commit comments

Comments
 (0)