Repository: spark
Updated Branches:
refs/heads/master a26afd521 -> aef506e39
[SPARK-17739][SQL] Collapse adjacent similar Window operators
## What changes were proposed in this pull request?
Currently, Spark does not collapse adjacent windows with the same partitioning
and sorting. This PR implements `CollapseWindow` optimizer to do the followings.
1. If the partition specs and order specs are the same, collapse into the
parent.
2. If the partition specs are the same and one order spec is a prefix of the
other, collapse to the more specific one.
For example:
```scala
val df = spark.range(1000).select($"id" % 100 as "grp", $"id", rand() as
"col1", rand() as "col2")
// Add summary statistics for all columns
import org.apache.spark.sql.expressions.Window
val cols = Seq("id", "col1", "col2")
val window = Window.partitionBy($"grp").orderBy($"id")
val result = cols.foldLeft(df) { (base, name) =>
base.withColumn(s"${name}_avg", avg(col(name)).over(window))
.withColumn(s"${name}_stddev", stddev(col(name)).over(window))
.withColumn(s"${name}_min", min(col(name)).over(window))
.withColumn(s"${name}_max", max(col(name)).over(window))
}
```
**Before**
```scala
scala> result.explain
== Physical Plan ==
Window [max(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST,
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#234], [grp#17L],
[id#14L ASC NULLS FIRST]
+- Window [min(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST,
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#216], [grp#17L],
[id#14L ASC NULLS FIRST]
+- Window [stddev_samp(col2#19) windowspecdefinition(grp#17L, id#14L ASC
NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
col2_stddev#191], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [avg(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS
FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#167],
[grp#17L], [id#14L ASC NULLS FIRST]
+- Window [max(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS
FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#152],
[grp#17L], [id#14L ASC NULLS FIRST]
+- Window [min(col1#18) windowspecdefinition(grp#17L, id#14L ASC
NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
col1_min#138], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [stddev_samp(col1#18) windowspecdefinition(grp#17L,
id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
col1_stddev#117], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [avg(col1#18) windowspecdefinition(grp#17L, id#14L
ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
col1_avg#97], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [max(id#14L) windowspecdefinition(grp#17L,
id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
id_max#86L], [grp#17L], [id#14L ASC NULLS FIRST]
+- Window [min(id#14L) windowspecdefinition(grp#17L,
id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
id_min#76L], [grp#17L], [id#14L ASC NULLS FIRST]
+- *Project [grp#17L, id#14L, col1#18, col2#19,
id_avg#26, id_stddev#42]
+- Window [stddev_samp(_w0#59)
windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS id_stddev#42], [grp#17L], [id#14L ASC NULLS FIRST]
+- *Project [grp#17L, id#14L, col1#18,
col2#19, id_avg#26, cast(id#14L as double) AS _w0#59]
+- Window [avg(id#14L)
windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS id_avg#26], [grp#17L], [id#14L ASC NULLS FIRST]
+- *Sort [grp#17L ASC NULLS FIRST,
id#14L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(grp#17L,
200)
+- *Project [(id#14L % 100) AS
grp#17L, id#14L, rand(-6329949029880411066) AS col1#18,
rand(-7251358484380073081) AS col2#19]
+- *Range (0, 1000, step=1,
splits=Some(8))
```
**After**
```scala
scala> result.explain
== Physical Plan ==
Window [max(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#220, min(col2#5)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS col2_min#202, stddev_samp(col2#5)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS col2_stddev#177, avg(col2#5)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS col2_avg#153, max(col1#4)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS col1_max#138, min(col1#4)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS col1_min#124, stddev_samp(col1#4)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS col1_stddev#103, avg(col1#4)
windowspecdefinition(grp#3L, id#
0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS
col1_avg#83, max(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST,
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#72L, min(id#0L)
windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW) AS id_min#62L], [grp#3L], [id#0L ASC NULLS FIRST]
+- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, id_stddev#28]
+- Window [stddev_samp(_w0#45) windowspecdefinition(grp#3L, id#0L ASC NULLS
FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#28],
[grp#3L], [id#0L ASC NULLS FIRST]
+- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, cast(id#0L as
double) AS _w0#45]
+- Window [avg(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS
FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#12],
[grp#3L], [id#0L ASC NULLS FIRST]
+- *Sort [grp#3L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(grp#3L, 200)
+- *Project [(id#0L % 100) AS grp#3L, id#0L,
rand(6537478539664068821) AS col1#4, rand(-8961093871295252795) AS col2#5]
+- *Range (0, 1000, step=1, splits=Some(8))
```
## How was this patch tested?
Pass the Jenkins tests with a newly added testsuite.
Author: Dongjoon Hyun <[email protected]>
Closes #15317 from dongjoon-hyun/SPARK-17739.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aef506e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aef506e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aef506e3
Branch: refs/heads/master
Commit: aef506e39a41cfe7198162c324a11ef2f01136c3
Parents: a26afd5
Author: Dongjoon Hyun <[email protected]>
Authored: Fri Sep 30 21:05:06 2016 -0700
Committer: Herman van Hovell <[email protected]>
Committed: Fri Sep 30 21:05:06 2016 -0700
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 12 +++
.../optimizer/CollapseWindowSuite.scala | 78 ++++++++++++++++++++
2 files changed, 90 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/aef506e3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9df8ce1..e5e2cd7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -88,6 +88,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog,
conf: CatalystConf)
// Operator combine
CollapseRepartition,
CollapseProject,
+ CollapseWindow,
CombineFilters,
CombineLimits,
CombineUnions,
@@ -538,6 +539,17 @@ object CollapseRepartition extends Rule[LogicalPlan] {
}
/**
+ * Collapse Adjacent Window Expression.
+ * - If the partition specs and order specs are the same, collapse into the
parent.
+ */
+object CollapseWindow extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1
== ps2 && os1 == os2 =>
+ w.copy(windowExpressions = we1 ++ we2, child = grandChild)
+ }
+}
+
+/**
* Generate a list of additional filters from an operator's existing
constraint but remove those
* that are either already part of the operator's condition or are part of the
operator's child
* constraints. These filters are currently inserted to the existing
conditions in the Filter
http://git-wip-us.apache.org/repos/asf/spark/blob/aef506e3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
new file mode 100644
index 0000000..797076e
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class CollapseWindowSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("CollapseWindow", FixedPoint(10),
+ CollapseWindow) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.double, 'b.double, 'c.string)
+ val a = testRelation.output(0)
+ val b = testRelation.output(1)
+ val c = testRelation.output(2)
+ val partitionSpec1 = Seq(c)
+ val partitionSpec2 = Seq(c + 1)
+ val orderSpec1 = Seq(c.asc)
+ val orderSpec2 = Seq(c.desc)
+
+ test("collapse two adjacent windows with the same partition/order") {
+ val query = testRelation
+ .window(Seq(min(a).as('min_a)), partitionSpec1, orderSpec1)
+ .window(Seq(max(a).as('max_a)), partitionSpec1, orderSpec1)
+ .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1)
+ .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation.window(Seq(
+ avg(b).as('avg_b),
+ sum(b).as('sum_b),
+ max(a).as('max_a),
+ min(a).as('min_a)), partitionSpec1, orderSpec1)
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("Don't collapse adjacent windows with different partitions or orders") {
+ val query1 = testRelation
+ .window(Seq(min(a).as('min_a)), partitionSpec1, orderSpec1)
+ .window(Seq(max(a).as('max_a)), partitionSpec1, orderSpec2)
+
+ val optimized1 = Optimize.execute(query1.analyze)
+ val correctAnswer1 = query1.analyze
+
+ comparePlans(optimized1, correctAnswer1)
+
+ val query2 = testRelation
+ .window(Seq(min(a).as('min_a)), partitionSpec1, orderSpec1)
+ .window(Seq(max(a).as('max_a)), partitionSpec2, orderSpec1)
+
+ val optimized2 = Optimize.execute(query2.analyze)
+ val correctAnswer2 = query2.analyze
+
+ comparePlans(optimized2, correctAnswer2)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]