diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java index 9566a663169b0..8ad4e09a1e26d 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java @@ -27,8 +27,8 @@ * Configuration snapshot for serializers of Scala's {@link Either} type, * containing configuration snapshots of the Left and Right serializers. */ -public class ScalaEitherSerializerConfigSnapshot, L, R> - extends CompositeTypeSerializerConfigSnapshot { +public class ScalaEitherSerializerConfigSnapshot + extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index 82637befdcd1a..c2bda3dc3fc93 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -27,15 +27,15 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView} */ @Internal @SerialVersionUID(9219995873023657525L) -class EitherSerializer[A, B, T <: Either[A, B]]( +class EitherSerializer[A, B]( val leftSerializer: TypeSerializer[A], val rightSerializer: TypeSerializer[B]) - extends TypeSerializer[T] { + extends TypeSerializer[Either[A, B]] { - override def duplicate: EitherSerializer[A,B,T] = this + override def duplicate: EitherSerializer[A,B] = this - override def createInstance: T = { - Left(null).asInstanceOf[T] + override def createInstance: Either[A, B] = { + Left(null).asInstanceOf[Left[A, B]] } override def isImmutableType: Boolean = { @@ -45,12 +45,12 @@ class EitherSerializer[A, B, T <: Either[A, B]]( override def getLength: Int = -1 - override def copy(from: T): T = from match { - case Left(a) => Left(leftSerializer.copy(a)).asInstanceOf[T] - case Right(b) => Right(rightSerializer.copy(b)).asInstanceOf[T] + override def copy(from: Either[A, B]): Either[A, B] = from match { + case Left(a) => Left(leftSerializer.copy(a)) + case Right(b) => Right(rightSerializer.copy(b)) } - override def copy(from: T, reuse: T): T = copy(from) + override def copy(from: Either[A, B], reuse: Either[A, B]): Either[A, B] = copy(from) override def copy(source: DataInputView, target: DataOutputView): Unit = { val isLeft = source.readBoolean() @@ -62,7 +62,7 @@ class EitherSerializer[A, B, T <: Either[A, B]]( } } - override def serialize(either: T, target: DataOutputView): Unit = either match { + override def serialize(either: Either[A, B], target: DataOutputView): Unit = either match { case Left(a) => target.writeBoolean(true) leftSerializer.serialize(a, target) @@ -71,27 +71,27 @@ class EitherSerializer[A, B, T <: Either[A, B]]( rightSerializer.serialize(b, target) } - override def deserialize(source: DataInputView): T = { + override def deserialize(source: DataInputView): Either[A, B] = { val isLeft = source.readBoolean() if (isLeft) { - Left(leftSerializer.deserialize(source)).asInstanceOf[T] + Left(leftSerializer.deserialize(source)) } else { - Right(rightSerializer.deserialize(source)).asInstanceOf[T] + Right(rightSerializer.deserialize(source)) } } - override def deserialize(reuse: T, source: DataInputView): T = { + override def deserialize(reuse: Either[A, B], source: DataInputView): Either[A, B] = { val isLeft = source.readBoolean() if (isLeft) { - Left(leftSerializer.deserialize(source)).asInstanceOf[T] + Left(leftSerializer.deserialize(source)) } else { - Right(rightSerializer.deserialize(source)).asInstanceOf[T] + Right(rightSerializer.deserialize(source)) } } override def equals(obj: Any): Boolean = { obj match { - case eitherSerializer: EitherSerializer[_, _, _] => + case eitherSerializer: EitherSerializer[_, _] => eitherSerializer.canEqual(this) && leftSerializer.equals(eitherSerializer.leftSerializer) && rightSerializer.equals(eitherSerializer.rightSerializer) @@ -100,7 +100,7 @@ class EitherSerializer[A, B, T <: Either[A, B]]( } override def canEqual(obj: Any): Boolean = { - obj.isInstanceOf[EitherSerializer[_, _, _]] + obj.isInstanceOf[EitherSerializer[_, _]] } override def hashCode(): Int = { @@ -111,15 +111,15 @@ class EitherSerializer[A, B, T <: Either[A, B]]( // Serializer configuration snapshotting & compatibility // -------------------------------------------------------------------------------------------- - override def snapshotConfiguration(): ScalaEitherSerializerConfigSnapshot[T, A, B] = { - new ScalaEitherSerializerConfigSnapshot[T, A, B](leftSerializer, rightSerializer) + override def snapshotConfiguration(): ScalaEitherSerializerConfigSnapshot[A, B] = { + new ScalaEitherSerializerConfigSnapshot[A, B](leftSerializer, rightSerializer) } override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[T] = { + configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Either[A, B]] = { configSnapshot match { - case eitherSerializerConfig: ScalaEitherSerializerConfigSnapshot[T, A, B] => + case eitherSerializerConfig: ScalaEitherSerializerConfigSnapshot[A, B] => checkCompatibility(eitherSerializerConfig) // backwards compatibility path; @@ -134,7 +134,7 @@ class EitherSerializer[A, B, T <: Either[A, B]]( private def checkCompatibility( configSnapshot: CompositeTypeSerializerConfigSnapshot[_] - ): CompatibilityResult[T] = { + ): CompatibilityResult[Either[A, B]] = { val previousLeftRightSerWithConfigs = configSnapshot.getNestedSerializersAndConfigs diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala index e89730951be0c..b2a055e5efced 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala @@ -52,18 +52,18 @@ class EitherTypeInfo[A, B, T <: Either[A, B]]( @PublicEvolving def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = { - val leftSerializer = if (leftTypeInfo != null) { + val leftSerializer: TypeSerializer[A] = if (leftTypeInfo != null) { leftTypeInfo.createSerializer(executionConfig) } else { - new NothingSerializer + (new NothingSerializer).asInstanceOf[TypeSerializer[A]] } - val rightSerializer = if (rightTypeInfo != null) { + val rightSerializer: TypeSerializer[B] = if (rightTypeInfo != null) { rightTypeInfo.createSerializer(executionConfig) } else { - new NothingSerializer + (new NothingSerializer).asInstanceOf[TypeSerializer[B]] } - new EitherSerializer(leftSerializer, rightSerializer) + new EitherSerializer[A, B](leftSerializer, rightSerializer).asInstanceOf[TypeSerializer[T]] } override def equals(obj: Any): Boolean = {