Skip to content

Commit

Permalink
[FLINK-17882][table-common] Check for self references in structured t…
Browse files Browse the repository at this point in the history
…ypes

This closes apache#12294.
  • Loading branch information
twalthr committed May 28, 2020
1 parent aedb406 commit 14e11c7
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.flink.table.types.extraction.ExtractionUtils.toClass;
import static org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass;
import static org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability;
import static org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredSelfReference;

/**
* Reflection-based utility that analyzes a given {@link java.lang.reflect.Type}, method, or class to
Expand Down Expand Up @@ -453,6 +454,7 @@ private DataType extractStructuredType(DataTypeTemplate template, List<Type> typ
}

validateStructuredClass(clazz);
validateStructuredSelfReference(type, typeHierarchy);

final List<Field> fields = collectStructuredFields(clazz);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,21 @@ static void validateStructuredClass(Class<?> clazz) {
}
}

/**
* Validates if a given type is not already contained in the type hierarchy of a structured type.
*
* <p>Otherwise this would lead to infinite data type extraction cycles.
*/
static void validateStructuredSelfReference(Type t, List<Type> typeHierarchy) {
final Class<?> clazz = toClass(t);
if (clazz != null && !clazz.isInterface() && clazz != Object.class && typeHierarchy.contains(t)) {
throw extractionError(
"Cyclic reference detected for class '%s'. Attributes of structured types must not " +
"(transitively) reference the structured type itself.",
clazz.getName());
}
}

/**
* Returns the fields of a class for a {@link StructuredType}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
import org.apache.flink.table.types.logical.TypeInformationRawType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
Expand Down Expand Up @@ -71,6 +72,7 @@
* Tests for {@link DataTypeExtractor}.
*/
@RunWith(Parameterized.class)
@SuppressWarnings("unused")
public class DataTypeExtractorTest {

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -398,12 +400,28 @@ public Class<?> bridgedTo() {
.forMethodOutput(IntegerVarArg.class)
.expectDataType(DataTypes.INT()),

// structured type with invalid constructor
TestSpec
.forType(SimplePojoWithInvalidConstructor.class)
.forType(
"Structured type with invalid constructor",
SimplePojoWithInvalidConstructor.class)
.expectErrorMessage(
"Class '" + SimplePojoWithInvalidConstructor.class.getName() + "' has neither a " +
"constructor that assigns all fields nor a default constructor.")
"constructor that assigns all fields nor a default constructor."),

TestSpec
.forType(
"Structured type with self reference",
PojoWithInvalidSelfReference.class)
.expectErrorMessage(
"Cyclic reference detected for class '" + PojoWithInvalidSelfReference.class.getName() + "'. Attributes " +
"of structured types must not (transitively) reference the structured type itself."),

TestSpec
.forType(
"Structured type with self reference that is avoided using RAW",
PojoWithRawSelfReference.class)
.lookupExpects(PojoWithRawSelfReference.class)
.expectDataType(getPojoWithRawSelfReferenceDataType())
);
}

