This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ce3437af0179 Revert "[SPARK-53738][SQL] PlannedWrite should preserve 
custom sort order when query output contains literal"
ce3437af0179 is described below

commit ce3437af0179db986d57b464e69b845337f469ac
Author: Peter Toth <[email protected]>
AuthorDate: Thu Oct 9 10:04:40 2025 +0200

    Revert "[SPARK-53738][SQL] PlannedWrite should preserve custom sort order 
when query output contains literal"
    
    This reverts commit 22d9709c2921f358bd5c70036d886d9bb0ebb4f5.
---
 .../apache/spark/sql/catalyst/dsl/package.scala    |  1 -
 .../spark/sql/catalyst/expressions/SortOrder.scala | 49 +++++-----------------
 .../sql/catalyst/expressions/aggregate/Mode.scala  |  4 +-
 .../expressions/aggregate/percentiles.scala        |  4 +-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../plans/AliasAwareOutputExpression.scala         |  6 +--
 .../plans/logical/basicLogicalOperators.scala      |  3 +-
 .../sql/catalyst/expressions/OrderingSuite.scala   |  3 +-
 .../org/apache/spark/sql/execution/SortExec.scala  |  9 ++--
 .../spark/sql/execution/SortPrefixUtils.scala      |  8 ----
 .../sql/execution/columnar/InMemoryRelation.scala  | 19 +++------
 .../execution/datasources/DataSourceStrategy.scala |  3 --
 .../spark/sql/execution/datasources/V1Writes.scala | 20 ++-------
 .../window/WindowEvaluatorFactoryBase.scala        |  4 +-
 .../datasources/V1WriteCommandSuite.scala          | 46 +-------------------
 .../command/V1WriteHiveCommandSuite.scala          | 32 --------------
 16 files changed, 34 insertions(+), 179 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index e9e897cfb78e..1d7cf5455e57 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -151,7 +151,6 @@ package object dsl extends SQLConfHelper {
 
     def asc: SortOrder = SortOrder(expr, Ascending)
     def asc_nullsLast: SortOrder = SortOrder(expr, Ascending, NullsLast, 
Seq.empty)
-    def const: SortOrder = SortOrder(expr, Constant)
     def desc: SortOrder = SortOrder(expr, Descending)
     def desc_nullsFirst: SortOrder = SortOrder(expr, Descending, NullsFirst, 
Seq.empty)
     def as(alias: String): NamedExpression = Alias(expr, alias)()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 4632e064daa5..166866c90b87 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -45,11 +45,6 @@ case object Descending extends SortDirection {
   override def defaultNullOrdering: NullOrdering = NullsLast
 }
 
-case object Constant extends SortDirection {
-  override def sql: String = "CONST"
-  override def defaultNullOrdering: NullOrdering = NullsFirst
-}
-
 case object NullsFirst extends NullOrdering {
   override def sql: String = "NULLS FIRST"
 }
@@ -74,13 +69,8 @@ case class SortOrder(
 
   override def children: Seq[Expression] = child +: sameOrderExpressions
 
-  override def checkInputDataTypes(): TypeCheckResult = {
-    if (direction == Constant) {
-      TypeCheckResult.TypeCheckSuccess
-    } else {
-      TypeUtils.checkForOrderingExpr(dataType, prettyName)
-    }
-  }
+  override def checkInputDataTypes(): TypeCheckResult =
+    TypeUtils.checkForOrderingExpr(dataType, prettyName)
 
   override def dataType: DataType = child.dataType
   override def nullable: Boolean = child.nullable
@@ -91,8 +81,8 @@ case class SortOrder(
   def isAscending: Boolean = direction == Ascending
 
   def satisfies(required: SortOrder): Boolean = {
-    children.exists(required.child.semanticEquals) && (direction == Constant ||
-      direction == required.direction && nullOrdering == required.nullOrdering)
+    children.exists(required.child.semanticEquals) &&
+      direction == required.direction && nullOrdering == required.nullOrdering
   }
 
   override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[Expression]): SortOrder =
@@ -111,38 +101,21 @@ object SortOrder {
    * Returns if a sequence of SortOrder satisfies another sequence of 
SortOrder.
    *
    * SortOrder sequence A satisfies SortOrder sequence B if and only if B is 
an equivalent of A
-   * or of A's prefix, except for SortOrder in B that satisfies any constant 
SortOrder in A.
-   *
-   * Here are examples of ordering A satisfying ordering B:
+   * or of A's prefix. Here are examples of ordering A satisfying ordering B:
    * <ul>
    *   <li>ordering A is [x, y] and ordering B is [x]</li>
-   *   <li>ordering A is [z(const), x, y] and ordering B is [x, z]</li>
    *   <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is 
[x1]</li>
    *   <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is 
[x1]</li>
    * </ul>
    */
-  def orderingSatisfies(
-      providedOrdering: Seq[SortOrder], requiredOrdering: Seq[SortOrder]): 
Boolean = {
-    if (requiredOrdering.isEmpty) {
-      return true
-    }
-
-    val (constantProvidedOrdering, nonConstantProvidedOrdering) = 
providedOrdering.partition {
-      case SortOrder(_, Constant, _, _) => true
-      case SortOrder(child, _, _, _) => child.foldable
-    }
-
-    val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder 
=>
-       constantProvidedOrdering.exists { providedOrder =>
-         providedOrder.satisfies(requiredOrder)
-       }
-    }
-
-    if (effectiveRequiredOrdering.length > nonConstantProvidedOrdering.length) 
{
+  def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): 
Boolean = {
+    if (ordering2.isEmpty) {
+      true
+    } else if (ordering2.length > ordering1.length) {
       false
     } else {
-      effectiveRequiredOrdering.zip(nonConstantProvidedOrdering).forall {
-        case (required, provided) => provided.satisfies(required)
+      ordering2.zip(ordering1).forall {
+        case (o2, o1) => o1.satisfies(o2)
       }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index b61dae585061..7a4f04bf04f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, 
UnresolvedWithinGroup}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Constant, 
Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, 
SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, 
Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.Cast.toSQLExpr
 import org.apache.spark.sql.catalyst.trees.UnaryLike
 import org.apache.spark.sql.catalyst.types.PhysicalDataType
@@ -199,8 +199,6 @@ case class Mode(
             this.copy(child = child, reverseOpt = Some(true))
           case SortOrder(child, Descending, _, _) =>
             this.copy(child = child, reverseOpt = Some(false))
-          case SortOrder(child, Constant, _, _) =>
-            this.copy(child = child)
         }
       case _ => this
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index 942c06f60d12..6dfa1b499df2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -382,7 +382,7 @@ case class PercentileCont(left: Expression, right: 
Expression, reverse: Boolean
         nodeName, 1, orderingWithinGroup.length)
     }
     orderingWithinGroup.head match {
-      case SortOrder(child, Ascending | Constant, _, _) => this.copy(left = 
child)
+      case SortOrder(child, Ascending, _, _) => this.copy(left = child)
       case SortOrder(child, Descending, _, _) => this.copy(left = child, 
reverse = true)
     }
   }
@@ -440,7 +440,7 @@ case class PercentileDisc(
         nodeName, 1, orderingWithinGroup.length)
     }
     orderingWithinGroup.head match {
-      case SortOrder(expr, Ascending | Constant, _, _) => this.copy(child = 
expr)
+      case SortOrder(expr, Ascending, _, _) => this.copy(child = expr)
       case SortOrder(expr, Descending, _, _) => this.copy(child = expr, 
reverse = true)
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b5941cd5eddc..fc65c24afcb8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1919,7 +1919,7 @@ object CombineFilters extends Rule[LogicalPlan] with 
PredicateHelper {
 object EliminateSorts extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformUpWithPruning(_.containsPattern(SORT)) {
     case s @ Sort(orders, _, child, _) if orders.isEmpty || 
orders.exists(_.child.foldable) =>
-      val newOrders = orders.filterNot(o => o.direction != Constant && 
o.child.foldable)
+      val newOrders = orders.filterNot(_.child.foldable)
       if (newOrders.isEmpty) {
         child
       } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
index efbd7b0c8b81..e1a9e8b5ea81 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeSet, Constant, Empty2Null, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
 import org.apache.spark.sql.internal.SQLConf
 
 /**
@@ -128,8 +128,6 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
         }
       }
     }
-    newOrdering.takeWhile(_.isDefined).flatten.toSeq ++ 
outputExpressions.collect {
-      case a @ Alias(child, _) if child.foldable => SortOrder(a.toAttribute, 
Constant)
-    }
+    newOrdering.takeWhile(_.isDefined).flatten.toSeq
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 5a7548f13f1f..ad6939422b97 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -912,8 +912,7 @@ case class Sort(
   override def maxRowsPerPartition: Option[Long] = {
     if (global) maxRows else child.maxRowsPerPartition
   }
-  override def outputOrdering: Seq[SortOrder] =
-    order ++ child.outputOrdering.filter(_.direction == Constant)
+  override def outputOrdering: Seq[SortOrder] = order
   final override val nodePatterns: Seq[TreePattern] = Seq(SORT)
   override protected def withNewChildInternal(newChild: LogicalPlan): Sort = 
copy(child = newChild)
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
index 5facdaeb1aca..06c8b5ccef65 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
@@ -43,10 +43,9 @@ class OrderingSuite extends SparkFunSuite with 
ExpressionEvalHelper {
         val sortOrder = direction match {
           case Ascending => BoundReference(0, dataType, nullable = true).asc
           case Descending => BoundReference(0, dataType, nullable = true).desc
-          case Constant => BoundReference(0, dataType, nullable = true).const
         }
         val expectedCompareResult = direction match {
-          case Ascending | Constant => signum(expected)
+          case Ascending => signum(expected)
           case Descending => -1 * signum(expected)
         }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 708e054664c5..11fde41aae9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -46,8 +46,7 @@ case class SortExec(
 
   override def output: Seq[Attribute] = child.output
 
-  override def outputOrdering: Seq[SortOrder] =
-    sortOrder ++ child.outputOrdering.filter(_.direction == Constant)
+  override def outputOrdering: Seq[SortOrder] = sortOrder
 
   // sort performed is local within a given partition so will retain
   // child operator's partitioning
@@ -74,17 +73,15 @@ case class SortExec(
    * should make it public.
    */
   def createSorter(): UnsafeExternalRowSorter = {
-    val effectiveSortOrder = sortOrder.filterNot(_.direction == Constant)
-
     rowSorter = new ThreadLocal[UnsafeExternalRowSorter]()
 
     val ordering = RowOrdering.create(sortOrder, output)
 
     // The comparator for comparing prefix
-    val boundSortExpression = 
BindReferences.bindReference(effectiveSortOrder.head, output)
+    val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
output)
     val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
 
-    val canUseRadixSort = enableRadixSort && effectiveSortOrder.length == 1 &&
+    val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
       SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
 
     // The generator for prefix
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index a50f2ba0d04f..7332bbcb1845 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -63,8 +63,6 @@ object SortPrefixUtils {
         PrefixComparators.STRING_DESC_NULLS_FIRST
       case Descending =>
         PrefixComparators.STRING_DESC
-      case Constant =>
-        NoOpPrefixComparator
     }
   }
 
@@ -78,8 +76,6 @@ object SortPrefixUtils {
         PrefixComparators.BINARY_DESC_NULLS_FIRST
       case Descending =>
         PrefixComparators.BINARY_DESC
-      case Constant =>
-        NoOpPrefixComparator
     }
   }
 
@@ -93,8 +89,6 @@ object SortPrefixUtils {
         PrefixComparators.LONG_DESC_NULLS_FIRST
       case Descending =>
         PrefixComparators.LONG_DESC
-      case Constant =>
-        NoOpPrefixComparator
     }
   }
 
@@ -108,8 +102,6 @@ object SortPrefixUtils {
         PrefixComparators.DOUBLE_DESC_NULLS_FIRST
       case Descending =>
         PrefixComparators.DOUBLE_DESC
-      case Constant =>
-        NoOpPrefixComparator
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index bf7491625fa0..eabbc7fc74f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -439,7 +439,9 @@ case class InMemoryRelation(
   override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
 
   override def doCanonicalize(): logical.LogicalPlan =
-    withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))
+    copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      cacheBuilder,
+      outputOrdering)
 
   @transient val partitionStatistics = new PartitionStatistics(output)
 
@@ -467,13 +469,8 @@ case class InMemoryRelation(
     }
   }
 
