This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new dfc9afadfff8 [SPARK-53074][SQL] Avoid partial clustering in SPJ to meet a child's required distribution dfc9afadfff8 is described below commit dfc9afadfff8f420ce1b3b8e8b22959820f70f6f Author: Chirag Singh <chirag.si...@databricks.com> AuthorDate: Tue Aug 12 20:42:28 2025 +0800 [SPARK-53074][SQL] Avoid partial clustering in SPJ to meet a child's required distribution ### What changes were proposed in this pull request? Currently, SPJ logic can apply partial clustering (when enabled) to either side of an inner JOIN as long as the nodes between the scan and JOIN preserve partitioning. This doesn't work if one of these nodes is using the scan's key-grouped partitioning to satisfy its required distribution (for example, a grouping agg or window function). This PR avoids this issue by avoiding applying a partially clustered distribution to a JOIN's child if any node in that child relies on the KeyGroupedPartitioning to satisfy its required distribution (since it's not safe to do so with a partially clustered distribution). ### Why are the changes needed? Without this fix, using a partially-clustered distribution with SPJ may cause correctness issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? See test changes. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51818 from chirag-s-db/fix-partial-clustered. Authored-by: Chirag Singh <chirag.si...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 96a4f5097ad96f2b7cdf102cdb007c896f1a8a1a) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/exchange/EnsureRequirements.scala | 31 +++++++++- .../connector/KeyGroupedPartitioningSuite.scala | 66 ++++++++++++++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index de5c3aaa4fe4..503dca02490a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -374,6 +374,26 @@ case class EnsureRequirements( } } + /** + * Whether partial clustering can be applied to a given child query plan. This is true if the plan + * consists only of a sequence of unary nodes where each node does not use the scan's key-grouped + * partitioning to satisfy its required distribution. Otherwise, partially clustering could be + * applied to a key-grouped partitioning unrelated to this join. + */ + private def canApplyPartialClusteredDistribution(plan: SparkPlan): Boolean = { + !plan.exists { + // Unary nodes are safe as long as they don't have a required distribution (for example, a + // project or filter). If they have a required distribution, then we should assume that this + // plan can't be partially clustered (since the key-grouped partitioning may be needed to + // satisfy this distribution unrelated to this JOIN). + case u if u.children.length == 1 => + u.requiredChildDistribution.head != UnspecifiedDistribution + // Only allow a non-unary node if it's a leaf node - key-grouped partitionings other binary + // nodes (like another JOIN) aren't safe to partially cluster. + case other => other.children.nonEmpty + } + } + /** * Checks whether two children, `left` and `right`, of a join operator have compatible * `KeyGroupedPartitioning`, and can benefit from storage-partitioned join. @@ -490,9 +510,16 @@ case class EnsureRequirements( // whether partially clustered distribution can be applied. For instance, the // optimization cannot be applied to a left outer join, where the left hand // side is chosen as the side to replicate partitions according to stats. + // Similarly, the partially clustered distribution cannot be applied if the + // partially clustered side must use the scan's key-grouped partitioning to + // satisfy some unrelated required distribution in its plan (for example, for an aggregate + // or window function), as this will give incorrect results (for example, duplicate + // row_number() values). // Otherwise, query result could be incorrect. - val canReplicateLeft = canReplicateLeftSide(joinType) - val canReplicateRight = canReplicateRightSide(joinType) + val canReplicateLeft = canReplicateLeftSide(joinType) && + canApplyPartialClusteredDistribution(right) + val canReplicateRight = canReplicateRightSide(joinType) && + canApplyPartialClusteredDistribution(left) if (!canReplicateLeft && !canReplicateRight) { logInfo(log"Skipping partially clustered distribution as it cannot be applied for " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index c24f52bd9307..c73e8e16fbbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1063,6 +1063,72 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("[SPARK-53074] partial clustering avoided to meet a non-JOIN required distribution") { + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + + "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 45.0, cast('2020-01-01' as timestamp)), " + + "(1, 50.0, cast('2020-01-02' as timestamp)), " + + "(2, 15.0, cast('2020-01-02' as timestamp)), " + + "(2, 20.0, cast('2020-01-03' as timestamp)), " + + "(3, 20.0, cast('2020-02-01' as timestamp))") + + for { + pushDownValues <- Seq(true, false) + enable <- Seq("true", "false") + } yield { + withSQLConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> false.toString, + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString, + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> enable) { + // The left side uses a key-grouped partitioning to satisfy the WINDOW function's + // required distribution. By default, the left side will be partially clustered (since + // it's estimated to be larger), but this partial clustering won't be applied because the + // left side needs to be key-grouped partitioned to satisfy the WINDOW's required + // distribution. + // The left side needs to project additional fields to ensure it's estimated to be + // larger than the right side. + val df = sql( + s""" + |WITH purchases_windowed AS ( + | SELECT + | ROW_NUMBER() OVER ( + | PARTITION BY item_id ORDER BY time DESC + | ) AS RN, + | item_id, + | price, + | STRUCT(item_id, price, time) AS purchases_struct + | FROM testcat.ns.$purchases + |) + |SELECT + | SUM(p.price), + | SUM(p.purchases_struct.item_id), + | SUM(p.purchases_struct.price), + | MAX(p.purchases_struct.time) + |FROM + | purchases_windowed p JOIN testcat.ns.$items i + | ON i.id = p.item_id + |WHERE p.RN = 1 + |""".stripMargin) + checkAnswer(df, Seq(Row(140.0, 7, 140.0, Timestamp.valueOf("2020-02-01 00:00:00")))) + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not contain any shuffle") + if (pushDownValues) { + val scans = collectScans(df.queryExecution.executedPlan) + assert(scans.forall(_.inputRDD.partitions.length === 3)) + } + } + } + } + test("data source partitioning + dynamic partition filtering") { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org