diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java index 0fca3e232a2be..8e92c27dbf09a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,7 +19,9 @@ package org.apache.flink.api.java.io; import org.apache.flink.api.common.io.BinaryInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.memory.DataInputView; import java.io.IOException; @@ -28,23 +30,30 @@ * Reads elements by deserializing them with a given type serializer. * @param */ -public class TypeSerializerInputFormat extends BinaryInputFormat { +public class TypeSerializerInputFormat extends BinaryInputFormat implements ResultTypeQueryable { private static final long serialVersionUID = 2123068581665107480L; + private transient TypeInformation resultType; + private TypeSerializer serializer; - public TypeSerializerInputFormat(TypeSerializer serializer){ - this.serializer = serializer; + public TypeSerializerInputFormat(TypeInformation resultType) { + this.resultType = resultType; + this.serializer = resultType.createSerializer(); } @Override protected T deserialize(T reuse, DataInputView dataInput) throws IOException { - if(serializer == null){ - throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to " + - "be defined."); - } - return serializer.deserialize(reuse, dataInput); } + + // -------------------------------------------------------------------------------------------- + // Typing + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation getProducedType() { + return resultType; + } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java index ef271e7d20d07..7dd11352afd13 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java @@ -40,6 +40,8 @@ @RunWith(Parameterized.class) public class TypeSerializerFormatTest extends SequentialFormatTestBase> { + TypeInformation> resultType = TypeExtractor.getForObject(getRecord(0)); + private TypeSerializer> serializer; private BlockInfo block; @@ -47,9 +49,9 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase> tti = TypeExtractor.getForObject(getRecord(0)); + resultType = TypeExtractor.getForObject(getRecord(0)); - serializer = tti.createSerializer(); + serializer = resultType.createSerializer(); } @Before @@ -63,7 +65,7 @@ protected BinaryInputFormat> createInputFormat() { configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); final TypeSerializerInputFormat> inputFormat = new - TypeSerializerInputFormat>(serializer); + TypeSerializerInputFormat>(resultType); inputFormat.setFilePath(this.tempFile.toURI().toString()); inputFormat.configure(configuration);