This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new e6fa6d011354 [SPARK-53738][SQL][3.5] Fix planned write when query 
output contains foldable orderings
e6fa6d011354 is described below

commit e6fa6d011354020c13deee92b53e55a7d60b2af3
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Oct 22 19:34:44 2025 +0200

    [SPARK-53738][SQL][3.5] Fix planned write when query output contains 
foldable orderings
    
    Backport #52584 to branch-3.5
    
    ### What changes were proposed in this pull request?
    
    This is the second try of https://github.com/apache/spark/pull/52474, 
following [the suggestion from 
cloud-fan](https://github.com/apache/spark/pull/52474#issuecomment-3383971418)
    
    This PR fixes a bug in `plannedWrite`, where the `query` has foldable 
orderings in the partition columns.
    
    ```
    CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k);
    
    INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i;
    ```
    
    The evaluation of `FileFormatWriter.orderingMatched` fails because 
`SortOrder(Literal)` is eliminated by `EliminateSorts`.
    
    ### Why are the changes needed?
    
    `V1Writes` will override the custom sort order when the query output 
ordering does not satisfy the required ordering. Before SPARK-53707, when the 
query's output contains literals in partition columns, the judgment produces a 
false-negative result, thus causing the sort order not to take effect.
    
    SPARK-53707 partially fixes the issue on the logical plan by adding a 
`Project` of query in `V1Writes`.
    
    Before SPARK-53707
    ```
    Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false
    +- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282]
       +- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet
    ```
    
    After SPARK-53707
    ```
    Project [i#284, j#285, 0 AS k#290]
    +- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false
       +- Project [i#284, j#285]
          +- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet
    ```
    
    Note, note the issue still exists because there is another place to check 
the ordering match again in `FileFormatWriter`.
    
    This PR fixes the issue thoroughly, with new UTs added.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it's a bug fix.
    
    ### How was this patch tested?
    
    New UTs are added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52697 from pan3793/SPARK-53694-3.5.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Peter Toth <[email protected]>
---
 .../sql/execution/columnar/InMemoryRelation.scala  | 19 ++++++--
 .../spark/sql/execution/datasources/V1Writes.scala | 26 +++++++---
 .../datasources/V1WriteCommandSuite.scala          | 55 ++++++++++++++++++++--
 .../command/V1WriteHiveCommandSuite.scala          | 32 +++++++++++++
 4 files changed, 116 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index e26e6d1b1ddc..a811f2eb2d95 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -382,9 +382,7 @@ case class InMemoryRelation(
   override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
 
   override def doCanonicalize(): logical.LogicalPlan =
-    copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
-      cacheBuilder,
-      outputOrdering)
+    withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))
 
   @transient val partitionStatistics = new PartitionStatistics(output)
 
@@ -412,8 +410,13 @@ case class InMemoryRelation(
     }
   }
 
-  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
-    InMemoryRelation(newOutput, cacheBuilder, outputOrdering, 
statsOfPlanToCache)
+  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
+    val map = AttributeMap(output.zip(newOutput))
+    val newOutputOrdering = outputOrdering
+      .map(_.transform { case a: Attribute => map(a) })
+      .asInstanceOf[Seq[SortOrder]]
+    InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, 
statsOfPlanToCache)
+  }
 
   override def newInstance(): this.type = {
     InMemoryRelation(
@@ -430,6 +433,12 @@ case class InMemoryRelation(
     cloned
   }
 
+  override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
+    val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
+    copied.statsOfPlanToCache = this.statsOfPlanToCache
+    copied
+  }
+
   override def simpleString(maxFields: Int): String =
     s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], 
${cacheBuilder.storageLevel}"
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index d7a8d7aec0b7..91b0a65db75e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, 
Literal, NamedExpression, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.optimizer.{EliminateSorts, 
FoldablePropagation}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -98,13 +99,15 @@ object V1Writes extends Rule[LogicalPlan] with 
SQLConfHelper {
     assert(empty2NullPlan.output.length == query.output.length)
     val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
 
-    // Rewrite the attribute references in the required ordering to use the 
new output.
-    val requiredOrdering = write.requiredOrdering.map(_.transform {
-      case a: Attribute => attrMap.getOrElse(a, a)
-    }.asInstanceOf[SortOrder])
-    val outputOrdering = empty2NullPlan.outputOrdering
-    val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), 
outputOrdering)
-    if (orderingMatched) {
+    // Rewrite the attribute references in the required ordering to use the 
new output,
+    // then eliminate foldable ordering.
+    val requiredOrdering = {
+      val ordering = write.requiredOrdering.map(_.transform {
+        case a: Attribute => attrMap.getOrElse(a, a)
+      }.asInstanceOf[SortOrder])
+      eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering
+    }
+    if (isOrderingMatched(requiredOrdering.map(_.child), 
empty2NullPlan.outputOrdering)) {
       empty2NullPlan
     } else {
       Sort(requiredOrdering, global = false, empty2NullPlan)
@@ -200,6 +203,15 @@ object V1WritesUtils {
     expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
   }
 
+  // SPARK-53738: the required ordering inferred from table spec (partition, 
bucketing, etc.)
+  // may contain foldable sort ordering expressions, which causes the 
optimized query's output
+  // ordering mismatch, here we calculate the required ordering more 
accurately, by creating a
+  // fake Sort node with the input query, then remove the foldable sort 
ordering expressions.
+  def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan): 
LogicalPlan =
+    EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query)))
+
+  // The comparison ignores SortDirection and NullOrdering since it doesn't 
matter
+  // for writing cases.
   def isOrderingMatched(
       requiredOrdering: Seq[Expression],
       outputOrdering: Seq[SortOrder]): Boolean = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index ce43edb79c12..5fd27410dcb6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -62,10 +62,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
       hasLogicalSort: Boolean,
       orderingMatched: Boolean,
       hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
