Skip to content

Commit

Permalink
[FLINK-10839][serializer] Fix implementation of PojoSerializer.duplic…
Browse files Browse the repository at this point in the history
…ate() w.r.t. subclass serializer
  • Loading branch information
StefanRRichter committed Nov 12, 2018
1 parent 1ad17e0 commit c4d366c
Showing 1 changed file with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,17 @@

package org.apache.flink.api.java.typeutils.runtime;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand All @@ -52,6 +40,18 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

@Internal
Expand Down Expand Up @@ -181,11 +181,13 @@ public PojoSerializer<T> duplicate() {
}
}

if (stateful) {
return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig);
} else {
return this;
if (!stateful) {
// as a small memory optimization, we can share the same object between instances
duplicateFieldSerializers = fieldSerializers;
}

// we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems
return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig);
}


Expand Down

0 comments on commit c4d366c

Please sign in to comment.