Skip to content

Commit

Permalink
[FLINK-3306] [core] Fix auto-type registry util
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 1, 2016
1 parent d902d16 commit c4bc47a
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
Expand Down Expand Up @@ -219,41 +216,43 @@ public void testDeserialisation() throws IOException {
*/
@Test
public void testDeserializeToGenericType() throws IOException {
DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(userSchema);

FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader);
// initialize Record by reading it from disk (thats easier than creating it by hand)
GenericData.Record rec = new GenericData.Record(userSchema);
dataFileReader.next(rec);
// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString() );
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
assertEquals(null, rec.get("type_long_test")); // it is null for the first record.

// now serialize it with our framework:

TypeInformation<GenericData.Record> te = (TypeInformation<GenericData.Record>) TypeExtractor.createTypeInfo(GenericData.Record.class);
ExecutionConfig ec = new ExecutionConfig();
Assert.assertEquals(GenericTypeInfo.class, te.getClass());
Serializers.recursivelyRegisterType(( (GenericTypeInfo) te).getTypeClass(), ec);

TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
Assert.assertTrue(
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
tser.serialize(rec, target);

GenericData.Record newRec = tser.deserialize(target.getInputView());

// check if it is still the same
assertNotNull(newRec);
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() );
assertEquals(null, newRec.get("type_long_test"));

DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);

try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
// initialize Record by reading it from disk (thats easier than creating it by hand)
GenericData.Record rec = new GenericData.Record(userSchema);
dataFileReader.next(rec);

// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
assertEquals(null, rec.get("type_long_test")); // it is null for the first record.

// now serialize it with our framework:
TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);

ExecutionConfig ec = new ExecutionConfig();
Assert.assertEquals(GenericTypeInfo.class, te.getClass());

Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());

TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
Assert.assertTrue(
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
tser.serialize(rec, target);

GenericData.Record newRec = tser.deserialize(target.getInputView());

// check if it is still the same
assertNotNull(newRec);
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
assertEquals(null, newRec.get("type_long_test"));
}
}

/**
Expand All @@ -264,28 +263,30 @@ public void testDeserializeToSpecificType() throws IOException {

DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);

FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader);
User rec = dataFileReader.next();
try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
User rec = dataFileReader.next();

// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());

// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString() );
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
// now serialize it with our framework:
ExecutionConfig ec = new ExecutionConfig();
TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);

// now serialize it with our framework:
ExecutionConfig ec = new ExecutionConfig();
TypeInformation<User> te = (TypeInformation<User>) TypeExtractor.createTypeInfo(User.class);
Assert.assertEquals(AvroTypeInfo.class, te.getClass());
TypeSerializer<User> tser = te.createSerializer(ec);
ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
tser.serialize(rec, target);
Assert.assertEquals(AvroTypeInfo.class, te.getClass());
TypeSerializer<User> tser = te.createSerializer(ec);
ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView();
tser.serialize(rec, target);

User newRec = tser.deserialize(target.getInputView());
User newRec = tser.deserialize(target.getInputView());

// check if it is still the same
assertNotNull(newRec);
assertEquals("name not equal", TEST_NAME, newRec.getName().toString() );
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString() );
// check if it is still the same
assertNotNull(newRec);
assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.*;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
Expand All @@ -38,7 +33,6 @@
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.io.CsvReader;
Expand All @@ -52,11 +46,7 @@
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.OperatorTranslation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.typeutils.*;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -942,30 +932,23 @@ public Plan createProgramPlan(String jobName, boolean clearSinks) {
plan.setDefaultParallelism(getParallelism());
}
plan.setExecutionConfig(getConfig());

// Check plan for GenericTypeInfo's and register the types at the serializers.
plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {
@Override
public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
OperatorInformation<?> opInfo = visitable.getOperatorInfo();
TypeInformation<?> typeInfo = opInfo.getOutputType();
if(typeInfo instanceof GenericTypeInfo) {
GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
if(!config.isAutoTypeRegistrationDisabled()) {
Serializers.recursivelyRegisterType(genericTypeInfo.getTypeClass(), config);
}
}
if(typeInfo instanceof CompositeType) {
List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
for(GenericTypeInfo<?> gt : genericTypesInComposite) {
Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
}
if (!config.isAutoTypeRegistrationDisabled()) {
plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {

private final HashSet<Class<?>> deduplicator = new HashSet<>();

@Override
public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
OperatorInformation<?> opInfo = visitable.getOperatorInfo();
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator);
return true;
}
return true;
}
@Override
public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
});
@Override
public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
});
}

try {
registerCachedFilesWithPlan(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(F
// --------------------------------------------------------------------------------------------
// Create type information
// --------------------------------------------------------------------------------------------

@SuppressWarnings("unchecked")
public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
return (TypeInformation<T>) createTypeInfo((Type) type);
}

public static TypeInformation<?> createTypeInfo(Type t) {
TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t);
Expand Down
Loading

0 comments on commit c4bc47a

Please sign in to comment.