Repository: spark
Updated Branches:
  refs/heads/branch-1.0 086ca9c86 -> 0e39c884c


[SPARK-2184][SQL] AddExchange isn't idempotent

...redPartitioning.

Author: Michael Armbrust <[email protected]>

Closes #1122 from marmbrus/fixAddExchange and squashes the following commits:

3417537 [Michael Armbrust] Don't bind partitioning expressions as that breaks 
comparison with requiredPartitioning.

(cherry picked from commit 5ff75c748a27bcfae71759d0e509218f0c5d0200)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e39c884
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e39c884
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e39c884

Branch: refs/heads/branch-1.0
Commit: 0e39c884cf840e4213fabfeebce0ad83ea7816a7
Parents: 086ca9c
Author: Michael Armbrust <[email protected]>
Authored: Wed Jun 18 17:52:42 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Jun 18 17:52:51 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/expressions/BoundAttribute.scala      | 4 ++--
 .../org/apache/spark/sql/catalyst/expressions/Row.scala      | 3 +++
 .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 8 ++++----
 3 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e39c884/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index 4ebf6c4..655d4a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -68,7 +68,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends 
Rule[TreeNode] {
 }
 
 object BindReferences extends Logging {
-  def bindReference(expression: Expression, input: Seq[Attribute]): Expression 
= {
+  def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A 
= {
     expression.transform { case a: AttributeReference =>
       attachTree(a, "Binding attribute") {
         val ordinal = input.indexWhere(_.exprId == a.exprId)
@@ -83,6 +83,6 @@ object BindReferences extends Logging {
           BoundReference(ordinal, a)
         }
       }
-    }
+    }.asInstanceOf[A] // Kind of a hack, but safe.  TODO: Tighten return type 
when possible.
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e39c884/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 77b5429..74ae723 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -208,6 +208,9 @@ class GenericMutableRow(size: Int) extends GenericRow(size) 
with MutableRow {
 
 
 class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
+  def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
+    this(ordering.map(BindReferences.bindReference(_, inputSchema)))
+
   def compare(a: Row, b: Row): Int = {
     var i = 0
     while (i < ordering.size) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0e39c884/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index cef2941..05dfb85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -22,7 +22,7 @@ import org.apache.spark.{HashPartitioner, RangePartitioner, 
SparkConf}
 import org.apache.spark.rdd.ShuffledRDD
 import org.apache.spark.sql.{SQLConf, SQLContext, Row}
 import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions.{MutableProjection, 
RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{NoBind, MutableProjection, 
RowOrdering}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.util.MutablePair
@@ -31,7 +31,7 @@ import org.apache.spark.util.MutablePair
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends 
UnaryNode {
+case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends 
UnaryNode with NoBind {
 
   override def outputPartitioning = newPartitioning
 
@@ -42,7 +42,7 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
       case HashPartitioning(expressions, numPartitions) =>
         // TODO: Eliminate redundant expressions in grouping key and value.
         val rdd = child.execute().mapPartitions { iter =>
-          val hashExpressions = new MutableProjection(expressions)
+          val hashExpressions = new MutableProjection(expressions, 
child.output)
           val mutablePair = new MutablePair[Row, Row]()
           iter.map(r => mutablePair.update(hashExpressions(r), r))
         }
@@ -53,7 +53,7 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
 
       case RangePartitioning(sortingExpressions, numPartitions) =>
         // TODO: RangePartitioner should take an Ordering.
-        implicit val ordering = new RowOrdering(sortingExpressions)
+        implicit val ordering = new RowOrdering(sortingExpressions, 
child.output)
 
         val rdd = child.execute().mapPartitions { iter =>
           val mutablePair = new MutablePair[Row, Null](null, null)

Reply via email to