Skip to content

Commit

Permalink
[FLINK-11405][rest] Add maxExceptions query parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 authored and zentol committed Oct 30, 2019
1 parent 4c87259 commit 785b7b2
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 12 deletions.
10 changes: 10 additions & 0 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1893,6 +1893,16 @@
</ul>
</td>
</tr>
<tr>
<td colspan="2">Query parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>maxExceptions</code> (optional): Comma-separated list of integer values that specifies the upper limit of exceptions to return.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#-1011644505">Request</button>
Expand Down
5 changes: 4 additions & 1 deletion flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,10 @@
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
"queryParameters" : [ {
"key" : "maxExceptions",
"mandatory" : false
} ]
},
"request" : {
"type" : "any"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
Expand All @@ -49,15 +50,15 @@
/**
* Handler serving the job exceptions.
*/
public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> implements JsonArchivist {
public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobExceptionsMessageParameters> implements JsonArchivist {

static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;

public JobExceptionsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> messageHeaders,
MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobExceptionsMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {

Expand All @@ -71,19 +72,21 @@ public JobExceptionsHandler(
}

@Override
protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
return createJobExceptionsInfo(executionGraph);
protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> request, AccessExecutionGraph executionGraph) {
List<Integer> exceptionToReportMaxSizes = request.getQueryParameter(UpperLimitExceptionParameter.class);
final int exceptionToReportMaxSize = exceptionToReportMaxSizes.size() > 0 ? exceptionToReportMaxSizes.get(0) : MAX_NUMBER_EXCEPTION_TO_REPORT;
return createJobExceptionsInfo(executionGraph, exceptionToReportMaxSize);
}

@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
ResponseBody json = createJobExceptionsInfo(graph);
ResponseBody json = createJobExceptionsInfo(graph, MAX_NUMBER_EXCEPTION_TO_REPORT);
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
return Collections.singletonList(new ArchivedJson(path, json));
}

private static JobExceptionsInfo createJobExceptionsInfo(AccessExecutionGraph executionGraph) {
private static JobExceptionsInfo createJobExceptionsInfo(AccessExecutionGraph executionGraph, int exceptionToReportMaxSize) {
ErrorInfo rootException = executionGraph.getFailureInfo();
String rootExceptionMessage = null;
Long rootTimestamp = null;
Expand All @@ -97,7 +100,7 @@ private static JobExceptionsInfo createJobExceptionsInfo(AccessExecutionGraph ex
for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
String t = task.getFailureCauseAsString();
if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
if (taskExceptionList.size() >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
if (taskExceptionList.size() >= exceptionToReportMaxSize) {
truncated = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link JobExceptionsHandler}.
*/
public class JobExceptionsHeaders implements MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> {
public class JobExceptionsHeaders implements MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobExceptionsMessageParameters> {

private static final JobExceptionsHeaders INSTANCE = new JobExceptionsHeaders();

Expand All @@ -50,8 +51,8 @@ public HttpResponseStatus getResponseStatusCode() {
}

@Override
public JobMessageParameters getUnresolvedMessageParameters() {
return new JobMessageParameters();
public JobExceptionsMessageParameters getUnresolvedMessageParameters() {
return new JobExceptionsMessageParameters();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
Expand Down Expand Up @@ -81,6 +82,26 @@ public int hashCode() {
return Objects.hash(rootException, rootTimestamp, allExceptions, truncated);
}

@JsonIgnore
public String getRootException() {
return rootException;
}

@JsonIgnore
public Long getRootTimestamp() {
return rootTimestamp;
}

@JsonIgnore
public List<ExecutionExceptionInfo> getAllExceptions() {
return allExceptions;
}

@JsonIgnore
public boolean isTruncated() {
return truncated;
}

//---------------------------------------------------------------------------------
// Static helper classes
//---------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.job;

import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;

import java.util.Collection;
import java.util.Collections;

/**
* {@link MessageParameters} for {@link JobExceptionsHandler}.
*/
public class JobExceptionsMessageParameters extends JobMessageParameters {

private final UpperLimitExceptionParameter upperLimitExceptionParameter = new UpperLimitExceptionParameter();

@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
return Collections.singletonList(upperLimitExceptionParameter);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.job;

import org.apache.flink.runtime.rest.messages.MessageParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;

/**
* Specifies the upper limit of exceptions to return for JobExceptionsHandler.
* @see org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler
*/
public class UpperLimitExceptionParameter extends MessageQueryParameter<Integer> {

public static final String KEY = "maxExceptions";

public UpperLimitExceptionParameter() {
super(KEY, MessageParameter.MessageParameterRequisiteness.OPTIONAL);
}

@Override
public Integer convertStringToValue(String value) {
return Integer.valueOf(value);
}

@Override
public String convertValueToString(Integer value) {
return value.toString();
}

@Override
public String getDescription() {
return "Comma-separated list of integer values that specifies the upper limit of exceptions to return.";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

/**
* Test for the {@link JobExceptionsHandler}.
*/
public class JobExceptionsHandlerTest extends TestLogger {

@Test
public void testGetJobExceptionsInfo() throws HandlerRequestException {
final JobExceptionsHandler jobExceptionsHandler = new JobExceptionsHandler(
() -> null,
TestingUtils.TIMEOUT(),
Collections.emptyMap(),
JobExceptionsHeaders.getInstance(),
new ExecutionGraphCache(TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()),
TestingUtils.defaultExecutor());
final int numExceptions = 20;
final AccessExecutionGraph archivedExecutionGraph = createAccessExecutionGraph(numExceptions);
checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, numExceptions, 10);
checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, numExceptions, numExceptions);
checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, numExceptions, 30);
}

private static void checkExceptionLimit(JobExceptionsHandler jobExceptionsHandler, AccessExecutionGraph graph, int maxNumExceptions, int numExpectedException) throws HandlerRequestException {
final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> handlerRequest = createRequest(graph.getJobID(), numExpectedException);
final JobExceptionsInfo jobExceptionsInfo = jobExceptionsHandler.handleRequest(handlerRequest, graph);
final int numReportedException = maxNumExceptions >= numExpectedException ? numExpectedException : maxNumExceptions;
assertEquals(jobExceptionsInfo.getAllExceptions().size(), numReportedException);
}

private static AccessExecutionGraph createAccessExecutionGraph(int numTasks) {
Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
for (int i = 0; i < numTasks; i++) {
final JobVertexID jobVertexId = new JobVertexID();
tasks.put(jobVertexId, createArchivedExecutionJobVertex(jobVertexId));
}
return new ArchivedExecutionGraphBuilder()
.setTasks(tasks)
.build();
}

private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex(JobVertexID jobVertexID) {
final StringifiedAccumulatorResult[] emptyAccumulators = new StringifiedAccumulatorResult[0];
final long[] timestamps = new long[ExecutionState.values().length];
final ExecutionState expectedState = ExecutionState.RUNNING;

final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation();
final AllocationID allocationID = new AllocationID();

final int subtaskIndex = 1;
final int attempt = 2;
return new ArchivedExecutionJobVertex(
new ArchivedExecutionVertex[]{
new ArchivedExecutionVertex(
subtaskIndex,
"test task",
new ArchivedExecution(
new StringifiedAccumulatorResult[0],
null,
new ExecutionAttemptID(),
attempt,
expectedState,
"error",
assignedResourceLocation,
allocationID,
subtaskIndex,
timestamps),
new EvictingBoundedList<>(0)
)
},
jobVertexID,
jobVertexID.toString(),
1,
1,
ResourceProfile.UNKNOWN,
emptyAccumulators);
}

private static HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> createRequest(JobID jobId, int size) throws HandlerRequestException {
final Map<String, String> pathParameters = new HashMap<>();
pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
final Map<String, List<String>> queryParameters = new HashMap<>();
queryParameters.put(UpperLimitExceptionParameter.KEY, Collections.singletonList("" + size));

return new HandlerRequest<>(
EmptyRequestBody.getInstance(),
new JobExceptionsMessageParameters(),
pathParameters,
queryParameters);
}
}

0 comments on commit 785b7b2

Please sign in to comment.