Skip to content

Commit

Permalink
[FLINK-7855] [flip6] Port JobVertexAccumulatorsHandler to REST endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored and tillrohrmann committed Oct 20, 2017
1 parent 142ff78 commit 1348839
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
Expand All @@ -57,6 +58,7 @@
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
Expand Down Expand Up @@ -217,6 +219,14 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executionGraphCache,
executor);

JobVertexAccumulatorsHandler jobVertexAccumulatorsHandler = new JobVertexAccumulatorsHandler(
restAddressFuture,
leaderRetriever,
timeout,
JobVertexAccumulatorsHeaders.getInstance(),
executionGraphCache,
executor);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand Down Expand Up @@ -244,6 +254,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));

BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Request handler for the job vertex accumulators.
*/
public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<JobVertexAccumulatorsInfo, JobMessageParameters> {

public JobVertexAccumulatorsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
}

@Override
protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
if (null != jobVertex) {
StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
for (StringifiedAccumulatorResult acc : accs) {
userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
acc.getName(),
acc.getType(),
acc.getValue()));
}

return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList);
} else {
throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link JobVertexAccumulatorsHandler}.
*/
public class JobVertexAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobMessageParameters> {

private static final JobVertexAccumulatorsHeaders INSTANCE = new JobVertexAccumulatorsHeaders();

public static final String URL = "/jobs/:jobid/vertices/:vertexid/accumulators";

private JobVertexAccumulatorsHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public Class<JobVertexAccumulatorsInfo> getResponseClass() {
return JobVertexAccumulatorsInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public JobMessageParameters getUnresolvedMessageParameters() {
return new JobMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static JobVertexAccumulatorsHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Objects;

/**
* Response type of the {@link JobVertexAccumulatorsHandler}.
*/
public class JobVertexAccumulatorsInfo implements ResponseBody {

public static final String FIELD_NAME_ID = "id";
public static final String FIELD_NAME_USER_ACCUMULATORS = "user-accumulators";

@JsonProperty(FIELD_NAME_ID)
private String id;

@JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
private List<UserAccumulator> userAccumulatorList;

@JsonCreator
public JobVertexAccumulatorsInfo(
@JsonProperty(FIELD_NAME_ID) String id,
@JsonProperty(FIELD_NAME_USER_ACCUMULATORS) List<UserAccumulator> userAccumulatorList) {
this.id = id;
this.userAccumulatorList = userAccumulatorList;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JobVertexAccumulatorsInfo that = (JobVertexAccumulatorsInfo) o;
return Objects.equals(id, that.id) &&
Objects.equals(userAccumulatorList, that.userAccumulatorList);
}

@Override
public int hashCode() {
return Objects.hash(id, userAccumulatorList);
}

//---------------------------------------------------------------------------------
// Static helper classes
//---------------------------------------------------------------------------------

/**
* Json serializer for the {@link JobVertexAccumulatorsInfo}.
*/
public static final class UserAccumulator {

public static final String FIELD_NAME_ACC_NAME = "name";
public static final String FIELD_NAME_ACC_TYPE = "type";
public static final String FIELD_NAME_ACC_VALUE = "value";

@JsonProperty(FIELD_NAME_ACC_NAME)
private String name;

@JsonProperty(FIELD_NAME_ACC_TYPE)
private String type;

@JsonProperty(FIELD_NAME_ACC_VALUE)
private String value;

@JsonCreator
public UserAccumulator(
@JsonProperty(FIELD_NAME_ACC_NAME) String name,
@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
this.name = name;
this.type = type;
this.value = value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UserAccumulator that = (UserAccumulator) o;
return Objects.equals(name, that.name) &&
Objects.equals(type, that.type) &&
Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(name, type, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import java.util.ArrayList;
import java.util.List;

/**
* Tests that the {@link JobVertexAccumulatorsInfo} can be marshalled and unmarshalled.
*/
public class JobVertexAccumulatorsInfoTest extends RestResponseMarshallingTestBase<JobVertexAccumulatorsInfo> {
@Override
protected Class<JobVertexAccumulatorsInfo> getTestResponseClass() {
return JobVertexAccumulatorsInfo.class;
}

@Override
protected JobVertexAccumulatorsInfo getTestResponseInstance() throws Exception {
List<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>();
userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
"test name1",
"test type1",
"test value1"));
userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
"test name2",
"test type2",
"test value2"));
userAccumulatorList.add(new JobVertexAccumulatorsInfo.UserAccumulator(
"test name3",
"test type3",
"test value3"));

return new JobVertexAccumulatorsInfo("testId", userAccumulatorList);
}
}

0 comments on commit 1348839

Please sign in to comment.