Skip to content

Commit

Permalink
Reuses Row object in ExistingRdd.productToRowRdd()
Browse files Browse the repository at this point in the history
Author: Cheng Lian <[email protected]>

Closes apache#432 from liancheng/reuseRow and squashes the following commits:

9e6d083 [Cheng Lian] Simplified code with BufferedIterator
52acec9 [Cheng Lian] Reuses Row object in ExistingRdd.productToRowRdd()
  • Loading branch information
liancheng authored and rxin committed Apr 18, 2014
1 parent e31c8ff commit 89f4743
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair


case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output = projectList.map(_.toAttribute)

Expand Down Expand Up @@ -143,8 +142,24 @@ object ExistingRdd {
}

def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
// TODO: Reuse the row, don't use map on the product iterator. Maybe code gen?
data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)

bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = r.productElement(i)
i += 1
}

mutableRow
}
}
}
}

def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
Expand Down

0 comments on commit 89f4743

Please sign in to comment.