This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 81639090622f [SPARK-45882][SQL][3.4] BroadcastHashJoinExec propagate
partitioning should respect CoalescedHashPartitioning
81639090622f is described below
commit 81639090622fbaa171527783bf9667a01a9511bb
Author: ulysses-you <[email protected]>
AuthorDate: Tue Nov 14 19:55:53 2023 +0800
[SPARK-45882][SQL][3.4] BroadcastHashJoinExec propagate partitioning should
respect CoalescedHashPartitioning
This pr backport https://github.com/apache/spark/pull/43753 to branch-3.4
### What changes were proposed in this pull request?
Add HashPartitioningLike trait and make HashPartitioning and
CoalescedHashPartitioning extend it. When we propagate output partiitoning, we
should handle HashPartitioningLike instead of HashPartitioning. This pr also
changes the BroadcastHashJoinExec to use HashPartitioningLike to avoid
regression.
### Why are the changes needed?
Avoid unnecessary shuffle exchange.
### Does this PR introduce _any_ user-facing change?
yes, avoid regression
### How was this patch tested?
add test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43793 from ulysses-you/partitioning-3.4.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: youxiduo <[email protected]>
---
.../sql/catalyst/plans/physical/partitioning.scala | 46 ++++++++++------------
.../execution/joins/BroadcastHashJoinExec.scala | 11 +++---
.../scala/org/apache/spark/sql/JoinSuite.scala | 28 ++++++++++++-
3 files changed, 54 insertions(+), 31 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 1eefe65859bd..211b5a05eb70 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -258,18 +258,8 @@ case object SinglePartition extends Partitioning {
SinglePartitionShuffleSpec
}
-/**
- * Represents a partitioning where rows are split up across partitions based
on the hash
- * of `expressions`. All rows where `expressions` evaluate to the same values
are guaranteed to be
- * in the same partition.
- *
- * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and
Spark requires
- * stateful operators to retain the same physical partitioning during the
lifetime of the query
- * (including restart), the result of evaluation on `partitionIdExpression`
must be unchanged
- * across Spark versions. Violation of this requirement may bring silent
correctness issue.
- */
-case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
- extends Expression with Partitioning with Unevaluable {
+trait HashPartitioningLike extends Expression with Partitioning with
Unevaluable {
+ def expressions: Seq[Expression]
override def children: Seq[Expression] = expressions
override def nullable: Boolean = false
@@ -294,6 +284,20 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
}
}
}
+}
+
+/**
+ * Represents a partitioning where rows are split up across partitions based
on the hash
+ * of `expressions`. All rows where `expressions` evaluate to the same values
are guaranteed to be
+ * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and
Spark requires
+ * stateful operators to retain the same physical partitioning during the
lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression`
must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent
correctness issue.
+ */
+case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
+ extends HashPartitioningLike {
override def createShuffleSpec(distribution: ClusteredDistribution):
ShuffleSpec =
HashShuffleSpec(this, distribution)
@@ -306,7 +310,6 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions
= newChildren)
-
}
case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
@@ -316,25 +319,18 @@ case class CoalescedBoundary(startReducerIndex: Int,
endReducerIndex: Int)
* fewer number of partitions.
*/
case class CoalescedHashPartitioning(from: HashPartitioning, partitions:
Seq[CoalescedBoundary])
- extends Expression with Partitioning with Unevaluable {
-
- override def children: Seq[Expression] = from.expressions
- override def nullable: Boolean = from.nullable
- override def dataType: DataType = from.dataType
+ extends HashPartitioningLike {
- override def satisfies0(required: Distribution): Boolean =
from.satisfies0(required)
+ override def expressions: Seq[Expression] = from.expressions
override def createShuffleSpec(distribution: ClusteredDistribution):
ShuffleSpec =
CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)
- override protected def withNewChildrenInternal(
- newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
- copy(from = from.copy(expressions = newChildren))
-
override val numPartitions: Int = partitions.length
- override def toString: String = from.toString
- override def sql: String = from.sql
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
+ copy(from = from.copy(expressions = newChildren))
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 69c760b5a00b..d46e2efb2d10 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution,
Distribution, HashPartitioning, Partitioning, PartitioningCollection,
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution,
Distribution, HashPartitioningLike, Partitioning, PartitioningCollection,
UnspecifiedDistribution}
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
@@ -73,7 +73,7 @@ case class BroadcastHashJoinExec(
joinType match {
case _: InnerLike if conf.broadcastHashJoinOutputPartitioningExpandLimit
> 0 =>
streamedPlan.outputPartitioning match {
- case h: HashPartitioning => expandOutputPartitioning(h)
+ case h: HashPartitioningLike => expandOutputPartitioning(h)
case c: PartitioningCollection => expandOutputPartitioning(c)
case other => other
}
@@ -99,7 +99,7 @@ case class BroadcastHashJoinExec(
private def expandOutputPartitioning(
partitioning: PartitioningCollection): PartitioningCollection = {
PartitioningCollection(partitioning.partitionings.flatMap {
- case h: HashPartitioning => expandOutputPartitioning(h).partitionings
+ case h: HashPartitioningLike => expandOutputPartitioning(h).partitionings
case c: PartitioningCollection => Seq(expandOutputPartitioning(c))
case other => Seq(other)
})
@@ -111,7 +111,8 @@ case class BroadcastHashJoinExec(
// the expanded partitioning will have the following expressions:
// Seq("a", "b", "c"), Seq("a", "b", "y"), Seq("a", "x", "c"), Seq("a", "x",
"y").
// The expanded expressions are returned as PartitioningCollection.
- private def expandOutputPartitioning(partitioning: HashPartitioning):
PartitioningCollection = {
+ private def expandOutputPartitioning(
+ partitioning: HashPartitioningLike): PartitioningCollection = {
val maxNumCombinations =
conf.broadcastHashJoinOutputPartitioningExpandLimit
var currentNumCombinations = 0
@@ -133,7 +134,7 @@ case class BroadcastHashJoinExec(
PartitioningCollection(
generateExprCombinations(partitioning.expressions, Nil)
- .map(HashPartitioning(_, partitioning.numPartitions)))
+ .map(exprs =>
partitioning.withNewChildren(exprs).asInstanceOf[HashPartitioningLike]))
}
protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 832b6ee5c329..5125708be32d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending,
GenericRow, SortOrd
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec,
ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec,
ShuffleExchangeLike}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.BatchEvalPythonExec
import org.apache.spark.sql.internal.SQLConf
@@ -1564,4 +1564,30 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
checkAnswer(sql(query), expected)
}
}
+
+ test("SPARK-45882: BroadcastHashJoinExec propagate partitioning should
respect " +
+ "CoalescedHashPartitioning") {
+ withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key ->
"true") {
+ val cached = spark.sql(
+ """
+ |select /*+ broadcast(testData) */ key, value, a
+ |from testData join (
+ | select a from testData2 group by a
+ |)tmp on key = a
+ |""".stripMargin).cache()
+ try {
+ // materialize aqe first to propagate output partitioning
+ cached.count()
+ val df = cached.groupBy("key").count()
+ val expected = Seq(Row(1, 1), Row(2, 1), Row(3, 1))
+ checkAnswer(df, expected)
+ assert(find(df.queryExecution.executedPlan) {
+ case _: ShuffleExchangeLike => true
+ case _ => false
+ }.isEmpty, df.queryExecution)
+ } finally {
+ cached.unpersist()
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]