Skip to content

Commit

Permalink
[FLINK-27769][sql-gateway]Adjust the RestAPIVersion and RestfulGatewa…
Browse files Browse the repository at this point in the history
…y to adapt a variety of endpoints
  • Loading branch information
WencongLiu authored and xintongsong committed Aug 2, 2022
1 parent 0f77b29 commit 8ff0c3f
Show file tree
Hide file tree
Showing 86 changed files with 516 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
Expand Down Expand Up @@ -856,7 +857,8 @@ protected CompletableFuture<EmptyResponseBody> handleRequest(
}

private static final class PingRestHandlerHeaders
implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
implements RuntimeMessageHeaders<
EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {

static final PingRestHandlerHeaders INSTANCE = new PingRestHandlerHeaders();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer;
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.util.ConfigurationException;
Expand Down Expand Up @@ -114,8 +114,8 @@ public class OpenApiSpecGenerator {
public static void main(String[] args) throws IOException, ConfigurationException {
String outputDirectory = args[0];

for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
if (apiVersion == RestAPIVersion.V0) {
for (final RuntimeRestAPIVersion apiVersion : RuntimeRestAPIVersion.values()) {
if (apiVersion == RuntimeRestAPIVersion.V0) {
// this version exists only for testing purposes
continue;
}
Expand All @@ -130,7 +130,7 @@ public static void main(String[] args) throws IOException, ConfigurationExceptio

@VisibleForTesting
static void createDocumentationFile(
DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile)
DocumentingRestEndpoint restEndpoint, RuntimeRestAPIVersion apiVersion, Path outputFile)
throws IOException {
final OpenAPI openApi = new OpenAPI();

Expand Down Expand Up @@ -167,7 +167,7 @@ private static boolean shouldBeDocumented(MessageHeaders spec) {
return spec.getClass().getAnnotation(Documentation.ExcludeFromDocumentation.class) == null;
}

private static void setInfo(final OpenAPI openApi, final RestAPIVersion apiVersion) {
private static void setInfo(final OpenAPI openApi, final RuntimeRestAPIVersion apiVersion) {
openApi.info(
new Info()
.title("Flink JobManager REST API")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.util.ConfigurationException;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -114,8 +114,8 @@ public class RestAPIDocGenerator {
public static void main(String[] args) throws IOException, ConfigurationException {
String outputDirectory = args[0];

for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
if (apiVersion == RestAPIVersion.V0) {
for (final RuntimeRestAPIVersion apiVersion : RuntimeRestAPIVersion.values()) {
if (apiVersion == RuntimeRestAPIVersion.V0) {
// this version exists only for testing purposes
continue;
}
Expand All @@ -130,7 +130,7 @@ public static void main(String[] args) throws IOException, ConfigurationExceptio

@VisibleForTesting
static void createHtmlFile(
DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion, Path outputFile)
DocumentingRestEndpoint restEndpoint, RuntimeRestAPIVersion apiVersion, Path outputFile)
throws IOException {
StringBuilder html = new StringBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.docs.rest.data.TestExcludeMessageHeaders;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.util.FileUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
Expand All @@ -45,7 +45,7 @@ class OpenApiSpecGeneratorTest {
void testExcludeFromDocumentation() throws Exception {
File file = File.createTempFile("rest_v0_", ".html");
OpenApiSpecGenerator.createDocumentationFile(
new TestExcludeDocumentingRestEndpoint(), RestAPIVersion.V0, file.toPath());
new TestExcludeDocumentingRestEndpoint(), RuntimeRestAPIVersion.V0, file.toPath());
String actual = FileUtils.readFile(file, "UTF-8");

assertThat(actual).contains("/test/empty1");
Expand Down Expand Up @@ -95,7 +95,7 @@ void testDuplicateOperationIdsAreRejected() throws Exception {
() ->
OpenApiSpecGenerator.createDocumentationFile(
new TestDuplicateOperationIdDocumentingRestEndpoint(),
RestAPIVersion.V0,
RuntimeRestAPIVersion.V0,
file.toPath()))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Duplicate OperationId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.docs.rest.data.TestExcludeMessageHeaders;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.util.FileUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
Expand All @@ -44,7 +44,7 @@ class RestAPIDocGeneratorTest {
void testExcludeFromDocumentation() throws Exception {
File file = File.createTempFile("rest_v0_", ".html");
RestAPIDocGenerator.createHtmlFile(
new TestExcludeDocumentingRestEndpoint(), RestAPIVersion.V0, file.toPath());
new TestExcludeDocumentingRestEndpoint(), RuntimeRestAPIVersion.V0, file.toPath());
String actual = FileUtils.readFile(file, "UTF-8");

assertThat(actual).containsSequence("/test/empty1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

Expand All @@ -36,7 +37,8 @@
* parameters are all empty.
*/
public class TestEmptyMessageHeaders
implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
implements RuntimeMessageHeaders<
EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {

private static final String URL = "/test/empty";
private static final String DESCRIPTION = "This is an empty testing REST API.";
Expand Down Expand Up @@ -104,7 +106,7 @@ public String getTargetRestEndpointURL() {
}

@Override
public Collection<RestAPIVersion> getSupportedAPIVersions() {
return Collections.singleton(RestAPIVersion.V0);
public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
return Collections.singleton(RuntimeRestAPIVersion.V0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

Expand All @@ -37,7 +37,8 @@
*/
@Documentation.ExcludeFromDocumentation()
public class TestExcludeMessageHeaders
implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
implements RuntimeMessageHeaders<
EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {

private static final String URL = "/test/excluded";
private static final String DESCRIPTION =
Expand Down Expand Up @@ -92,7 +93,7 @@ public String getTargetRestEndpointURL() {
}

@Override
public Collection<RestAPIVersion> getSupportedAPIVersions() {
return Collections.singleton(RestAPIVersion.V0);
public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
return Collections.singleton(RuntimeRestAPIVersion.V0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** Message headers for {@link JarPlanHandler}. */
public abstract class AbstractJarPlanHeaders
implements MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
implements RuntimeMessageHeaders<
JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {

@Override
public Class<JobPlanInfo> getResponseClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** Message headers for {@link JarDeleteHandler}. */
public class JarDeleteHeaders
implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {
implements RuntimeMessageHeaders<
EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {

private static final JarDeleteHeaders INSTANCE = new JarDeleteHeaders();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** Message headers for the {@link JarListHandler}. */
public class JarListHeaders
implements MessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> {
implements RuntimeMessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> {

public static final String URL = "/jars";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** {@link MessageHeaders} for {@link JarRunHandler}. */
public class JarRunHeaders
implements MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
implements RuntimeMessageHeaders<
JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {

private static final JarRunHeaders INSTANCE = new JarRunHeaders();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** {@link MessageHeaders} for uploading jars. */
public final class JarUploadHeaders
implements MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
implements RuntimeMessageHeaders<
EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {

public static final String URL = "/jars/upload";
private static final JarUploadHeaders INSTANCE = new JarUploadHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.util.ConfigurationException;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -63,15 +63,15 @@ private static class StableRestApiVersionProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context)
throws Exception {
return Arrays.stream(RestAPIVersion.values())
.filter(RestAPIVersion::isStableVersion)
return Arrays.stream(RuntimeRestAPIVersion.values())
.filter(RuntimeRestAPIVersion::isStableVersion)
.map(Arguments::of);
}
}

@ParameterizedTest
@ArgumentsSource(StableRestApiVersionProvider.class)
void testDispatcherRestAPIStability(RestAPIVersion apiVersion)
void testDispatcherRestAPIStability(RuntimeRestAPIVersion apiVersion)
throws IOException, ConfigurationException {
final String versionedSnapshotFileName =
String.format(SNAPSHOT_RESOURCE_PATTERN, apiVersion.getURLVersionPrefix());
Expand Down Expand Up @@ -113,7 +113,7 @@ private static void writeSnapshot(
}

private RestAPISnapshot createSnapshot(
final DocumentingRestEndpoint restEndpoint, RestAPIVersion apiVersion) {
final DocumentingRestEndpoint restEndpoint, RuntimeRestAPIVersion apiVersion) {
final List<JsonNode> calls =
restEndpoint.getSpecs().stream()
// we only compare compatibility within the given version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,16 @@ CompletableFuture<P> sendRequest(
R request,
Collection<FileUpload> fileUploads)
throws IOException {
Collection<? extends RestAPIVersion> supportedAPIVersions =
messageHeaders.getSupportedAPIVersions();
return sendRequest(
targetAddress,
targetPort,
messageHeaders,
messageParameters,
request,
fileUploads,
RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
RestAPIVersion.getLatestVersion(supportedAPIVersions));
}

public <
Expand All @@ -323,7 +325,7 @@ CompletableFuture<P> sendRequest(
U messageParameters,
R request,
Collection<FileUpload> fileUploads,
RestAPIVersion apiVersion)
RestAPIVersion<? extends RestAPIVersion<?>> apiVersion)
throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -623,9 +624,9 @@ private static void checkAllEndpointsAndHandlersAreUnique(
}

final RestHandlerSpecification headers = handler.f0;
for (RestAPIVersion supportedAPIVersion : headers.getSupportedAPIVersions()) {
for (RestAPIVersion supportedRestAPIVersion : headers.getSupportedAPIVersions()) {
final String parameterizedEndpoint =
supportedAPIVersion.toString()
supportedRestAPIVersion.toString()
+ headers.getHttpMethod()
+ headers.getTargetRestEndpointURL();
// normalize path parameters; distinct path parameters still clash at runtime
Expand All @@ -636,7 +637,7 @@ private static void checkAllEndpointsAndHandlersAreUnique(
throw new FlinkRuntimeException(
String.format(
"REST handler registration overlaps with another registration for: version=%s, method=%s, url=%s.",
supportedAPIVersion,
supportedRestAPIVersion,
headers.getHttpMethod(),
headers.getTargetRestEndpointURL()));
}
Expand All @@ -662,9 +663,6 @@ public static final class RestHandlerUrlComparator
private static final Comparator<String> CASE_INSENSITIVE_ORDER =
new CaseInsensitiveOrderComparator();

private static final Comparator<RestAPIVersion> API_VERSION_ORDER =
new RestAPIVersion.RestAPIVersionComparator();

static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();

@Override
Expand All @@ -677,9 +675,13 @@ public int compare(
if (urlComparisonResult != 0) {
return urlComparisonResult;
} else {
return API_VERSION_ORDER.compare(
Collections.min(o1.f0.getSupportedAPIVersions()),
Collections.min(o2.f0.getSupportedAPIVersions()));
Collection<? extends RestAPIVersion> o1APIVersions =
o1.f0.getSupportedAPIVersions();
RestAPIVersion o1Version = Collections.min(o1APIVersions);
Collection<? extends RestAPIVersion> o2APIVersions =
o2.f0.getSupportedAPIVersions();
RestAPIVersion o2Version = Collections.min(o2APIVersions);
return o1Version.compareTo(o2Version);
}
}

Expand Down
Loading

0 comments on commit 8ff0c3f

Please sign in to comment.