Skip to content

Commit

Permalink
[FLINK-26835][serialization] Fix concurrent modification exception
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchaoming authored and pnowojski committed Apr 7, 2022
1 parent 1465633 commit 3ec3868
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}

cl = Thread.currentThread().getContextClassLoader();
subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
subclassSerializerCache = new HashMap<>();
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public final class RuntimeSerializerFactory<T>

private TypeSerializer<T> serializer;

private boolean firstSerializer = true;

private Class<T> clazz;

// Because we read the class from the TaskConfig and instantiate ourselves
Expand All @@ -62,7 +60,6 @@ public void writeParametersToConfig(Configuration config) {
}
}

@SuppressWarnings("unchecked")
@Override
public void readParametersFromConfig(Configuration config, ClassLoader cl)
throws ClassNotFoundException {
Expand All @@ -71,12 +68,8 @@ public void readParametersFromConfig(Configuration config, ClassLoader cl)
}

try {
this.clazz =
(Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
this.serializer =
(TypeSerializer<T>)
InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
firstSerializer = true;
this.clazz = InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl);
this.serializer = InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl);
} catch (ClassNotFoundException e) {
throw e;
} catch (Exception e) {
Expand All @@ -87,12 +80,7 @@ public void readParametersFromConfig(Configuration config, ClassLoader cl)
@Override
public TypeSerializer<T> getSerializer() {
if (this.serializer != null) {
if (firstSerializer) {
firstSerializer = false;
return this.serializer;
} else {
return this.serializer.duplicate();
}
return this.serializer.duplicate();
} else {
throw new RuntimeException(
"SerializerFactory has not been initialized from configuration.");
Expand Down

0 comments on commit 3ec3868

Please sign in to comment.