diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java new file mode 100644 index 0000000000000..975d6e36942f2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +/** + * A utility class for describing generic types. It can be used to obtain a type information via: + * + *
{@code
+ * TypeInformation> info = TypeInformation.of(new TypeHint>(){});
+ * }
+ * or + *
{@code
+ * TypeInformation> info = new TypeHint>(){}.getTypeInfo();
+ * }
+ * + * @param The type information to hint. + */ +@Public +public abstract class TypeHint { + + /** The type information described by the hint */ + private final TypeInformation typeInfo; + + /** + * Creates a hint for the generic type in the class signature. + */ + public TypeHint() { + this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0); + } + + // ------------------------------------------------------------------------ + + /** + * Gets the type information described by this TypeHint. + * @return The type information described by this TypeHint. + */ + public TypeInformation getTypeInfo() { + return typeInfo; + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return typeInfo.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj == this || + obj instanceof TypeHint && this.typeInfo.equals(((TypeHint) obj).typeInfo); + } + + @Override + public String toString() { + return "TypeHint: " + typeInfo; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 1c95be01b8be0..95eed6be27224 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -22,9 +22,10 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; import java.io.Serializable; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; /** @@ -128,7 +129,7 @@ public abstract class TypeInformation implements Serializable { @PublicEvolving public List> getGenericParameters() { // Return an empty list as the default implementation - return new LinkedList<>(); + return Collections.emptyList(); } /** @@ -175,4 +176,39 @@ public boolean isSortKeyType() { * @return true if obj can be equaled with this, otherwise false */ public abstract boolean canEqual(Object obj); + + // ------------------------------------------------------------------------ + + /** + * Creates a TypeInformation for the type described by the given class. + * + *

