Skip to content

Commit

Permalink
[FLINK-25971][table-planner] Hide JsonSerdeUtil#getObjectMapper
Browse files Browse the repository at this point in the history
This closes apache#18878.
  • Loading branch information
slinkydeveloper authored and twalthr committed Mar 9, 2022
1 parent 92a6850 commit 02e155e
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public static boolean hasJsonCreatorAnnotation(Class<?> clazz) {
/**
* Object mapper shared instance to serialize and deserialize the plan. Note that creating and
* copying of object mappers is expensive and should be avoided.
*
* <p>This is not exposed to avoid bad usages, like adding new modules. If you need to read and
* write json persisted plans, use {@link #createObjectWriter(SerdeContext)} and {@link
* #createObjectReader(SerdeContext)}.
*/
private static final ObjectMapper OBJECT_MAPPER_INSTANCE;

Expand All @@ -109,11 +113,6 @@ public static boolean hasJsonCreatorAnnotation(Class<?> clazz) {
OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
}

/** Get the {@link ObjectMapper} instance. */
public static ObjectMapper getObjectMapper() {
return OBJECT_MAPPER_INSTANCE;
}

public static ObjectReader createObjectReader(SerdeContext serdeContext) {
return OBJECT_MAPPER_INSTANCE
.reader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,20 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.io.IOException;
import java.io.StringWriter;

import static org.junit.Assert.assertEquals;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;

/** Tests for {@link ChangelogMode} serialization and deserialization. */
public class ChangelogModeJsonSerdeTest {
@Execution(ExecutionMode.CONCURRENT)
class ChangelogModeJsonSerdeTest {

@Test
public void testChangelogModeSerde() throws IOException {
ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();

void testChangelogModeSerde() throws IOException {
ChangelogMode changelogMode =
ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
Expand All @@ -46,12 +43,6 @@ public void testChangelogModeSerde() throws IOException {
.addContainedKind(RowKind.UPDATE_BEFORE)
.build();

StringWriter writer = new StringWriter(100);
try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
gen.writeObject(changelogMode);
}
String json = writer.toString();
ChangelogMode actual = mapper.readValue(json, ChangelogMode.class);
assertEquals(changelogMode, actual);
testJsonRoundTrip(changelogMode, ChangelogMode.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.utils.CatalogManagerMocks;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
Expand All @@ -55,7 +56,6 @@
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonDoesNotContain;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectWriter;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.getObjectMapper;
import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -632,13 +632,13 @@ void withPermanentTable() throws Exception {

private Tuple2<JsonNode, ContextResolvedTable> serDe(
SerdeContext serdeCtx, ContextResolvedTable contextResolvedTable) throws Exception {
// Serialize/Deserialize test
final byte[] actualSerialized =
createObjectWriter(serdeCtx).writeValueAsBytes(contextResolvedTable);
final JsonNode middleDeserialized = getObjectMapper().readTree(actualSerialized);

final ObjectReader objectReader = createObjectReader(serdeCtx);
final JsonNode middleDeserialized = objectReader.readTree(actualSerialized);
final ContextResolvedTable actualDeserialized =
createObjectReader(serdeCtx)
.readValue(actualSerialized, ContextResolvedTable.class);
objectReader.readValue(actualSerialized, ContextResolvedTable.class);

return Tuple2.of(middleDeserialized, actualDeserialized);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ void testFlinkVersions(FlinkVersion flinkVersion) throws IOException {

@Test
void testManualString() throws IOException {
final SerdeContext ctx = configuredSerdeContext();

final String flinkVersion = "1.15";

assertThat(toJson(configuredSerdeContext(), FlinkVersion.v1_15))
.isEqualTo(JsonSerdeUtil.getObjectMapper().writeValueAsString(flinkVersion));
assertThat(toJson(ctx, FlinkVersion.v1_15))
.isEqualTo(JsonSerdeUtil.createObjectWriter(ctx).writeValueAsString(flinkVersion));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,28 @@

import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;

/** Tests for {@link InputProperty} serialization and deserialization. */
@RunWith(Parameterized.class)
public class InputPropertySerdeTest {
@Parameterized.Parameter public InputProperty inputProperty;

@Test
public void testExecEdgeSerde() throws IOException {
ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();
@Execution(ExecutionMode.CONCURRENT)
class InputPropertySerdeTest {

StringWriter writer = new StringWriter(100);
try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
gen.writeObject(inputProperty);
}
String json = writer.toString();
InputProperty actual = mapper.readValue(json, InputProperty.class);
assertEquals(inputProperty, actual);
@ParameterizedTest
@MethodSource("testExecEdgeSerde")
void testExecEdgeSerde(InputProperty inputProperty) throws IOException {
testJsonRoundTrip(inputProperty, InputProperty.class);
}

@Parameterized.Parameters(name = "{0}")
public static List<InputProperty> testData() {
return Arrays.asList(
public static Stream<InputProperty> testExecEdgeSerde() {
return Stream.of(
InputProperty.DEFAULT,
InputProperty.builder()
.requiredDistribution(InputProperty.hashDistribution(new int[] {0, 1}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,24 @@
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;

/** Tests for {@link IntervalJoinSpec} serialization and deserialization. */
@Execution(ExecutionMode.CONCURRENT)
public class IntervalJoinSpecJsonSerdeTest {

private final ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();

@Test
public void testWindowBoundsSerde() throws IOException {
IntervalJoinSpec.WindowBounds windowBounds =
new IntervalJoinSpec.WindowBounds(true, 0L, 10L, 1, 2);
assertEquals(
windowBounds,
mapper.readValue(
mapper.writeValueAsString(windowBounds),
IntervalJoinSpec.WindowBounds.class));

testJsonRoundTrip(windowBounds, IntervalJoinSpec.WindowBounds.class);
}

@Test
Expand All @@ -58,8 +54,7 @@ public void testIntervalJoinSpecSerde() throws IOException {
IntervalJoinSpec.WindowBounds windowBounds =
new IntervalJoinSpec.WindowBounds(true, 0L, 10L, 1, 2);
IntervalJoinSpec actual = new IntervalJoinSpec(joinSpec, windowBounds);
assertEquals(
actual,
mapper.readValue(mapper.writeValueAsString(actual), IntervalJoinSpec.class));

testJsonRoundTrip(actual, IntervalJoinSpec.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,20 @@
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.io.IOException;
import java.io.StringWriter;

import static org.junit.Assert.assertEquals;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;

/** Tests for {@link JoinSpec} serialization and deserialization. */
public class JoinSpecJsonSerdeTest {

private final ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();
@Execution(ExecutionMode.CONCURRENT)
class JoinSpecJsonSerdeTest {

@Test
public void testJoinSpecSerde() throws IOException {
void testJoinSpecSerde() throws IOException {
JoinSpec joinSpec =
new JoinSpec(
FlinkJoinType.ANTI,
Expand All @@ -46,12 +43,6 @@ public void testJoinSpecSerde() throws IOException {
new boolean[] {true},
null);

StringWriter writer = new StringWriter(100);
try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
gen.writeObject(joinSpec);
}
String json = writer.toString();
JoinSpec actual = mapper.readValue(json, JoinSpec.class);
assertEquals(joinSpec, actual);
testJsonRoundTrip(joinSpec, JoinSpec.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.planner.expressions.RexNodeExpression;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;

import org.apache.calcite.rex.RexBuilder;
Expand All @@ -54,6 +55,7 @@
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.assertThatJsonDoesNotContain;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.configuredSerdeContext;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
Expand Down Expand Up @@ -136,15 +138,16 @@ void testDontSerializeOptions() throws IOException {
JsonSerdeUtil.createObjectWriter(serdeCtx)
.withAttribute(ResolvedCatalogTableJsonSerializer.SERIALIZE_OPTIONS, false)
.writeValueAsBytes(FULL_RESOLVED_CATALOG_TABLE);
JsonNode actualJson = JsonSerdeUtil.getObjectMapper().readTree(actualSerialized);

final ObjectReader objectReader = createObjectReader(serdeCtx);
JsonNode actualJson = objectReader.readTree(actualSerialized);
assertThatJsonContains(actualJson, ResolvedCatalogTableJsonSerializer.RESOLVED_SCHEMA);
assertThatJsonContains(actualJson, ResolvedCatalogTableJsonSerializer.PARTITION_KEYS);
assertThatJsonDoesNotContain(actualJson, ResolvedCatalogTableJsonSerializer.OPTIONS);
assertThatJsonDoesNotContain(actualJson, ResolvedCatalogTableJsonSerializer.COMMENT);

ResolvedCatalogTable actual =
JsonSerdeUtil.createObjectReader(serdeCtx)
.readValue(actualSerialized, ResolvedCatalogTable.class);
objectReader.readValue(actualSerialized, ResolvedCatalogTable.class);

assertThat(actual)
.isEqualTo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.toJson;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectReader;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.createObjectWriter;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.getObjectMapper;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CLASS;
import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
Expand Down Expand Up @@ -760,7 +759,7 @@ private JsonNode serializePermanentFunction(SerdeContext serdeContext) throws Ex
final byte[] actualSerialized =
createObjectWriter(serdeContext)
.writeValueAsBytes(createFunctionCall(serdeContext, PERMANENT_FUNCTION));
return getObjectMapper().readTree(actualSerialized);
return createObjectReader(serdeContext).readTree(actualSerialized);
}

private ContextResolvedFunction deserialize(SerdeContext serdeContext, JsonNode node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@
package org.apache.flink.table.planner.utils;

import org.apache.flink.FlinkVersion;
import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;

/** This class contains a collection of generic utilities to deal with JSON in tests. */
public final class JsonTestUtils {

private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new ObjectMapper();

private JsonTestUtils() {}

public static JsonNode readFromResource(String path) throws IOException {
return JsonSerdeUtil.getObjectMapper().readTree(JsonTestUtils.class.getResource(path));
return OBJECT_MAPPER_INSTANCE.readTree(JsonTestUtils.class.getResource(path));
}

public static JsonNode setFlinkVersion(JsonNode target, FlinkVersion flinkVersion) {
return ((ObjectNode) target)
.set("flinkVersion", JsonSerdeUtil.getObjectMapper().valueToTree(flinkVersion));
.set("flinkVersion", OBJECT_MAPPER_INSTANCE.valueToTree(flinkVersion.toString()));
}

public static JsonNode clearFlinkVersion(JsonNode target) {
Expand Down

0 comments on commit 02e155e

Please sign in to comment.