Skip to content

Commit

Permalink
Rename ScalaTupleTypeInfo to CaseClassTypeInfo
Browse files Browse the repository at this point in the history
This better reflects what it is actually for. It is still derived
from TupleTypeInfoBase, though.
  • Loading branch information
aljoscha committed Sep 22, 2014
1 parent 0385651 commit ca4fa3d
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator}
import org.apache.flink.api.scala.typeutils.ScalaTupleTypeInfo
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.types.TypeInformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.api.scala

import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.scala.operators.ScalaAggregateOperator
import org.apache.flink.api.scala.typeutils.ScalaTupleTypeInfo
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.types.TypeInformation
import org.apache.flink.util.Collector

Expand Down Expand Up @@ -177,7 +177,7 @@ private[flink] class UnfinishedCoGroupOperationImpl[T: ClassTag, O: ClassTag](
val rightArrayType =
ObjectArrayTypeInfo.getInfoFor(new Array[O](0).getClass, rightSet.set.getType)

val returnType = new ScalaTupleTypeInfo[(Array[T], Array[O])](
val returnType = new CaseClassTypeInfo[(Array[T], Array[O])](
classOf[(Array[T], Array[O])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) {

override def createSerializer: TypeSerializer[(Array[T], Array[O])] = {
Expand All @@ -186,7 +186,7 @@ private[flink] class UnfinishedCoGroupOperationImpl[T: ClassTag, O: ClassTag](
fieldSerializers(i) = types(i).createSerializer
}

new ScalaTupleSerializer[(Array[T], Array[O])](
new CaseClassSerializer[(Array[T], Array[O])](
classOf[(Array[T], Array[O])],
fieldSerializers) {
override def createInstance(fields: Array[AnyRef]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.codegen

import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.types.{Value, TypeInformation}
import org.apache.hadoop.io.Writable

Expand Down Expand Up @@ -70,14 +70,14 @@ private[flink] trait TypeInformationGen[C <: Context] {
val fieldNames = desc.getters map { f => Literal(Constant(f.getter.name.toString)) } toList
val fieldNamesExpr = c.Expr[Seq[String]](mkSeq(fieldNames))
reify {
new ScalaTupleTypeInfo[T](tpeClazz.splice, fieldsExpr.splice, fieldNamesExpr.splice) {
new CaseClassTypeInfo[T](tpeClazz.splice, fieldsExpr.splice, fieldNamesExpr.splice) {
override def createSerializer: TypeSerializer[T] = {
val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
for (i <- 0 until getArity) {
fieldSerializers(i) = types(i).createSerializer
}

new ScalaTupleSerializer[T](tupleType, fieldSerializers) {
new CaseClassSerializer[T](tupleType, fieldSerializers) {
override def createInstance(fields: Array[AnyRef]): T = {
instance.splice
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.types.TypeInformation
import org.apache.flink.util.Collector

Expand Down Expand Up @@ -109,7 +109,7 @@ private[flink] object CrossDataSetImpl {
(left, right)
}
}
val returnType = new ScalaTupleTypeInfo[(T, O)](
val returnType = new CaseClassTypeInfo[(T, O)](
classOf[(T, O)], Seq(leftSet.getType, rightSet.getType), Array("_1", "_2")) {

override def createSerializer: TypeSerializer[(T, O)] = {
Expand All @@ -118,7 +118,7 @@ private[flink] object CrossDataSetImpl {
fieldSerializers(i) = types(i).createSerializer
}

new ScalaTupleSerializer[(T, O)](classOf[(T, O)], fieldSerializers) {
new CaseClassSerializer[(T, O)](classOf[(T, O)], fieldSerializers) {
override def createInstance(fields: Array[AnyRef]) = {
(fields(0).asInstanceOf[T], fields(1).asInstanceOf[O])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlat
import org.apache.flink.api.java.operators.JoinOperator.{EquiJoin, JoinHint}
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.types.TypeInformation
import org.apache.flink.util.Collector

Expand Down Expand Up @@ -179,7 +179,7 @@ private[flink] class UnfinishedJoinOperationImpl[T, O](
out.collect((left, right))
}
}
val returnType = new ScalaTupleTypeInfo[(T, O)](
val returnType = new CaseClassTypeInfo[(T, O)](
classOf[(T, O)], Seq(leftSet.set.getType, rightSet.set.getType), Array("_1", "_2")) {

override def createSerializer: TypeSerializer[(T, O)] = {
Expand All @@ -188,7 +188,7 @@ private[flink] class UnfinishedJoinOperationImpl[T, O](
fieldSerializers(i) = types(i).createSerializer
}

new ScalaTupleSerializer[(T, O)](classOf[(T, O)], fieldSerializers) {
new CaseClassSerializer[(T, O)](classOf[(T, O)], fieldSerializers) {
override def createInstance(fields: Array[AnyRef]) = {
(fields(0).asInstanceOf[T], fields(1).asInstanceOf[O])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.api
import _root_.scala.reflect.ClassTag
import language.experimental.macros
import org.apache.flink.types.TypeInformation
import org.apache.flink.api.scala.typeutils.{ScalaTupleTypeInfo, TypeUtils}
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
import org.apache.flink.api.java.{DataSet => JavaDataSet}

package object scala {
Expand All @@ -36,7 +36,7 @@ package object scala {
typeInfo: TypeInformation[_],
fields: Array[String]): Array[Int] = {
typeInfo match {
case ti: ScalaTupleTypeInfo[_] =>
case ti: CaseClassTypeInfo[_] =>
ti.getFieldIndices(fields)

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldExcepti
;

/**
* Comparator for Scala Tuples. Access is different from
* Comparator for Case Classes. Access is different from
* our Java Tuples so we have to treat them differently.
*/
class ScalaTupleComparator[T <: Product](
class CaseClassComparator[T <: Product](
keys: Array[Int],
scalaComparators: Array[TypeComparator[_]],
scalaSerializers: Array[TypeSerializer[_]] )
Expand All @@ -39,7 +39,7 @@ class ScalaTupleComparator[T <: Product](
def duplicate: TypeComparator[T] = {
// ensure that the serializers are available
instantiateDeserializationUtils()
val result = new ScalaTupleComparator[T](keyPositions, comparators, serializers)
val result = new CaseClassComparator[T](keyPositions, comparators, serializers)
result.privateDuplicate(this)
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView}
;

/**
* Serializer for Scala Tuples. Creation and access is different from
* Serializer for Case Classes. Creation and access is different from
* our Java Tuples so we have to treat them differently.
*/
abstract class ScalaTupleSerializer[T <: Product](
tupleClass: Class[T],
abstract class CaseClassSerializer[T <: Product](
clazz: Class[T],
scalaFieldSerializers: Array[TypeSerializer[_]])
extends TupleSerializerBase[T](tupleClass, scalaFieldSerializers) {
extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {

def createInstance: T = {
val fields: Array[AnyRef] = new Array(arity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import org.apache.flink.types.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}

/**
* TypeInformation for Scala Tuples. Creation and access is different from
* TypeInformation for Case Classes. Creation and access is different from
* our Java Tuples so we have to treat them differently.
*/
abstract class ScalaTupleTypeInfo[T <: Product](
tupleClass: Class[T],
abstract class CaseClassTypeInfo[T <: Product](
clazz: Class[T],
fieldTypes: Seq[TypeInformation[_]],
val fieldNames: Seq[String])
extends TupleTypeInfoBase[T](tupleClass, fieldTypes: _*) {
extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {

def createComparator(logicalKeyFields: Array[Int], orders: Array[Boolean]): TypeComparator[T] = {
// sanity checks
Expand Down Expand Up @@ -72,14 +72,14 @@ abstract class ScalaTupleTypeInfo[T <: Product](
fieldSerializers(i) = types(i).createSerializer
}

new ScalaTupleComparator[T](logicalKeyFields, fieldComparators, fieldSerializers)
new CaseClassComparator[T](logicalKeyFields, fieldComparators, fieldSerializers)
}

def getFieldIndices(fields: Array[String]): Array[Int] = {
val result = fields map { x => fieldNames.indexOf(x) }
if (result.contains(-1)) {
throw new IllegalArgumentException("Fields '" + fields.mkString(", ") +
"' are not valid for " + tupleClass + " with fields '" + fieldNames.mkString(", ") + "'.")
"' are not valid for " + clazz + " with fields '" + fieldNames.mkString(", ") + "'.")
}
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.operators.Keys
import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
import org.apache.flink.api.scala.typeutils.ScalaTupleTypeInfo
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.types.TypeInformation

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.base.{DoubleComparator, DoubleSeria

import org.apache.flink.api.java.typeutils.runtime.{GenericPairComparator, TupleComparator}
import org.apache.flink.api.scala.runtime.tuple.base.PairComparatorTestBase
import org.apache.flink.api.scala.typeutils.ScalaTupleComparator
import org.apache.flink.api.scala.typeutils.CaseClassComparator

class GenericPairComparatorTest
extends PairComparatorTestBase[(Int, String, Double), (Int, Float, Long, Double)] {
Expand All @@ -42,8 +42,8 @@ class GenericPairComparatorTest
val sers2 =
Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)

val comp1 = new ScalaTupleComparator[(Int, String, Double)](fields1, comps1, sers1)
val comp2 = new ScalaTupleComparator[(Int, Float, Long, Double)](fields2, comps2, sers2)
val comp1 = new CaseClassComparator[(Int, String, Double)](fields1, comps1, sers1)
val comp2 = new CaseClassComparator[(Int, Float, Long, Double)](fields2, comps2, sers2)

new GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)](comp1, comp2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.api.scala.runtime.tuple.base

import org.apache.flink.api.common.typeutils.ComparatorTestBase
import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleComparator}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassComparator}
import org.junit.Assert._

abstract class TupleComparatorTestBase[T <: Product] extends ComparatorTestBase[T] {
Expand Down

0 comments on commit ca4fa3d

Please sign in to comment.