Skip to content

Commit

Permalink
[FLINK-7705] Add JobDetailsHandler
Browse files Browse the repository at this point in the history
Add JobID(De)Serializer and JobVertexID(De)Serializer for jackson

This closes apache#4884.
  • Loading branch information
tillrohrmann committed Nov 7, 2017
1 parent 1c78dee commit de201a6
Show file tree
Hide file tree
Showing 13 changed files with 1,003 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
Expand Down Expand Up @@ -289,6 +291,17 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
TaskManagerDetailsHeaders.getInstance(),
resourceManagerRetriever,
metricFetcher);

final JobDetailsHandler jobDetailsHandler = new JobDetailsHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
JobDetailsHeaders.getInstance(),
executionGraphCache,
executor,
metricFetcher);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand Down Expand Up @@ -317,6 +330,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Handler returning the details for the specified job.
*/
public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {

private final MetricFetcher<?> metricFetcher;

public JobDetailsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
MetricFetcher<?> metricFetcher) {
super(
localRestAddress,
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
executionGraphCache,
executor);

this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}

@Override
protected JobDetailsInfo handleRequest(
HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
AccessExecutionGraph executionGraph) throws RestHandlerException {

final long now = System.currentTimeMillis();
final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
final long endTime = executionGraph.getState().isGloballyTerminalState() ?
executionGraph.getStatusTimestamp(executionGraph.getState()) : -1L;
final long duration = (endTime > 0L ? endTime : now) - startTime;

final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);

for (JobStatus jobStatus : JobStatus.values()) {
timestamps.put(jobStatus, executionGraph.getStatusTimestamp(jobStatus));
}

Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(executionGraph.getAllVertices().size());
int[] jobVerticesPerState = new int[ExecutionState.values().length];

for (AccessExecutionJobVertex accessExecutionJobVertex : executionGraph.getVerticesTopologically()) {
final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo = createJobVertexDetailsInfo(
accessExecutionJobVertex,
now,
executionGraph.getJobID(),
metricFetcher);

jobVertexInfos.add(vertexDetailsInfo);
jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
}

Map<ExecutionState, Integer> jobVerticesPerStateMap = new HashMap<>(ExecutionState.values().length);

for (ExecutionState executionState : ExecutionState.values()) {
jobVerticesPerStateMap.put(executionState, jobVerticesPerState[executionState.ordinal()]);
}

return new JobDetailsInfo(
executionGraph.getJobID(),
executionGraph.getJobName(),
executionGraph.isStoppable(),
executionGraph.getState(),
startTime,
endTime,
duration,
now,
timestamps,
jobVertexInfos,
jobVerticesPerStateMap,
executionGraph.getJsonPlan());
}

public static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
AccessExecutionJobVertex ejv,
long now,
JobID jobId,
MetricFetcher<?> metricFetcher) {
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
long endTime = 0;
boolean allFinished = true;

for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;

// take the earliest start time
long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
if (started > 0L) {
startTime = Math.min(startTime, started);
}

allFinished &= state.isTerminal();
endTime = Math.max(endTime, vertex.getStateTimestamp(state));
}

long duration;
if (startTime < Long.MAX_VALUE) {
if (allFinished) {
duration = endTime - startTime;
}
else {
endTime = -1L;
duration = now - startTime;
}
}
else {
startTime = -1L;
endTime = -1L;
duration = -1L;
}

ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());

Map<ExecutionState, Integer> tasksPerStateMap = new HashMap<>(tasksPerState.length);

for (ExecutionState executionState : ExecutionState.values()) {
tasksPerStateMap.put(executionState, tasksPerState[executionState.ordinal()]);
}

MutableIOMetrics counts = new MutableIOMetrics();

for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
counts.addIOMetrics(
vertex.getCurrentExecutionAttempt(),
metricFetcher,
jobId.toString(),
ejv.getJobVertexId().toString());
}

final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
counts.getNumBytesOut(),
counts.isNumBytesOutComplete(),
counts.getNumRecordsIn(),
counts.isNumRecordsInComplete(),
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete());

return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
ejv.getName(),
ejv.getParallelism(),
jobVertexState,
startTime,
endTime,
duration,
tasksPerStateMap,
jobVertexMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ public MutableIOMetrics() {
super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
}

public boolean isNumBytesInLocalComplete() {
return numBytesInLocalComplete;
}

public boolean isNumBytesInRemoteComplete() {
return numBytesInRemoteComplete;
}

public boolean isNumBytesOutComplete() {
return numBytesOutComplete;
}

public boolean isNumRecordsInComplete() {
return numRecordsInComplete;
}

public boolean isNumRecordsOutComplete() {
return numRecordsOutComplete;
}

/**
* Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
* a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -106,7 +106,7 @@ public class CheckpointStatistics implements ResponseBody {
private final int numAckSubtasks;

@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;

@JsonCreator
Expand All @@ -121,7 +121,7 @@ private CheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
this.id = id;
this.status = Preconditions.checkNotNull(status);
this.savepoint = savepoint;
Expand Down Expand Up @@ -309,7 +309,7 @@ public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
super(
Expand Down Expand Up @@ -388,7 +388,7 @@ public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
super(
Expand Down
Loading

0 comments on commit de201a6

Please sign in to comment.