Skip to content

Commit

Permalink
[hotfix] [Scala API] Simplify generics in EitherSerializer
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen authored and tillrohrmann committed Nov 1, 2018
1 parent 9ad932c commit 089a9fb
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
* Configuration snapshot for serializers of Scala's {@link Either} type,
* containing configuration snapshots of the Left and Right serializers.
*/
public class ScalaEitherSerializerConfigSnapshot<E extends Either<L, R>, L, R>
extends CompositeTypeSerializerConfigSnapshot<E> {
public class ScalaEitherSerializerConfigSnapshot<L, R>
extends CompositeTypeSerializerConfigSnapshot<Either<L, R>> {

private static final int VERSION = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
*/
@Internal
@SerialVersionUID(9219995873023657525L)
class EitherSerializer[A, B, T <: Either[A, B]](
class EitherSerializer[A, B](
val leftSerializer: TypeSerializer[A],
val rightSerializer: TypeSerializer[B])
extends TypeSerializer[T] {
extends TypeSerializer[Either[A, B]] {

override def duplicate: EitherSerializer[A,B,T] = this
override def duplicate: EitherSerializer[A,B] = this

override def createInstance: T = {
Left(null).asInstanceOf[T]
override def createInstance: Either[A, B] = {
Left(null).asInstanceOf[Left[A, B]]
}

override def isImmutableType: Boolean = {
Expand All @@ -45,12 +45,12 @@ class EitherSerializer[A, B, T <: Either[A, B]](

override def getLength: Int = -1

override def copy(from: T): T = from match {
case Left(a) => Left(leftSerializer.copy(a)).asInstanceOf[T]
case Right(b) => Right(rightSerializer.copy(b)).asInstanceOf[T]
override def copy(from: Either[A, B]): Either[A, B] = from match {
case Left(a) => Left(leftSerializer.copy(a))
case Right(b) => Right(rightSerializer.copy(b))
}

override def copy(from: T, reuse: T): T = copy(from)
override def copy(from: Either[A, B], reuse: Either[A, B]): Either[A, B] = copy(from)

override def copy(source: DataInputView, target: DataOutputView): Unit = {
val isLeft = source.readBoolean()
Expand All @@ -62,7 +62,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
}
}

override def serialize(either: T, target: DataOutputView): Unit = either match {
override def serialize(either: Either[A, B], target: DataOutputView): Unit = either match {
case Left(a) =>
target.writeBoolean(true)
leftSerializer.serialize(a, target)
Expand All @@ -71,27 +71,27 @@ class EitherSerializer[A, B, T <: Either[A, B]](
rightSerializer.serialize(b, target)
}

override def deserialize(source: DataInputView): T = {
override def deserialize(source: DataInputView): Either[A, B] = {
val isLeft = source.readBoolean()
if (isLeft) {
Left(leftSerializer.deserialize(source)).asInstanceOf[T]
Left(leftSerializer.deserialize(source))
} else {
Right(rightSerializer.deserialize(source)).asInstanceOf[T]
Right(rightSerializer.deserialize(source))
}
}

override def deserialize(reuse: T, source: DataInputView): T = {
override def deserialize(reuse: Either[A, B], source: DataInputView): Either[A, B] = {
val isLeft = source.readBoolean()
if (isLeft) {
Left(leftSerializer.deserialize(source)).asInstanceOf[T]
Left(leftSerializer.deserialize(source))
} else {
Right(rightSerializer.deserialize(source)).asInstanceOf[T]
Right(rightSerializer.deserialize(source))
}
}

override def equals(obj: Any): Boolean = {
obj match {
case eitherSerializer: EitherSerializer[_, _, _] =>
case eitherSerializer: EitherSerializer[_, _] =>
eitherSerializer.canEqual(this) &&
leftSerializer.equals(eitherSerializer.leftSerializer) &&
rightSerializer.equals(eitherSerializer.rightSerializer)
Expand All @@ -100,7 +100,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](
}

override def canEqual(obj: Any): Boolean = {
obj.isInstanceOf[EitherSerializer[_, _, _]]
obj.isInstanceOf[EitherSerializer[_, _]]
}

override def hashCode(): Int = {
Expand All @@ -111,15 +111,15 @@ class EitherSerializer[A, B, T <: Either[A, B]](
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------

override def snapshotConfiguration(): ScalaEitherSerializerConfigSnapshot[T, A, B] = {
new ScalaEitherSerializerConfigSnapshot[T, A, B](leftSerializer, rightSerializer)
override def snapshotConfiguration(): ScalaEitherSerializerConfigSnapshot[A, B] = {
new ScalaEitherSerializerConfigSnapshot[A, B](leftSerializer, rightSerializer)
}

override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[T] = {
configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Either[A, B]] = {

configSnapshot match {
case eitherSerializerConfig: ScalaEitherSerializerConfigSnapshot[T, A, B] =>
case eitherSerializerConfig: ScalaEitherSerializerConfigSnapshot[A, B] =>
checkCompatibility(eitherSerializerConfig)

// backwards compatibility path;
Expand All @@ -134,7 +134,7 @@ class EitherSerializer[A, B, T <: Either[A, B]](

private def checkCompatibility(
configSnapshot: CompositeTypeSerializerConfigSnapshot[_]
): CompatibilityResult[T] = {
): CompatibilityResult[Either[A, B]] = {

val previousLeftRightSerWithConfigs =
configSnapshot.getNestedSerializersAndConfigs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](

@PublicEvolving
def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
val leftSerializer = if (leftTypeInfo != null) {
val leftSerializer: TypeSerializer[A] = if (leftTypeInfo != null) {
leftTypeInfo.createSerializer(executionConfig)
} else {
new NothingSerializer
(new NothingSerializer).asInstanceOf[TypeSerializer[A]]
}

val rightSerializer = if (rightTypeInfo != null) {
val rightSerializer: TypeSerializer[B] = if (rightTypeInfo != null) {
rightTypeInfo.createSerializer(executionConfig)
} else {
new NothingSerializer
(new NothingSerializer).asInstanceOf[TypeSerializer[B]]
}
new EitherSerializer(leftSerializer, rightSerializer)
new EitherSerializer[A, B](leftSerializer, rightSerializer).asInstanceOf[TypeSerializer[T]]
}

override def equals(obj: Any): Boolean = {
Expand Down

0 comments on commit 089a9fb

Please sign in to comment.