Skip to content

Commit

Permalink
[hotfix][table-planner] Use jdk 8 and java time modules for optional …
Browse files Browse the repository at this point in the history
…ser/de

Signed-off-by: slinkydeveloper <[email protected]>
  • Loading branch information
slinkydeveloper authored and fapaul committed Feb 3, 2022
1 parent 46bb038 commit 2529767
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 161 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
Expand All @@ -60,7 +63,6 @@
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.time.Duration;
import java.util.Optional;

/** An utility class that provide abilities for JSON serialization and deserialization. */
Expand Down Expand Up @@ -93,6 +95,9 @@ public static boolean hasJsonCreatorAnnotation(Class<?> clazz) {
.getTypeFactory()
.withClassLoader(JsonSerdeUtil.class.getClassLoader()));
OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
OBJECT_MAPPER_INSTANCE.registerModule(new Jdk8Module().configureAbsentsAsNulls(true));
OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule());
OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
}

Expand Down Expand Up @@ -136,7 +141,6 @@ private static void registerSerializers(SimpleModule module) {
// RexNode is used in many exec nodes, so we register its serializer directly here
module.addSerializer(new RexNodeJsonSerializer());
module.addSerializer(new AggregateCallJsonSerializer());
module.addSerializer(new DurationJsonSerializer());
module.addSerializer(new ChangelogModeJsonSerializer());
module.addSerializer(new LogicalWindowJsonSerializer());
module.addSerializer(new RexWindowBoundJsonSerializer());
Expand Down Expand Up @@ -164,7 +168,6 @@ private static void registerDeserializers(SimpleModule module) {
// with RexLiteral instead of RexNode.
module.addDeserializer(RexLiteral.class, (StdDeserializer) new RexNodeJsonDeserializer());
module.addDeserializer(AggregateCall.class, new AggregateCallJsonDeserializer());
module.addDeserializer(Duration.class, new DurationJsonDeserializer());
module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
module.addDeserializer(LogicalWindow.class, new LogicalWindowJsonDeserializer());
module.addDeserializer(RexWindowBound.class, new RexWindowBoundJsonDeserializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.time.Duration;

import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_ALIAS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_FIELD_INDEX;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_FIELD_NAME;
Expand Down Expand Up @@ -83,7 +84,7 @@ public LogicalWindow deserialize(
if (isTimeTumblingWindow) {
Duration size =
deserializationContext.readValue(
jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
traverse(jsonNode.get(FIELD_NAME_SIZE), jsonParser.getCodec()),
Duration.class);
return new TumblingGroupWindow(
alias, timeField, new ValueLiteralExpression(size));
Expand All @@ -97,11 +98,11 @@ public LogicalWindow deserialize(
if (isTimeSlidingWindow) {
Duration size =
deserializationContext.readValue(
jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
traverse(jsonNode.get(FIELD_NAME_SIZE), jsonParser.getCodec()),
Duration.class);
Duration slide =
deserializationContext.readValue(
jsonNode.get(FIELD_NAME_SLIDE).traverse(jsonParser.getCodec()),
traverse(jsonNode.get(FIELD_NAME_SLIDE), jsonParser.getCodec()),
Duration.class);
return new SlidingGroupWindow(
alias,
Expand All @@ -120,7 +121,7 @@ public LogicalWindow deserialize(
case KIND_SESSION:
Duration gap =
deserializationContext.readValue(
jsonNode.get(FIELD_NAME_GAP).traverse(jsonParser.getCodec()),
traverse(jsonNode.get(FIELD_NAME_GAP), jsonParser.getCodec()),
Duration.class);
return new SessionGroupWindow(alias, timeField, new ValueLiteralExpression(gap));

Expand Down

This file was deleted.

0 comments on commit 2529767

Please sign in to comment.