Skip to content

Commit

Permalink
[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint
Browse files Browse the repository at this point in the history
This closes apache#4930.
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent 430fa7b commit 712d4cf
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
Expand Down Expand Up @@ -307,6 +309,15 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executionGraphCache,
executor);

SubtasksTimesHandler subtasksTimesHandler = new SubtasksTimesHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
SubtasksTimesHeaders.getInstance(),
executionGraphCache,
executor);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Request handler for the subtasks times info.
*/
public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<SubtasksTimesInfo, JobVertexMessageParameters> {
public SubtasksTimesHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(
localRestAddress,
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
executionGraphCache,
executor);
}

@Override
protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) {
JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);

final String id = jobVertex.getJobVertexId().toString();
final String name = jobVertex.getName();
final long now = System.currentTimeMillis();
final List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>();

int num = 0;
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {

long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
ExecutionState status = vertex.getExecutionState();

long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];

long start = scheduledTime > 0 ? scheduledTime : -1;
long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
long duration = start >= 0 ? end - start : -1L;

TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();

Map<String, Long> timestampMap = new HashMap<>();
for (ExecutionState state : ExecutionState.values()) {
timestampMap.put(state.name(), timestamps[state.ordinal()]);
}

subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(
num++,
locationString,
duration,
timestampMap));
}
return new SubtasksTimesInfo(id, name, now, subtasks);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link SubtasksTimesHandler}.
*/
public class SubtasksTimesHeaders implements MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> {

private static final SubtasksTimesHeaders INSTANCE = new SubtasksTimesHeaders();

public static final String URL = "/jobs/:" + JobIDPathParameter.KEY +
"/vertices/:" + JobVertexIdPathParameter.KEY + "/subtasktimes";

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public Class<SubtasksTimesInfo> getResponseClass() {
return SubtasksTimesInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public JobVertexMessageParameters getUnresolvedMessageParameters() {
return new JobVertexMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static SubtasksTimesHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Response type of the {@link SubtasksTimesHandler}.
*/
public class SubtasksTimesInfo implements ResponseBody {

public static final String FIELD_NAME_ID = "id";
public static final String FIELD_NAME_NAME = "name";
public static final String FIELD_NAME_NOW = "now";
public static final String FIELD_NAME_SUBTASKS = "subtasks";

@JsonProperty(FIELD_NAME_ID)
private final String id;

@JsonProperty(FIELD_NAME_NAME)
private final String name;

@JsonProperty(FIELD_NAME_NOW)
private final long now;

@JsonProperty(FIELD_NAME_SUBTASKS)
private final List<SubtaskTimeInfo> subtasks;

@JsonCreator
public SubtasksTimesInfo(
@JsonProperty(FIELD_NAME_ID) String id,
@JsonProperty(FIELD_NAME_NAME) String name,
@JsonProperty(FIELD_NAME_NOW) long now,
@JsonProperty(FIELD_NAME_SUBTASKS) List<SubtaskTimeInfo> subtasks) {
this.id = checkNotNull(id);
this.name = checkNotNull(name);
this.now = now;
this.subtasks = checkNotNull(subtasks);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || this.getClass() != o.getClass()) {
return false;
}

SubtasksTimesInfo that = (SubtasksTimesInfo) o;
return Objects.equals(id, that.id) &&
Objects.equals(name, that.name) &&
now == that.now &&
Objects.equals(subtasks, that.subtasks);
}

@Override
public int hashCode() {
return Objects.hash(id, name, now, subtasks);
}

//---------------------------------------------------------------------------------
// Static helper classes
//---------------------------------------------------------------------------------

/**
* Nested class to encapsulate the sub task times info.
*/
public static final class SubtaskTimeInfo {

public static final String FIELD_NAME_SUBTASK = "subtask";
public static final String FIELD_NAME_HOST = "host";
public static final String FIELD_NAME_DURATION = "duration";
public static final String FIELD_NAME_TIMESTAMPS = "timestamps";

@JsonProperty(FIELD_NAME_SUBTASK)
private final int subtask;

@JsonProperty(FIELD_NAME_HOST)
private final String host;

@JsonProperty(FIELD_NAME_DURATION)
private final long duration;

@JsonProperty(FIELD_NAME_TIMESTAMPS)
private final Map<String, Long> timestamps;

public SubtaskTimeInfo(
@JsonProperty(FIELD_NAME_SUBTASK) int subtask,
@JsonProperty(FIELD_NAME_HOST) String host,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) {
this.subtask = subtask;
this.host = checkNotNull(host);
this.duration = duration;
this.timestamps = checkNotNull(timestamps);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (null == o || this.getClass() != o.getClass()) {
return false;
}

SubtaskTimeInfo that = (SubtaskTimeInfo) o;
return subtask == that.subtask &&
Objects.equals(host, that.host) &&
duration == that.duration &&
Objects.equals(timestamps, that.timestamps);
}

@Override
public int hashCode() {
return Objects.hash(subtask, host, duration, timestamps);
}
}
}
Loading

0 comments on commit 712d4cf

Please sign in to comment.