Skip to content

Commit

Permalink
[FLINK-35024][Runtime/State] Implement the record buffer of AsyncExec…
Browse files Browse the repository at this point in the history
…utionController (apache#24633)
  • Loading branch information
fredia authored Apr 15, 2024
1 parent c0d889f commit 6d139a1
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The Async Execution Controller (AEC) receives processing requests from operators, and put them
* into execution according to some strategies.
Expand All @@ -45,11 +49,26 @@ public class AsyncExecutionController<R, K> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);

public static final int DEFAULT_BATCH_SIZE = 1000;
public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;

/**
* The batch size. When the number of state requests in the active buffer exceeds the batch
* size, a batched state execution would be triggered.
*/
private final int batchSize;

/** The max allowed number of in-flight records. */
private final int maxInFlightRecordNum;

/**
* The mailbox executor borrowed from {@code StreamTask}. Keeping the reference of
* mailboxExecutor here is to restrict the number of in-flight records, when the number of
* in-flight records > {@link #maxInFlightRecordNum}, the newly entering records would be
* blocked.
*/
private final MailboxExecutor mailboxExecutor;

/** The key accounting unit which is used to detect the key conflict. */
final KeyAccountingUnit<R, K> keyAccountingUnit;

Expand All @@ -65,17 +84,35 @@ public class AsyncExecutionController<R, K> {
/** The corresponding context that currently runs in task thread. */
RecordContext<R, K> currentContext;

/** The buffer to store the state requests to execute in batch. */
StateRequestBuffer<R, K> stateRequestsBuffer;

/**
* The number of in-flight records. Including the records in active buffer and blocking buffer.
*/
final AtomicInteger inFlightRecordNum;

public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
}

public AsyncExecutionController(
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) {
MailboxExecutor mailboxExecutor,
StateExecutor stateExecutor,
int batchSize,
int maxInFlightRecords) {
this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
this.mailboxExecutor = mailboxExecutor;
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
this.stateExecutor = stateExecutor;
this.batchSize = batchSize;
this.maxInFlightRecordNum = maxInFlightRecords;
LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords);
this.stateRequestsBuffer = new StateRequestBuffer<>();
this.inFlightRecordNum = new AtomicInteger(0);
LOG.info(
"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}",
batchSize,
maxInFlightRecords);
}

/**
Expand Down Expand Up @@ -107,6 +144,14 @@ public void setCurrentContext(RecordContext<R, K> switchingContext) {
*/
public void disposeContext(RecordContext<R, K> toDispose) {
keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
inFlightRecordNum.decrementAndGet();
RecordContext<R, K> nextRecordCtx =
stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey());
if (nextRecordCtx != null) {
Preconditions.checkState(
tryOccupyKey(nextRecordCtx),
String.format("key(%s) is already occupied.", nextRecordCtx.getKey()));
}
}

/**
Expand Down Expand Up @@ -140,23 +185,28 @@ public <IN, OUT> InternalStateFuture<OUT> handleRequest(
InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext);
StateRequest<K, IN, OUT> request =
new StateRequest<>(state, type, payload, stateFuture, currentContext);
// Step 2: try to occupy the key and place it into right buffer.

// Step 2: try to seize the capacity, if the current in-flight records exceeds the limit,
// block the current state request from entering until some buffered requests are processed.
seizeCapacity();

// Step 3: try to occupy the key and place it into right buffer.
if (tryOccupyKey(currentContext)) {
insertActiveBuffer(request);
} else {
insertBlockingBuffer(request);
}
// Step 3: trigger the (active) buffer if needed.
// Step 4: trigger the (active) buffer if needed.
triggerIfNeeded(false);
return stateFuture;
}

<IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) {
// TODO: implement the active buffer.
stateRequestsBuffer.enqueueToActive(request);
}

<IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
// TODO: implement the blocking buffer.
stateRequestsBuffer.enqueueToBlocking(request);
}

/**
Expand All @@ -165,6 +215,38 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
* @param force whether to trigger requests in force.
*/
void triggerIfNeeded(boolean force) {
// TODO: implement the trigger logic.
// TODO: introduce a timeout mechanism for triggering.
if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
return;
}
List<StateRequest<?, ?, ?>> toRun = stateRequestsBuffer.popActive(batchSize);
stateExecutor.executeBatchRequests(toRun);
}