-  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
-    val map = AttributeMap(output.zip(newOutput))
-    val newOutputOrdering = outputOrdering
-      .map(_.transform { case a: Attribute => map(a) })
-      .asInstanceOf[Seq[SortOrder]]
-    InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, 
statsOfPlanToCache)
-  }
+  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
+    InMemoryRelation(newOutput, cacheBuilder, outputOrdering, 
statsOfPlanToCache)
 
   override def newInstance(): this.type = {
     InMemoryRelation(
@@ -490,12 +487,6 @@ case class InMemoryRelation(
     cloned
   }
 
-  override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
-    val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
-    copied.statsOfPlanToCache = this.statsOfPlanToCache
-    copied
-  }
-
   override def simpleString(maxFields: Int): String =
     s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], 
${cacheBuilder.storageLevel}"
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 95bba45c3b1d..2e47f08ac115 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,7 +25,6 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.PREDICATES
 import org.apache.spark.rdd.RDD
@@ -828,8 +827,6 @@ object DataSourceStrategy
         val directionV2 = directionV1 match {
           case Ascending => SortDirection.ASCENDING
           case Descending => SortDirection.DESCENDING
-          case Constant =>
-            throw SparkException.internalError(s"Unexpected catalyst sort 
direction $Constant")
         }
         val nullOrderingV2 = nullOrderingV1 match {
           case NullsFirst => NullOrdering.NULLS_FIRST
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index 53e2d3f74bb3..280fe1068d81 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeMap, AttributeSet, BitwiseAnd, Constant, Empty2Null, Expression, 
HiveHash, Literal, NamedExpression, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, 
Literal, NamedExpression, Pmod, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -199,27 +199,13 @@ object V1WritesUtils {
     expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
   }
 
-  // SortOrder sequence A (outputOrdering) satisfies SortOrder sequence B 
(requiredOrdering)
-  // if and only if B is an equivalent of A or of A's prefix, except for 
SortOrder in B that
-  // satisfies any constant SortOrder in A.
   def isOrderingMatched(
       requiredOrdering: Seq[Expression],
       outputOrdering: Seq[SortOrder]): Boolean = {
-    val (constantOutputOrdering, nonConstantOutputOrdering) = 
outputOrdering.partition {
-      case SortOrder(_, Constant, _, _) => true
-      case SortOrder(child, _, _, _) => child.foldable
-    }
-
-    val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder 
=>
-      constantOutputOrdering.exists { outputOrder =>
-        outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
-      }
-    }
-
-    if (effectiveRequiredOrdering.length > nonConstantOutputOrdering.length) {
+    if (requiredOrdering.length > outputOrdering.length) {
       false
     } else {
-      effectiveRequiredOrdering.zip(nonConstantOutputOrdering).forall {
+      requiredOrdering.zip(outputOrdering).forall {
         case (requiredOrder, outputOrder) =>
           outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index 3ce11acfc4d1..c2dedda832e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Add, 
AggregateWindowFunction, Ascending, Attribute, BoundReference, Constant, 
CurrentRow, DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, 
Expression, ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType, 
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression, 
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering, 
SortOrder, SpecifiedWindowFrame, TimestampAddInterval,  [...]
+import org.apache.spark.sql.catalyst.expressions.{Add, 
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow, 
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression, 
ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType, 
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression, 
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering, 
SortOrder, SpecifiedWindowFrame, TimestampAddInterval, TimestampA [...]
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
@@ -95,7 +95,7 @@ trait WindowEvaluatorFactoryBase {
         // Flip the sign of the offset when processing the order is descending
         val boundOffset = sortExpr.direction match {
           case Descending => UnaryMinus(offset)
-          case Ascending | Constant => offset
+          case Ascending => offset
         }
 
         // Create the projection which returns the current 'value' modified by 
adding the offset.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index c16b02906724..80d771428d90 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -63,23 +63,10 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with 
AdaptiveSparkPlanHelper
       hasLogicalSort: Boolean,
       orderingMatched: Boolean,
       hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
-    executeAndCheckOrderingAndCustomValidate(
-      hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ())
-  }
-
-  /**
-   * Execute a write query and check ordering of the plan, then do custom 
validation
-   */
-  protected def executeAndCheckOrderingAndCustomValidate(
-      hasLogicalSort: Boolean,
-      orderingMatched: Boolean,
-      hasEmpty2Null: Boolean = false)(query: => Unit)(
-      customValidate: LogicalPlan => Unit): Unit = {
-    @volatile var optimizedPlan: LogicalPlan = null
+    var optimizedPlan: LogicalPlan = null
 
     val listener = new QueryExecutionListener {
       override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
-        val conf = qe.sparkSession.sessionState.conf
         qe.optimizedPlan match {
           case w: V1WriteCommand =>
             if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) 
{
@@ -100,8 +87,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with 
AdaptiveSparkPlanHelper
 
     // Check whether the output ordering is matched before FileFormatWriter 
executes rdd.
     assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
-      s"Expect orderingMatched: $orderingMatched, " +
-        s"Actual: ${FileFormatWriter.outputOrderingMatched}")
+      s"Expect: $orderingMatched, Actual: 
${FileFormatWriter.outputOrderingMatched}")
 
     sparkContext.listenerBus.waitUntilEmpty()
 
@@ -117,8 +103,6 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with 
AdaptiveSparkPlanHelper
     assert(empty2nullExpr == hasEmpty2Null,
       s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr. 
Plan:\n$optimizedPlan")
 
-    customValidate(optimizedPlan)
-
     spark.listenerManager.unregister(listener)
   }
 }
@@ -407,30 +391,4 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
       }
     }
   }
-
-  test("v1 write with sort by literal column preserve custom order") {
-    withPlannedWrite { _ =>
-      withTable("t") {
-        sql(
-          """
-            |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
-            |PARTITIONED BY (k)
-            |""".stripMargin)
-        executeAndCheckOrderingAndCustomValidate(hasLogicalSort = true, 
orderingMatched = true) {
-          sql(
-            """
-              |INSERT OVERWRITE t
-              |SELECT i, j, '0' as k FROM t0 SORT BY k, i
-              |""".stripMargin)
-        } { optimizedPlan =>
-          assert {
-            optimizedPlan.outputOrdering.exists {
-              case SortOrder(attr: AttributeReference, _, _, _) => attr.name 
== "i"
-              case _ => false
-            }
-          }
-        }
-      }
-    }
-  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
index 55bb2c60dcca..e0e056be5987 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SortOrder}
 import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
 import org.apache.spark.sql.hive.HiveUtils._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -127,35 +126,4 @@ class V1WriteHiveCommandSuite
       }
     }
   }
-
-  test("v1 write to hive table with sort by literal column preserve custom 
order") {
-    withCovnertMetastore { _ =>
-      withPlannedWrite { _ =>
-        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-          withTable("t") {
-            sql(
-              """
-                |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
-                |PARTITIONED BY (k)
-                |""".stripMargin)
-            executeAndCheckOrderingAndCustomValidate(
-              hasLogicalSort = true, orderingMatched = true) {
-              sql(
-                """
-                  |INSERT OVERWRITE t
-                  |SELECT i, j, '0' as k FROM t0 SORT BY k, i
-                  |""".stripMargin)
-            } { optimizedPlan =>
-              assert {
-                optimizedPlan.outputOrdering.exists {
-                  case SortOrder(attr: AttributeReference, _, _, _) => 
attr.name == "i"
-                  case _ => false
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to