Skip to content

Commit

Permalink
[hotfix] Clean up warnings in Serializers util class
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 1, 2016
1 parent 8c8f1c4 commit d902d16
Showing 1 changed file with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
Expand Down Expand Up @@ -60,48 +61,51 @@ public class Serializers {
* to Kryo.
* It also watches for types which need special serializers.
*/
private static Set<Class<?>> alreadySeen = new HashSet<Class<?>>();
private static Set<Class<?>> alreadySeen = new HashSet<>();

public static void recursivelyRegisterType(Class<?> type, ExecutionConfig config) {
alreadySeen.add(type);
if(type.isPrimitive()) {

if (type.isPrimitive()) {
return;
}
config.registerKryoType(type);
addSerializerForType(config, type);

Field[] fields = type.getDeclaredFields();
for(Field field : fields) {
for (Field field : fields) {
if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
continue;
}
Type fieldType = field.getGenericType();
if(fieldType instanceof ParameterizedType) { // field has generics
if (fieldType instanceof ParameterizedType) { // field has generics
ParameterizedType parameterizedFieldType = (ParameterizedType) fieldType;
for(Type t: parameterizedFieldType.getActualTypeArguments()) {
if(TypeExtractor.isClassType(t) ) {
Class clazz = TypeExtractor.typeToClass(t);
if(!alreadySeen.contains(clazz)) {
for (Type t: parameterizedFieldType.getActualTypeArguments()) {
if (TypeExtractor.isClassType(t) ) {
Class<?> clazz = TypeExtractor.typeToClass(t);
if (!alreadySeen.contains(clazz)) {
recursivelyRegisterType(TypeExtractor.typeToClass(t), config);
}
}
}
}
Class<?> clazz = field.getType();
if(!alreadySeen.contains(clazz)) {
if (!alreadySeen.contains(clazz)) {
recursivelyRegisterType(clazz, config);
}
}
}

public static void addSerializerForType(ExecutionConfig reg, Class<?> type) {
if(GenericData.Record.class.isAssignableFrom(type)) {
if (GenericData.Record.class.isAssignableFrom(type)) {
registerGenericAvro(reg);
}
if(SpecificRecordBase.class.isAssignableFrom(type)) {
registerSpecificAvro(reg, (Class<? extends SpecificRecordBase>) type);
if (SpecificRecordBase.class.isAssignableFrom(type)) {
@SuppressWarnings("unchecked")
Class<? extends SpecificRecordBase> specRecordClass = (Class<? extends SpecificRecordBase>) type;
registerSpecificAvro(reg, specRecordClass);
}
if(DateTime.class.isAssignableFrom(type) || Interval.class.isAssignableFrom(type)) {
if (DateTime.class.isAssignableFrom(type) || Interval.class.isAssignableFrom(type)) {
registerJodaTime(reg);
}
}
Expand All @@ -114,6 +118,7 @@ public static void registerGenericAvro(ExecutionConfig reg) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);

// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
Expand Down Expand Up @@ -167,7 +172,10 @@ public static void registerJavaUtils(ExecutionConfig reg) {
// Custom Serializers
// --------------------------------------------------------------------------------------------

@SuppressWarnings("rawtypes")
public static class SpecificInstanceCollectionSerializerForArrayList extends SpecificInstanceCollectionSerializer<ArrayList> {
private static final long serialVersionUID = 1L;

public SpecificInstanceCollectionSerializerForArrayList() {
super(ArrayList.class);
}
Expand All @@ -177,8 +185,14 @@ public SpecificInstanceCollectionSerializerForArrayList() {
* Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle
* this type, so we use ArrayLists.
*/
public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer implements Serializable {
@SuppressWarnings("rawtypes")
public static class SpecificInstanceCollectionSerializer<T extends Collection>
extends CollectionSerializer implements Serializable
{
private static final long serialVersionUID = 1L;

private Class<T> type;

public SpecificInstanceCollectionSerializer(Class<T> type) {
this.type = type;
}
Expand All @@ -200,6 +214,8 @@ protected Collection createCopy(Kryo kryo, Collection original) {
* Having this serializer, we are able to handle avro Records.
*/
public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable {
private static final long serialVersionUID = 1L;

@Override
public void write(Kryo kryo, Output output, Schema object) {
String schemaAsString = object.toString(false);
Expand Down

0 comments on commit d902d16

Please sign in to comment.