Skip to content

Commit

Permalink
[FLINK-11753] [tests] Add hamcrest matchers for TypeSerializerSchemaC…
Browse files Browse the repository at this point in the history
…ompatibility
  • Loading branch information
Igal Shilman authored and tzulitai committed Feb 28, 2019
1 parent 0ab52e9 commit 4c6be5a
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher;

import java.util.function.Predicate;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link TypeSerializerSchemaCompatibility}.
*/
public final class TypeSerializerMatchers {

private TypeSerializerMatchers() {
}

// -------------------------------------------------------------------------------------------------------------
// Matcher Factories
// -------------------------------------------------------------------------------------------------------------

/**
* Matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}.
*
* @param <T> element type
* @return a {@code Matcher} that matches {@code compatibleAsIs} {@link TypeSerializerSchemaCompatibility}.
*/
public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleAsIs() {
return propertyMatcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs,
"type serializer schema that is a compatible as is");
}

/**
* Matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}.
*
* @param <T> element type
* @return a {@code Matcher} that matches {@code isIncompatible} {@link TypeSerializerSchemaCompatibility}.
*/
public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isIncompatible() {
return propertyMatcher(TypeSerializerSchemaCompatibility::isIncompatible,
"type serializer schema that is incompatible");
}

/**
* Matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}.
*
* @param <T> element type
* @return a {@code Matcher} that matches {@code isCompatibleAfterMigration} {@link TypeSerializerSchemaCompatibility}.
*/
public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleAfterMigration() {
return propertyMatcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration,
"type serializer schema that is compatible after migration");
}

/**
* Matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}.
*
* @param <T> element type
* @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}.
*/
public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleWithReconfiguredSerializer() {
@SuppressWarnings("unchecked") Matcher<TypeSerializer<T>> anything =
(Matcher<TypeSerializer<T>>) (Matcher<?>) CoreMatchers.anything();

return new CompatibleAfterReconfiguration<>(anything);
}

/**
* Matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}.
*
* @param reconfiguredSerializerMatcher matches the reconfigured serializer.
* @param <T> element type
* @return a {@code Matcher} that matches {@code isCompatibleWithReconfiguredSerializer} {@link TypeSerializerSchemaCompatibility}.
*/
public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> isCompatibleWithReconfiguredSerializer(
Matcher<? extends TypeSerializer<T>> reconfiguredSerializerMatcher) {

return new CompatibleAfterReconfiguration<>(reconfiguredSerializerMatcher);
}

/**
* Matches if the expected {@code TypeSerializerSchemaCompatibility} has the same compatibility as {@code expectedCompatibility}.
*
* @param expectedCompatibility the compatibility to match to.
* @param <T> element type.
* @return a {@code Matcher} that matches if it has the same compatibility as {@code expectedCompatibility}.
*/
public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityAs(
TypeSerializerSchemaCompatibility<T> expectedCompatibility) {

return new SchemaCompatibilitySameAs<>(expectedCompatibility);
}

// -------------------------------------------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------------------------------------------

private static <T> Matcher<T> propertyMatcher(Predicate<T> predicate, String matcherDescription) {
return new TypeSafeMatcher<T>() {

@Override
protected boolean matchesSafely(T item) {
return predicate.test(item);
}

@Override
public void describeTo(Description description) {
description.appendText(matcherDescription);
}
};
}

// -------------------------------------------------------------------------------------------------------------
// Matchers
// -------------------------------------------------------------------------------------------------------------

private static final class CompatibleAfterReconfiguration<T>
extends TypeSafeDiagnosingMatcher<TypeSerializerSchemaCompatibility<T>> {

private final Matcher<? extends TypeSerializer<T>> reconfiguredSerializerMatcher;

private CompatibleAfterReconfiguration(Matcher<? extends TypeSerializer<T>> reconfiguredSerializerMatcher) {
this.reconfiguredSerializerMatcher = checkNotNull(reconfiguredSerializerMatcher);
}

@Override
protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> item, Description mismatchDescription) {
if (!item.isCompatibleWithReconfiguredSerializer()) {
mismatchDescription.appendText("serializer schema is not compatible with a reconfigured serializer");
return false;
}
TypeSerializer<T> reconfiguredSerializer = item.getReconfiguredSerializer();
if (!reconfiguredSerializerMatcher.matches(reconfiguredSerializer)) {
reconfiguredSerializerMatcher.describeMismatch(reconfiguredSerializer, mismatchDescription);
return false;
}
return true;
}

@Override
public void describeTo(Description description) {
description.appendText("type serializer schema that is compatible after reconfiguration,")
.appendText("with a reconfigured serializer matching ")
.appendDescriptionOf(reconfiguredSerializerMatcher);
}
}

