Repository: spark
Updated Branches:
  refs/heads/master 416e71af4 -> c3a6269ca


[SPARK-13789] Infer additional constraints from attribute equality

## What changes were proposed in this pull request?

This PR adds support for inferring an additional set of data constraints based 
on attribute equality. For e.g., if an operator has constraints of the form (`a 
= 5`, `a = b`), we can now automatically infer an additional constraint of the 
form `b = 5`

## How was this patch tested?

Tested that new constraints are properly inferred for filters (by adding a new 
test) and equi-joins (by modifying an existing test)

Author: Sameer Agarwal <[email protected]>

Closes #11618 from sameeragarwal/infer-isequal-constraints.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3a6269c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3a6269c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3a6269c

Branch: refs/heads/master
Commit: c3a6269ca994a977303a450043a577f435565f4e
Parents: 416e71a
Author: Sameer Agarwal <[email protected]>
Authored: Thu Mar 10 17:29:45 2016 -0800
Committer: Reynold Xin <[email protected]>
Committed: Thu Mar 10 17:29:45 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/QueryPlan.scala    | 21 ++++++++++++++++++++
 .../plans/ConstraintPropagationSuite.scala      | 14 +++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c3a6269c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 40c06ed..c222571 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -32,6 +32,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
    */
   protected def getRelevantConstraints(constraints: Set[Expression]): 
Set[Expression] = {
     constraints
+      .union(inferAdditionalConstraints(constraints))
       .union(constructIsNotNullConstraints(constraints))
       .filter(constraint =>
         constraint.references.nonEmpty && 
constraint.references.subsetOf(outputSet))
@@ -64,6 +65,26 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
   }
 
   /**
+   * Infers an additional set of constraints from a given set of equality 
constraints.
+   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), 
this returns an
+   * additional constraint of the form `b = 5`
+   */
+  private def inferAdditionalConstraints(constraints: Set[Expression]): 
Set[Expression] = {
+    var inferredConstraints = Set.empty[Expression]
+    constraints.foreach {
+      case eq @ EqualTo(l: Attribute, r: Attribute) =>
+        inferredConstraints ++= (constraints - eq).map(_ transform {
+          case a: Attribute if a.semanticEquals(l) => r
+        })
+        inferredConstraints ++= (constraints - eq).map(_ transform {
+          case a: Attribute if a.semanticEquals(r) => l
+        })
+      case _ => // No inference
+    }
+    inferredConstraints -- constraints
+  }
+
+  /**
    * An [[ExpressionSet]] that contains invariants about the rows output by 
this operator. For
    * example, if this set contains the expression `a = 2` then that expression 
is guaranteed to
    * evaluate to `true` for all rows produced.

http://git-wip-us.apache.org/repos/asf/spark/blob/c3a6269c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index e70d379..a9375a7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -158,6 +158,7 @@ class ConstraintPropagationSuite extends SparkFunSuite {
         tr2.resolveQuoted("d", caseInsensitiveResolution).get < 100,
         tr1.resolveQuoted("a", caseInsensitiveResolution).get ===
           tr2.resolveQuoted("a", caseInsensitiveResolution).get,
+        tr2.resolveQuoted("a", caseInsensitiveResolution).get > 10,
         IsNotNull(tr2.resolveQuoted("a", caseInsensitiveResolution).get),
         IsNotNull(tr1.resolveQuoted("a", caseInsensitiveResolution).get),
         IsNotNull(tr2.resolveQuoted("d", caseInsensitiveResolution).get))))
@@ -203,4 +204,17 @@ class ConstraintPropagationSuite extends SparkFunSuite {
       .join(tr2.where('d.attr < 100), FullOuter, Some("tr1.a".attr === 
"tr2.a".attr))
       .analyze.constraints.isEmpty)
   }
+
+  test("infer additional constraints in filters") {
+    val tr = LocalRelation('a.int, 'b.int, 'c.int)
+
+    verifyConstraints(tr
+      .where('a.attr > 10 && 'a.attr === 'b.attr)
+      .analyze.constraints,
+      ExpressionSet(Seq(resolveColumn(tr, "a") > 10,
+        resolveColumn(tr, "b") > 10,
+        resolveColumn(tr, "a") === resolveColumn(tr, "b"),
+        IsNotNull(resolveColumn(tr, "a")),
+        IsNotNull(resolveColumn(tr, "b")))))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to