Skip to content

Commit

Permalink
[FLINK-23381][state] Back-pressure on reaching state change to upload…
Browse files Browse the repository at this point in the history
… limit
  • Loading branch information
rkhachatryan committed Nov 19, 2021
1 parent 7bbe5f9 commit f925481
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.flink.changelog.fs;

import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.AvailabilityProvider.AvailabilityHelper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,14 +33,15 @@
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.LongAdder;

import static java.lang.Thread.holdsLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand All @@ -55,26 +59,40 @@ class BatchingStateChangeUploader implements StateChangeUploader {
private final long scheduleDelayMs;
private final long sizeThresholdBytes;

@GuardedBy("scheduled")
/**
* The lock is used to synchronize concurrent accesses:
*
* <ul>
* <li>task thread and {@link #scheduler} thread to {@link #scheduled} tasks
* <li>task thread and {@link #retryingExecutor uploader} thread to {@link #uploadThrottle}
* </ul>
*
* <p>These code paths are independent, but a single lock is used for simplicity.
*/
private final Object lock = new Object();

@GuardedBy("lock")
private final Queue<UploadTask> scheduled;

@GuardedBy("scheduled")
@GuardedBy("lock")
private long scheduledBytesCounter;

private final AvailabilityHelper availabilityHelper;

/**
* There should be at most one scheduled future, so that changes are batched according to
* settings.
*/
@Nullable
@GuardedBy("scheduled")
@GuardedBy("lock")
private ScheduledFuture<?> scheduledFuture;

@Nullable
@GuardedBy("this")
private Throwable errorUnsafe;

private final long maxBytesInFlight;
private final LongAdder inFlightBytesCounter = new LongAdder();
@GuardedBy("lock")
private final UploadThrottle uploadThrottle;

BatchingStateChangeUploader(
long persistDelayMs,
Expand All @@ -86,33 +104,40 @@ class BatchingStateChangeUploader implements StateChangeUploader {
this(
persistDelayMs,
sizeThresholdBytes,
maxBytesInFlight,
retryPolicy,
delegate,
SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG),
new RetryingExecutor(numUploadThreads),
maxBytesInFlight);
new RetryingExecutor(numUploadThreads));
}

BatchingStateChangeUploader(
long persistDelayMs,
long sizeThresholdBytes,
long maxBytesInFlight,
RetryPolicy retryPolicy,
StateChangeUploader delegate,
ScheduledExecutorService scheduler,
RetryingExecutor retryingExecutor,
long maxBytesInFlight) {
RetryingExecutor retryingExecutor) {
checkArgument(
sizeThresholdBytes <= maxBytesInFlight,
"sizeThresholdBytes (%s) must not exceed maxBytesInFlight (%s)",
sizeThresholdBytes,
maxBytesInFlight);
this.scheduleDelayMs = persistDelayMs;
this.scheduled = new LinkedList<>();
this.scheduler = scheduler;
this.retryPolicy = retryPolicy;
this.retryingExecutor = retryingExecutor;
this.sizeThresholdBytes = sizeThresholdBytes;
this.maxBytesInFlight = maxBytesInFlight;
this.delegate = delegate;
this.uploadThrottle = new UploadThrottle(maxBytesInFlight);
this.availabilityHelper = new AvailabilityHelper();
this.availabilityHelper.resetAvailable();
}

