Skip to content

Commit

Permalink
[FLINK-35158][runtime] Error handling in StateFuture's callback
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia committed May 8, 2024
1 parent 1c34ca0 commit bb0f442
Show file tree
Hide file tree
Showing 19 changed files with 664 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

/**
* StateFuture is a future that act as a return value for async state interfaces. Note: All these
Expand All @@ -40,7 +39,8 @@ public interface StateFuture<T> {
* @param <U> the function's return type.
* @return the new StateFuture.
*/
<U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn);
<U> StateFuture<U> thenApply(
FunctionWithException<? super T, ? extends U, ? extends Exception> fn);

/**
* Returns a new StateFuture that, when this future completes normally, is executed with this
Expand All @@ -49,7 +49,7 @@ public interface StateFuture<T> {
* @param action the action to perform before completing the returned StateFuture.
* @return the new StateFuture.
*/
StateFuture<Void> thenAccept(Consumer<? super T> action);
StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends Exception> action);

/**
* Returns a new future that, when this future completes normally, is executed with this future
Expand All @@ -58,7 +58,8 @@ public interface StateFuture<T> {
* @param action the action to perform.
* @return the new StateFuture.
*/
<U> StateFuture<U> thenCompose(Function<? super T, ? extends StateFuture<U>> action);
<U> StateFuture<U> thenCompose(
FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception> action);

/**
* Returns a new StateFuture that, when this and the other given future both complete normally,
Expand All @@ -71,5 +72,6 @@ public interface StateFuture<T> {
* @return the new StateFuture.
*/
<U, V> StateFuture<V> thenCombine(
StateFuture<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn);
StateFuture<? extends U> other,
BiFunctionWithException<? super T, ? super U, ? extends V, ? extends Exception> fn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.Public;

import java.util.function.Function;

/**
* A functional interface for a {@link java.util.function.Function} that may throw exceptions.
*
Expand All @@ -39,4 +41,24 @@ public interface FunctionWithException<T, R, E extends Throwable> {
* @throws E This function may throw an exception.
*/
R apply(T value) throws E;

/**
* Convert at {@link FunctionWithException} into a {@link Function}.
*
* @param functionWithException function with exception to convert into a function
* @param <A> input type
* @param <B> output type
* @return {@link Function} which throws all checked exception as an unchecked exception.
*/
static <A, B> Function<A, B> unchecked(FunctionWithException<A, B, ?> functionWithException) {
return (A a) -> {
try {
return functionWithException.apply(a);
} catch (Throwable t) {
ThrowingExceptionUtils.rethrow(t);
// we need this to appease the compiler :-(
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.Public;

import java.util.function.Consumer;

/**
* This interface is basically Java's {@link java.util.function.Consumer} interface enhanced with
* the ability to throw an exception.
Expand All @@ -38,4 +40,21 @@ public interface ThrowingConsumer<T, E extends Throwable> {
* @throws E on errors during consumption
*/
void accept(T t) throws E;

/**
* Convert a {@link ThrowingConsumer} into a {@link Consumer}.
*
* @param throwingConsumer Consumer with exception to convert into a {@link Consumer}.
* @param <A> input type
* @return {@link Consumer} which rethrows all checked exceptions as unchecked.
*/
static <A> Consumer<A> unchecked(ThrowingConsumer<A, ?> throwingConsumer) {
return (A a) -> {
try {
throwingConsumer.accept(a);
} catch (Throwable t) {
ThrowingExceptionUtils.rethrow(t);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,47 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.util.FlinkRuntimeException;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

/** A {@link StateFuture} that has already been completed when it is created. */
@Internal
public class CompletedStateFuture<T> implements InternalStateFuture<T> {

T result;

// no public access
CompletedStateFuture(T result) {
this.result = result;
}

@Override
public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
return StateFutureUtils.completedFuture(fn.apply(result));
public <U> StateFuture<U> thenApply(
FunctionWithException<? super T, ? extends U, ? extends Exception> fn) {
return StateFutureUtils.completedFuture(FunctionWithException.unchecked(fn).apply(result));
}

@Override
public StateFuture<Void> thenAccept(Consumer<? super T> action) {
action.accept(result);
public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends Exception> action) {
ThrowingConsumer.unchecked(action).accept(result);
return StateFutureUtils.completedVoidFuture();
}

@Override
public <U> StateFuture<U> thenCompose(Function<? super T, ? extends StateFuture<U>> action) {
return action.apply(result);
public <U> StateFuture<U> thenCompose(
FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception>
action) {
return FunctionWithException.unchecked(action).apply(result);
}

@Override
public <U, V> StateFuture<V> thenCombine(
StateFuture<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
StateFuture<? extends U> other,
BiFunctionWithException<? super T, ? super U, ? extends V, ? extends Exception> fn) {
return other.thenCompose(
(u) -> {
try {
V v = fn.apply(result, u);
return StateFutureUtils.completedFuture(v);
} catch (Throwable e) {
throw new FlinkRuntimeException("Error binding or executing callback", e);
}
V v = fn.apply(result, u);
return StateFutureUtils.completedFuture(v);
});
}

Expand All @@ -73,7 +70,7 @@ public void complete(T result) {
}

@Override
public void thenSyncAccept(Consumer<? super T> action) {
action.accept(result);
public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) {
ThrowingConsumer.unchecked(action).accept(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.StateFuture;

import java.util.function.Consumer;
import org.apache.flink.util.function.ThrowingConsumer;

/**
* The Internal definition of {@link StateFuture}, add some method that will be used by framework.
Expand All @@ -38,5 +37,5 @@ public interface InternalStateFuture<T> extends StateFuture<T> {
*
* @param action the action to perform.
*/
void thenSyncAccept(Consumer<? super T> action);
void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action);
}
Loading

0 comments on commit bb0f442

Please sign in to comment.