-    var optimizedPlan: LogicalPlan = null
+    executeAndCheckOrderingAndCustomValidate(
+      hasLogicalSort, Some(orderingMatched), hasEmpty2Null)(query)(_ => ())
+  }
+
+  /**
+   * Execute a write query and check ordering of the plan, then do custom 
validation
+   */
+  protected def executeAndCheckOrderingAndCustomValidate(
+      hasLogicalSort: Boolean,
+      orderingMatched: Option[Boolean],
+      hasEmpty2Null: Boolean = false)(query: => Unit)(
+      customValidate: LogicalPlan => Unit): Unit = {
+    @volatile var optimizedPlan: LogicalPlan = null
 
     val listener = new QueryExecutionListener {
       override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        val conf = qe.sparkSession.sessionState.conf
         qe.optimizedPlan match {
           case w: V1WriteCommand =>
             if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) 
{
@@ -84,9 +97,12 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
 
     query
 
-    // Check whether the output ordering is matched before FileFormatWriter 
executes rdd.
-    assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
-      s"Expect: $orderingMatched, Actual: 
${FileFormatWriter.outputOrderingMatched}")
+    orderingMatched.foreach { matched =>
+      // Check whether the output ordering is matched before FileFormatWriter 
executes rdd.
+      assert(FileFormatWriter.outputOrderingMatched == matched,
+        s"Expect orderingMatched: $matched, " +
+          s"Actual: ${FileFormatWriter.outputOrderingMatched}")
+    }
 
     sparkContext.listenerBus.waitUntilEmpty()
 
@@ -102,6 +118,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
     assert(empty2nullExpr == hasEmpty2Null,
       s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. 
Plan:\n$optimizedPlan")
 
+    customValidate(optimizedPlan)
+
     spark.listenerManager.unregister(listener)
   }
 }
@@ -390,4 +408,33 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
       }
     }
   }
+
+  test("v1 write with sort by literal column preserve custom order") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
+            |PARTITIONED BY (k)
+            |""".stripMargin)
+        // Skip checking orderingMatched temporarily to avoid touching 
`FileFormatWriter`,
+        // see details at 
https://github.com/apache/spark/pull/52584#issuecomment-3407716019
+        executeAndCheckOrderingAndCustomValidate(
+          hasLogicalSort = true, orderingMatched = None) {
+          sql(
+            """
+              |INSERT OVERWRITE t
+              |SELECT i, j, '0' as k FROM t0 SORT BY k, i
+              |""".stripMargin)
+        } { optimizedPlan =>
+          assert {
+            optimizedPlan.outputOrdering.exists {
+              case SortOrder(attr: AttributeReference, _, _, _) => attr.name 
== "i"
+              case _ => false
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
index 0f219032fc05..3aa3ac99451f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SortOrder}
 import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
@@ -105,4 +106,35 @@ class V1WriteHiveCommandSuite
       }
     }
   }
+
+  test("v1 write to hive table with sort by literal column preserve custom 
order") {
+    withPlannedWrite { enabled =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        withTable("t") {
+          sql(
+            """
+              |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
+              |PARTITIONED BY (k)
+              |""".stripMargin)
+          // Skip checking orderingMatched temporarily to avoid touching 
`FileFormatWriter`,
+          // see details at 
https://github.com/apache/spark/pull/52584#issuecomment-3407716019
+          executeAndCheckOrderingAndCustomValidate(
+            hasLogicalSort = true, orderingMatched = None) {
+            sql(
+              """
+                |INSERT OVERWRITE t
+                |SELECT i, j, '0' as k FROM t0 SORT BY k, i
+                |""".stripMargin)
+          } { optimizedPlan =>
+            assert {
+              optimizedPlan.outputOrdering.exists {
+                case SortOrder(attr: AttributeReference, _, _, _) => attr.name 
== "i"
+                case _ => false
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to