This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new e6fa6d011354 [SPARK-53738][SQL][3.5] Fix planned write when query
output contains foldable orderings
e6fa6d011354 is described below
commit e6fa6d011354020c13deee92b53e55a7d60b2af3
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Oct 22 19:34:44 2025 +0200
[SPARK-53738][SQL][3.5] Fix planned write when query output contains
foldable orderings
Backport #52584 to branch-3.5
### What changes were proposed in this pull request?
This is the second try of https://github.com/apache/spark/pull/52474,
following [the suggestion from
cloud-fan](https://github.com/apache/spark/pull/52474#issuecomment-3383971418)
This PR fixes a bug in `plannedWrite`, where the `query` has foldable
orderings in the partition columns.
```
CREATE TABLE t (i INT, j INT, k STRING) USING PARQUET PARTITIONED BY (k);
INSERT OVERWRITE t SELECT j AS i, i AS j, '0' as k FROM t0 SORT BY k, i;
```
The evaluation of `FileFormatWriter.orderingMatched` fails because
`SortOrder(Literal)` is eliminated by `EliminateSorts`.
### Why are the changes needed?
`V1Writes` will override the custom sort order when the query output
ordering does not satisfy the required ordering. Before SPARK-53707, when the
query's output contains literals in partition columns, the judgment produces a
false-negative result, thus causing the sort order not to take effect.
SPARK-53707 partially fixes the issue on the logical plan by adding a
`Project` of query in `V1Writes`.
Before SPARK-53707
```
Sort [0 ASC NULLS FIRST, i#280 ASC NULLS FIRST], false
+- Project [j#287 AS i#280, i#286 AS j#281, 0 AS k#282]
+- Relation spark_catalog.default.t0[i#286,j#287,k#288] parquet
```
After SPARK-53707
```
Project [i#284, j#285, 0 AS k#290]
+- Sort [0 ASC NULLS FIRST, i#284 ASC NULLS FIRST], false
+- Project [i#284, j#285]
+- Relation spark_catalog.default.t0[i#284,j#285,k#286] parquet
```
Note, note the issue still exists because there is another place to check
the ordering match again in `FileFormatWriter`.
This PR fixes the issue thoroughly, with new UTs added.
### Does this PR introduce _any_ user-facing change?
Yes, it's a bug fix.
### How was this patch tested?
New UTs are added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52697 from pan3793/SPARK-53694-3.5.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../sql/execution/columnar/InMemoryRelation.scala | 19 ++++++--
.../spark/sql/execution/datasources/V1Writes.scala | 26 +++++++---
.../datasources/V1WriteCommandSuite.scala | 55 ++++++++++++++++++++--
.../command/V1WriteHiveCommandSuite.scala | 32 +++++++++++++
4 files changed, 116 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index e26e6d1b1ddc..a811f2eb2d95 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -382,9 +382,7 @@ case class InMemoryRelation(
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
override def doCanonicalize(): logical.LogicalPlan =
- copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
- cacheBuilder,
- outputOrdering)
+ withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))
@transient val partitionStatistics = new PartitionStatistics(output)
@@ -412,8 +410,13 @@ case class InMemoryRelation(
}
}
- def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
- InMemoryRelation(newOutput, cacheBuilder, outputOrdering,
statsOfPlanToCache)
+ def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
+ val map = AttributeMap(output.zip(newOutput))
+ val newOutputOrdering = outputOrdering
+ .map(_.transform { case a: Attribute => map(a) })
+ .asInstanceOf[Seq[SortOrder]]
+ InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering,
statsOfPlanToCache)
+ }
override def newInstance(): this.type = {
InMemoryRelation(
@@ -430,6 +433,12 @@ case class InMemoryRelation(
cloned
}
+ override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
+ val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
+ copied.statsOfPlanToCache = this.statsOfPlanToCache
+ copied
+ }
+
override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}],
${cacheBuilder.storageLevel}"
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index d7a8d7aec0b7..91b0a65db75e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash,
Literal, NamedExpression, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.optimizer.{EliminateSorts,
FoldablePropagation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
@@ -98,13 +99,15 @@ object V1Writes extends Rule[LogicalPlan] with
SQLConfHelper {
assert(empty2NullPlan.output.length == query.output.length)
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
- // Rewrite the attribute references in the required ordering to use the
new output.
- val requiredOrdering = write.requiredOrdering.map(_.transform {
- case a: Attribute => attrMap.getOrElse(a, a)
- }.asInstanceOf[SortOrder])
- val outputOrdering = empty2NullPlan.outputOrdering
- val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child),
outputOrdering)
- if (orderingMatched) {
+ // Rewrite the attribute references in the required ordering to use the
new output,
+ // then eliminate foldable ordering.
+ val requiredOrdering = {
+ val ordering = write.requiredOrdering.map(_.transform {
+ case a: Attribute => attrMap.getOrElse(a, a)
+ }.asInstanceOf[SortOrder])
+ eliminateFoldableOrdering(ordering, empty2NullPlan).outputOrdering
+ }
+ if (isOrderingMatched(requiredOrdering.map(_.child),
empty2NullPlan.outputOrdering)) {
empty2NullPlan
} else {
Sort(requiredOrdering, global = false, empty2NullPlan)
@@ -200,6 +203,15 @@ object V1WritesUtils {
expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
}
+ // SPARK-53738: the required ordering inferred from table spec (partition,
bucketing, etc.)
+ // may contain foldable sort ordering expressions, which causes the
optimized query's output
+ // ordering mismatch, here we calculate the required ordering more
accurately, by creating a
+ // fake Sort node with the input query, then remove the foldable sort
ordering expressions.
+ def eliminateFoldableOrdering(ordering: Seq[SortOrder], query: LogicalPlan):
LogicalPlan =
+ EliminateSorts(FoldablePropagation(Sort(ordering, global = false, query)))
+
+ // The comparison ignores SortDirection and NullOrdering since it doesn't
matter
+ // for writing cases.
def isOrderingMatched(
requiredOrdering: Seq[Expression],
outputOrdering: Seq[SortOrder]): Boolean = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index ce43edb79c12..5fd27410dcb6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -62,10 +62,23 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
hasLogicalSort: Boolean,
orderingMatched: Boolean,
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
- var optimizedPlan: LogicalPlan = null
+ executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort, Some(orderingMatched), hasEmpty2Null)(query)(_ => ())
+ }
+
+ /**
+ * Execute a write query and check ordering of the plan, then do custom
validation
+ */
+ protected def executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort: Boolean,
+ orderingMatched: Option[Boolean],
+ hasEmpty2Null: Boolean = false)(query: => Unit)(
+ customValidate: LogicalPlan => Unit): Unit = {
+ @volatile var optimizedPlan: LogicalPlan = null
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ val conf = qe.sparkSession.sessionState.conf
qe.optimizedPlan match {
case w: V1WriteCommand =>
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED))
{
@@ -84,9 +97,12 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
query
- // Check whether the output ordering is matched before FileFormatWriter
executes rdd.
- assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
- s"Expect: $orderingMatched, Actual:
${FileFormatWriter.outputOrderingMatched}")
+ orderingMatched.foreach { matched =>
+ // Check whether the output ordering is matched before FileFormatWriter
executes rdd.
+ assert(FileFormatWriter.outputOrderingMatched == matched,
+ s"Expect orderingMatched: $matched, " +
+ s"Actual: ${FileFormatWriter.outputOrderingMatched}")
+ }
sparkContext.listenerBus.waitUntilEmpty()
@@ -102,6 +118,8 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
assert(empty2nullExpr == hasEmpty2Null,
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr.
Plan:\n$optimizedPlan")
+ customValidate(optimizedPlan)
+
spark.listenerManager.unregister(listener)
}
}
@@ -390,4 +408,33 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
}
}
}
+
+ test("v1 write with sort by literal column preserve custom order") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
+ |PARTITIONED BY (k)
+ |""".stripMargin)
+ // Skip checking orderingMatched temporarily to avoid touching
`FileFormatWriter`,
+ // see details at
https://github.com/apache/spark/pull/52584#issuecomment-3407716019
+ executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort = true, orderingMatched = None) {
+ sql(
+ """
+ |INSERT OVERWRITE t
+ |SELECT i, j, '0' as k FROM t0 SORT BY k, i
+ |""".stripMargin)
+ } { optimizedPlan =>
+ assert {
+ optimizedPlan.outputOrdering.exists {
+ case SortOrder(attr: AttributeReference, _, _, _) => attr.name
== "i"
+ case _ => false
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
index 0f219032fc05..3aa3ac99451f 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
SortOrder}
import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -105,4 +106,35 @@ class V1WriteHiveCommandSuite
}
}
}
+
+ test("v1 write to hive table with sort by literal column preserve custom
order") {
+ withPlannedWrite { enabled =>
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
+ |PARTITIONED BY (k)
+ |""".stripMargin)
+ // Skip checking orderingMatched temporarily to avoid touching
`FileFormatWriter`,
+ // see details at
https://github.com/apache/spark/pull/52584#issuecomment-3407716019
+ executeAndCheckOrderingAndCustomValidate(
+ hasLogicalSort = true, orderingMatched = None) {
+ sql(
+ """
+ |INSERT OVERWRITE t
+ |SELECT i, j, '0' as k FROM t0 SORT BY k, i
+ |""".stripMargin)
+ } { optimizedPlan =>
+ assert {
+ optimizedPlan.outputOrdering.exists {
+ case SortOrder(attr: AttributeReference, _, _, _) => attr.name
== "i"
+ case _ => false
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]