@Override
public void upload(UploadTask uploadTask) {
public void upload(UploadTask uploadTask) throws IOException {
Throwable error = getErrorSafe();
if (error != null) {
LOG.debug("don't persist {} changesets, already failed", uploadTask.changeSets.size());
Expand All @@ -121,26 +146,46 @@ public void upload(UploadTask uploadTask) {
}
LOG.debug("persist {} changeSets", uploadTask.changeSets.size());
try {
checkState(
inFlightBytesCounter.sum() <= maxBytesInFlight,
"In flight data size threshold exceeded %s > %s",
inFlightBytesCounter.sum(),
maxBytesInFlight);
synchronized (scheduled) {
long size = uploadTask.getSize();
inFlightBytesCounter.add(size);
long size = uploadTask.getSize();
synchronized (lock) {
while (!uploadThrottle.hasCapacity()) {
lock.wait();
}
uploadThrottle.seizeCapacity(size);
if (!uploadThrottle.hasCapacity()) {
availabilityHelper.resetUnavailable();
}
scheduledBytesCounter += size;
scheduled.add(wrapWithSizeUpdate(uploadTask, size, inFlightBytesCounter));
scheduled.add(wrapWithSizeUpdate(uploadTask, size));
scheduleUploadIfNeeded();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
uploadTask.fail(e);
throw new IOException(e);
} catch (Exception e) {
uploadTask.fail(e);
throw e;
}
}

private void releaseCapacity(long size) {
CompletableFuture<?> toNotify = null;
synchronized (lock) {
boolean hadCapacityBefore = uploadThrottle.hasCapacity();
uploadThrottle.releaseCapacity(size);
lock.notifyAll();
if (!hadCapacityBefore && uploadThrottle.hasCapacity()) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
}
}
if (toNotify != null) {
toNotify.complete(null);
}
}

private void scheduleUploadIfNeeded() {
checkState(holdsLock(scheduled));
checkState(holdsLock(lock));
if (scheduleDelayMs == 0 || scheduledBytesCounter >= sizeThresholdBytes) {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
Expand All @@ -154,7 +199,7 @@ private void scheduleUploadIfNeeded() {

private void drainAndSave() {
Collection<UploadTask> tasks;
synchronized (scheduled) {
synchronized (lock) {
tasks = new ArrayList<>(scheduled);
scheduled.clear();
scheduledBytesCounter = 0;
Expand Down Expand Up @@ -186,7 +231,7 @@ public void close() throws Exception {
LOG.warn("Unable to cleanly shutdown scheduler in 5s");
}
ArrayList<UploadTask> drained;
synchronized (scheduled) {
synchronized (lock) {
drained = new ArrayList<>(scheduled);
scheduled.clear();
scheduledBytesCounter = 0;
Expand All @@ -205,17 +250,32 @@ private synchronized void setErrorSafe(Throwable t) {
errorUnsafe = t;
}

private static UploadTask wrapWithSizeUpdate(
UploadTask uploadTask, long preComputedTaskSize, LongAdder inflightSize) {
private UploadTask wrapWithSizeUpdate(UploadTask uploadTask, long size) {
return new UploadTask(
uploadTask.changeSets,
result -> {
inflightSize.add(-preComputedTaskSize);
uploadTask.successCallback.accept(result);
try {
releaseCapacity(size);
} finally {
uploadTask.successCallback.accept(result);
}
},
(result, error) -> {
inflightSize.add(-preComputedTaskSize);
uploadTask.failureCallback.accept(result, error);
try {
releaseCapacity(size);
} finally {
uploadTask.failureCallback.accept(result, error);
}
});
}

@Override
public AvailabilityProvider getAvailabilityProvider() {
// This method can be called by multiple (task) threads.
// Though the field itself is final, implementation is generally not thread-safe.
// However, in case of reading stale AvailabilityHelper.availableFuture
// the task will either be notified about availability immediately;
// or back-pressured hard trying to seize capacity in upload()
return availabilityHelper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public class FsStateChangelogOptions {
+ ". Once reached, accumulated changes are persisted immediately. "
+ "This is different from "
+ PREEMPTIVE_PERSIST_THRESHOLD.key()
+ " as it happens AFTER the checkpoint and potentially for state changes of multiple operators.");
+ " as it happens AFTER the checkpoint and potentially for state changes of multiple operators. "
+ "Must not exceed in-flight data limit (see below)");

public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =
ConfigOptions.key("dstl.dfs.upload.buffer-size")
Expand All @@ -93,7 +94,14 @@ public class FsStateChangelogOptions {
.defaultValue(MemorySize.parse("100Mb"))
.withDescription(
"Max amount of data allowed to be in-flight. "
+ "Upon reaching this limit the task will fail");
+ "Upon reaching this limit the task will be back-pressured. "
+ " I.e., snapshotting will block; normal processing will block if "
+ PREEMPTIVE_PERSIST_THRESHOLD.key()
+ " is set and reached. "
+ "The limit is applied to the total size of in-flight changes if multiple "
+ "operators/backends are using the same changelog storage. "
+ "Must be greater than or equal to "
+ PERSIST_SIZE_THRESHOLD.key());

public static final ConfigOption<String> RETRY_POLICY =
ConfigOptions.key("dstl.dfs.upload.retry-policy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
Expand Down Expand Up @@ -92,4 +93,9 @@ public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader()
public void close() throws Exception {
uploader.close();
}

@Override
public AvailabilityProvider getAvailabilityProvider() {
return uploader.getAvailabilityProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.state.changelog.SequenceNumber;

import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -89,6 +90,10 @@ static StateChangeUploader fromConfig(ReadableConfig config) throws IOException
return batchingStore;
}

default AvailabilityProvider getAvailabilityProvider() {
return () -> AvailabilityProvider.AVAILABLE;
}

@ThreadSafe
final class UploadTask {
final Collection<StateChangeSet> changeSets;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.changelog.fs;

import javax.annotation.concurrent.NotThreadSafe;

import static org.apache.flink.util.Preconditions.checkState;

/** Helper class to throttle upload requests when the in-flight data size limit is exceeded. */
@NotThreadSafe
class UploadThrottle {

private final long maxBytesInFlight;

private long inFlightBytesCounter = 0;

UploadThrottle(long maxBytesInFlight) {
this.maxBytesInFlight = maxBytesInFlight;
}

/**
* Seize <b>bytes</b> capacity. It is the caller responsibility to ensure at least some capacity
* {@link #hasCapacity() is available}. <strong>After</strong> this call, the caller is allowed
* to actually use the seized capacity. When the capacity is not needed anymore, the caller is
* required to {@link #releaseCapacity(long) release} it. Called by the Task thread.
*
* @throws IllegalStateException if capacity is unavailable.
*/
public void seizeCapacity(long bytes) throws IllegalStateException {
checkState(hasCapacity());
inFlightBytesCounter += bytes;
}

/**
* Release previously {@link #seizeCapacity(long) seized} capacity. Called by {@link
* BatchingStateChangeUploader} (IO thread).
*/
public void releaseCapacity(long bytes) {
inFlightBytesCounter -= bytes;
}

/** Test whether some capacity is available. */
public boolean hasCapacity() {
return inFlightBytesCounter < maxBytesInFlight;
}
}
Loading

0 comments on commit f925481

Please sign in to comment.