Skip to content

Commit

Permalink
[hotfix] Add BiConsumerWithException#unchecked to convert into BiCons…
Browse files Browse the repository at this point in the history
…umer
  • Loading branch information
tillrohrmann committed Sep 14, 2018
1 parent 3e5d07c commit 4942b95
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @param <E> type of the thrown exception
*/
@FunctionalInterface
public interface BiConsumerWithException<T, U, E extends Throwable> extends BiConsumer<T, U> {
public interface BiConsumerWithException<T, U, E extends Throwable> {

/**
* Performs this operation on the given arguments.
Expand All @@ -39,14 +39,23 @@ public interface BiConsumerWithException<T, U, E extends Throwable> extends BiCo
* @param u the second input argument
* @throws E in case of an error
*/
void acceptWithException(T t, U u) throws E;
void accept(T t, U u) throws E;

@Override
default void accept(T t, U u) {
try {
acceptWithException(t, u);
} catch (Throwable e) {
ExceptionUtils.rethrow(e);
}
/**
* Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
*
* @param biConsumerWithException BiConsumer with exception to convert into a {@link BiConsumer}.
* @param <A> first input type
* @param <B> second input type
* @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
*/
static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B, ?> biConsumerWithException) {
return (A a, B b) -> {
try {
biConsumerWithException.accept(a, b);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ private void rescaleJobGraph(Collection<JobVertexID> operators, int newParalleli
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
}

rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
rescalingBehaviour.accept(jobVertex, newParallelism);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Int
// rescaling is only executed if the operator can be set to the given parallelism
STRICT {
@Override
public void acceptWithException(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
public void accept(JobVertex jobVertex, Integer newParallelism) throws FlinkException {
if (jobVertex.getMaxParallelism() < newParallelism) {
throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() +
" because its maximum parallelism " + jobVertex.getMaxParallelism() +
Expand All @@ -42,7 +42,7 @@ public void acceptWithException(JobVertex jobVertex, Integer newParallelism) thr
// the new parallelism will be the minimum of the given parallelism and the maximum parallelism
RELAXED {
@Override
public void acceptWithException(JobVertex jobVertex, Integer newParallelism) {
public void accept(JobVertex jobVertex, Integer newParallelism) {
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism));
}
}
Expand Down

0 comments on commit 4942b95

Please sign in to comment.