Repository: spark Updated Branches: refs/heads/master e31c8ffca -> 89f47434e
Reuses Row object in ExistingRdd.productToRowRdd() Author: Cheng Lian <[email protected]> Closes #432 from liancheng/reuseRow and squashes the following commits: 9e6d083 [Cheng Lian] Simplified code with BufferedIterator 52acec9 [Cheng Lian] Reuses Row object in ExistingRdd.productToRowRdd() Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89f47434 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89f47434 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89f47434 Branch: refs/heads/master Commit: 89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3 Parents: e31c8ff Author: Cheng Lian <[email protected]> Authored: Fri Apr 18 10:02:27 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Apr 18 10:02:27 2014 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/basicOperators.scala | 21 +++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/89f47434/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index ab2e624..eedcc7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.util.MutablePair - case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output = projectList.map(_.toAttribute) @@ -143,8 +142,24 @@ object ExistingRdd { } def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { - // TODO: Reuse the row, don't use map on the product iterator. Maybe code gen? - data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row) + data.mapPartitions { iterator => + if (iterator.isEmpty) { + Iterator.empty + } else { + val bufferedIterator = iterator.buffered + val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity) + + bufferedIterator.map { r => + var i = 0 + while (i < mutableRow.length) { + mutableRow(i) = r.productElement(i) + i += 1 + } + + mutableRow + } + } + } } def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