private void seizeCapacity() {
// 1. Check if the record is already in buffer. If yes, this indicates that it is a state
// request resulting from a callback statement, otherwise, it signifies the initial state
// request for a newly entered record.
if (currentContext.isKeyOccupied()) {
return;
}
RecordContext<R, K> storedContext = currentContext;
// 2. If the state request is for a newly entered record, the in-flight record number should
// be less than the max in-flight record number.
// Note: the currentContext may be updated by {@code StateFutureFactory#build}.
try {
while (inFlightRecordNum.get() > maxInFlightRecordNum) {
if (!mailboxExecutor.tryYield()) {
triggerIfNeeded(true);
Thread.sleep(1);
}
}
} catch (InterruptedException ignored) {
// ignore the interrupted exception to avoid throwing fatal error when the task cancel
// or exit.
}
// 3. Ensure the currentContext is restored.
setCurrentContext(storedContext);
inFlightRecordNum.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* A buffer to hold state requests to execute state requests in batch, which can only be manipulated
* within task thread.
*
* @param <R> the type of the record
* @param <K> the type of the key
*/
@NotThreadSafe
public class StateRequestBuffer<R, K> {
/**
* The state requests in this buffer could be executed when the buffer is full or configured
* batch size is reached. All operations on this buffer must be invoked in task thread.
*/
final LinkedList<StateRequest<K, ?, ?>> activeQueue;

/**
* The requests in that should wait until all preceding records with identical key finishing its
* execution. After which the queueing requests will move into the active buffer. All operations
* on this buffer must be invoked in task thread.
*/
final Map<K, Deque<StateRequest<K, ?, ?>>> blockingQueue;

/** The number of state requests in blocking queue. */
int blockingQueueSize;

public StateRequestBuffer() {
this.activeQueue = new LinkedList<>();
this.blockingQueue = new HashMap<>();
this.blockingQueueSize = 0;
}

void enqueueToActive(StateRequest<K, ?, ?> request) {
activeQueue.add(request);
}

void enqueueToBlocking(StateRequest<K, ?, ?> request) {
blockingQueue
.computeIfAbsent(request.getRecordContext().getKey(), k -> new LinkedList<>())
.add(request);
blockingQueueSize++;
}

/**
* Try to pull one state request with specific key from blocking queue to active queue.
*
* @param key The key to release, the other records with this key is no longer blocking.
* @return The first record context with the same key in blocking queue, null if no such record.
*/
@Nullable
@SuppressWarnings("rawtypes")
RecordContext<R, K> tryActivateOneByKey(K key) {
if (!blockingQueue.containsKey(key)) {
return null;
}

StateRequest<K, ?, ?> stateRequest = blockingQueue.get(key).removeFirst();
activeQueue.add(stateRequest);
if (blockingQueue.get(key).isEmpty()) {
blockingQueue.remove(key);
}
blockingQueueSize--;
return (RecordContext<R, K>) stateRequest.getRecordContext();
}

/**
* Get the number of state requests of blocking queue in constant-time.
*
* @return the number of state requests of blocking queue.
*/
int blockingQueueSize() {
return blockingQueueSize;
}

/**
* Get the number of state requests of active queue in constant-time.
*
* @return the number of state requests of active queue.
*/
int activeQueueSize() {
return activeQueue.size();
}

/**
* Try to pop state requests from active queue, if the size of active queue is less than N,
* return all the requests in active queue.
*
* @param n the number of state requests to pop.
* @return A list of state requests.
*/
List<StateRequest<?, ?, ?>> popActive(int n) {
LinkedList<StateRequest<?, ?, ?>> ret =
new LinkedList<>(activeQueue.subList(0, Math.min(activeQueue.size(), n)));
activeQueue.removeAll(ret);
return ret;
}
}
Loading

0 comments on commit 6d139a1

Please sign in to comment.