From 8ff0c3f887da789a599a5b0ef64e954f37261d42 Mon Sep 17 00:00:00 2001
From: Wencong Liu <liuwenclever@163.com>
Date: Tue, 26 Jul 2022 17:20:02 +0800
Subject: [PATCH] [FLINK-27769][sql-gateway]Adjust the RestAPIVersion and
 RestfulGateway to adapt a variety of endpoints

---
 .../program/rest/RestClusterClientTest.java   |   4 +-
 .../flink/docs/rest/OpenApiSpecGenerator.java |  10 +-
 .../flink/docs/rest/RestAPIDocGenerator.java  |   8 +-
 .../docs/rest/OpenApiSpecGeneratorTest.java   |   6 +-
 .../docs/rest/RestAPIDocGeneratorTest.java    |   4 +-
 .../rest/data/TestEmptyMessageHeaders.java    |  10 +-
 .../rest/data/TestExcludeMessageHeaders.java  |  11 +-
 .../handlers/AbstractJarPlanHeaders.java      |   5 +-
 .../webmonitor/handlers/JarDeleteHeaders.java |   5 +-
 .../webmonitor/handlers/JarListHeaders.java   |   4 +-
 .../webmonitor/handlers/JarRunHeaders.java    |   4 +-
 .../webmonitor/handlers/JarUploadHeaders.java |   4 +-
 .../compatibility/RestAPIStabilityTest.java   |  10 +-
 .../apache/flink/runtime/rest/RestClient.java |   6 +-
 .../runtime/rest/RestServerEndpoint.java      |  20 ++--
 .../handler/RestHandlerSpecification.java     |   5 +-
 ...chronousOperationStatusMessageHeaders.java |   4 +-
 ...hronousOperationTriggerMessageHeaders.java |   4 +-
 .../files/WebContentHandlerSpecification.java |   9 ++
 .../ClusterConfigurationInfoHeaders.java      |   3 +-
 .../rest/messages/ClusterOverviewHeaders.java |   2 +-
 .../DashboardConfigurationHeaders.java        |   2 +-
 .../rest/messages/JobAccumulatorsHeaders.java |   2 +-
 .../rest/messages/JobCancellationHeaders.java |   2 +-
 .../rest/messages/JobConfigHeaders.java       |   2 +-
 .../rest/messages/JobExceptionsHeaders.java   |   2 +-
 .../JobIdsWithStatusesOverviewHeaders.java    |   2 +-
 .../JobManagerEnvironmentHeaders.java         |   3 +-
 .../messages/JobManagerLogUrlHeaders.java     |   2 +-
 .../runtime/rest/messages/JobPlanHeaders.java |   2 +-
 .../JobVertexAccumulatorsHeaders.java         |   2 +-
 .../JobVertexBackPressureHeaders.java         |   2 +-
 .../messages/JobVertexDetailsHeaders.java     |   2 +-
 .../messages/JobVertexFlameGraphHeaders.java  |   2 +-
 .../JobVertexTaskManagersHeaders.java         |   2 +-
 .../rest/messages/JobsOverviewHeaders.java    |   3 +-
 .../rest/messages/RuntimeMessageHeaders.java  |  45 ++++++++
 .../RuntimeUntypedResponseMessageHeaders.java |  39 +++++++
 .../SubtasksAllAccumulatorsHeaders.java       |   2 +-
 .../rest/messages/SubtasksTimesHeaders.java   |   3 +-
 .../messages/TaskManagerLogUrlHeaders.java    |   2 +-
 .../UntypedResponseMessageHeaders.java        |   2 +-
 .../YarnCancelJobTerminationHeaders.java      |   9 ++
 .../YarnStopJobTerminationHeaders.java        |   9 ++
 .../checkpoints/CheckpointConfigHeaders.java  |   5 +-
 .../CheckpointStatisticDetailsHeaders.java    |   4 +-
 .../CheckpointingStatisticsHeaders.java       |   5 +-
 .../TaskCheckpointStatisticsHeaders.java      |   4 +-
 .../cluster/JobManagerCustomLogHeaders.java   |   4 +-
 .../cluster/JobManagerLogFileHeader.java      |   4 +-
 .../cluster/JobManagerLogListHeaders.java     |   4 +-
 .../cluster/JobManagerStdoutFileHeader.java   |   4 +-
 .../cluster/JobManagerThreadDumpHeaders.java  |   4 +-
 .../messages/cluster/ShutdownHeaders.java     |   5 +-
 .../dataset/ClusterDataSetListHeaders.java    |   4 +-
 .../rest/messages/job/JobDetailsHeaders.java  |   4 +-
 .../job/JobExecutionResultHeaders.java        |   3 +-
 .../JobManagerJobConfigurationHeaders.java    |   5 +-
 .../job/JobManagerJobEnvironmentHeaders.java  |   4 +-
 .../messages/job/JobStatusInfoHeaders.java    |   3 +-
 .../rest/messages/job/JobSubmitHeaders.java   |   4 +-
 .../SubtaskCurrentAttemptDetailsHeaders.java  |   4 +-
 ...skExecutionAttemptAccumulatorsHeaders.java |   4 +-
 ...SubtaskExecutionAttemptDetailsHeaders.java |   4 +-
 .../ClientCoordinationHeaders.java            |   4 +-
 .../AbstractAggregatedMetricsHeaders.java     |   3 +-
 .../job/metrics/AbstractMetricsHeaders.java   |   3 +-
 .../metrics/JobVertexWatermarksHeaders.java   |   3 +-
 .../TaskManagerCustomLogHeaders.java          |   4 +-
 .../TaskManagerDetailsHeaders.java            |   4 +-
 .../TaskManagerLogFileHeaders.java            |   5 +-
 .../taskmanager/TaskManagerLogsHeaders.java   |   5 +-
 .../TaskManagerStdoutFileHeaders.java         |   5 +-
 .../TaskManagerThreadDumpHeaders.java         |   5 +-
 .../taskmanager/TaskManagersHeaders.java      |   5 +-
 .../rest/versioning/RestAPIVersion.java       |  72 ++-----------
 .../versioning/RuntimeRestAPIVersion.java     |  74 +++++++++++++
 .../NonLeaderRetrievalRestfulGateway.java     | 102 ++++++++++++++++++
 .../runtime/rest/MultipartUploadResource.java |   7 +-
 .../flink/runtime/rest/RestClientTest.java    |   9 +-
 .../rest/RestExternalHandlersITCase.java      |   4 +-
 .../rest/RestServerEndpointITCase.java        |  35 +++---
 .../rest/handler/AbstractHandlerTest.java     |   7 ++
 .../AbstractTaskManagerFileHandlerTest.java   |   7 ++
 .../runtime/rest/util/TestMessageHeaders.java |   4 +-
 ...st.java => RuntimeRestAPIVersionTest.java} |  15 +--
 86 files changed, 516 insertions(+), 229 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeMessageHeaders.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeUntypedResponseMessageHeaders.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersion.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
 rename flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/{RestAPIVersionTest.java => RuntimeRestAPIVersionTest.java} (72%)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 1a862599fe282..7624bb37363c6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -70,6 +70,7 @@
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
@@ -856,7 +857,8 @@ protected CompletableFuture<EmptyResponseBody> handleRequest(
     }
 
     private static final class PingRestHandlerHeaders
-            implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+            implements RuntimeMessageHeaders<
+                    EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
         static final PingRestHandlerHeaders INSTANCE = new PingRestHandlerHeaders();
 
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
index 4629304195a82..0265ff2c9570b 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
@@ -39,7 +39,7 @@
 import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer;
 import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.util.ConfigurationException;
