Skip to content

Commit

Permalink
[FLINK-7505] Use lambdas in suppressed exception idiom
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Aug 24, 2017
1 parent ca87bec commit 5456cf9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 22 deletions.
63 changes: 63 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* This class offers utility functions for Java's lambda features.
*/
public final class LambdaUtil {

private LambdaUtil() {
throw new AssertionError();
}

/**
* This method supplies all elements from the input to the consumer. Exceptions that happen on elements are
* suppressed until all elements are processed. If exceptions happened for one or more of the inputs, they are
* reported in a combining suppressed exception.
*
* @param inputs iterator for all inputs to the throwingConsumer.
* @param throwingConsumer this consumer will be called for all elements delivered by the input iterator.
* @param <T> the type of input.
* @throws Exception collected exceptions that happened during the invocation of the consumer on the input elements.
*/
public static <T> void applyToAllWhileSuppressingExceptions(
Iterable<T> inputs,
ThrowingConsumer<T> throwingConsumer) throws Exception {

if (inputs != null && throwingConsumer != null) {
Exception exception = null;

for (T input : inputs) {

if (input != null) {
try {
throwingConsumer.accept(input);
} catch (Exception ex) {
exception = ExceptionUtils.firstOrSuppressed(ex, exception);
}
}
}

if (exception != null) {
throw exception;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* This interface is basically Java's {@link java.util.function.Consumer} interface enhanced with the ability to throw
* an exception.
*
* @param <T> type of the consumed elements.
*/
@FunctionalInterface
public interface ThrowingConsumer<T> {

/**
* Performs this operation on the given argument.
*
* @param t the input argument
* @throws Exception on errors during consumption
*/
void accept(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.runtime.state;

import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.LambdaUtil;

import java.util.concurrent.RunnableFuture;

Expand Down Expand Up @@ -49,27 +49,8 @@ public static long getStateSize(StateObject handle) {
* @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
*/
public static void bestEffortDiscardAllStateObjects(
Iterable<? extends StateObject> handlesToDiscard) throws Exception {

if (handlesToDiscard != null) {
Exception exception = null;

for (StateObject state : handlesToDiscard) {

if (state != null) {
try {
state.discardState();
}
catch (Exception ex) {
exception = ExceptionUtils.firstOrSuppressed(ex, exception);
}
}
}

if (exception != null) {
throw exception;
}
}
Iterable<? extends StateObject> handlesToDiscard) throws Exception {
LambdaUtil.applyToAllWhileSuppressingExceptions(handlesToDiscard, StateObject::discardState);
}

/**
Expand Down

0 comments on commit 5456cf9

Please sign in to comment.