From 1449e8da48b5cf798fad32a71d9bbf7c927c5acf Mon Sep 17 00:00:00 2001 From: Yangze Guo Date: Wed, 1 Jun 2022 16:23:29 +0800 Subject: [PATCH] [FLINK-28311][rest] Introduce JobManagerEnvironmentHandler --- .../generated/rest_v1_dispatcher.html | 96 +++++++++ docs/static/generated/rest_v1_dispatcher.yml | 42 ++++ .../src/test/resources/rest_api_v1.snapshot | 59 ++++++ .../cluster/JobManagerEnvironmentHandler.java | 60 ++++++ .../rest/messages/EnvironmentInfo.java | 184 ++++++++++++++++++ .../JobManagerEnvironmentHeaders.java | 71 +++++++ .../webmonitor/WebMonitorEndpoint.java | 13 ++ .../rest/messages/EnvironmentInfoTest.java | 32 +++ 8 files changed, 557 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 3683cbca3d670..0846f9d1c1d48 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -876,6 +876,102 @@ + + + + + + + + + + + + + + + + + + + +
/jobmanager/environment
Verb: GETResponse code: 200 OK
Returns the jobmanager environment.
+
+ +
+
+
+ +
+
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index d8c0cb3b8e830..3f9d37dace058 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -273,6 +273,17 @@ paths: type: boolean items: $ref: '#/components/schemas/ConfigurationInfoEntry' + /jobmanager/environment: + get: + description: Returns the jobmanager environment. + operationId: getJobManagerEnvironment + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/EnvironmentInfo' /jobmanager/logs: get: description: Returns the list of log files on the JobManager. @@ -2021,6 +2032,17 @@ components: type: object allOf: - $ref: '#/components/schemas/SubtaskCheckpointStatistics' + JVMInfo: + type: object + properties: + version: + type: string + arch: + type: string + options: + type: array + items: + type: string LogInfo: type: object properties: @@ -2891,6 +2913,19 @@ components: - FAILED - CANCELED - UNKNOWN + EnvironmentInfo: + type: object + properties: + environment: + type: array + items: + $ref: '#/components/schemas/EnvironmentVariableItem' + jvm: + $ref: '#/components/schemas/JVMInfo' + classpath: + type: array + items: + type: string Summary: type: object properties: @@ -2918,6 +2953,13 @@ components: type: string value: type: string + EnvironmentVariableItem: + type: object + properties: + key: + type: string + value: + type: string TriggerResponse: type: object properties: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index c390c7f600b06..37a2482703cb3 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -483,6 +483,65 @@ } } } + }, { + "url" : "/jobmanager/environment", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo", + "properties" : { + "environment" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:EnvironmentVariableItem", + "properties" : { + "key" : { + "type" : "string" + }, + "value" : { + "type" : "string" + } + } + } + }, + "jvm" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EnvironmentInfo:JVMInfo", + "properties" : { + "version" : { + "type" : "string" + }, + "arch" : { + "type" : "string" + }, + "options" : { + "type" : "array", + "items" : { + "type" : "string" + } + } + } + }, + "classpath" : { + "type" : "array", + "items" : { + "type" : "string" + } + } + } + } }, { "url" : "/jobmanager/logs", "method" : "GET", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java new file mode 100644 index 0000000000000..362b89fc14f68 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/JobManagerEnvironmentHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.cluster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EnvironmentInfo; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler which serves the jobmanager's environment variables. */ +public class JobManagerEnvironmentHandler + extends AbstractRestHandler< + RestfulGateway, EmptyRequestBody, EnvironmentInfo, EmptyMessageParameters> { + private final EnvironmentInfo environmentInfo; + + public JobManagerEnvironmentHandler( + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders + messageHeaders) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders); + + this.environmentInfo = EnvironmentInfo.create(); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway) + throws RestHandlerException { + return CompletableFuture.completedFuture(environmentInfo); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java new file mode 100644 index 0000000000000..44c80388ebe9e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EnvironmentInfo.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** The response of environment info. */ +public class EnvironmentInfo implements ResponseBody { + + private static final String FIELD_NAME_ENVIRONMENT_INFO = "environment"; + + private static final String FIELD_NAME_JVM_INFO = "jvm"; + + private static final String FIELD_NAME_CLASSPATH = "classpath"; + + @JsonProperty(FIELD_NAME_ENVIRONMENT_INFO) + private final List environmentVariables; + + @JsonProperty(FIELD_NAME_JVM_INFO) + private final JVMInfo jvmInfo; + + @JsonProperty(FIELD_NAME_CLASSPATH) + private final List classpath; + + @JsonCreator + public EnvironmentInfo( + @JsonProperty(FIELD_NAME_ENVIRONMENT_INFO) + List environmentVariables, + @JsonProperty(FIELD_NAME_JVM_INFO) JVMInfo jvmInfo, + @JsonProperty(FIELD_NAME_CLASSPATH) List classpath) { + this.environmentVariables = environmentVariables; + this.jvmInfo = jvmInfo; + this.classpath = classpath; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EnvironmentInfo that = (EnvironmentInfo) o; + return environmentVariables.equals(that.environmentVariables) + && jvmInfo.equals(that.jvmInfo) + && classpath.equals(that.classpath); + } + + @Override + public int hashCode() { + return Objects.hash(environmentVariables, jvmInfo, classpath); + } + + public static EnvironmentInfo create() { + List environmentVariableItems = new ArrayList<>(); + System.getenv() + .forEach( + (key, value) -> + environmentVariableItems.add( + new EnvironmentVariableItem(key, value))); + + return new EnvironmentInfo( + environmentVariableItems, + JVMInfo.create(), + Arrays.asList(System.getProperty("java.class.path").split(":"))); + } + + /** A single key-value pair entry in the {@link EnvironmentInfo} response. */ + private static class EnvironmentVariableItem { + private static final String FIELD_NAME_KEY = "key"; + + private static final String FIELD_NAME_VALUE = "value"; + + @JsonProperty(FIELD_NAME_KEY) + private final String key; + + @JsonProperty(FIELD_NAME_VALUE) + private final String value; + + @JsonCreator + public EnvironmentVariableItem( + @JsonProperty(FIELD_NAME_KEY) String key, + @JsonProperty(FIELD_NAME_VALUE) String value) { + this.key = key; + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EnvironmentVariableItem that = (EnvironmentVariableItem) o; + return key.equals(that.key) && value.equals(that.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } + } + + /** JVM information. */ + private static class JVMInfo { + private static final String FIELD_NAME_VERSION = "version"; + + private static final String FIELD_NAME_ARCH = "arch"; + + private static final String FIELD_NAME_OPTIONS = "options"; + + @JsonProperty(FIELD_NAME_VERSION) + private final String version; + + @JsonProperty(FIELD_NAME_ARCH) + private final String arch; + + @JsonProperty(FIELD_NAME_OPTIONS) + private final List options; + + @JsonCreator + public JVMInfo( + @JsonProperty(FIELD_NAME_VERSION) String version, + @JsonProperty(FIELD_NAME_ARCH) String arch, + @JsonProperty(FIELD_NAME_OPTIONS) List options) { + this.version = version; + this.arch = arch; + this.options = options; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JVMInfo that = (JVMInfo) o; + return version.equals(that.version) + && arch.equals(that.arch) + && options.equals(that.options); + } + + @Override + public int hashCode() { + return Objects.hash(version, arch, options); + } + + private static JVMInfo create() { + return new JVMInfo( + EnvironmentInformation.getJvmVersion(), + System.getProperty("os.arch"), + Arrays.asList(EnvironmentInformation.getJvmStartupOptionsArray())); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java new file mode 100644 index 0000000000000..f6b0aca14d0dd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.cluster.JobManagerEnvironmentHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Message headers for the {@link JobManagerEnvironmentHandler}. */ +public class JobManagerEnvironmentHeaders + implements MessageHeaders { + private static final JobManagerEnvironmentHeaders INSTANCE = new JobManagerEnvironmentHeaders(); + + public static final String JOB_MANAGER_ENV_REST_PATH = "/jobmanager/environment"; + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return JOB_MANAGER_ENV_REST_PATH; + } + + @Override + public Class getResponseClass() { + return EnvironmentInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Returns the jobmanager environment."; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + public static JobManagerEnvironmentHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 3b21be96973b8..a77c3d5217a62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler; import org.apache.flink.runtime.rest.handler.cluster.JobManagerCustomLogHandler; +import org.apache.flink.runtime.rest.handler.cluster.JobManagerEnvironmentHandler; import org.apache.flink.runtime.rest.handler.cluster.JobManagerLogFileHandler; import org.apache.flink.runtime.rest.handler.cluster.JobManagerLogListHandler; import org.apache.flink.runtime.rest.handler.cluster.JobManagerThreadDumpHandler; @@ -99,6 +100,7 @@ import org.apache.flink.runtime.rest.messages.JobConfigHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; +import org.apache.flink.runtime.rest.messages.JobManagerEnvironmentHeaders; import org.apache.flink.runtime.rest.messages.JobPlanHeaders; import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders; @@ -317,6 +319,13 @@ protected List> initiali ClusterConfigurationInfoHeaders.getInstance(), clusterConfiguration); + JobManagerEnvironmentHandler jobManagerEnvironmentHandler = + new JobManagerEnvironmentHandler( + leaderRetriever, + timeout, + responseHeaders, + JobManagerEnvironmentHeaders.getInstance()); + JobConfigHandler jobConfigHandler = new JobConfigHandler( leaderRetriever, @@ -660,6 +669,10 @@ protected List> initiali Tuple2.of( clusterConfigurationHandler.getMessageHeaders(), clusterConfigurationHandler)); + handlers.add( + Tuple2.of( + jobManagerEnvironmentHandler.getMessageHeaders(), + jobManagerEnvironmentHandler)); handlers.add(Tuple2.of(dashboardConfigHandler.getMessageHeaders(), dashboardConfigHandler)); handlers.add(Tuple2.of(jobIdsHandler.getMessageHeaders(), jobIdsHandler)); handlers.add(Tuple2.of(jobStatusHandler.getMessageHeaders(), jobStatusHandler)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java new file mode 100644 index 0000000000000..6b8696c06ac7c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/EnvironmentInfoTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** Tests for the {@link EnvironmentInfo}. */ +public class EnvironmentInfoTest extends RestResponseMarshallingTestBase { + @Override + protected Class getTestResponseClass() { + return EnvironmentInfo.class; + } + + @Override + protected EnvironmentInfo getTestResponseInstance() { + return EnvironmentInfo.create(); + } +}