Skip to content

Commit

Permalink
[FLINK-18586][table-common] Simplify the creation of explicit structu…
Browse files Browse the repository at this point in the history
…red types

This closes apache#12887.
  • Loading branch information
twalthr committed Jul 14, 2020
1 parent c7beb3f commit 104775e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.CollectionDataType;
Expand Down Expand Up @@ -52,6 +53,8 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
Expand All @@ -74,6 +77,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass;

/**
* A {@link DataType} can be used to declare input and/or output types of operations. This class
* enumerates all pre-defined data types of the Table & SQL API.
Expand Down Expand Up @@ -786,6 +791,54 @@ public static <T> DataType RAW(TypeInformation<T> typeInformation) {
return new AtomicDataType(new TypeInformationRawType<>(typeInformation));
}

/**
* Data type of a user-defined object structured type. Structured types contain zero, one or more
* attributes. Each attribute consists of a name and a type. A type cannot be defined so that one of
* its attribute types (transitively) uses itself.
*
* <p>There are two kinds of structured types. Types that are stored in a catalog and are identified
* by an {@link ObjectIdentifier} or anonymously defined, unregistered types (usually reflectively
* extracted) that are identified by an implementation {@link Class}.
*
* <p>This method helps in manually constructing anonymous, unregistered types. This is useful in
* cases where the reflective extraction using {@link DataTypes#of(Class)} is not applicable. However,
* {@link DataTypes#of(Class)} is the recommended way of creating inline structured types as it also
* considers {@link DataTypeHint}s.
*
* <p>Structured types are converted to internal data structures by the runtime. The given implementation
* class is only used at the edges of the table ecosystem (e.g. when bridging to a function or connector).
* Serialization and equality ({@code hashCode/equals}) are handled by the runtime based on the logical
* type. An implementation class must offer a default constructor with zero arguments or a full constructor
* that assigns all attributes.
*
* <p>Note: A caller of this method must make sure that the {@link DataType#getConversionClass()} of the
* given fields matches with the attributes of the given implementation class, otherwise an exception
* might be thrown during runtime.
*
* @see DataTypes#of(Class)
* @see StructuredType
*/
public static <T> DataType STRUCTURED(Class<T> implementationClass, Field... fields) {
// some basic validation of the class to prevent common mistakes
validateStructuredClass(implementationClass);

final StructuredType.Builder builder = StructuredType.newBuilder(implementationClass);
final List<StructuredAttribute> attributes = Stream.of(fields)
.map(f ->
new StructuredAttribute(
f.getName(),
f.getDataType().getLogicalType(),
f.getDescription().orElse(null)))
.collect(Collectors.toList());
builder.attributes(attributes);
builder.setFinal(true);
builder.setInstantiable(true);
final List<DataType> fieldDataTypes = Stream.of(fields)
.map(DataTypes.Field::getDataType)
.collect(Collectors.toList());
return new FieldsDataType(builder.build(), implementationClass, fieldDataTypes);
}

// --------------------------------------------------------------------------------------------
// Helper functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
import org.apache.flink.table.types.utils.ClassDataTypeConverter;
import org.apache.flink.types.Row;

Expand Down Expand Up @@ -515,20 +512,11 @@ else if (constructor == null && !hasInvokableConstructor(clazz)) {
type,
fields);

final List<StructuredAttribute> attributes = createStructuredTypeAttributes(
final DataTypes.Field[] attributes = createStructuredTypeAttributes(
constructor,
fieldDataTypes);

final StructuredType.Builder builder = StructuredType.newBuilder(clazz);
builder.attributes(attributes);
builder.setFinal(true); // anonymous structured types should not allow inheritance
builder.setInstantiable(true);
return new FieldsDataType(
builder.build(),
clazz,
attributes.stream()
.map(a -> fieldDataTypes.get(a.getName()))
.collect(Collectors.toList()));
return DataTypes.STRUCTURED(clazz, attributes);
}

private Map<String, DataType> extractStructuredTypeFields(
Expand Down Expand Up @@ -560,7 +548,7 @@ private Map<String, DataType> extractStructuredTypeFields(
return fieldDataTypes;
}

private List<StructuredAttribute> createStructuredTypeAttributes(
private DataTypes.Field[] createStructuredTypeAttributes(
ExtractionUtils.AssigningConstructor constructor,
Map<String, DataType> fieldDataTypes) {
return Optional.ofNullable(constructor)
Expand All @@ -572,11 +560,8 @@ private List<StructuredAttribute> createStructuredTypeAttributes(
// field order is sorted
return fieldDataTypes.keySet().stream().sorted();
})
.map(name -> {
final LogicalType logicalType = fieldDataTypes.get(name).getLogicalType();
return new StructuredAttribute(name, logicalType);
})
.collect(Collectors.toList());
.map(name -> DataTypes.FIELD(name, fieldDataTypes.get(name)))
.toArray(DataTypes.Field[]::new);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,23 @@ public static String createMethodSignatureString(
return builder.toString();
}

/**
* Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
*/
public static void validateStructuredClass(Class<?> clazz) {
final int m = clazz.getModifiers();
if (Modifier.isAbstract(m)) {
throw extractionError("Class '%s' must not be abstract.", clazz.getName());
}
if (!Modifier.isPublic(m)) {
throw extractionError("Class '%s' is not public.", clazz.getName());
}
if (clazz.getEnclosingClass() != null &&
(clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
throw extractionError("Class '%s' is a not a static, globally accessible class.", clazz.getName());
}
}

/**
* Returns the field of a structured type. The logic is as broad as possible to support
* both Java and Scala in different flavors.
Expand Down Expand Up @@ -433,23 +450,6 @@ private static boolean typeVariableEquals(TypeVariable<?> variable, TypeVariable
currentVariable.getName().equals(variable.getName());
}

/**
* Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
*/
static void validateStructuredClass(Class<?> clazz) {
final int m = clazz.getModifiers();
if (Modifier.isAbstract(m)) {
throw extractionError("Class '%s' must not be abstract.", clazz.getName());
}
if (!Modifier.isPublic(m)) {
throw extractionError("Class '%s' is not public.", clazz.getName());
}
if (clazz.getEnclosingClass() != null &&
(clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
throw extractionError("Class '%s' is a not a static, globally accessible class.", clazz.getName());
}
}

/**
* Validates if a given type is not already contained in the type hierarchy of a structured type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,15 @@ public static List<TestSpec> testData() {
.forUnresolvedDataType(RAW(Object.class))
.expectUnresolvedString("[RAW('java.lang.Object', '?')]")
.lookupReturns(DataTypes.RAW(new GenericTypeInfo<>(Object.class)))
.expectResolvedDataType(DataTypes.RAW(new GenericTypeInfo<>(Object.class)))
.expectResolvedDataType(DataTypes.RAW(new GenericTypeInfo<>(Object.class))),

TestSpec
.forUnresolvedDataType(DataTypes.of(SimplePojo.class))
.expectResolvedDataType(
DataTypes.STRUCTURED(
SimplePojo.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class))))
);
}

Expand Down Expand Up @@ -475,4 +483,21 @@ public String toString() {
return abstractDataType.toString();
}
}

// --------------------------------------------------------------------------------------------
// Helper classes
// --------------------------------------------------------------------------------------------

/**
* Simple POJO for testing.
*/
public static class SimplePojo {
public final String name;
public final int count;

public SimplePojo(String name, int count) {
this.name = name;
this.count = count;
}
}
}

0 comments on commit 104775e

Please sign in to comment.