Skip to content

Commit

Permalink
Merge pull request twitter#1303 from JiJiTang/develop
Browse files Browse the repository at this point in the history
Improve TypedParquetTuple twitter#1302
  • Loading branch information
johnynek committed May 30, 2015
2 parents cf7ae11 + 6995da4 commit 7af2fae
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 179 deletions.
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ object ScaldingBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.scala-lang" % "scala-reflect" % scalaVersion,
"com.twitter" %% "bijection-macros" % bijectionVersion
"com.twitter" %% "bijection-macros" % bijectionVersion,
"com.twitter" %% "chill-bijection" % chillVersion
) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % quasiquotesVersion) else Seq())
}, addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full))
.dependsOn(scaldingCore, scaldingHadoopTest)
Expand Down
27 changes: 26 additions & 1 deletion scalding-parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,29 @@ The implementation is ported from code used by Twitter internally written by Sam
## Use com.twitter.scalding.parquet.thrift for reading apache Thrift (TBase) records
## Use com.twitter.scalding.parquet.scrooge for reading scrooge Thrift (ThriftStruct) records
Located in the scalding-parquet-scrooge module
## Use com.twitter.scalding.parquet.tuple for reading Tuple records
## Use com.twitter.scalding.parquet.tuple for reading Tuple records
## Use com.twitter.scalding.parquet.tuple.TypedParquet for reading or writing case classes:
Can use macro in com.twitter.scalding.parquet.tuple.macros.Macros to generate parquet read/write support. Here's an example:
```scala
import com.twitter.scalding.parquet.tuple.macros.Macros._

case class SampleClass(x: Int, y: String)

class WriteToTypedParquetTupleJob(args: Args) extends Job(args) {
val outputPath = args.required("output")
val sink = TypedParquetSink[SampleClass](outputPath)

TypedPipe.from(List(SampleClass(0, "foo"), SampleClass(1, "bar"))).write(sink)
}

class ReadWithFilterPredicateJob(args: Args) extends Job(args) {
val fp: FilterPredicate = FilterApi.eq(binaryColumn("y"), Binary.fromString("foo"))

val inputPath = args.required("input")
val outputPath = args.required("output")

val input = TypedParquet[SampleClass](inputPath, fp)

TypedPipe.from(input).map(_.x).write(TypedTsv[Int](outputPath))
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import _root_.parquet.filter2.predicate.FilterPredicate
import cascading.scheme.Scheme
import com.twitter.scalding._
import com.twitter.scalding.parquet.HasFilterPredicate
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetWriteSupport, ParquetReadSupport, TypedParquetTupleScheme }

import scala.reflect.ClassTag
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetReadSupport, ParquetWriteSupport, TypedParquetTupleScheme }

/**
* Typed parquet tuple
Expand All @@ -16,87 +14,55 @@ object TypedParquet {
/**
* Create readable typed parquet source.
* Here is an example:
*
* case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
*
* class ReadSupport extends ParquetReadSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* override val tupleConverter: ParquetTupleConverter[SampleClassB] = caseClassParquetTupleConverter[SampleClassB]
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* val parquetTuple = TypedParquet[SampleClassB, ReadSupport](Seq(outputPath))
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* val parquetTuple = TypedParquet[SampleClass](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null)
def apply[T](paths: Seq[String])(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, readSupport, null)

def apply[T](path: String)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] = apply[T](Seq(path))

/**
* Create readable typed parquet source with filter predicate.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String], fp: Option[FilterPredicate])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null) {
override def withFilter = fp
def apply[T](paths: Seq[String], fp: FilterPredicate)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, readSupport, null) {
override def withFilter = Some(fp)
}

/**
* Create typed parquet source supports both R/W.
* @param paths paths of parquet I/O
* @param r Read support type tag
* @param w Write support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit r: ClassTag[R],
w: ClassTag[W]) = {
val readSupport = r.runtimeClass.asInstanceOf[Class[R]]
val writeSupport = w.runtimeClass.asInstanceOf[Class[W]]
new TypedFixedPathParquetTuple[T, R, W](paths, readSupport, writeSupport)
}

def apply[T](path: String, fp: FilterPredicate)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
apply[T](Seq(path), fp)
}

object TypedParquetSink {
/**
* Create typed parquet sink.
* Here is an example:
*
* case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
*
* class WriteSupport extends ParquetWriteSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
*
* override def writeRecord(r: SampleClassB, rc: RecordConsumer, schema: MessageType): Unit =
* caseClassWriteSupport[SampleClassB](r, rc, schema)
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* val sink = TypedParquetSink[SampleClassB, WriteSupport](Seq(outputPath))
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* val sink = TypedParquetSink[SampleClass](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam W Write support type
* @return a typed parquet source.
*/
def apply[T, W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit t: ClassTag[W]) =
new TypedFixedPathParquetTuple[T, ParquetReadSupport[T], W](paths, null, t.runtimeClass.asInstanceOf[Class[W]])
def apply[T](paths: Seq[String])(implicit writeSupport: ParquetWriteSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, null, writeSupport)

def apply[T](path: String)(implicit writeSupport: ParquetWriteSupport[T]): TypedParquet[T] = apply[T](Seq(path))
}

/**
* Typed Parquet tuple source/sink.
*/
trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] extends FileSource with Mappable[T]
trait TypedParquet[T] extends FileSource with Mappable[T]
with TypedSink[T] with HasFilterPredicate {

val readSupport: Class[R]
val writeSupport: Class[W]
def readSupport: ParquetReadSupport[T]
def writeSupport: ParquetWriteSupport[T]

override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])

Expand All @@ -108,5 +74,5 @@ trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] e
}
}

class TypedFixedPathParquetTuple[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](val paths: Seq[String],
val readSupport: Class[R], val writeSupport: Class[W]) extends FixedPathSource(paths: _*) with TypedParquet[T, R, W]
class TypedFixedPathParquetTuple[T](val paths: Seq[String], val readSupport: ParquetReadSupport[T],
val writeSupport: ParquetWriteSupport[T]) extends FixedPathSource(paths: _*) with TypedParquet[T]
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.twitter.scalding.parquet.tuple.macros

import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetSchemaProvider, ParquetTupleConverterProvider, WriteSupportProvider }
import com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter
import parquet.io.api.RecordConsumer
import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetReadSupportProvider, ParquetSchemaProvider, WriteSupportProvider }
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetReadSupport, ParquetWriteSupport }

import scala.language.experimental.macros

Expand Down Expand Up @@ -35,20 +33,17 @@ object Macros {
* @tparam T Case class type that contains primitive fields or collection fields or nested case class.
* @return Generated case class parquet message type string
*/
def caseClassParquetSchema[T]: String = macro ParquetSchemaProvider.toParquetSchemaImpl[T]
implicit def caseClassParquetSchema[T]: String = macro ParquetSchemaProvider.toParquetSchemaImpl[T]

/**
* Macro used to generate parquet tuple converter for a given case class.
*
* @tparam T Case class type that contains primitive or collection type fields or nested case class.
* @return Generated parquet converter
* Macro generated case class read support
*/
def caseClassParquetTupleConverter[T]: ParquetTupleConverter[T] = macro ParquetTupleConverterProvider.toParquetTupleConverterImpl[T]
implicit def caseClassParquetReadSupport[T]: ParquetReadSupport[T] = macro ParquetReadSupportProvider.toParquetReadSupportImpl[T]

/**
* Macro used to generate case class write support to parquet.
* @tparam T User defined case class tuple type.
* @return Generated case class tuple write support function.
*/
def caseClassWriteSupport[T]: (T, RecordConsumer, MessageType) => Unit = macro WriteSupportProvider.toWriteSupportImpl[T]
implicit def caseClassParquetWriteSupport[T]: ParquetWriteSupport[T] = macro WriteSupportProvider.toWriteSupportImpl[T]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import com.twitter.scalding.parquet.tuple.scheme._

import scala.reflect.macros.Context

object ParquetTupleConverterProvider {
object ParquetReadSupportProvider {
private[this] sealed trait CollectionType
private[this] case object NOT_A_COLLECTION extends CollectionType
private[this] case object OPTION extends CollectionType
private[this] case object LIST extends CollectionType
private[this] case object SET extends CollectionType
private[this] case object MAP extends CollectionType

def toParquetTupleConverterImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetTupleConverter[T]] = {
def toParquetReadSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetReadSupport[T]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
Expand Down Expand Up @@ -194,6 +194,12 @@ object ParquetTupleConverterProvider {
val groupConverter = buildGroupConverter(T.tpe, converters, converterGetters, convertersResetCalls,
buildTupleValue(T.tpe, fieldValues))

ctx.Expr[ParquetTupleConverter[T]](groupConverter)
val schema = ParquetSchemaProvider.toParquetSchemaImpl[T](ctx)
val readSupport = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetReadSupport[$T]($schema) {
override val tupleConverter: _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter[$T] = $groupConverter
}
"""
ctx.Expr[ParquetReadSupport[T]](readSupport)
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.twitter.scalding.parquet.tuple.macros.impl

import com.twitter.bijection.macros.impl.IsCaseClassImpl
import parquet.io.api.RecordConsumer
import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport

import scala.reflect.macros.Context

object WriteSupportProvider {

def toWriteSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[(T, RecordConsumer, MessageType) => Unit] = {
def toWriteSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetWriteSupport[T]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
Expand Down Expand Up @@ -39,7 +38,7 @@ object WriteSupportProvider {

fieldType match {
case tpe if tpe =:= typeOf[String] =>
writePrimitiveField(q"rc.addBinary(Binary.fromString($fValue))")
writePrimitiveField(q"rc.addBinary(_root_.parquet.io.api.Binary.fromString($fValue))")
case tpe if tpe =:= typeOf[Boolean] =>
writePrimitiveField(q"rc.addBoolean($fValue)")
case tpe if tpe =:= typeOf[Short] =>
Expand Down Expand Up @@ -124,16 +123,17 @@ object WriteSupportProvider {
if (finalIdx == 0)
ctx.abort(ctx.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?")

val writeFunction: Tree = q"""
val writeFunc = (t: $T, rc: _root_.parquet.io.api.RecordConsumer, schema: _root_.parquet.schema.MessageType) => {

var $rootGroupName: _root_.parquet.schema.GroupType = schema
rc.startMessage
$funcBody
rc.endMessage
val schema = ParquetSchemaProvider.toParquetSchemaImpl[T](ctx)
val writeSupport: Tree = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport[$T]($schema) {
override def writeRecord(t: $T, rc: _root_.parquet.io.api.RecordConsumer, schema: _root_.parquet.schema.MessageType): Unit = {
var $rootGroupName: _root_.parquet.schema.GroupType = schema
rc.startMessage
$funcBody
rc.endMessage
}
}
writeFunc
"""
ctx.Expr[(T, RecordConsumer, MessageType) => Unit](writeFunction)
ctx.Expr[ParquetWriteSupport[T]](writeSupport)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.twitter.scalding.parquet.tuple.scheme
import parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter }
import scala.util.Try

trait TupleFieldConverter[+T] extends Converter {
trait TupleFieldConverter[+T] extends Converter with Serializable {
/**
* Current value read from parquet column
*/
Expand Down
Loading

0 comments on commit 7af2fae

Please sign in to comment.