This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new dfc9afadfff8 [SPARK-53074][SQL] Avoid partial clustering in SPJ to 
meet a child's required distribution
dfc9afadfff8 is described below

commit dfc9afadfff8f420ce1b3b8e8b22959820f70f6f
Author: Chirag Singh <chirag.si...@databricks.com>
AuthorDate: Tue Aug 12 20:42:28 2025 +0800

    [SPARK-53074][SQL] Avoid partial clustering in SPJ to meet a child's 
required distribution
    
    ### What changes were proposed in this pull request?
    Currently, SPJ logic can apply partial clustering (when enabled) to either 
side of an inner JOIN as long as the nodes between the scan and JOIN preserve 
partitioning. This doesn't work if one of these nodes is using the scan's 
key-grouped partitioning to satisfy its required distribution (for example, a 
grouping agg or window function).
    
    This PR avoids this issue by avoiding applying a partially clustered 
distribution to a JOIN's child if any node in that child relies on the 
KeyGroupedPartitioning to satisfy its required distribution (since it's not 
safe to do so with a partially clustered distribution).
    
    ### Why are the changes needed?
    Without this fix, using a partially-clustered distribution with SPJ may 
cause correctness issues.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    See test changes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51818 from chirag-s-db/fix-partial-clustered.
    
    Authored-by: Chirag Singh <chirag.si...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 96a4f5097ad96f2b7cdf102cdb007c896f1a8a1a)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/exchange/EnsureRequirements.scala    | 31 +++++++++-
 .../connector/KeyGroupedPartitioningSuite.scala    | 66 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index de5c3aaa4fe4..503dca02490a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -374,6 +374,26 @@ case class EnsureRequirements(
     }
   }
 
+  /**
+   * Whether partial clustering can be applied to a given child query plan. 
This is true if the plan
+   * consists only of a sequence of unary nodes where each node does not use 
the scan's key-grouped
+   * partitioning to satisfy its required distribution. Otherwise, partially 
clustering could be
+   * applied to a key-grouped partitioning unrelated to this join.
+   */
+  private def canApplyPartialClusteredDistribution(plan: SparkPlan): Boolean = 
{
+    !plan.exists {
+      // Unary nodes are safe as long as they don't have a required 
distribution (for example, a
+      // project or filter). If they have a required distribution, then we 
should assume that this
+      // plan can't be partially clustered (since the key-grouped partitioning 
may be needed to
+      // satisfy this distribution unrelated to this JOIN).
+      case u if u.children.length == 1 =>
+        u.requiredChildDistribution.head != UnspecifiedDistribution
+      // Only allow a non-unary node if it's a leaf node - key-grouped 
partitionings other binary
+      // nodes (like another JOIN) aren't safe to partially cluster.
+      case other => other.children.nonEmpty
+    }
+  }
+
   /**
    * Checks whether two children, `left` and `right`, of a join operator have 
compatible
    * `KeyGroupedPartitioning`, and can benefit from storage-partitioned join.
@@ -490,9 +510,16 @@ case class EnsureRequirements(
           // whether partially clustered distribution can be applied. For 
instance, the
           // optimization cannot be applied to a left outer join, where the 
left hand
           // side is chosen as the side to replicate partitions according to 
stats.
+          // Similarly, the partially clustered distribution cannot be applied 
if the
+          // partially clustered side must use the scan's key-grouped 
partitioning to
+          // satisfy some unrelated required distribution in its plan (for 
example, for an aggregate
+          // or window function), as this will give incorrect results (for 
example, duplicate
+          // row_number() values).
           // Otherwise, query result could be incorrect.
-          val canReplicateLeft = canReplicateLeftSide(joinType)
-          val canReplicateRight = canReplicateRightSide(joinType)
+          val canReplicateLeft = canReplicateLeftSide(joinType) &&
+            canApplyPartialClusteredDistribution(right)
+          val canReplicateRight = canReplicateRightSide(joinType) &&
+            canApplyPartialClusteredDistribution(left)
 
           if (!canReplicateLeft && !canReplicateRight) {
             logInfo(log"Skipping partially clustered distribution as it cannot 
be applied for " +
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index c24f52bd9307..c73e8e16fbbb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -1063,6 +1063,72 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     }
   }
 
+  test("[SPARK-53074] partial clustering avoided to meet a non-JOIN required 
distribution") {
+    val items_partitions = Array(identity("id"))
+    createTable(items, itemsColumns, items_partitions)
+    sql(s"INSERT INTO testcat.ns.$items VALUES " +
+      "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+      "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+      "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+      "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+    val purchases_partitions = Array(identity("item_id"))
+    createTable(purchases, purchasesColumns, purchases_partitions)
+    sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+      "(1, 45.0, cast('2020-01-01' as timestamp)), " +
+      "(1, 50.0, cast('2020-01-02' as timestamp)), " +
+      "(2, 15.0, cast('2020-01-02' as timestamp)), " +
+      "(2, 20.0, cast('2020-01-03' as timestamp)), " +
+      "(3, 20.0, cast('2020-02-01' as timestamp))")
+
+    for {
+      pushDownValues <- Seq(true, false)
+      enable <- Seq("true", "false")
+    } yield {
+      withSQLConf(
+          SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> 
false.toString,
+          SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString,
+          SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> 
enable) {
+        // The left side uses a key-grouped partitioning to satisfy the WINDOW 
function's
+        // required distribution. By default, the left side will be partially 
clustered (since
+        // it's estimated to be larger), but this partial clustering won't be 
applied because the
+        // left side needs to be key-grouped partitioned to satisfy the 
WINDOW's required
+        // distribution.
+        // The left side needs to project additional fields to ensure it's 
estimated to be
+        // larger than the right side.
+        val df = sql(
+          s"""
+             |WITH purchases_windowed AS (
+             |  SELECT
+             |    ROW_NUMBER() OVER (
+             |      PARTITION BY item_id ORDER BY time DESC
+             |    ) AS RN,
+             |    item_id,
+             |    price,
+             |    STRUCT(item_id, price, time) AS purchases_struct
+             |  FROM testcat.ns.$purchases
+             |)
+             |SELECT
+             |  SUM(p.price),
+             |  SUM(p.purchases_struct.item_id),
+             |  SUM(p.purchases_struct.price),
+             |  MAX(p.purchases_struct.time)
+             |FROM
+             |  purchases_windowed p JOIN testcat.ns.$items i
+             |  ON i.id = p.item_id
+             |WHERE p.RN = 1
+             |""".stripMargin)
+        checkAnswer(df, Seq(Row(140.0, 7, 140.0, Timestamp.valueOf("2020-02-01 
00:00:00"))))
+        val shuffles = collectShuffles(df.queryExecution.executedPlan)
+        assert(shuffles.isEmpty, "should not contain any shuffle")
+        if (pushDownValues) {
+          val scans = collectScans(df.queryExecution.executedPlan)
+          assert(scans.forall(_.inputRDD.partitions.length === 3))
+        }
+      }
+    }
+  }
+
   test("data source partitioning + dynamic partition filtering") {
     withSQLConf(
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to