Skip to content

Commit

Permalink
[FLINK-33268][rest] Skip unknown fields in REST response deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi authored Jan 15, 2024
1 parent 96eea96 commit 19cb9de
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class RestClient implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);

private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
private static final ObjectMapper flexibleObjectMapper =
RestMapperUtils.getFlexibleObjectMapper();

// used to open connections to a rest server endpoint
private final Executor executor;
Expand Down Expand Up @@ -632,35 +634,34 @@ private static <P extends ResponseBody> CompletableFuture<P> parseResponse(
CompletableFuture<P> responseFuture = new CompletableFuture<>();
final JsonParser jsonParser = objectMapper.treeAsTokens(rawResponse.json);
try {
P response = objectMapper.readValue(jsonParser, responseType);
responseFuture.complete(response);
} catch (IOException originalException) {
// the received response did not matched the expected response type

// lets see if it is an ErrorResponse instead
try {
// We make sure it fits to ErrorResponseBody, this condition is enforced by test in
// RestClientTest
if (rawResponse.json.size() == 1 && rawResponse.json.has("errors")) {
ErrorResponseBody error =
objectMapper.treeToValue(rawResponse.getJson(), ErrorResponseBody.class);
responseFuture.completeExceptionally(
new RestClientException(
error.errors.toString(), rawResponse.getHttpResponseStatus()));
} catch (JsonProcessingException jpe2) {
// if this fails it is either the expected type or response type was wrong, most
// likely caused
// by a client/search MessageHeaders mismatch
LOG.error(
"Received response was neither of the expected type ({}) nor an error. Response={}",
responseType,
rawResponse,
jpe2);
responseFuture.completeExceptionally(
new RestClientException(
"Response was neither of the expected type("
+ responseType
+ ") nor an error.",
originalException,
rawResponse.getHttpResponseStatus()));
} else {
P response = flexibleObjectMapper.readValue(jsonParser, responseType);
responseFuture.complete(response);
}
} catch (IOException ex) {
// if this fails it is either the expected type or response type was wrong, most
// likely caused
// by a client/search MessageHeaders mismatch
LOG.error(
"Received response was neither of the expected type ({}) nor an error. Response={}",
responseType,
rawResponse,
ex);
responseFuture.completeExceptionally(
new RestClientException(
"Response was neither of the expected type("
+ responseType
+ ") nor an error.",
ex,
rawResponse.getHttpResponseStatus()));
}
return responseFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,36 @@

/** This class contains utilities for mapping requests and responses to/from JSON. */
public class RestMapperUtils {
private static final ObjectMapper objectMapper;
private static final ObjectMapper strictObjectMapper;
private static final ObjectMapper flexibleObjectMapper;

static {
objectMapper = JacksonMapperFactory.createObjectMapper();
objectMapper.enable(
strictObjectMapper = JacksonMapperFactory.createObjectMapper();
strictObjectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
strictObjectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

flexibleObjectMapper = strictObjectMapper.copy();
flexibleObjectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}

/**
* Returns a preconfigured {@link ObjectMapper}.
* Returns a preconfigured strict {@link ObjectMapper}.
*
* @return preconfigured object mapper
*/
public static ObjectMapper getStrictObjectMapper() {
return objectMapper;
return strictObjectMapper;
}

/**
* Returns a preconfigured flexible {@link ObjectMapper}.
*
* @return preconfigured object mapper
*/
public static ObjectMapper getFlexibleObjectMapper() {
return flexibleObjectMapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand Down Expand Up @@ -56,24 +57,54 @@ class JobDetailsTest {
+ " \"reconciling\" : 3"
+ " }"
+ "}";
private static final String UNKNOWN_FIELD_JOB_DETAILS =
"{"
+ " \"jid\" : \"7a7c3291accebd10b6be8d4f8c8d8dfc\","
+ " \"intentionally_unknown_which_must_be_skipped\" : 0,"
+ " \"name\" : \"foobar\","
+ " \"state\" : \"RUNNING\","
+ " \"start-time\" : 1,"
+ " \"end-time\" : 10,"
+ " \"duration\" : 9,"
+ " \"last-modification\" : 8,"
+ " \"tasks\" : {"
+ " \"total\" : 42,"
+ " \"created\" : 1,"
+ " \"scheduled\" : 3,"
+ " \"deploying\" : 3,"
+ " \"running\" : 4,"
+ " \"finished\" : 7,"
+ " \"canceling\" : 4,"
+ " \"canceled\" : 2,"
+ " \"failed\" : 7,"
+ " \"reconciling\" : 3"
+ " }"
+ "}";

private ObjectMapper objectMapper;
private ObjectMapper flexibleObjectMapper;

final JobDetails expected =
new JobDetails(
JobID.fromHexString("7a7c3291accebd10b6be8d4f8c8d8dfc"),
"foobar",
1L,
10L,
9L,
JobStatus.RUNNING,
8L,
new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 0},
42);

@BeforeEach
public void beforeEach() {
objectMapper = RestMapperUtils.getStrictObjectMapper();
flexibleObjectMapper = RestMapperUtils.getFlexibleObjectMapper();
}

/** Tests that we can marshal and unmarshal JobDetails instances. */
@Test
void testJobDetailsMarshalling() throws JsonProcessingException {
final JobDetails expected =
new JobDetails(
new JobID(),
"foobar",
1L,
10L,
9L,
JobStatus.RUNNING,
8L,
new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
42);

final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

final JsonNode marshalled = objectMapper.valueToTree(expected);

final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
Expand All @@ -83,23 +114,17 @@ void testJobDetailsMarshalling() throws JsonProcessingException {

@Test
void testJobDetailsCompatibleUnmarshalling() throws IOException {
final JobDetails expected =
new JobDetails(
JobID.fromHexString("7a7c3291accebd10b6be8d4f8c8d8dfc"),
"foobar",
1L,
10L,
9L,
JobStatus.RUNNING,
8L,
new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 0},
42);

final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

final JobDetails unmarshalled =
objectMapper.readValue(COMPATIBLE_JOB_DETAILS, JobDetails.class);

assertThat(unmarshalled).isEqualTo(expected);
}

@Test
void testJobDetailsCompatibleUnmarshallingSkipUnknown() throws IOException {
final JobDetails unmarshalled =
flexibleObjectMapper.readValue(UNKNOWN_FIELD_JOB_DETAILS, JobDetails.class);

assertThat(unmarshalled).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.testutils.TestingUtils;
Expand All @@ -33,6 +34,7 @@
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.CheckedSupplier;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory;
Expand All @@ -45,14 +47,19 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -70,6 +77,22 @@ class RestClientTest {

private static final long TIMEOUT = 10L;

@Test
void testErrorResponseBodyHasSpecificStrictureForErrorHandling() {
List<Field> annotatedFields =
Arrays.stream(ErrorResponseBody.class.getDeclaredFields())
.filter(
field ->
Arrays.stream(field.getDeclaredAnnotations())
.map(Annotation::annotationType)
.anyMatch(c -> c.equals(JsonProperty.class)))
.collect(Collectors.toList());
assertThat(annotatedFields).hasSize(1);

Field field = annotatedFields.get(0);
assertThat(field.getName().equals("errors") && field.getType().equals(List.class)).isTrue();
}

@Test
void testConnectionTimeout() throws Exception {
final Configuration config = new Configuration();
Expand Down

0 comments on commit 19cb9de

Please sign in to comment.