Skip to content

Commit

Permalink
Showing 2 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,7 +19,9 @@
package org.apache.flink.api.java.io;

import org.apache.flink.api.common.io.BinaryInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.memory.DataInputView;

import java.io.IOException;
@@ -28,23 +30,30 @@
* Reads elements by deserializing them with a given type serializer.
* @param <T>
*/
public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> {
public class TypeSerializerInputFormat<T> extends BinaryInputFormat<T> implements ResultTypeQueryable<T> {

private static final long serialVersionUID = 2123068581665107480L;

private transient TypeInformation<T> resultType;

private TypeSerializer<T> serializer;

public TypeSerializerInputFormat(TypeSerializer<T> serializer){
this.serializer = serializer;
public TypeSerializerInputFormat(TypeInformation<T> resultType) {
this.resultType = resultType;
this.serializer = resultType.createSerializer();
}

@Override
protected T deserialize(T reuse, DataInputView dataInput) throws IOException {
if(serializer == null){
throw new RuntimeException("TypeSerializerInputFormat requires a type serializer to " +
"be defined.");
}

return serializer.deserialize(reuse, dataInput);
}

// --------------------------------------------------------------------------------------------
// Typing
// --------------------------------------------------------------------------------------------

@Override
public TypeInformation<T> getProducedType() {
return resultType;
}
}
Original file line number Diff line number Diff line change
@@ -40,16 +40,18 @@
@RunWith(Parameterized.class)
public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> {

TypeInformation<Tuple2<Integer, String>> resultType = TypeExtractor.getForObject(getRecord(0));

private TypeSerializer<Tuple2<Integer, String>> serializer;

private BlockInfo block;

public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int degreeOfParallelism) {
super(numberOfTuples, blockSize, degreeOfParallelism);

TypeInformation<Tuple2<Integer, String>> tti = TypeExtractor.getForObject(getRecord(0));
resultType = TypeExtractor.getForObject(getRecord(0));

serializer = tti.createSerializer();
serializer = resultType.createSerializer();
}

@Before
@@ -63,7 +65,7 @@ protected BinaryInputFormat<Tuple2<Integer, String>> createInputFormat() {
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);

final TypeSerializerInputFormat<Tuple2<Integer, String>> inputFormat = new
TypeSerializerInputFormat<Tuple2<Integer, String>>(serializer);
TypeSerializerInputFormat<Tuple2<Integer, String>>(resultType);
inputFormat.setFilePath(this.tempFile.toURI().toString());

inputFormat.configure(configuration);

0 comments on commit e3f6c9b

Please sign in to comment.