Skip to content

Commit

Permalink
[FLINK-28311][rest] Introduce JobManagerJobConfigurationHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
KarmaGYZ committed Jul 6, 2022
1 parent 1449e8d commit 4ab2536
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 0 deletions.
76 changes: 76 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -3021,6 +3021,82 @@
</tr>
</tbody>
</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
<td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/jobmanager/config</strong></h5></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
<td colspan="2">Returns the jobmanager's configuration of a specific job.</td>
</tr>
<tr>
<td colspan="2">Path parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<div class="book-expand">
<label>
<div class="book-expand-head flex justify-between">
<span>Request</span>
&nbsp; <span></span>
</div>
<input type="checkbox" class="hidden">
<div class="book-expand-content markdown-inner">
<pre>
<code>
{} </code>
</pre>
</div>
</label>
</div>
</td>
</tr>
<tr>
<td colspan="2">
<div class="book-expand">
<label>
<div class="book-expand-head flex justify-between">
<span>Response</span>
&nbsp; <span></span>
</div>
<input type="checkbox" class="hidden">
<div class="book-expand-content markdown-inner">
<pre>
<code>
{
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
"properties" : {
"key" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
} </code>
</pre>
</div>
</label>
</div>
</td>
</tr>
</tbody>
</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
Expand Down
23 changes: 23 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,29 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/JobExecutionResultResponseBody'
/jobs/{jobid}/jobmanager/config:
get:
description: Returns the jobmanager's configuration of a specific job.
operationId: getJobManagerJobConfiguration
parameters:
- name: jobid
in: path
description: 32-character hexadecimal string value that identifies a job.
required: true
schema:
$ref: '#/components/schemas/JobID'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
type: array
properties:
empty:
type: boolean
items:
$ref: '#/components/schemas/ConfigurationInfoEntry'
/jobs/{jobid}/metrics:
get:
description: Provides access to job metrics.
Expand Down
31 changes: 31 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,37 @@
}
}
}
}, {
"url" : "/jobs/:jobid/jobmanager/config",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "jobid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
"properties" : {
"key" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
}, {
"url" : "/jobs/:jobid/metrics",
"method" : "GET",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/** Handler which serves the jobmanager's configuration of a specific job. */
public class JobManagerJobConfigurationHandler
extends AbstractRestHandler<
RestfulGateway, EmptyRequestBody, ConfigurationInfo, JobMessageParameters>
implements JsonArchivist {
private final ConfigurationInfo jobConfig;

public JobManagerJobConfigurationHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters>
messageHeaders,
Configuration configuration) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);

Preconditions.checkNotNull(configuration);
this.jobConfig = ConfigurationInfo.from(configuration);
}

@Override
protected CompletableFuture<ConfigurationInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway)
throws RestHandlerException {
return CompletableFuture.completedFuture(jobConfig);
}

@Override
public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
throws IOException {
return Collections.singletonList(
new ArchivedJson(
JobManagerJobConfigurationHeaders.getInstance()
.getTargetRestEndpointURL()
.replace(
':' + JobIDPathParameter.KEY,
executionGraphInfo.getJobId().toString()),
jobConfig));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.job;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link
* org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler}.
*/
public class JobManagerJobConfigurationHeaders
implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters> {
private static final JobManagerJobConfigurationHeaders INSTANCE =
new JobManagerJobConfigurationHeaders();

public static final String JOBMANAGER_JOB_CONFIG_REST_PATH = "/jobs/:jobid/jobmanager/config";

private JobManagerJobConfigurationHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return JOBMANAGER_JOB_CONFIG_REST_PATH;
}

@Override
public Class<ConfigurationInfo> getResponseClass() {
return ConfigurationInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public JobMessageParameters getUnresolvedMessageParameters() {
return new JobMessageParameters();
}

public static JobManagerJobConfigurationHeaders getInstance() {
return INSTANCE;
}

@Override
public String getDescription() {
return "Returns the jobmanager's configuration of a specific job.";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
import org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobStatusHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
Expand Down Expand Up @@ -123,6 +124,7 @@
import org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeaders;
import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
Expand Down Expand Up @@ -335,6 +337,14 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
executionGraphCache,
executor);

JobManagerJobConfigurationHandler jobManagerJobConfigurationHandler =
new JobManagerJobConfigurationHandler(
leaderRetriever,
timeout,
responseHeaders,
JobManagerJobConfigurationHeaders.getInstance(),
clusterConfiguration);

CheckpointConfigHandler checkpointConfigHandler =
new CheckpointConfigHandler(
leaderRetriever,
Expand Down Expand Up @@ -762,6 +772,10 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
Tuple2.of(
jobVertexBackPressureHandler.getMessageHeaders(),
jobVertexBackPressureHandler));
handlers.add(
Tuple2.of(
jobManagerJobConfigurationHandler.getMessageHeaders(),
jobManagerJobConfigurationHandler));

final AbstractRestHandler<?, ?, ?, ?> jobVertexFlameGraphHandler;
if (clusterConfiguration.get(RestOptions.ENABLE_FLAMEGRAPH)) {
Expand Down
Loading

0 comments on commit 4ab2536

Please sign in to comment.