Repository: spark
Updated Branches:
refs/heads/master 91984978e -> 1d1de28a3
[SPARK-13827][SQL] Can't add subquery to an operator with same-name outputs
while generate SQL string
## What changes were proposed in this pull request?
This PR tries to solve a fundamental issue in the `SQLBuilder`. When we want to
turn a logical plan into SQL string and put it after FROM clause, we need to
wrap it with a sub-query. However, a logical plan is allowed to have same-name
outputs with different qualifiers(e.g. the `Join` operator), and this kind of
plan can't be put under a subquery as we will erase and assign a new qualifier
to all outputs and make it impossible to distinguish same-name outputs.
To solve this problem, this PR renames all attributes with globally unique
names(using exprId), so that we don't need qualifiers to resolve ambiguity
anymore.
For example, `SELECT x.key, MAX(y.key) OVER () FROM t x JOIN t y`, we will
parse this SQL to a Window operator and a Project operator, and add a sub-query
between them. The generated SQL looks like:
```
SELECT sq_1.key, sq_1.max
FROM (
SELECT sq_0.key, sq_0.key, MAX(sq_0.key) OVER () AS max
FROM (
SELECT x.key, y.key FROM t1 AS x JOIN t2 AS y
) AS sq_0
) AS sq_1
```
You can see, the `key` columns become ambiguous after `sq_0`.
After this PR, it will generate something like:
```
SELECT attr_30 AS key, attr_37 AS max
FROM (
SELECT attr_30, attr_37
FROM (
SELECT attr_30, attr_35, MAX(attr_35) AS attr_37
FROM (
SELECT attr_30, attr_35 FROM
(SELECT key AS attr_30 FROM t1) AS sq_0
INNER JOIN
(SELECT key AS attr_35 FROM t1) AS sq_1
) AS sq_2
) AS sq_3
) AS sq_4
```
The outermost SELECT is used to turn the generated named to real names back,
and the innermost SELECT is used to alias real columns to our generated names.
Between them, there is no name ambiguity anymore.
## How was this patch tested?
existing tests and new tests in LogicalPlanToSQLSuite.
Author: Wenchen Fan <[email protected]>
Closes #11658 from cloud-fan/gensql.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d1de28a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d1de28a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d1de28a
Branch: refs/heads/master
Commit: 1d1de28a3c3c3a4bc37dc7565b9178a712df493a
Parents: 9198497
Author: Wenchen Fan <[email protected]>
Authored: Wed Mar 16 11:57:28 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Wed Mar 16 11:57:28 2016 -0700
----------------------------------------------------------------------
.../catalyst/expressions/namedExpressions.scala | 6 +-
.../org/apache/spark/sql/hive/SQLBuilder.scala | 226 +++++++++++--------
.../spark/sql/hive/LogicalPlanToSQLSuite.scala | 18 ++
3 files changed, 147 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1d1de28a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 1af5437..271ef33 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -185,8 +185,7 @@ case class Alias(child: Expression, name: String)(
override def sql: String = {
val qualifiersString =
if (qualifiers.isEmpty) "" else
qualifiers.map(quoteIdentifier).mkString("", ".", ".")
- val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
- s"${child.sql} AS $qualifiersString${quoteIdentifier(aliasName)}"
+ s"${child.sql} AS $qualifiersString${quoteIdentifier(name)}"
}
}
@@ -302,8 +301,7 @@ case class AttributeReference(
override def sql: String = {
val qualifiersString =
if (qualifiers.isEmpty) "" else
qualifiers.map(quoteIdentifier).mkString("", ".", ".")
- val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
- s"$qualifiersString${quoteIdentifier(attrRefName)}"
+ s"$qualifiersString${quoteIdentifier(name)}"
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1d1de28a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 760335b..3bc8e9a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
@@ -54,8 +55,26 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
def toSQL: String = {
val canonicalizedPlan = Canonicalizer.execute(logicalPlan)
+ val outputNames = logicalPlan.output.map(_.name)
+ val qualifiers = logicalPlan.output.flatMap(_.qualifiers).distinct
+
+ // Keep the qualifier information by using it as sub-query name, if there
is only one qualifier
+ // present.
+ val finalName = if (qualifiers.length == 1) {
+ qualifiers.head
+ } else {
+ SQLBuilder.newSubqueryName
+ }
+
+ // Canonicalizer will remove all naming information, we should add it back
by adding an extra
+ // Project and alias the outputs.
+ val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
+ case (attr, name) => Alias(attr.withQualifiers(Nil), name)()
+ }
+ val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName,
canonicalizedPlan))
+
try {
- val replaced = canonicalizedPlan.transformAllExpressions {
+ val replaced = finalPlan.transformAllExpressions {
case e: SubqueryExpression =>
SubqueryHolder(new SQLBuilder(e.query, sqlContext).toSQL)
case e: NonSQLExpression =>
@@ -109,23 +128,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"
- case p: Sample if p.isTableSample =>
- val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) *
100))
- p.child match {
- case m: MetastoreRelation =>
- val aliasName = m.alias.getOrElse("")
- build(
- s"`${m.databaseName}`.`${m.tableName}`",
- "TABLESAMPLE(" + fraction + " PERCENT)",
- aliasName)
- case s: SubqueryAlias =>
- val aliasName = if (s.child.isInstanceOf[SubqueryAlias]) s.alias
else ""
- val plan = if (s.child.isInstanceOf[SubqueryAlias]) s.child else s
- build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)",
aliasName)
- case _ =>
- build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
- }
-
case Filter(condition, child) =>
val whereOrHaving = child match {
case _: Aggregate => "HAVING"
@@ -147,18 +149,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
case p: Except =>
build("(" + toSQL(p.left), ") EXCEPT (", toSQL(p.right) + ")")
- case p: SubqueryAlias =>
- p.child match {
- // Persisted data source relation
- case LogicalRelation(_, _, Some(TableIdentifier(table,
Some(database)))) =>
- s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
- // Parentheses is not used for persisted data source relations
- // e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1
- case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) =>
- build(toSQL(p.child), "AS", p.alias)
- case _ =>
- build("(" + toSQL(p.child) + ")", "AS", p.alias)
- }
+ case p: SubqueryAlias => build("(" + toSQL(p.child) + ")", "AS", p.alias)
case p: Join =>
build(
@@ -168,11 +159,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
toSQL(p.right),
p.condition.map(" ON " + _.sql).getOrElse(""))
- case p: MetastoreRelation =>
- build(
- s"${quoteIdentifier(p.databaseName)}.${quoteIdentifier(p.tableName)}",
- p.alias.map(a => s" AS ${quoteIdentifier(a)}").getOrElse("")
- )
+ case SQLTable(database, table, _, sample) =>
+ val qualifiedName =
s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
+ sample.map { case (lowerBound, upperBound) =>
+ val fraction = math.min(100, math.max(0, (upperBound - lowerBound) *
100))
+ qualifiedName + " TABLESAMPLE(" + fraction + " PERCENT)"
+ }.getOrElse(qualifiedName)
case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _))
if orders.map(_.child) == partitionExprs =>
@@ -274,8 +266,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
val groupingSetSQL = "GROUPING SETS(" +
groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ")
+ ")"
- val aggExprs = agg.aggregateExpressions.map { case expr =>
- expr.transformDown {
+ val aggExprs = agg.aggregateExpressions.map { case aggExpr =>
+ val originalAggExpr = aggExpr.transformDown {
// grouping_id() is converted to VirtualColumn.groupingIdName by
Analyzer. Revert it back.
case ar: AttributeReference if ar == gid => GroupingID(Nil)
case ar: AttributeReference if groupByAttrMap.contains(ar) =>
groupByAttrMap(ar)
@@ -286,6 +278,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
groupByExprs.lift(idx).map(Grouping).getOrElse(a)
}
+
+ originalAggExpr match {
+ // Ancestor operators may reference the output of this grouping set,
and we use exprId to
+ // generate a unique name for each attribute, so we should make sure
the transformed
+ // aggregate expression won't change the output, i.e. exprId and alias
name should remain
+ // the same.
+ case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne
+ case e => Alias(e, normalizedName(aggExpr))(exprId = aggExpr.exprId)
+ }
}
build(
@@ -308,6 +309,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
)
}
+ private def normalizedName(n: NamedExpression): String = "gen_attr_" +
n.exprId.id
+
object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Collapse Project", FixedPoint(100),
@@ -316,31 +319,55 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
// `Aggregate`s.
CollapseProject),
Batch("Recover Scoping Info", Once,
- // Used to handle other auxiliary `Project`s added by analyzer (e.g.
- // `ResolveAggregateFunctions` rule)
- AddSubquery,
- // Previous rule will add extra sub-queries, this rule is used to
re-propagate and update
- // the qualifiers bottom up, e.g.:
- //
- // Sort
- // ordering = t1.a
- // Project
- // projectList = [t1.a, t1.b]
- // Subquery gen_subquery
- // child ...
- //
- // will be transformed to:
- //
- // Sort
- // ordering = gen_subquery.a
- // Project
- // projectList = [gen_subquery.a, gen_subquery.b]
- // Subquery gen_subquery
- // child ...
- UpdateQualifiers
+ // Remove all sub queries, as we will insert new ones when it's
necessary.
+ EliminateSubqueryAliases,
+ // A logical plan is allowed to have same-name outputs with different
qualifiers(e.g. the
+ // `Join` operator). However, this kind of plan can't be put under a
sub query as we will
+ // erase and assign a new qualifier to all outputs and make it
impossible to distinguish
+ // same-name outputs. This rule renames all attributes, to guarantee
different
+ // attributes(with different exprId) always have different names. It
also removes all
+ // qualifiers, as attributes have unique names now and we don't need
qualifiers to resolve
+ // ambiguity.
+ NormalizedAttribute,
+ // Finds the table relations and wrap them with `SQLTable`s. If there
are any `Sample`
+ // operators on top of a table relation, merge the sample information
into `SQLTable` of
+ // that table relation, as we can only convert table sample to
standard SQL string.
+ ResolveSQLTable,
+ // Insert sub queries on top of operators that need to appear after
FROM clause.
+ AddSubquery
)
)
+ object NormalizedAttribute extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformAllExpressions {
+ case a: AttributeReference =>
+ AttributeReference(normalizedName(a), a.dataType)(exprId = a.exprId,
qualifiers = Nil)
+ case a: Alias =>
+ Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers =
Nil)
+ }
+ }
+
+ object ResolveSQLTable extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
+ case Sample(lowerBound, upperBound, _, _, ExtractSQLTable(table)) =>
+ aliasColumns(table.withSample(lowerBound, upperBound))
+ case ExtractSQLTable(table) =>
+ aliasColumns(table)
+ }
+
+ /**
+ * Aliases the table columns to the generated attribute names, as we use
exprId to generate
+ * unique name for each attribute when normalize attributes, and we
can't reference table
+ * columns with their real names.
+ */
+ private def aliasColumns(table: SQLTable): LogicalPlan = {
+ val aliasedOutput = table.output.map { attr =>
+ Alias(attr, normalizedName(attr))(exprId = attr.exprId)
+ }
+ addSubquery(Project(aliasedOutput, table))
+ }
+ }
+
object AddSubquery extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
// This branch handles aggregate functions within HAVING clauses. For
example:
@@ -354,55 +381,56 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext:
SQLContext) extends Loggi
// +- Filter ...
// +- Aggregate ...
// +- MetastoreRelation default, src, None
- case plan @ Project(_, Filter(_, _: Aggregate)) =>
wrapChildWithSubquery(plan)
+ case p @ Project(_, f @ Filter(_, _: Aggregate)) => p.copy(child =
addSubquery(f))
- case w @ Window(_, _, _, Filter(_, _: Aggregate)) =>
wrapChildWithSubquery(w)
+ case w @ Window(_, _, _, f @ Filter(_, _: Aggregate)) => w.copy(child
= addSubquery(f))
- case plan @ Project(_,
- _: SubqueryAlias
- | _: Filter
- | _: Join
- | _: MetastoreRelation
- | OneRowRelation
- | _: LocalLimit
- | _: GlobalLimit
- | _: Sample
- ) => plan
-
- case plan: Project => wrapChildWithSubquery(plan)
+ case p: Project => p.copy(child = addSubqueryIfNeeded(p.child))
// We will generate "SELECT ... FROM ..." for Window operator, so its
child operator should
// be able to put in the FROM clause, or we wrap it with a subquery.
- case w @ Window(_, _, _,
- _: SubqueryAlias
- | _: Filter
- | _: Join
- | _: MetastoreRelation
- | OneRowRelation
- | _: LocalLimit
- | _: GlobalLimit
- | _: Sample
- ) => w
-
- case w: Window => wrapChildWithSubquery(w)
- }
+ case w: Window => w.copy(child = addSubqueryIfNeeded(w.child))
- private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = {
- val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child)
- plan.withNewChildren(Seq(newChild))
+ case j: Join => j.copy(
+ left = addSubqueryIfNeeded(j.left),
+ right = addSubqueryIfNeeded(j.right))
}
}
- object UpdateQualifiers extends Rule[LogicalPlan] {
- override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
- case plan =>
- val inputAttributes = plan.children.flatMap(_.output)
- plan transformExpressions {
- case a: AttributeReference if !plan.producedAttributes.contains(a)
=>
- val qualifier = inputAttributes.find(_ semanticEquals
a).map(_.qualifiers)
- a.withQualifiers(qualifier.getOrElse(Nil))
- }
- }
+ private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
+ SubqueryAlias(SQLBuilder.newSubqueryName, plan)
+ }
+
+ private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan
match {
+ case _: SubqueryAlias => plan
+ case _: Filter => plan
+ case _: Join => plan
+ case _: LocalLimit => plan
+ case _: GlobalLimit => plan
+ case _: SQLTable => plan
+ case OneRowRelation => plan
+ case _ => addSubquery(plan)
+ }
+ }
+
+ case class SQLTable(
+ database: String,
+ table: String,
+ output: Seq[Attribute],
+ sample: Option[(Double, Double)] = None) extends LeafNode {
+ def withSample(lowerBound: Double, upperBound: Double): SQLTable =
+ this.copy(sample = Some(lowerBound -> upperBound))
+ }
+
+ object ExtractSQLTable {
+ def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
+ case l @ LogicalRelation(_, _, Some(TableIdentifier(table,
Some(database)))) =>
+ Some(SQLTable(database, table, l.output.map(_.withQualifiers(Nil))))
+
+ case m: MetastoreRelation =>
+ Some(SQLTable(m.databaseName, m.tableName,
m.output.map(_.withQualifiers(Nil))))
+
+ case _ => None
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1d1de28a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 198652b..f02ecb4 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -550,4 +550,22 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with
SQLTestUtils {
|WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
""".stripMargin)
}
+
+ test("window with join") {
+ checkHiveQl(
+ """
+ |SELECT x.key, MAX(y.key) OVER (PARTITION BY x.key % 5 ORDER BY x.key)
+ |FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key
+ """.stripMargin)
+ }
+
+ test("join 2 tables and aggregate function in having clause") {
+ checkHiveQl(
+ """
+ |SELECT COUNT(a.value), b.KEY, a.KEY
+ |FROM parquet_t1 a, parquet_t1 b
+ |GROUP BY a.KEY, b.KEY
+ |HAVING MAX(a.KEY) > 0
+ """.stripMargin)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]