From bf0cfa606487c742a38cfccfee780a613894a8d8 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 23 Apr 2020 10:19:50 +0200 Subject: [PATCH] [FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint --- .../runtime/dispatcher/DispatcherRestEndpoint.java | 4 ++-- ...faultDispatcherResourceManagerComponentFactory.java | 4 ++-- .../runtime/jobmaster/MiniDispatcherRestEndpoint.java | 4 ++-- .../flink/runtime/rest/JobRestEndpointFactory.java | 4 ++-- .../apache/flink/runtime/rest/RestEndpointFactory.java | 4 ++-- .../flink/runtime/rest/SessionRestEndpointFactory.java | 4 ++-- .../flink/runtime/webmonitor/WebMonitorEndpoint.java | 10 +++++----- .../rest/util/DocumentingDispatcherRestEndpoint.java | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 0bbb10f32b8a6..ba6f6781f22e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -42,7 +42,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * REST endpoint for the {@link Dispatcher} component. @@ -58,7 +58,7 @@ public DispatcherRestEndpoint( RestHandlerConfiguration restConfiguration, GatewayRetriever resourceManagerRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java index e1d1296a789a3..84b94a9bf4788 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java @@ -71,7 +71,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components. @@ -137,7 +137,7 @@ public DispatcherResourceManagerComponent create( 10, Time.milliseconds(50L)); - final ExecutorService executor = WebMonitorEndpoint.createExecutorService( + final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java index f11b6694f210f..e9f7d61aa8aaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import java.io.IOException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * REST endpoint for the {@link JobClusterEntrypoint}. @@ -46,7 +46,7 @@ public MiniDispatcherRestEndpoint( RestHandlerConfiguration restConfiguration, GatewayRetriever resourceManagerRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java index 21df8218af8fd..fecf83e1c34d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * {@link RestEndpointFactory} which creates a {@link MiniDispatcherRestEndpoint}. @@ -45,7 +45,7 @@ public WebMonitorEndpoint createRestEndpoint( LeaderGatewayRetriever dispatcherGatewayRetriever, LeaderGatewayRetriever resourceManagerGatewayRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java index 840a4beaf0e51..db16318cb029c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * {@link WebMonitorEndpoint} factory. @@ -43,7 +43,7 @@ WebMonitorEndpoint createRestEndpoint( LeaderGatewayRetriever dispatcherGatewayRetriever, LeaderGatewayRetriever resourceManagerGatewayRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java index 15d5fe29b03ad..b0911978846fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}. @@ -44,7 +44,7 @@ public WebMonitorEndpoint createRestEndpoint( LeaderGatewayRetriever dispatcherGatewayRetriever, LeaderGatewayRetriever resourceManagerGatewayRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index fc1887b130a1a..6cf61cfbcfffe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -144,8 +144,8 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -160,7 +160,7 @@ public class WebMonitorEndpoint extends RestServerEndp protected final RestHandlerConfiguration restConfiguration; private final GatewayRetriever resourceManagerRetriever; private final TransientBlobService transientBlobService; - protected final ExecutorService executor; + protected final ScheduledExecutorService executor; private final ExecutionGraphCache executionGraphCache; private final CheckpointStatsCache checkpointStatsCache; @@ -182,7 +182,7 @@ public WebMonitorEndpoint( RestHandlerConfiguration restConfiguration, GatewayRetriever resourceManagerRetriever, TransientBlobService transientBlobService, - ExecutorService executor, + ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws IOException { @@ -799,7 +799,7 @@ public Collection archiveJsonWithPath(AccessExecutionGraph graph) return archivedJson; } - public static ExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) { + public static ScheduledExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) { if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( String.format( @@ -809,7 +809,7 @@ public static ExecutorService createExecutorService(int numThreads, int threadPr threadPriority)); } - return Executors.newFixedThreadPool( + return Executors.newScheduledThreadPool( numThreads, new ExecutorThreadFactory.Builder() .setThreadPriority(threadPriority) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index e727ab2519816..c3db3c3c90ab4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -82,7 +82,7 @@ public DocumentingDispatcherRestEndpoint() throws IOException { handlerConfig, resourceManagerGatewayRetriever, NoOpTransientBlobService.INSTANCE, - Executors.newFixedThreadPool(1), + Executors.newScheduledThreadPool(1), VoidMetricFetcher.INSTANCE, NoOpElectionService.INSTANCE, NoOpFatalErrorHandler.INSTANCE);