From 89548bafb092c0b9b92bb6e9c3616e2644c765e8 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 15 Nov 2018 17:05:16 +0100 Subject: [PATCH] [hotfix] [s3] Minor fixes/code simplifications in S3 recoverable writer --- .../common/utils/BackPressuringExecutor.java | 5 +--- .../fs/s3/common/writer/S3ConfigOptions.java | 30 ------------------- 2 files changed, 1 insertion(+), 34 deletions(-) delete mode 100644 flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java index d0dd7c8f73440..e78e83de4252e 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/BackPressuringExecutor.java @@ -23,7 +23,6 @@ import org.apache.flink.util.FlinkRuntimeException; import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,11 +65,9 @@ public void execute(Runnable command) { final SemaphoreReleasingRunnable runnable = new SemaphoreReleasingRunnable(command, permits); try { delegate.execute(runnable); - } catch (RejectedExecutionException e) { + } catch (Throwable e) { runnable.release(); ExceptionUtils.rethrow(e, e.getMessage()); - } catch (Throwable t) { - ExceptionUtils.rethrow(t, t.getMessage()); } } diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java deleted file mode 100644 index 421880554e5e8..0000000000000 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3ConfigOptions.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.fs.s3.common.writer; - -/** - * Configuration keys for the S3 file system based using Hadoop's s3a. - */ -public final class S3ConfigOptions { - - // ------------------------------------------------------------------------ - - /** Not meant to be instantiated. */ - private S3ConfigOptions() {} -}