@@ -114,8 +114,8 @@ public class OpenApiSpecGenerator {
     public static void main(String[] args) throws IOException, ConfigurationException {
         String outputDirectory = args[0];
 
-        for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
-            if (apiVersion == RestAPIVersion.V0) {
+        for (final RuntimeRestAPIVersion apiVersion : RuntimeRestAPIVersion.values()) {
+            if (apiVersion == RuntimeRestAPIVersion.V0) {
                 // this version exists only for testing purposes
                 continue;
             }
@@ -130,7 +130,7 @@ public static void main(String[] args) throws IOException, ConfigurationExceptio
 
     @VisibleForTesting
     static void createDocumentationFile(
-            DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile)
+            DocumentingRestEndpoint restEndpoint, RuntimeRestAPIVersion apiVersion, Path outputFile)
             throws IOException {
         final OpenAPI openApi = new OpenAPI();
 
@@ -167,7 +167,7 @@ private static boolean shouldBeDocumented(MessageHeaders spec) {
         return spec.getClass().getAnnotation(Documentation.ExcludeFromDocumentation.class) == null;
     }
 
-    private static void setInfo(final OpenAPI openApi, final RestAPIVersion apiVersion) {
+    private static void setInfo(final OpenAPI openApi, final RuntimeRestAPIVersion apiVersion) {
         openApi.info(
                 new Info()
                         .title("Flink JobManager REST API")
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 5307bcf2362d7..489bc6415ed54 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -30,7 +30,7 @@
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.ConfigurationException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -114,8 +114,8 @@ public class RestAPIDocGenerator {
     public static void main(String[] args) throws IOException, ConfigurationException {
         String outputDirectory = args[0];
 
-        for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
-            if (apiVersion == RestAPIVersion.V0) {
+        for (final RuntimeRestAPIVersion apiVersion : RuntimeRestAPIVersion.values()) {
+            if (apiVersion == RuntimeRestAPIVersion.V0) {
                 // this version exists only for testing purposes
                 continue;
             }
@@ -130,7 +130,7 @@ public static void main(String[] args) throws IOException, ConfigurationExceptio
 
     @VisibleForTesting
     static void createHtmlFile(
-            DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile)
+            DocumentingRestEndpoint restEndpoint, RuntimeRestAPIVersion apiVersion, Path outputFile)
             throws IOException {
         StringBuilder html = new StringBuilder();
 
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/rest/OpenApiSpecGeneratorTest.java b/flink-docs/src/test/java/org/apache/flink/docs/rest/OpenApiSpecGeneratorTest.java
index e2d6912323e54..baa8aea319680 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/rest/OpenApiSpecGeneratorTest.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/rest/OpenApiSpecGeneratorTest.java
@@ -23,7 +23,7 @@
 import org.apache.flink.docs.rest.data.TestExcludeMessageHeaders;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
@@ -45,7 +45,7 @@ class OpenApiSpecGeneratorTest {
     void testExcludeFromDocumentation() throws Exception {
         File file = File.createTempFile("rest_v0_", ".html");
         OpenApiSpecGenerator.createDocumentationFile(
-                new TestExcludeDocumentingRestEndpoint(), RestAPIVersion.V0, file.toPath());
+                new TestExcludeDocumentingRestEndpoint(), RuntimeRestAPIVersion.V0, file.toPath());
         String actual = FileUtils.readFile(file, "UTF-8");
 
         assertThat(actual).contains("/test/empty1");
@@ -95,7 +95,7 @@ void testDuplicateOperationIdsAreRejected() throws Exception {
                         () ->
                                 OpenApiSpecGenerator.createDocumentationFile(
                                         new TestDuplicateOperationIdDocumentingRestEndpoint(),
-                                        RestAPIVersion.V0,
+                                        RuntimeRestAPIVersion.V0,
                                         file.toPath()))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("Duplicate OperationId");
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/rest/RestAPIDocGeneratorTest.java b/flink-docs/src/test/java/org/apache/flink/docs/rest/RestAPIDocGeneratorTest.java
index 6b6e4b5db62cf..d3fc475d23c48 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/rest/RestAPIDocGeneratorTest.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/rest/RestAPIDocGeneratorTest.java
@@ -23,7 +23,7 @@
 import org.apache.flink.docs.rest.data.TestExcludeMessageHeaders;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
@@ -44,7 +44,7 @@ class RestAPIDocGeneratorTest {
     void testExcludeFromDocumentation() throws Exception {
         File file = File.createTempFile("rest_v0_", ".html");
         RestAPIDocGenerator.createHtmlFile(
-                new TestExcludeDocumentingRestEndpoint(), RestAPIVersion.V0, file.toPath());
+                new TestExcludeDocumentingRestEndpoint(), RuntimeRestAPIVersion.V0, file.toPath());
         String actual = FileUtils.readFile(file, "UTF-8");
 
         assertThat(actual).containsSequence("/test/empty1");
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestEmptyMessageHeaders.java b/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestEmptyMessageHeaders.java
index 228a36fc34e24..26d23f5f34788 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestEmptyMessageHeaders.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestEmptyMessageHeaders.java
@@ -23,7 +23,8 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -36,7 +37,8 @@
  * parameters are all empty.
  */
 public class TestEmptyMessageHeaders
-        implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
     private static final String URL = "/test/empty";
     private static final String DESCRIPTION = "This is an empty testing REST API.";
@@ -104,7 +106,7 @@ public String getTargetRestEndpointURL() {
     }
 
     @Override
-    public Collection<RestAPIVersion> getSupportedAPIVersions() {
-        return Collections.singleton(RestAPIVersion.V0);
+    public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V0);
     }
 }
diff --git a/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestExcludeMessageHeaders.java b/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestExcludeMessageHeaders.java
index dc8b29b5c6c42..3b5b848208001 100644
--- a/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestExcludeMessageHeaders.java
+++ b/flink-docs/src/test/java/org/apache/flink/docs/rest/data/TestExcludeMessageHeaders.java
@@ -23,8 +23,8 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -37,7 +37,8 @@
  */
 @Documentation.ExcludeFromDocumentation()
 public class TestExcludeMessageHeaders
-        implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
     private static final String URL = "/test/excluded";
     private static final String DESCRIPTION =
@@ -92,7 +93,7 @@ public String getTargetRestEndpointURL() {
     }
 
     @Override
-    public Collection<RestAPIVersion> getSupportedAPIVersions() {
-        return Collections.singleton(RestAPIVersion.V0);
+    public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V0);
     }
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJarPlanHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJarPlanHeaders.java
index 324b42ec19905..9ab2612cc8d77 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJarPlanHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJarPlanHeaders.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for {@link JarPlanHandler}. */
 public abstract class AbstractJarPlanHeaders
-        implements MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
+        implements RuntimeMessageHeaders<
+                JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
 
     @Override
     public Class<JobPlanInfo> getResponseClass() {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java
index e9fa0a3fbe099..634ae832c747a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java
@@ -21,13 +21,14 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for {@link JarDeleteHandler}. */
 public class JarDeleteHeaders
-        implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {
 
     private static final JarDeleteHeaders INSTANCE = new JarDeleteHeaders();
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
index 11dddc81b4ad5..8c8968b3d47c5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
@@ -21,13 +21,13 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link JarListHandler}. */
 public class JarListHeaders
-        implements MessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> {
 
     public static final String URL = "/jars";
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
index 011188e780f60..02dc0d30ef172 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -20,12 +20,14 @@
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** {@link MessageHeaders} for {@link JarRunHandler}. */
 public class JarRunHeaders
-        implements MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+        implements RuntimeMessageHeaders<
+                JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
 
     private static final JarRunHeaders INSTANCE = new JarRunHeaders();
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
index e18b15c0c9236..5876433b8dc41 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java
@@ -22,12 +22,14 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** {@link MessageHeaders} for uploading jars. */
 public final class JarUploadHeaders
-        implements MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
 
     public static final String URL = "/jars/upload";
     private static final JarUploadHeaders INSTANCE = new JarUploadHeaders();
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
index 9d83cbfb74598..0dd5704b1dd8e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
@@ -21,7 +21,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.ConfigurationException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -63,15 +63,15 @@ private static class StableRestApiVersionProvider implements ArgumentsProvider {
         @Override
         public Stream<? extends Arguments> provideArguments(ExtensionContext context)
                 throws Exception {
-            return Arrays.stream(RestAPIVersion.values())
-                    .filter(RestAPIVersion::isStableVersion)
+            return Arrays.stream(RuntimeRestAPIVersion.values())
+                    .filter(RuntimeRestAPIVersion::isStableVersion)
                     .map(Arguments::of);
         }
     }
 
     @ParameterizedTest
     @ArgumentsSource(StableRestApiVersionProvider.class)
-    void testDispatcherRestAPIStability(RestAPIVersion apiVersion)
+    void testDispatcherRestAPIStability(RuntimeRestAPIVersion apiVersion)
             throws IOException, ConfigurationException {
         final String versionedSnapshotFileName =
                 String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
@@ -113,7 +113,7 @@ private static void writeSnapshot(
     }
 
     private RestAPISnapshot createSnapshot(
-            final DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion) {
+            final DocumentingRestEndpoint restEndpoint, RuntimeRestAPIVersion apiVersion) {
         final List<JsonNode> calls =
                 restEndpoint.getSpecs().stream()
                         // we only compare compatibility within the given version
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 4cb1eedf20557..a82d9d0880166 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -301,6 +301,8 @@ CompletableFuture<P> sendRequest(
                     R request,
                     Collection<FileUpload> fileUploads)
                     throws IOException {
+        Collection<? extends RestAPIVersion> supportedAPIVersions =
+                messageHeaders.getSupportedAPIVersions();
         return sendRequest(
                 targetAddress,
                 targetPort,
@@ -308,7 +310,7 @@ CompletableFuture<P> sendRequest(
                 messageParameters,
                 request,
                 fileUploads,
-                RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
+                RestAPIVersion.getLatestVersion(supportedAPIVersions));
     }
 
     public <
@@ -323,7 +325,7 @@ CompletableFuture<P> sendRequest(
                     U messageParameters,
                     R request,
                     Collection<FileUpload> fileUploads,
-                    RestAPIVersion apiVersion)
+                    RestAPIVersion<? extends RestAPIVersion<?>> apiVersion)
                     throws IOException {
         Preconditions.checkNotNull(targetAddress);
         Preconditions.checkArgument(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 158ef35210a9e..d1f35161a60fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -67,6 +67,7 @@
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -623,9 +624,9 @@ private static void checkAllEndpointsAndHandlersAreUnique(
             }
 
             final RestHandlerSpecification headers = handler.f0;
-            for (RestAPIVersion supportedAPIVersion : headers.getSupportedAPIVersions()) {
+            for (RestAPIVersion supportedRestAPIVersion : headers.getSupportedAPIVersions()) {
                 final String parameterizedEndpoint =
-                        supportedAPIVersion.toString()
+                        supportedRestAPIVersion.toString()
                                 + headers.getHttpMethod()
                                 + headers.getTargetRestEndpointURL();
                 // normalize path parameters; distinct path parameters still clash at runtime
@@ -636,7 +637,7 @@ private static void checkAllEndpointsAndHandlersAreUnique(
                     throw new FlinkRuntimeException(
                             String.format(
                                     "REST handler registration overlaps with another registration for: version=%s, method=%s, url=%s.",
-                                    supportedAPIVersion,
+                                    supportedRestAPIVersion,
                                     headers.getHttpMethod(),
                                     headers.getTargetRestEndpointURL()));
                 }
@@ -662,9 +663,6 @@ public static final class RestHandlerUrlComparator
         private static final Comparator<String> CASE_INSENSITIVE_ORDER =
                 new CaseInsensitiveOrderComparator();
 
-        private static final Comparator<RestAPIVersion> API_VERSION_ORDER =
-                new RestAPIVersion.RestAPIVersionComparator();
-
         static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();
 
         @Override
@@ -677,9 +675,13 @@ public int compare(
             if (urlComparisonResult != 0) {
                 return urlComparisonResult;
             } else {
-                return API_VERSION_ORDER.compare(
-                        Collections.min(o1.f0.getSupportedAPIVersions()),
-                        Collections.min(o2.f0.getSupportedAPIVersions()));
+                Collection<? extends RestAPIVersion> o1APIVersions =
+                        o1.f0.getSupportedAPIVersions();
+                RestAPIVersion o1Version = Collections.min(o1APIVersions);
+                Collection<? extends RestAPIVersion> o2APIVersions =
+                        o2.f0.getSupportedAPIVersions();
+                RestAPIVersion o2Version = Collections.min(o2APIVersions);
+                return o1Version.compareTo(o2Version);
             }
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
index c55b3c3886b07..dc8f7e487fc5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -22,7 +22,6 @@
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 
 import java.util.Collection;
-import java.util.Collections;
 
 /** Rest handler interface which all rest handler implementation have to implement. */
 public interface RestHandlerSpecification {
@@ -47,7 +46,5 @@ public interface RestHandlerSpecification {
      *
      * @return Collection of supported API versions
      */
-    default Collection<RestAPIVersion> getSupportedAPIVersions() {
-        return Collections.singleton(RestAPIVersion.V1);
-    }
+    Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationStatusMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationStatusMessageHeaders.java
index 63f4402eb306f..a2a841f28429a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationStatusMessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationStatusMessageHeaders.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.async;
 
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -32,7 +32,7 @@
  * @param <M> type of the message parameters
  */
 public abstract class AsynchronousOperationStatusMessageHeaders<V, M extends MessageParameters>
-        implements MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<V>, M> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, AsynchronousOperationResult<V>, M> {
 
     /**
      * Returns the class of the value wrapped in the {@link AsynchronousOperationResult}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
index c08d37f8121f2..6f8b66c80618f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AsynchronousOperationTriggerMessageHeaders.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rest.handler.async;
 
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 /**
  * Message headers for the triggering of an asynchronous operation.
@@ -30,7 +30,7 @@
  */
 public abstract class AsynchronousOperationTriggerMessageHeaders<
                 R extends RequestBody, M extends MessageParameters>
-        implements MessageHeaders<R, TriggerResponse, M> {
+        implements RuntimeMessageHeaders<R, TriggerResponse, M> {
 
     @Override
     public Class<TriggerResponse> getResponseClass() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
index 36f0c0ce46831..96df694b34618 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
@@ -20,6 +20,10 @@
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /** Rest handler specification for the web content handler. */
 public final class WebContentHandlerSpecification implements RestHandlerSpecification {
@@ -42,4 +46,9 @@ public String getTargetRestEndpointURL() {
     public static WebContentHandlerSpecification getInstance() {
         return INSTANCE;
     }
+
+    @Override
+    public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V1);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
index 1d9e80a5de6f7..27d2c8b6fa52d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoHeaders.java
@@ -25,7 +25,8 @@
 
 /** Message headers for the {@link ClusterConfigHandler}. */
 public final class ClusterConfigurationInfoHeaders
-        implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, ConfigurationInfo, EmptyMessageParameters> {
 
     private static final ClusterConfigurationInfoHeaders INSTANCE =
             new ClusterConfigurationInfoHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
index 95feadff8ce47..fe3dfc6aead83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -26,7 +26,7 @@
 
 /** Message headers for the {@link ClusterOverviewHandler}. */
 public final class ClusterOverviewHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, ClusterOverviewWithVersion, EmptyMessageParameters> {
 
     private static final ClusterOverviewHeaders INSTANCE = new ClusterOverviewHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
index 76f45b4150b6c..cbe8a5917fe80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link DashboardConfigHandler}. */
 public final class DashboardConfigurationHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> {
 
     public static final DashboardConfigurationHeaders INSTANCE =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
index 59c9e310b0fa6..4d751f98a04fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobAccumulatorsHandler}. */
 public class JobAccumulatorsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
 
     private static final JobAccumulatorsHeaders INSTANCE = new JobAccumulatorsHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
index 260f39eb6cc1a..46f5e6d66b6ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobCancellationHandler}. */
 public class JobCancellationHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, EmptyResponseBody, JobCancellationMessageParameters> {
 
     public static final String URL = "/jobs/:jobid";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
index bcc2fb544c2d4..65ebc0f3c9b62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobConfigHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobConfigHandler}. */
 public class JobConfigHeaders
-        implements MessageHeaders<EmptyRequestBody, JobConfigInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, JobConfigInfo, JobMessageParameters> {
 
     private static final JobConfigHeaders INSTANCE = new JobConfigHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
index a72582e4f2562..a6e441e2c3d1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
@@ -27,7 +27,7 @@
 
 /** Message headers for the {@link JobExceptionsHandler}. */
 public class JobExceptionsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobExceptionsInfoWithHistory, JobExceptionsMessageParameters> {
 
     private static final JobExceptionsHeaders INSTANCE = new JobExceptionsHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
index 750cb09d41bcf..2ea6ec8e9d492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobIdsWithStatusOverview}. */
 public class JobIdsWithStatusesOverviewHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
 
     public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java
index f6b0aca14d0dd..b795fcb214a0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerEnvironmentHeaders.java
@@ -25,7 +25,8 @@
 
 /** Message headers for the {@link JobManagerEnvironmentHandler}. */
 public class JobManagerEnvironmentHeaders
-        implements MessageHeaders<EmptyRequestBody, EnvironmentInfo, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, EnvironmentInfo, EmptyMessageParameters> {
     private static final JobManagerEnvironmentHeaders INSTANCE = new JobManagerEnvironmentHeaders();
 
     public static final String JOB_MANAGER_ENV_REST_PATH = "/jobmanager/environment";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java
index c55efeb5b36b9..1e3d7b8398bb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java
@@ -24,7 +24,7 @@
 
 /** Headers for the log url retriever of JobManager. */
 public class JobManagerLogUrlHeaders
-        implements MessageHeaders<EmptyRequestBody, LogUrlResponse, JobMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, LogUrlResponse, JobMessageParameters> {
     private static final JobManagerLogUrlHeaders INSTANCE = new JobManagerLogUrlHeaders();
 
     private static final String URL = "/jobs/:" + JobIDPathParameter.KEY + "/jobmanager/log-url";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
index 500028fa1c8bd..437e0c863adb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobPlanHandler}. */
 public class JobPlanHeaders
-        implements MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> {
 
     private static final JobPlanHeaders INSTANCE = new JobPlanHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
index bd5feb8ac0615..757bbb1ad41a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexAccumulatorsHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobVertexAccumulatorsHandler}. */
 public class JobVertexAccumulatorsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobVertexAccumulatorsInfo, JobVertexMessageParameters> {
 
     private static final JobVertexAccumulatorsHeaders INSTANCE = new JobVertexAccumulatorsHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
index cd5b578796092..c0a30e21d1d3d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobVertexBackPressureHandler}. */
 public class JobVertexBackPressureHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> {
 
     private static final JobVertexBackPressureHeaders INSTANCE = new JobVertexBackPressureHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsHeaders.java
index f9002f3b4c7d4..805bf2cba945d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobVertexDetailsHandler}. */
 public class JobVertexDetailsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobVertexDetailsInfo, JobVertexMessageParameters> {
 
     private static final JobVertexDetailsHeaders INSTANCE = new JobVertexDetailsHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
index 118104e661857..401c2202a03af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexFlameGraphHeaders.java
@@ -26,7 +26,7 @@
 
 /** Message headers for the {@link JobVertexFlameGraphHandler}. */
 public class JobVertexFlameGraphHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobVertexFlameGraph, JobVertexFlameGraphParameters> {
 
     private static final JobVertexFlameGraphHeaders INSTANCE = new JobVertexFlameGraphHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
index 0be755286a081..9cbdc4ca06983 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -25,7 +25,7 @@
 
 /** Message headers for the {@link JobVertexTaskManagersHandler}. */
 public class JobVertexTaskManagersHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> {
 
     private static final JobVertexTaskManagersHeaders INSTANCE = new JobVertexTaskManagersHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
index 6b52a39bdd407..5119c89fa4954 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
@@ -26,7 +26,8 @@
 
 /** Message headers for {@link JobsOverviewHandler}. */
 public final class JobsOverviewHeaders
-        implements MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
 
     private static final JobsOverviewHeaders INSTANCE = new JobsOverviewHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeMessageHeaders.java
new file mode 100644
index 0000000000000..934feb906dbbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeMessageHeaders.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data
+ * required for their http headers in runtime module.
+ *
+ * <p>Implementations must be state-less.
+ *
+ * @param <R> request message type
+ * @param <P> response message type
+ * @param <M> message parameters type
+ */
+public interface RuntimeMessageHeaders<
+                R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
+        extends MessageHeaders<R, P, M> {
+
+    @Override
+    default Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V1);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeUntypedResponseMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeUntypedResponseMessageHeaders.java
new file mode 100644
index 0000000000000..6572e30c32699
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RuntimeUntypedResponseMessageHeaders.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Message headers for a web handler request that belongs to runtime module.
+ *
+ * @param <R> type of the request
+ * @param <M> type of the message parameters
+ */
+public interface RuntimeUntypedResponseMessageHeaders<
+                R extends RequestBody, M extends MessageParameters>
+        extends UntypedResponseMessageHeaders<R, M> {
+    @Override
+    default Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V1);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHeaders.java
index ce33132df11d6..cb1daa84e7188 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHeaders.java
@@ -26,7 +26,7 @@
 
 /** Message headers for the {@link SubtasksAllAccumulatorsHandler}. */
 public class SubtasksAllAccumulatorsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, SubtasksAllAccumulatorsInfo, JobVertexMessageParameters> {
 
     private static final SubtasksAllAccumulatorsHeaders INSTANCE =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
index 85abb93670de2..a931b72302761 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesHeaders.java
@@ -25,7 +25,8 @@
 
 /** Message headers for the {@link SubtasksTimesHandler}. */
 public class SubtasksTimesHeaders
-        implements MessageHeaders<EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, SubtasksTimesInfo, JobVertexMessageParameters> {
 
     private static final SubtasksTimesHeaders INSTANCE = new SubtasksTimesHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java
index a3ca0750b665e..90e783c979dc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java
@@ -25,7 +25,7 @@
 
 /** Headers for the log url retriever of TaskManager. */
 public class TaskManagerLogUrlHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, LogUrlResponse, JobTaskManagerMessageParameters> {
     private static final TaskManagerLogUrlHeaders INSTANCE = new TaskManagerLogUrlHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
index 79a08183ad73d..21ad6fbafca46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/UntypedResponseMessageHeaders.java
@@ -24,7 +24,7 @@
  * Message headers for a web handler request.
  *
  * @param <R> type of the request
- * @param <M> type of the message message parameters
+ * @param <M> type of the message parameters
  */
 public interface UntypedResponseMessageHeaders<R extends RequestBody, M extends MessageParameters>
         extends RestHandlerSpecification {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java
index 83b69be8e6bf6..1c33d03bab8aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnCancelJobTerminationHeaders.java
@@ -21,6 +21,10 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * {@link RestHandlerSpecification} for the {@link JobCancellationHandler} which is registered for
@@ -55,4 +59,9 @@ public String getTargetRestEndpointURL() {
     public static YarnCancelJobTerminationHeaders getInstance() {
         return INSTANCE;
     }
+
+    @Override
+    public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V1);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java
index 98c7064ac6680..9738dccab608f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/YarnStopJobTerminationHeaders.java
@@ -21,6 +21,10 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * {@link RestHandlerSpecification} for the {@link JobCancellationHandler} which is registered for
@@ -54,4 +58,9 @@ public String getTargetRestEndpointURL() {
     public static YarnStopJobTerminationHeaders getInstance() {
         return INSTANCE;
     }
+
+    @Override
+    public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+        return Collections.singleton(RuntimeRestAPIVersion.V1);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
index 1cd070ac45ca4..4612c623268c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
@@ -22,13 +22,14 @@
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link CheckpointConfigHandler}. */
 public class CheckpointConfigHeaders
-        implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
 
     private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
index abc51dd93368d..d70357fcd2838 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
@@ -21,13 +21,13 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link CheckpointStatisticDetailsHandler}. */
 public class CheckpointStatisticDetailsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> {
 
     private static final CheckpointStatisticDetailsHeaders INSTANCE =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
index a8617d76a0751..dcba380b6211e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
@@ -22,13 +22,14 @@
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link CheckpointingStatisticsHandler}. */
 public class CheckpointingStatisticsHeaders
-        implements MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> {
 
     private static final CheckpointingStatisticsHeaders INSTANCE =
             new CheckpointingStatisticsHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java
index bc7bea2128027..51d7f9ca13aa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java
@@ -21,13 +21,13 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link TaskCheckpointStatisticDetailsHandler}. */
 public class TaskCheckpointStatisticsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody,
                 TaskCheckpointStatisticsWithSubtaskDetails,
                 TaskCheckpointMessageParameters> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerCustomLogHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerCustomLogHeaders.java
index a2875b252c397..606de011706bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerCustomLogHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerCustomLogHeaders.java
@@ -22,11 +22,11 @@
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerCustomLogHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter;
-import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
 
 /** Headers for the {@link JobManagerCustomLogHandler}. */
 public class JobManagerCustomLogHeaders
-        implements UntypedResponseMessageHeaders<EmptyRequestBody, FileMessageParameters> {
+        implements RuntimeUntypedResponseMessageHeaders<EmptyRequestBody, FileMessageParameters> {
 
     private static final JobManagerCustomLogHeaders INSTANCE = new JobManagerCustomLogHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogFileHeader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogFileHeader.java
index da55ff4309140..3ce923aed34cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogFileHeader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogFileHeader.java
@@ -22,11 +22,11 @@
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerLogFileHandler;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
 
 /** Headers for the {@link JobManagerLogFileHandler}. */
 public class JobManagerLogFileHeader
-        implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
+        implements RuntimeUntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
 
     private static final JobManagerLogFileHeader INSTANCE = new JobManagerLogFileHeader();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogListHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogListHeaders.java
index 8cec7cd54002e..c55037351bde9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogListHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerLogListHeaders.java
@@ -23,13 +23,13 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.LogListInfo;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link JobManagerLogListHandler}. */
 public class JobManagerLogListHeaders
-        implements MessageHeaders<EmptyRequestBody, LogListInfo, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, LogListInfo, EmptyMessageParameters> {
 
     private static final JobManagerLogListHeaders INSTANCE = new JobManagerLogListHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerStdoutFileHeader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerStdoutFileHeader.java
index 31358148b3042..5488e609b483f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerStdoutFileHeader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerStdoutFileHeader.java
@@ -21,11 +21,11 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
 
 /** Headers for the {@link JobManagerStdoutFileHandler}. */
 public class JobManagerStdoutFileHeader
-        implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
+        implements RuntimeUntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
 
     private static final JobManagerStdoutFileHeader INSTANCE = new JobManagerStdoutFileHeader();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerThreadDumpHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerThreadDumpHeaders.java
index 88dee82ab513a..f0dfba9439a74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerThreadDumpHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/JobManagerThreadDumpHeaders.java
@@ -22,14 +22,14 @@
 import org.apache.flink.runtime.rest.handler.cluster.JobManagerThreadDumpHandler;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link JobManagerThreadDumpHandler}. */
 public class JobManagerThreadDumpHeaders
-        implements MessageHeaders<EmptyRequestBody, ThreadDumpInfo, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, ThreadDumpInfo, EmptyMessageParameters> {
 
     private static final JobManagerThreadDumpHeaders INSTANCE = new JobManagerThreadDumpHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
index a6179f6667817..b4475271e1b0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
@@ -22,13 +22,14 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for {@link org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler}. */
 public class ShutdownHeaders
-        implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
     private static final ShutdownHeaders INSTANCE = new ShutdownHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/dataset/ClusterDataSetListHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/dataset/ClusterDataSetListHeaders.java
index 1fc2f6148aeb5..25e8cfe64e696 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/dataset/ClusterDataSetListHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/dataset/ClusterDataSetListHeaders.java
@@ -20,13 +20,13 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Specification for retrieving an overview over all available cluster partitions. */
 public class ClusterDataSetListHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, ClusterDataSetListResponseBody, EmptyMessageParameters> {
 
     public static final ClusterDataSetListHeaders INSTANCE = new ClusterDataSetListHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
index ec327e175fa00..9c14e304feba9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
@@ -23,13 +23,13 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link JobDetailsHandler}. */
 public class JobDetailsHeaders
-        implements MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
 
     private static final JobDetailsHeaders INSTANCE = new JobDetailsHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java
index 0b5e1d2306f2d..799939b7af37f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java
@@ -23,12 +23,13 @@
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** {@link MessageHeaders} for {@link JobExecutionResultHeaders}. */
 public class JobExecutionResultHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
 
     private static final JobExecutionResultHeaders INSTANCE = new JobExecutionResultHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
index ec8297e2b8bbf..684e7f98bbfdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
@@ -22,7 +22,7 @@
 import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -31,7 +31,8 @@
  * org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler}.
  */
 public class JobManagerJobConfigurationHeaders
-        implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, ConfigurationInfo, JobMessageParameters> {
     private static final JobManagerJobConfigurationHeaders INSTANCE =
             new JobManagerJobConfigurationHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java
index 628b3ceb818d3..256c1f6149ce8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobEnvironmentHeaders.java
@@ -22,7 +22,7 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EnvironmentInfo;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -31,7 +31,7 @@
  * org.apache.flink.runtime.rest.handler.job.JobManagerJobEnvironmentHandler}.
  */
 public class JobManagerJobEnvironmentHeaders
-        implements MessageHeaders<EmptyRequestBody, EnvironmentInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, EnvironmentInfo, JobMessageParameters> {
     private static final JobManagerJobEnvironmentHeaders INSTANCE =
             new JobManagerJobEnvironmentHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java
index 22b76d7d78c5e..5057c1a95cc3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java
@@ -24,6 +24,7 @@
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -31,7 +32,7 @@
  * {@link MessageHeaders} for {@link org.apache.flink.runtime.rest.handler.job.JobStatusHandler}.
  */
 public class JobStatusInfoHeaders
-        implements MessageHeaders<EmptyRequestBody, JobStatusInfo, JobMessageParameters> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, JobStatusInfo, JobMessageParameters> {
     private static final JobStatusInfoHeaders INSTANCE = new JobStatusInfoHeaders();
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
index 3cb8215d4b373..4e02e292ec061 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -21,13 +21,13 @@
 import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** These headers define the protocol for submitting a job to a flink cluster. */
 public class JobSubmitHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
 
     private static final String URL = "/jobs";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
index 59224c255975f..405f33348f3f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskCurrentAttemptDetailsHeaders.java
@@ -23,14 +23,14 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link SubtaskCurrentAttemptDetailsHandler}. */
 public class SubtaskCurrentAttemptDetailsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskMessageParameters> {
 
     private static final SubtaskCurrentAttemptDetailsHeaders INSTANCE =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
index 52473acf7bfbf..1175172f0a5b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
@@ -23,14 +23,14 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link SubtaskExecutionAttemptAccumulatorsHandler}. */
 public class SubtaskExecutionAttemptAccumulatorsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody,
                 SubtaskExecutionAttemptAccumulatorsInfo,
                 SubtaskAttemptMessageParameters> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
index ee2e31b7eb733..af8acf070e189 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
@@ -23,14 +23,14 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link SubtaskExecutionAttemptDetailsHandler}. */
 public class SubtaskExecutionAttemptDetailsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody,
                 SubtaskExecutionAttemptDetailsInfo,
                 SubtaskAttemptMessageParameters> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
index 0bbc0c7be51ea..e55ca8998cba9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/coordination/ClientCoordinationHeaders.java
@@ -21,7 +21,7 @@
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -29,7 +29,7 @@
 @Documentation.ExcludeFromDocumentation(
         "This API is not exposed to the users, as coordinators are used only internally.")
 public class ClientCoordinationHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 ClientCoordinationRequestBody,
                 ClientCoordinationResponseBody,
                 ClientCoordinationMessageParameters> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
index 6af5355415ead..994b9199429d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
@@ -21,13 +21,14 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Based {@link MessageHeaders} class for aggregating metrics. */
 public abstract class AbstractAggregatedMetricsHeaders<
                 P extends AbstractAggregatedMetricsParameters<?>>
-        implements MessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, AggregatedMetricsResponseBody, P> {
     @Override
     public Class<AggregatedMetricsResponseBody> getResponseClass() {
         return AggregatedMetricsResponseBody.class;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
index 2c2fae66225e1..8b8ddf0d89950 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeaders.java
@@ -22,6 +22,7 @@
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -30,7 +31,7 @@
  * org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}.
  */
 public abstract class AbstractMetricsHeaders<M extends MessageParameters>
-        implements MessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> {
+        implements RuntimeMessageHeaders<EmptyRequestBody, MetricCollectionResponseBody, M> {
 
     @Override
     public Class<EmptyRequestBody> getRequestClass() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexWatermarksHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexWatermarksHeaders.java
index 1e3c3a4a0589f..fc564105900d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexWatermarksHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexWatermarksHeaders.java
@@ -24,12 +24,13 @@
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** {@link MessageHeaders} for retrieving watermarks. */
 public final class JobVertexWatermarksHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, MetricCollectionResponseBody, JobVertexMessageParameters> {
 
     public static final JobVertexWatermarksHeaders INSTANCE = new JobVertexWatermarksHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerCustomLogHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerCustomLogHeaders.java
index a8bec07ba0f44..a0b52ad74e513 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerCustomLogHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerCustomLogHeaders.java
@@ -22,11 +22,11 @@
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerCustomLogHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter;
-import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
 
 /** Headers for the {@link TaskManagerCustomLogHandler}. */
 public class TaskManagerCustomLogHeaders
-        implements UntypedResponseMessageHeaders<
+        implements RuntimeUntypedResponseMessageHeaders<
                 EmptyRequestBody, TaskManagerFileMessageParameters> {
 
     private static final TaskManagerCustomLogHeaders INSTANCE = new TaskManagerCustomLogHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
index 9886520066334..dc8ceb2f340d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsHeaders.java
@@ -21,13 +21,13 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link TaskManagerDetailsHandler} which serves the TaskManager details. */
 public class TaskManagerDetailsHeaders
-        implements MessageHeaders<
+        implements RuntimeMessageHeaders<
                 EmptyRequestBody, TaskManagerDetailsInfo, TaskManagerMessageParameters> {
 
     private static final TaskManagerDetailsHeaders INSTANCE = new TaskManagerDetailsHeaders();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogFileHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogFileHeaders.java
index d5b0ab7ece0f7..aece6d414cc47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogFileHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogFileHeaders.java
@@ -21,11 +21,12 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
 
 /** Headers for the {@link TaskManagerLogFileHandler}. */
 public class TaskManagerLogFileHeaders
-        implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {
+        implements RuntimeUntypedResponseMessageHeaders<
+                EmptyRequestBody, TaskManagerMessageParameters> {
 
     private static final TaskManagerLogFileHeaders INSTANCE = new TaskManagerLogFileHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogsHeaders.java
index 8ae9256d4e210..733d2889735a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerLogsHeaders.java
@@ -22,13 +22,14 @@
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogListHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.LogListInfo;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link TaskManagerLogListHandler}. */
 public class TaskManagerLogsHeaders
-        implements MessageHeaders<EmptyRequestBody, LogListInfo, TaskManagerMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, LogListInfo, TaskManagerMessageParameters> {
 
     private static final TaskManagerLogsHeaders INSTANCE = new TaskManagerLogsHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerStdoutFileHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerStdoutFileHeaders.java
index 986091887ee45..87cab851c09bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerStdoutFileHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerStdoutFileHeaders.java
@@ -21,11 +21,12 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeUntypedResponseMessageHeaders;
 
 /** Headers for the {@link TaskManagerStdoutFileHandler}. */
 public class TaskManagerStdoutFileHeaders
-        implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {
+        implements RuntimeUntypedResponseMessageHeaders<
+                EmptyRequestBody, TaskManagerMessageParameters> {
 
     private static final TaskManagerStdoutFileHeaders INSTANCE = new TaskManagerStdoutFileHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java
index 5a5760e900c02..8695abb56fe40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java
@@ -21,14 +21,15 @@
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Headers for the {@link TaskManagerThreadDumpHandler}. */
 public class TaskManagerThreadDumpHeaders
-        implements MessageHeaders<EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> {
 
     private static final TaskManagerThreadDumpHeaders INSTANCE = new TaskManagerThreadDumpHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java
index 0d1d4b48ba1d9..23349c8184816 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagersHeaders.java
@@ -22,13 +22,14 @@
 import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Message headers for the {@link TaskManagersHandler}. */
 public class TaskManagersHeaders
-        implements MessageHeaders<EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, TaskManagersInfo, EmptyMessageParameters> {
 
     private static final TaskManagersHeaders INSTANCE = new TaskManagersHeaders();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
index 7c714604d8716..6193ef8a1f802 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RestAPIVersion.java
@@ -20,90 +20,36 @@
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
-
-/**
- * An enum for all versions of the REST API.
- *
- * <p>REST API versions are global and thus apply to every REST component.
- *
- * <p>Changes that must result in an API version increment include but are not limited to: -
- * modification of a handler url - addition of new mandatory parameters - removal of a
- * handler/request - modifications to request/response bodies (excluding additions)
- */
-public enum RestAPIVersion {
-    V0(0, false, false), // strictly for testing purposes
-    V1(1, true, true);
-
-    private final int versionNumber;
-
-    private final boolean isDefaultVersion;
-
-    private final boolean isStable;
-
-    RestAPIVersion(int versionNumber, boolean isDefaultVersion, boolean isStable) {
-        this.versionNumber = versionNumber;
-        this.isDefaultVersion = isDefaultVersion;
-        this.isStable = isStable;
-    }
 
+/** Interface for all versions of the REST API. */
+public interface RestAPIVersion<T extends RestAPIVersion<T>> extends Comparable<T> {
     /**
      * Returns the URL version prefix (e.g. "v1") for this version.
      *
      * @return URL version prefix
      */
-    public String getURLVersionPrefix() {
-        return name().toLowerCase();
-    }
+    String getURLVersionPrefix();
 
     /**
      * Returns whether this version is the default REST API version.
      *
      * @return whether this version is the default
      */
-    public boolean isDefaultVersion() {
-        return isDefaultVersion;
-    }
+    boolean isDefaultVersion();
 
     /**
      * Returns whether this version is considered stable.
      *
      * @return whether this version is stable
      */
-    public boolean isStableVersion() {
-        return isStable;
-    }
-
-    /**
-     * Converts the given URL version prefix (e.g "v1") to a {@link RestAPIVersion}.
-     *
-     * @param prefix prefix to converted
-     * @return REST API version matching the prefix
-     * @throws IllegalArgumentException if the prefix doesn't match any version
-     */
-    public static RestAPIVersion fromURLVersionPrefix(String prefix) {
-        return valueOf(prefix.toUpperCase());
-    }
+    boolean isStableVersion();
 
     /**
-     * Returns the latest version from the given collection.
+     * Accept versions and one of them as a comparator, and get the latest one.
      *
-     * @param versions possible candidates
-     * @return latest version
+     * @return latest version that implement RestAPIVersion interface>
      */
-    public static RestAPIVersion getLatestVersion(Collection<RestAPIVersion> versions) {
-        return Collections.max(versions, new RestAPIVersionComparator());
-    }
-
-    /**
-     * Comparator for {@link RestAPIVersion} that sorts versions based on their version number, i.e.
-     * oldest to latest.
-     */
-    public static class RestAPIVersionComparator implements Comparator<RestAPIVersion> {
-
-        @Override
-        public int compare(RestAPIVersion o1, RestAPIVersion o2) {
-            return Integer.compare(o1.versionNumber, o2.versionNumber);
-        }
+    static <E extends RestAPIVersion<E>> E getLatestVersion(Collection<E> versions) {
+        return Collections.max(versions);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersion.java
new file mode 100644
index 0000000000000..6df91d6a5a3a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersion.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.versioning;
+
+/**
+ * An enum for all versions of the REST API.
+ *
+ * <p>REST API versions are global and thus apply to every REST component.
+ *
+ * <p>Changes that must result in an API version increment include but are not limited to: -
+ * modification of a handler url - addition of new mandatory parameters - removal of a
+ * handler/request - modifications to request/response bodies (excluding additions)
+ */
+public enum RuntimeRestAPIVersion implements RestAPIVersion<RuntimeRestAPIVersion> {
+    // The bigger the ordinal(its position in enum declaration), the higher the level of the
+    // version.
+    V0(false, false), // strictly for testing purposes
+    V1(true, true);
+
+    private final boolean isDefaultVersion;
+
+    private final boolean isStable;
+
+    RuntimeRestAPIVersion(boolean isDefaultVersion, boolean isStable) {
+        this.isDefaultVersion = isDefaultVersion;
+        this.isStable = isStable;
+    }
+
+    /**
+     * Returns the URL version prefix (e.g. "v1") for this version.
+     *
+     * @return URL version prefix
+     */
+    @Override
+    public String getURLVersionPrefix() {
+        return name().toLowerCase();
+    }
+
+    /**
+     * Returns whether this version is the default REST API version.
+     *
+     * @return whether this version is the default
+     */
+    @Override
+    public boolean isDefaultVersion() {
+        return isDefaultVersion;
+    }
+
+    /**
+     * Returns whether this version is considered stable.
+     *
+     * @return whether this version is stable
+     */
+    @Override
+    public boolean isStableVersion() {
+        return isStable;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
new file mode 100644
index 0000000000000..d0be629e0e4b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/NonLeaderRetrievalRestfulGateway.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * * Gateway for restful endpoints without leader retrieval logic. * *
+ *
+ * <p>Gateways which implement this method run a REST endpoint which is reachable under the returned
+ * address, and can be used in {@link org.apache.flink.runtime.rest.handler.AbstractHandler} without
+ * leader retrieval logic.
+ */
+public class NonLeaderRetrievalRestfulGateway implements RestfulGateway {
+    private static final String MESSAGE =
+            "NonLeaderRetrievalRestfulGateway doesn't support the operation.";
+
+    public static final NonLeaderRetrievalRestfulGateway INSTANCE =
+            new NonLeaderRetrievalRestfulGateway();
+
+    private NonLeaderRetrievalRestfulGateway() {}
+
+    @Override
+    public String getAddress() {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public String getHostname() {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(
+            JobID jobId, Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Tuple2<ResourceID, String>>>
+            requestTaskManagerMetricQueryServiceAddresses(Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+
+    @Override
+    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {
+        throw new UnsupportedOperationException(MESSAGE);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c83ffa0cfd50f..19939b2dce9b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -28,8 +28,8 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -272,7 +272,8 @@ protected CompletableFuture<EmptyResponseBody> handleRequest(
     }
 
     private static final class MultipartMixedHeaders
-            implements MessageHeaders<TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+            implements RuntimeMessageHeaders<
+                    TestRequestBody, EmptyResponseBody, EmptyMessageParameters> {
         private static final MultipartMixedHeaders INSTANCE = new MultipartMixedHeaders();
 
         private MultipartMixedHeaders() {}
@@ -418,7 +419,7 @@ public boolean acceptsFileUploads() {
     }
 
     private abstract static class TestHeadersBase<R extends RequestBody>
-            implements MessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
+            implements RuntimeMessageHeaders<R, EmptyResponseBody, EmptyMessageParameters> {
 
         @Override
         public Class<EmptyResponseBody> getResponseClass() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index a29b535968525..cc99eeeca2cad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -23,8 +23,8 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.ExceptionUtils;
@@ -96,7 +96,7 @@ public void testInvalidVersionRejection() throws Exception {
                             EmptyMessageParameters.getInstance(),
                             EmptyRequestBody.getInstance(),
                             Collections.emptyList(),
-                            RestAPIVersion.V0);
+                            RuntimeRestAPIVersion.V0);
             Assert.fail("The request should have been rejected due to a version mismatch.");
         } catch (IllegalArgumentException e) {
             // expected
@@ -208,7 +208,8 @@ public void testRestClientClosedHandling() throws Exception {
     }
 
     private static class TestMessageHeaders
-            implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+            implements RuntimeMessageHeaders<
+                    EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
         @Override
         public Class<EmptyRequestBody> getRequestClass() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
index ae57a12c7fc6e..f950b5c157320 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
@@ -26,9 +26,9 @@
 import org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory;
 import org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
@@ -168,7 +168,7 @@ private static class TestRequest implements RequestBody {}
     private static class TestResponse implements ResponseBody {}
 
     private static class TestHeaders
-            implements MessageHeaders<TestRequest, TestResponse, EmptyMessageParameters> {
+            implements RuntimeMessageHeaders<TestRequest, TestResponse, EmptyMessageParameters> {
 
         @Override
         public HttpMethodWrapper getHttpMethod() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 81c37a0efe12c..8d3c77ac28412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -43,10 +43,11 @@
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.TestRestHandler;
 import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
-import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -481,7 +482,7 @@ public void testVersioning() throws Exception {
                         EmptyMessageParameters.getInstance(),
                         EmptyRequestBody.getInstance(),
                         Collections.emptyList(),
-                        RestAPIVersion.V1);
+                        RuntimeRestAPIVersion.V1);
 
         specifiedVersionResponse.get(5, TimeUnit.SECONDS);
     }
@@ -496,7 +497,7 @@ public void testVersionSelection() throws Exception {
                         EmptyMessageParameters.getInstance(),
                         EmptyRequestBody.getInstance(),
                         Collections.emptyList(),
-                        RestAPIVersion.V0);
+                        RuntimeRestAPIVersion.V0);
 
         try {
             version1Response.get(5, TimeUnit.SECONDS);
@@ -514,7 +515,7 @@ public void testVersionSelection() throws Exception {
                         EmptyMessageParameters.getInstance(),
                         EmptyRequestBody.getInstance(),
                         Collections.emptyList(),
-                        RestAPIVersion.V1);
+                        RuntimeRestAPIVersion.V1);
 
         try {
             version2Response.get(5, TimeUnit.SECONDS);
@@ -840,7 +841,7 @@ public TestResponse(@JsonProperty("id") int id, @JsonProperty("content") String
     }
 
     private static class TestHeaders
-            implements MessageHeaders<TestRequest, TestResponse, TestParameters> {
+            implements RuntimeMessageHeaders<TestRequest, TestResponse, TestParameters> {
 
         @Override
         public HttpMethodWrapper getHttpMethod() {
@@ -1037,7 +1038,9 @@ protected CompletableFuture<EmptyResponseBody> handleRequest(
     }
 
     enum TestVersionHeaders
-            implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+            implements
+                    RuntimeMessageHeaders<
+                            EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
         INSTANCE;
 
         @Override
@@ -1076,8 +1079,8 @@ public EmptyMessageParameters getUnresolvedMessageParameters() {
         }
 
         @Override
-        public Collection<RestAPIVersion> getSupportedAPIVersions() {
-            return Collections.singleton(RestAPIVersion.V1);
+        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+            return Collections.singleton(RuntimeRestAPIVersion.V1);
         }
     }
 
@@ -1124,8 +1127,8 @@ private enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeaders
         INSTANCE;
 
         @Override
-        public Collection<RestAPIVersion> getSupportedAPIVersions() {
-            return Collections.singleton(RestAPIVersion.V0);
+        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+            return Collections.singleton(RuntimeRestAPIVersion.V0);
         }
     }
 
@@ -1133,13 +1136,15 @@ private enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeaders
         INSTANCE;
 
         @Override
-        public Collection<RestAPIVersion> getSupportedAPIVersions() {
-            return Collections.singleton(RestAPIVersion.V1);
+        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+            return Collections.singleton(RuntimeRestAPIVersion.V1);
         }
     }
 
     private enum TestUploadHeaders
-            implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+            implements
+                    RuntimeMessageHeaders<
+                            EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
         INSTANCE;
 
         @Override
@@ -1184,7 +1189,9 @@ public boolean acceptsFileUploads() {
     }
 
     private enum TestUnavailableHeaders
-            implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+            implements
+                    RuntimeMessageHeaders<
+                            EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
         INSTANCE;
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
index 07a8cae6914ce..36bdc38f71cd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
@@ -31,6 +31,7 @@
 import org.apache.flink.runtime.rest.util.TestMessageHeaders;
 import org.apache.flink.runtime.rest.util.TestRestHandler;
 import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
@@ -61,6 +62,7 @@
 import java.net.InetAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -287,6 +289,11 @@ public String getTargetRestEndpointURL() {
             public boolean acceptsFileUploads() {
                 return true;
             }
+
+            @Override
+            public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+                return Collections.singleton(RuntimeRestAPIVersion.V1);
+            }
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
index 812f0e0bc1125..1ec46093a9154 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
@@ -36,6 +36,7 @@
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.TestingUtils;
@@ -79,6 +80,7 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Queue;
@@ -567,5 +569,10 @@ public HttpMethodWrapper getHttpMethod() {
         public String getTargetRestEndpointURL() {
             return URL;
         }
+
+        @Override
+        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
+            return Collections.singleton(RuntimeRestAPIVersion.V1);
+        }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestMessageHeaders.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestMessageHeaders.java
index e76738a08393c..1ce3466ed4a02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestMessageHeaders.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestMessageHeaders.java
@@ -22,10 +22,10 @@
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -39,7 +39,7 @@
  */
 public class TestMessageHeaders<
                 REQ extends RequestBody, RES extends ResponseBody, M extends MessageParameters>
-        implements MessageHeaders<REQ, RES, M> {
+        implements RuntimeMessageHeaders<REQ, RES, M> {
 
     public static TestMessageHeaders.Builder<
                     EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters>
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersionTest.java
similarity index 72%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersionTest.java
index 24baa62e3458c..80df563618cd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RestAPIVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/versioning/RuntimeRestAPIVersionTest.java
@@ -28,19 +28,20 @@
 import java.util.List;
 import java.util.stream.Collectors;
 
-/** Tests for {@link RestAPIVersion}. */
-public class RestAPIVersionTest extends TestLogger {
+/** Tests for {@link RuntimeRestAPIVersion}. */
+public class RuntimeRestAPIVersionTest extends TestLogger {
     @Test
     public void testGetLatest() {
-        Collection<RestAPIVersion> candidates = Arrays.asList(RestAPIVersion.V0, RestAPIVersion.V1);
-        Assert.assertEquals(RestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates));
+        Collection<RuntimeRestAPIVersion> candidates =
+                Arrays.asList(RuntimeRestAPIVersion.V0, RuntimeRestAPIVersion.V1);
+        Assert.assertEquals(RuntimeRestAPIVersion.V1, RestAPIVersion.getLatestVersion(candidates));
     }
 
     @Test
     public void testSingleDefaultVersion() {
-        final List<RestAPIVersion> defaultVersions =
-                Arrays.stream(RestAPIVersion.values())
-                        .filter(RestAPIVersion::isDefaultVersion)
+        final List<RuntimeRestAPIVersion> defaultVersions =
+                Arrays.stream(RuntimeRestAPIVersion.values())
+                        .filter(RuntimeRestAPIVersion::isDefaultVersion)
                         .collect(Collectors.toList());
 
         Assert.assertEquals(