private static class SchemaCompatibilitySameAs<T> extends TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>> {

private final TypeSerializerSchemaCompatibility<T> expectedCompatibility;

private SchemaCompatibilitySameAs(TypeSerializerSchemaCompatibility<T> expectedCompatibility) {
this.expectedCompatibility = checkNotNull(expectedCompatibility);
}

@Override
protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
if (expectedCompatibility.isCompatibleAsIs()) {
return testResultCompatibility.isCompatibleAsIs();
}
else if (expectedCompatibility.isIncompatible()) {
return testResultCompatibility.isIncompatible();
}
else if (expectedCompatibility.isCompatibleAfterMigration()) {
return testResultCompatibility.isCompatibleAfterMigration();
}
else if (expectedCompatibility.isCompatibleWithReconfiguredSerializer()) {
return testResultCompatibility.isCompatibleWithReconfiguredSerializer();
}
return false;
}

@Override
public void describeTo(Description description) {
description.appendText("same compatibility as ").appendValue(expectedCompatibility);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.TestLogger;

import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -42,6 +40,7 @@
import java.util.List;
import java.util.function.Supplier;

import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.allOf;
Expand Down Expand Up @@ -286,7 +285,7 @@ public TestSpecification<T> withNewSerializerProvider(
Supplier<? extends TypeSerializer<T>> serializerProvider,
TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) {
this.serializerProvider = serializerProvider;
this.schemaCompatibilityMatcher = hasSameCompatibilityType(expectedCompatibilityResult);
this.schemaCompatibilityMatcher = hasSameCompatibilityAs(expectedCompatibilityResult);
return this;
}

Expand Down Expand Up @@ -524,32 +523,4 @@ private static String getSpecNameForVersion(String baseName, MigrationVersion te
protected interface TestResourceFilenameSupplier {
String get(MigrationVersion testVersion);
}

// --------------------------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------------------------

public static <T> Matcher<TypeSerializerSchemaCompatibility<T>> hasSameCompatibilityType(TypeSerializerSchemaCompatibility<T> expectedCompatibilty) {
return new TypeSafeMatcher<TypeSerializerSchemaCompatibility<T>>() {

@Override
protected boolean matchesSafely(TypeSerializerSchemaCompatibility<T> testResultCompatibility) {
if (expectedCompatibilty.isCompatibleAsIs()) {
return testResultCompatibility.isCompatibleAsIs();
} else if (expectedCompatibilty.isIncompatible()) {
return testResultCompatibility.isIncompatible();
} else if (expectedCompatibilty.isCompatibleAfterMigration()) {
return testResultCompatibility.isCompatibleAfterMigration();
} else if (expectedCompatibilty.isCompatibleWithReconfiguredSerializer()) {
return testResultCompatibility.isCompatibleWithReconfiguredSerializer();
}
return false;
}

@Override
public void describeTo(Description description) {
description.appendText("same compatibility as ").appendValue(expectedCompatibilty);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;

import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer;
import static org.apache.flink.api.common.typeutils.base.TestEnum.BAR;
import static org.apache.flink.api.common.typeutils.base.TestEnum.EMMA;
import static org.apache.flink.api.common.typeutils.base.TestEnum.FOO;
Expand Down Expand Up @@ -62,43 +63,25 @@ public static Collection<TestSpecification<?>> testSpecifications() {
EnumSerializer.class,
EnumSerializer.EnumSerializerSnapshot.class,
() -> new EnumSerializer(TestEnum.class),
hasExpectedEnumSerializer(previousEnumValues));
isCompatibleWithReconfiguredSerializer(enumSerializerWith(previousEnumValues))
);

return testSpecifications.get();
}

private static Matcher<? extends TypeSerializer<TestEnum>> enumSerializerWith(final TestEnum[] expectedEnumValues) {
return new TypeSafeMatcher<EnumSerializer<TestEnum>>() {

private static Matcher<TypeSerializerSchemaCompatibility<TestEnum>> hasExpectedEnumSerializer(
final TestEnum[] expectedEnumValues) {

return new TypeSafeDiagnosingMatcher<TypeSerializerSchemaCompatibility<TestEnum>>() {
@Override
protected boolean matchesSafely(
TypeSerializerSchemaCompatibility<TestEnum> item,
Description mismatchDescription) {
if (!item.isCompatibleWithReconfiguredSerializer()) {
mismatchDescription.appendText("compatibility mismatch ").appendValue(item);
return false;
}

EnumSerializer<TestEnum> reconfiguredSerialized =
(EnumSerializer<TestEnum>) item.getReconfiguredSerializer();

if (!Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues)) {
mismatchDescription
.appendText("reconfigured values are ")
.appendValueList("{", ", ", "}", reconfiguredSerialized.getValues());
return false;
}

return true;
protected boolean matchesSafely(EnumSerializer<TestEnum> reconfiguredSerialized) {
return Arrays.equals(reconfiguredSerialized.getValues(), expectedEnumValues);
}

@Override
public void describeTo(Description description) {
description
.appendText("EnumSerializer with values ")
.appendValueList("{", ", ", "}", expectedEnumValues);
.appendText("EnumSerializer with values ")
.appendValueList("{", ", ", "}", expectedEnumValues);
}
};
}
Expand Down
Loading

0 comments on commit 4c6be5a

Please sign in to comment.