This method only works for non-generic types. For generic types, use the + * {@link #of(TypeHint)} method. + * + * @param typeClass The class of the type. + * @param The generic type. + * + * @return The TypeInformation object for the type described by the hint. + */ + public static TypeInformation of(Class typeClass) { + return TypeExtractor.createTypeInfo(typeClass); + } + + /** + * Creates a TypeInformation for a generic type via a utility "type hint". + * This method can be used as follows: + *

+	 * {@code
+	 * TypeInformation> info = TypeInformation.of(new TypeHint>(){});
+	 * }
+	 * 
+ * + * @param typeHint The hint for the generic type. + * @param The generic type. + * + * @return The TypeInformation object for the type described by the hint. + */ + public static TypeInformation of(TypeHint typeHint) { + return typeHint.getTypeInfo(); + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java new file mode 100644 index 0000000000000..60232f201d728 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeHintTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TypeHintTest { + + @Test + public void testTypeInfoDirect() { + + // simple (non-generic case) + TypeHint stringInfo1 = new TypeHint(){}; + TypeHint stringInfo2 = new TypeHint(){}; + + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, stringInfo1.getTypeInfo()); + + assertTrue(stringInfo1.hashCode() == stringInfo2.hashCode()); + assertTrue(stringInfo1.equals(stringInfo2)); + assertTrue(stringInfo1.toString().equals(stringInfo2.toString())); + + // generic case + TypeHint> generic = new TypeHint>(){}; + + TypeInformation> tupleInfo = + new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + + assertEquals(tupleInfo, generic.getTypeInfo()); + } + + @Test + public void testTypeInfoOf() { + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(String.class)); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint(){})); + + + TypeInformation> tupleInfo = + new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO); + + assertEquals(tupleInfo, TypeInformation.of(new TypeHint>(){})); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index c31592020263c..b186c3c776926 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -104,7 +104,6 @@ * * @param The type of the DataSet, i.e., the type of the elements of the DataSet. */ - @Public public abstract class DataSet { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java index 22cf0892fa859..eb485fe764714 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java @@ -26,18 +26,21 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import static java.util.Objects.requireNonNull; + /** * The SingleInputUdfOperator is the base class of all unary operators that execute * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that @@ -185,6 +188,92 @@ public O withForwardedFields(String... forwardedFields) { O returnType = (O) this; return returnType; } + + // ------------------------------------------------------------------------ + // type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

Classes can be used as type hints for non-generic types (classes without generic parameters), + * but not for generic types like for example Tuples. For those generic types, please + * use the {@link #returns(TypeHint)} method. + * + *

Use this method the following way: + *

{@code
+	 *     DataSet result =
+	 *         data.flatMap(new FunctionWithNonInferrableReturnType())
+	 *             .returns(String[].class);
+	 * }
+ * + * @param typeClass The class of the returned data type. + * @return This operator with the type information corresponding to the given type class. + */ + public O returns(Class typeClass) { + requireNonNull(typeClass, "type class must not be null"); + + try { + return returns(TypeInformation.of(typeClass)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the class alone." + + "This is most likely because the class represents a generic type. In that case," + + "please use the 'returns(TypeHint)' method instead.", e); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

Use this method the following way: + *

{@code
+	 *     DataSet> result =
+	 *         data.flatMap(new FunctionWithNonInferrableReturnType())
+	 *             .returns(new TypeHint>(){});
+	 * }
+ * + * @param typeHint The type hint for the returned data type. + * @return This operator with the type information corresponding to the given type hint. + */ + public O returns(TypeHint typeHint) { + requireNonNull(typeHint, "TypeHint must not be null"); + + try { + return returns(TypeInformation.of(typeHint)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the type hint. " + + "Make sure that the TypeHint does not use any generic type variables."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} + * are preferable. + * + * @param typeInfo The type information for the returned data type. + * @return This operator using the given type information for the return type. + */ + public O returns(TypeInformation typeInfo) { + requireNonNull(typeInfo, "TypeInformation must not be null"); + + fillInType(typeInfo); + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } /** * Adds a type information hint about the return type of this operator. @@ -220,7 +309,11 @@ public O withForwardedFields(String... forwardedFields) { * @param typeInfoString * type information string to be parsed * @return This operator with a given return type hint. + * + * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead. */ + @Deprecated + @PublicEvolving public O returns(String typeInfoString) { if (typeInfoString == null) { throw new IllegalArgumentException("Type information string must not be null."); @@ -228,78 +321,6 @@ public O returns(String typeInfoString) { return returns(TypeInfoParser.parse(typeInfoString)); } - /** - * Adds a type information hint about the return type of this operator. - * - *

- * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - *

- * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: - * - *

    - *
  • {@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}
  • - *
  • {@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.TupleTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.PojoTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.WritableTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.ValueTypeInfo}
  • - *
  • etc.
  • - *
- * - * @param typeInfo - * type information as a return type hint - * @return This operator with a given return type hint. - */ - public O returns(TypeInformation typeInfo) { - if (typeInfo == null) { - throw new IllegalArgumentException("Type information must not be null."); - } - fillInType(typeInfo); - @SuppressWarnings("unchecked") - O returnType = (O) this; - return returnType; - } - - /** - * Adds a type information hint about the return type of this operator. - * - *

- * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - *

- * This method takes a class that will be analyzed by Flink's type extraction capabilities. - * - *

- * Examples for classes are: - *

    - *
  • Basic types such as Integer.class, String.class, etc.
  • - *
  • POJOs such as MyPojo.class
  • - *
  • Classes that extend tuples. Classes like Tuple1.class,Tuple2.class, etc. are not sufficient.
  • - *
  • Arrays such as String[].class, etc.
  • - *
- * - * @param typeClass - * class as a return type hint - * @return This operator with a given return type hint. - */ - @SuppressWarnings("unchecked") - public O returns(Class typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("Type class must not be null."); - } - - try { - TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); - return returns(ti); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); - } - } - // -------------------------------------------------------------------------------------------- // Accessors // -------------------------------------------------------------------------------------------- @@ -360,12 +381,7 @@ protected SingleInputSemanticProperties extractSemanticAnnotations(Class udfC } protected boolean udfWithForwardedFieldsAnnotation(Class udfClass) { - - if (udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null || - udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null) { - return true; - } else { - return false; - } + return udfClass.getAnnotation(FunctionAnnotation.ForwardedFields.class) != null || + udfClass.getAnnotation(FunctionAnnotation.NonForwardedFields.class) != null; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java index 34a5518b2bf54..695ed3a4958cd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java @@ -26,18 +26,21 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import static java.util.Objects.requireNonNull; + /** * The TwoInputUdfOperator is the base class of all binary operators that execute * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that @@ -262,14 +265,102 @@ public O withForwardedFieldsSecond(String... forwardedFieldsSecond) { O returnType = (O) this; return returnType; } - + + // ------------------------------------------------------------------------ + // type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

Classes can be used as type hints for non-generic types (classes without generic parameters), + * but not for generic types like for example Tuples. For those generic types, please + * use the {@link #returns(TypeHint)} method. + * + *

Use this method the following way: + *

{@code
+	 *     DataSet result = 
+	 *         data1.join(data2).where("id").equalTo("fieldX")
+	 *              .with(new JoinFunctionWithNonInferrableReturnType())
+	 *              .returns(String[].class);
+	 * }
+ * + * @param typeClass The class of the returned data type. + * @return This operator with the type information corresponding to the given type class. + */ + public O returns(Class typeClass) { + requireNonNull(typeClass, "type class must not be null"); + + try { + return returns(TypeInformation.of(typeClass)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the class alone." + + "This is most likely because the class represents a generic type. In that case," + + "please use the 'returns(TypeHint)' method instead.", e); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

Use this method the following way: + *

{@code
+	 *     DataSet> result = 
+	 *         data1.join(data2).where("id").equalTo("fieldX")
+	 *              .with(new JoinFunctionWithNonInferrableReturnType())
+	 *              .returns(new TypeHint>(){});
+	 * }
+ * + * @param typeHint The type hint for the returned data type. + * @return This operator with the type information corresponding to the given type hint. + */ + public O returns(TypeHint typeHint) { + requireNonNull(typeHint, "TypeHint must not be null"); + + try { + return returns(TypeInformation.of(typeHint)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the type hint. " + + "Make sure that the TypeHint does not use any generic type variables."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} + * are preferable. + * + * @param typeInfo The type information for the returned data type. + * @return This operator using the given type information for the return type. + */ + public O returns(TypeInformation typeInfo) { + requireNonNull(typeInfo, "TypeInformation must not be null"); + + fillInType(typeInfo); + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } + /** * Adds a type information hint about the return type of this operator. - * + * *

* Type hints are important in cases where the Java compiler * throws away generic type information necessary for efficient execution. - * + * *

* This method takes a type information string that will be parsed. A type information string can contain the following * types: @@ -297,86 +388,17 @@ public O withForwardedFieldsSecond(String... forwardedFieldsSecond) { * @param typeInfoString * type information string to be parsed * @return This operator with a given return type hint. + * + * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead. */ + @Deprecated + @PublicEvolving public O returns(String typeInfoString) { if (typeInfoString == null) { throw new IllegalArgumentException("Type information string must not be null."); } return returns(TypeInfoParser.parse(typeInfoString)); } - - /** - * Adds a type information hint about the return type of this operator. - * - *

- * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - *

- * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: - * - *

    - *
  • {@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}
  • - *
  • {@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.TupleTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.PojoTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.WritableTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.ValueTypeInfo}
  • - *
  • etc.
  • - *
- * - * @param typeInfo - * type information as a return type hint - * @return This operator with a given return type hint. - */ - public O returns(TypeInformation typeInfo) { - if (typeInfo == null) { - throw new IllegalArgumentException("Type information must not be null."); - } - fillInType(typeInfo); - - @SuppressWarnings("unchecked") - O returnType = (O) this; - return returnType; - } - - /** - * Adds a type information hint about the return type of this operator. - * - *

- * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - *

- * This method takes a class that will be analyzed by Flink's type extraction capabilities. - * - *

- * Examples for classes are: - *

    - *
  • Basic types such as Integer.class, String.class, etc.
  • - *
  • POJOs such as MyPojo.class
  • - *
  • Classes that extend tuples. Classes like Tuple1.class,Tuple2.class, etc. are not sufficient.
  • - *
  • Arrays such as String[].class, etc.
  • - *
- * - * @param typeClass - * class as a return type hint - * @return This operator with a given return type hint. - */ - @SuppressWarnings("unchecked") - public O returns(Class typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("Type class must not be null."); - } - - try { - TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); - return returns(ti); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); - } - } // -------------------------------------------------------------------------------------------- // Accessors diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 5846c26c03f71..2c7b5cb7c8276 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -20,8 +20,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import static java.util.Objects.requireNonNull; + /** * {@code SingleOutputStreamOperator} represents a user defined transformation * applied on a {@link DataStream} with one predefined output type. @@ -152,6 +154,83 @@ public SingleOutputStreamOperator startNewChain() { return setChainingStrategy(ChainingStrategy.HEAD); } + // ------------------------------------------------------------------------ + // Type hinting + // ------------------------------------------------------------------------ + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

Classes can be used as type hints for non-generic types (classes without generic parameters), + * but not for generic types like for example Tuples. For those generic types, please + * use the {@link #returns(TypeHint)} method. + * + * @param typeClass The class of the returned data type. + * @return This operator with the type information corresponding to the given type class. + */ + public SingleOutputStreamOperator returns(Class typeClass) { + requireNonNull(typeClass, "type class must not be null."); + + try { + return returns(TypeInformation.of(typeClass)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the class alone." + + "This is most likely because the class represents a generic type. In that case," + + "please use the 'returns(TypeHint)' method instead."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

Use this method the following way: + *

{@code
+	 *     DataStream> result = 
+	 *         stream.flatMap(new FunctionWithNonInferrableReturnType())
+	 *               .returns(new TypeHint>(){});
+	 * }
+ * + * @param typeHint The type hint for the returned data type. + * @return This operator with the type information corresponding to the given type hint. + */ + public SingleOutputStreamOperator returns(TypeHint typeHint) { + requireNonNull(typeHint, "TypeHint must not be null"); + + try { + return returns(TypeInformation.of(typeHint)); + } + catch (InvalidTypesException e) { + throw new InvalidTypesException("Cannot infer the type information from the type hint. " + + "Make sure that the TypeHint does not use any generic type variables."); + } + } + + /** + * Adds a type information hint about the return type of this operator. This method + * can be used in cases where Flink cannot determine automatically what the produced + * type of a function is. That can be the case if the function uses generic type variables + * in the return type that cannot be inferred from the input type. + * + *

In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} + * are preferable. + * + * @param typeInfo type information as a return type hint + * @return This operator with a given return type hint. + */ + public SingleOutputStreamOperator returns(TypeInformation typeInfo) { + requireNonNull(typeInfo, "TypeInformation must not be null"); + + transformation.setOutputType(typeInfo); + return this; + } + /** * Adds a type information hint about the return type of this operator. * @@ -186,7 +265,11 @@ public SingleOutputStreamOperator startNewChain() { * @param typeInfoString * type information string to be parsed * @return This operator with a given return type hint. + * + * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead. */ + @Deprecated + @PublicEvolving public SingleOutputStreamOperator returns(String typeInfoString) { if (typeInfoString == null) { throw new IllegalArgumentException("Type information string must not be null."); @@ -194,74 +277,9 @@ public SingleOutputStreamOperator returns(String typeInfoString) { return returns(TypeInfoParser.parse(typeInfoString)); } - /** - * Adds a type information hint about the return type of this operator. - * - *

- * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - *

- * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as: - * - *

    - *
  • {@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}
  • - *
  • {@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.TupleTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.PojoTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.WritableTypeInfo}
  • - *
  • {@link org.apache.flink.api.java.typeutils.ValueTypeInfo}
  • - *
  • etc.
  • - *
- * - * @param typeInfo type information as a return type hint - * @return This operator with a given return type hint. - */ - public SingleOutputStreamOperator returns(TypeInformation typeInfo) { - if (typeInfo == null) { - throw new IllegalArgumentException("Type information must not be null."); - } - transformation.setOutputType(typeInfo); - return this; - } - - /** - * Adds a type information hint about the return type of this operator. - * - *

- * Type hints are important in cases where the Java compiler - * throws away generic type information necessary for efficient execution. - * - *

- * This method takes a class that will be analyzed by Flink's type extraction capabilities. - * - *

- * Examples for classes are: - *

    - *
  • Basic types such as Integer.class, String.class, etc.
  • - *
  • POJOs such as MyPojo.class
  • - *
  • Classes that extend tuples. Classes like Tuple1.class,Tuple2.class, etc. are not sufficient.
  • - *
  • Arrays such as String[].class, etc.
  • - *
- * - * @param typeClass - * class as a return type hint - * @return This operator with a given return type hint. - */ - @SuppressWarnings("unchecked") - public SingleOutputStreamOperator returns(Class typeClass) { - if (typeClass == null) { - throw new IllegalArgumentException("Type class must not be null."); - } - - try { - TypeInformation ti = (TypeInformation) TypeExtractor.createTypeInfo(typeClass); - return returns(ti); - } - catch (InvalidTypesException e) { - throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e); - } - } + // ------------------------------------------------------------------------ + // Miscellaneous + // ------------------------------------------------------------------------ @Override protected DataStream setConnectionType(StreamPartitioner partitioner) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index ddda82dba779b..acbb5b4858219 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -44,36 +45,35 @@ public void test() { try { env.addSource(new TestSource()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + DataStream source = env.generateSequence(1, 10); try { source.map(new TestMap()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + try { source.flatMap(new TestFlatMap()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + try { source.connect(source).map(new TestCoMap()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} + try { source.connect(source).flatMap(new TestCoFlatMap()).print(); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} - env.addSource(new TestSource()).returns("Integer"); + env.addSource(new TestSource()).returns(Integer.class); source.map(new TestMap()).returns(Long.class).print(); - source.flatMap(new TestFlatMap()).returns("Long").print(); - source.connect(source).map(new TestCoMap()).returns("Integer").print(); + source.flatMap(new TestFlatMap()).returns(new TypeHint(){}).print(); + source.connect(source).map(new TestCoMap()).returns(BasicTypeInfo.INT_TYPE_INFO).print(); source.connect(source).flatMap(new TestCoFlatMap()) .returns(BasicTypeInfo.INT_TYPE_INFO).print(); @@ -90,10 +90,9 @@ public String map(Long value) throws Exception { map.print(); try { - map.returns("String"); + map.returns(String.class); fail(); - } catch (Exception e) { - } + } catch (Exception ignored) {} } @@ -101,14 +100,10 @@ private class TestSource implements SourceFunction { private static final long serialVersionUID = 1L; @Override - public void run(SourceContext ctx) throws Exception { - - } + public void run(SourceContext ctx) throws Exception {} @Override - public void cancel() { - - } + public void cancel() {} } private class TestMap implements MapFunction { @@ -120,8 +115,7 @@ public O map(T value) throws Exception { private class TestFlatMap implements FlatMapFunction { @Override - public void flatMap(T value, Collector out) throws Exception { - } + public void flatMap(T value, Collector out) throws Exception {} } private class TestCoMap implements CoMapFunction { @@ -141,12 +135,10 @@ public OUT map2(IN2 value) { private class TestCoFlatMap implements CoFlatMapFunction { @Override - public void flatMap1(IN1 value, Collector out) throws Exception { - } + public void flatMap1(IN1 value, Collector out) throws Exception {} @Override - public void flatMap2(IN2 value, Collector out) throws Exception { - } + public void flatMap2(IN2 value, Collector out) throws Exception {} } }