Skip to content

Commit

Permalink
[FLINK-8024] Let ClusterOverviewHandler directly extend from Abstract…
Browse files Browse the repository at this point in the history
…RestHandler

This closes apache#4982.
  • Loading branch information
tillrohrmann committed Nov 9, 2017
1 parent 34fdf56 commit f03393e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
Expand All @@ -44,12 +45,10 @@
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
Expand Down Expand Up @@ -143,15 +142,12 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
final Time timeout = restConfiguration.getTimeout();
final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();

LegacyRestHandlerAdapter<DispatcherGateway, ClusterOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
ClusterOverviewHandler<DispatcherGateway> clusterOverviewHandler = new ClusterOverviewHandler<>(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
ClusterOverviewHeaders.getInstance(),
new ClusterOverviewHandler(
executor,
timeout));
ClusterOverviewHeaders.getInstance());

LegacyRestHandlerAdapter<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>(
restAddressFuture,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Handler which returns the cluster overview information with version.
*
* @param <T> type of the leader gateway
*/
public class ClusterOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> {

private static final String version = EnvironmentInformation.getVersion();

private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;

public ClusterOverviewHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<T> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> messageHeaders) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
}

@Override
public CompletableFuture<ClusterOverviewWithVersion> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) {
CompletableFuture<ClusterOverview> overviewFuture = gateway.requestClusterOverview(timeout);

return overviewFuture.thenApply(
statusOverview -> ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.FlinkException;

Expand All @@ -48,7 +43,7 @@
* Responder that returns the status of the Flink cluster, such as how many
* TaskManagers are currently connected, and how many jobs are running.
*/
public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, ClusterOverviewWithVersion, EmptyMessageParameters> {
public class ClusterOverviewHandler extends AbstractJsonRequestHandler {

private static final String version = EnvironmentInformation.getVersion();

Expand Down Expand Up @@ -108,12 +103,4 @@ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParam
return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
}
}

@Override
public CompletableFuture<ClusterOverviewWithVersion> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
CompletableFuture<ClusterOverview> overviewFuture = gateway.requestClusterOverview(timeout);

return overviewFuture.thenApply(
statusOverview -> ClusterOverviewWithVersion.fromStatusOverview(statusOverview, version, commitID));
}
}

0 comments on commit f03393e

Please sign in to comment.