Expand Down Expand Up @@ -550,10 +568,10 @@ static DataType getSimplePojoDataType(Class<?> simplePojoClass) {
final StructuredType.Builder builder = StructuredType.newBuilder(simplePojoClass);
builder.attributes(
Arrays.asList(
new StructuredType.StructuredAttribute("intField", new IntType(true)),
new StructuredType.StructuredAttribute("primitiveBooleanField", new BooleanType(false)),
new StructuredType.StructuredAttribute("primitiveIntField", new IntType(false)),
new StructuredType.StructuredAttribute("stringField", new VarCharType(VarCharType.MAX_LENGTH))));
new StructuredAttribute("intField", new IntType(true)),
new StructuredAttribute("primitiveBooleanField", new BooleanType(false)),
new StructuredAttribute("primitiveIntField", new IntType(false)),
new StructuredAttribute("stringField", new VarCharType(VarCharType.MAX_LENGTH))));
builder.setFinal(true);
builder.setInstantiable(true);
final StructuredType structuredType = builder.build();
Expand All @@ -575,13 +593,13 @@ static DataType getComplexPojoDataType(Class<?> complexPojoClass, Class<?> simpl
final StructuredType.Builder builder = StructuredType.newBuilder(complexPojoClass);
builder.attributes(
Arrays.asList(
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"mapField",
new MapType(new VarCharType(VarCharType.MAX_LENGTH), new IntType())),
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"simplePojoField",
getSimplePojoDataType(simplePojoClass).getLogicalType()),
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"someObject",
new TypeInformationRawType<>(new GenericTypeInfo<>(Object.class)))));
builder.setFinal(true);
Expand All @@ -604,13 +622,13 @@ public static DataType getPojoWithCustomOrderDataType(Class<?> pojoClass) {
final StructuredType.Builder builder = StructuredType.newBuilder(pojoClass);
builder.attributes(
Arrays.asList(
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"z",
new BigIntType()),
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"y",
new BooleanType()),
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"x",
new IntType())));
builder.setFinal(true);
Expand All @@ -630,10 +648,10 @@ private static DataType getOuterTupleDataType() {
final StructuredType.Builder builder = StructuredType.newBuilder(Tuple2.class);
builder.attributes(
Arrays.asList(
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"f0",
new IntType()),
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"f1",
getInnerTupleDataType().getLogicalType())));
builder.setFinal(true);
Expand All @@ -652,10 +670,10 @@ private static DataType getInnerTupleDataType() {
final StructuredType.Builder builder = StructuredType.newBuilder(Tuple2.class);
builder.attributes(
Arrays.asList(
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"f0",
new VarCharType(VarCharType.MAX_LENGTH)),
new StructuredType.StructuredAttribute(
new StructuredAttribute(
"f1",
new BooleanType())));
builder.setFinal(true);
Expand All @@ -670,6 +688,28 @@ private static DataType getInnerTupleDataType() {
return new FieldsDataType(structuredType, Tuple2.class, fieldDataTypes);
}

private static DataType getPojoWithRawSelfReferenceDataType() {
final StructuredType.Builder builder = StructuredType.newBuilder(PojoWithRawSelfReference.class);
builder.attributes(
Arrays.asList(
new StructuredAttribute(
"integer",
new IntType()),
new StructuredAttribute(
"reference",
new TypeInformationRawType<>(new GenericTypeInfo<>(PojoWithRawSelfReference.class)))));
builder.setFinal(true);
builder.setInstantiable(true);
final StructuredType structuredType = builder.build();

final List<DataType> fieldDataTypes = Arrays.asList(
DataTypes.INT(),
DataTypes.RAW(new GenericTypeInfo<>(PojoWithRawSelfReference.class))
);

return new FieldsDataType(structuredType, PojoWithRawSelfReference.class, fieldDataTypes);
}

// --------------------------------------------------------------------------------------------
// Test classes for extraction
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -970,4 +1010,32 @@ public SimplePojoWithInvalidConstructor(Integer intField) {
this.intField = intField;
}
}

// --------------------------------------------------------------------------------------------

/**
* Self reference in attribute.
*/
public static class PojoWithInvalidSelfReference {
public Integer integer;
public PojoWithInvalidSelfReferenceNested nestedPojo;
}

/**
* Nested POJO for self reference test.
*/
public static class PojoWithInvalidSelfReferenceNested {
public PojoWithInvalidSelfReference reference;
}

// --------------------------------------------------------------------------------------------

/**
* Self reference in attribute that is fixed with RAW type.
*/
public static class PojoWithRawSelfReference {
public Integer integer;
@DataTypeHint(value = "RAW", bridgedTo = PojoWithRawSelfReference.class)
public PojoWithRawSelfReference reference;
}
}

0 comments on commit 14e11c7

Please sign in to comment.