From 8d0c4c0405a24e087e66d3a4bf07d1105eda2fc9 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Sun, 11 Jun 2017 11:02:38 +0200 Subject: [PATCH] [hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in CEP --- .../flink/api/common/typeutils/CompatibilityResult.java | 6 ++++-- .../src/main/java/org/apache/flink/cep/nfa/NFA.java | 2 +- .../main/java/org/apache/flink/cep/nfa/SharedBuffer.java | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java index 5ad0b5e0a9273..4c83dedfa4031 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java @@ -21,6 +21,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + /** * A {@code CompatibilityResult} contains information about whether or not data migration * is required in order to continue using new serializers for previously serialized data. @@ -60,10 +62,10 @@ public static CompatibilityResult compatible() { * * @return a result that signals migration is necessary, also providing a convert deserializer. */ - public static CompatibilityResult requiresMigration(TypeDeserializer convertDeserializer) { + public static CompatibilityResult requiresMigration(@Nonnull TypeDeserializer convertDeserializer) { Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null."); - return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer)); + return new CompatibilityResult<>(true, convertDeserializer); } /** diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index cac16015934c4..a6c5bdeba2ff1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -1108,7 +1108,7 @@ public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnaps } } - return CompatibilityResult.requiresMigration(null); + return CompatibilityResult.requiresMigration(); } private void serializeStates(Set> states, DataOutputView out) throws IOException { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index d592c659fb136..59474655bdfbe 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -1163,7 +1163,7 @@ public CompatibilityResult> ensureCompatibility(TypeSerialize } } - return CompatibilityResult.requiresMigration(null); + return CompatibilityResult.requiresMigration(); } }