This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ce3437af0179 Revert "[SPARK-53738][SQL] PlannedWrite should preserve
custom sort order when query output contains literal"
ce3437af0179 is described below
commit ce3437af0179db986d57b464e69b845337f469ac
Author: Peter Toth <[email protected]>
AuthorDate: Thu Oct 9 10:04:40 2025 +0200
Revert "[SPARK-53738][SQL] PlannedWrite should preserve custom sort order
when query output contains literal"
This reverts commit 22d9709c2921f358bd5c70036d886d9bb0ebb4f5.
---
.../apache/spark/sql/catalyst/dsl/package.scala | 1 -
.../spark/sql/catalyst/expressions/SortOrder.scala | 49 +++++-----------------
.../sql/catalyst/expressions/aggregate/Mode.scala | 4 +-
.../expressions/aggregate/percentiles.scala | 4 +-
.../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +-
.../plans/AliasAwareOutputExpression.scala | 6 +--
.../plans/logical/basicLogicalOperators.scala | 3 +-
.../sql/catalyst/expressions/OrderingSuite.scala | 3 +-
.../org/apache/spark/sql/execution/SortExec.scala | 9 ++--
.../spark/sql/execution/SortPrefixUtils.scala | 8 ----
.../sql/execution/columnar/InMemoryRelation.scala | 19 +++------
.../execution/datasources/DataSourceStrategy.scala | 3 --
.../spark/sql/execution/datasources/V1Writes.scala | 20 ++-------
.../window/WindowEvaluatorFactoryBase.scala | 4 +-
.../datasources/V1WriteCommandSuite.scala | 46 +-------------------
.../command/V1WriteHiveCommandSuite.scala | 32 --------------
16 files changed, 34 insertions(+), 179 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index e9e897cfb78e..1d7cf5455e57 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -151,7 +151,6 @@ package object dsl extends SQLConfHelper {
def asc: SortOrder = SortOrder(expr, Ascending)
def asc_nullsLast: SortOrder = SortOrder(expr, Ascending, NullsLast,
Seq.empty)
- def const: SortOrder = SortOrder(expr, Constant)
def desc: SortOrder = SortOrder(expr, Descending)
def desc_nullsFirst: SortOrder = SortOrder(expr, Descending, NullsFirst,
Seq.empty)
def as(alias: String): NamedExpression = Alias(expr, alias)()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 4632e064daa5..166866c90b87 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -45,11 +45,6 @@ case object Descending extends SortDirection {
override def defaultNullOrdering: NullOrdering = NullsLast
}
-case object Constant extends SortDirection {
- override def sql: String = "CONST"
- override def defaultNullOrdering: NullOrdering = NullsFirst
-}
-
case object NullsFirst extends NullOrdering {
override def sql: String = "NULLS FIRST"
}
@@ -74,13 +69,8 @@ case class SortOrder(
override def children: Seq[Expression] = child +: sameOrderExpressions
- override def checkInputDataTypes(): TypeCheckResult = {
- if (direction == Constant) {
- TypeCheckResult.TypeCheckSuccess
- } else {
- TypeUtils.checkForOrderingExpr(dataType, prettyName)
- }
- }
+ override def checkInputDataTypes(): TypeCheckResult =
+ TypeUtils.checkForOrderingExpr(dataType, prettyName)
override def dataType: DataType = child.dataType
override def nullable: Boolean = child.nullable
@@ -91,8 +81,8 @@ case class SortOrder(
def isAscending: Boolean = direction == Ascending
def satisfies(required: SortOrder): Boolean = {
- children.exists(required.child.semanticEquals) && (direction == Constant ||
- direction == required.direction && nullOrdering == required.nullOrdering)
+ children.exists(required.child.semanticEquals) &&
+ direction == required.direction && nullOrdering == required.nullOrdering
}
override protected def withNewChildrenInternal(newChildren:
IndexedSeq[Expression]): SortOrder =
@@ -111,38 +101,21 @@ object SortOrder {
* Returns if a sequence of SortOrder satisfies another sequence of
SortOrder.
*
* SortOrder sequence A satisfies SortOrder sequence B if and only if B is
an equivalent of A
- * or of A's prefix, except for SortOrder in B that satisfies any constant
SortOrder in A.
- *
- * Here are examples of ordering A satisfying ordering B:
+ * or of A's prefix. Here are examples of ordering A satisfying ordering B:
* <ul>
* <li>ordering A is [x, y] and ordering B is [x]</li>
- * <li>ordering A is [z(const), x, y] and ordering B is [x, z]</li>
* <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is
[x1]</li>
* <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is
[x1]</li>
* </ul>
*/
- def orderingSatisfies(
- providedOrdering: Seq[SortOrder], requiredOrdering: Seq[SortOrder]):
Boolean = {
- if (requiredOrdering.isEmpty) {
- return true
- }
-
- val (constantProvidedOrdering, nonConstantProvidedOrdering) =
providedOrdering.partition {
- case SortOrder(_, Constant, _, _) => true
- case SortOrder(child, _, _, _) => child.foldable
- }
-
- val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder
=>
- constantProvidedOrdering.exists { providedOrder =>
- providedOrder.satisfies(requiredOrder)
- }
- }
-
- if (effectiveRequiredOrdering.length > nonConstantProvidedOrdering.length)
{
+ def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]):
Boolean = {
+ if (ordering2.isEmpty) {
+ true
+ } else if (ordering2.length > ordering1.length) {
false
} else {
- effectiveRequiredOrdering.zip(nonConstantProvidedOrdering).forall {
- case (required, provided) => provided.satisfies(required)
+ ordering2.zip(ordering1).forall {
+ case (o2, o1) => o1.satisfies(o2)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index b61dae585061..7a4f04bf04f7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder,
UnresolvedWithinGroup}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Constant,
Descending, Expression, ExpressionDescription, ImplicitCastInputTypes,
SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending,
Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLExpr
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.types.PhysicalDataType
@@ -199,8 +199,6 @@ case class Mode(
this.copy(child = child, reverseOpt = Some(true))
case SortOrder(child, Descending, _, _) =>
this.copy(child = child, reverseOpt = Some(false))
- case SortOrder(child, Constant, _, _) =>
- this.copy(child = child)
}
case _ => this
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
index 942c06f60d12..6dfa1b499df2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala
@@ -382,7 +382,7 @@ case class PercentileCont(left: Expression, right:
Expression, reverse: Boolean
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
- case SortOrder(child, Ascending | Constant, _, _) => this.copy(left =
child)
+ case SortOrder(child, Ascending, _, _) => this.copy(left = child)
case SortOrder(child, Descending, _, _) => this.copy(left = child,
reverse = true)
}
}
@@ -440,7 +440,7 @@ case class PercentileDisc(
nodeName, 1, orderingWithinGroup.length)
}
orderingWithinGroup.head match {
- case SortOrder(expr, Ascending | Constant, _, _) => this.copy(child =
expr)
+ case SortOrder(expr, Ascending, _, _) => this.copy(child = expr)
case SortOrder(expr, Descending, _, _) => this.copy(child = expr,
reverse = true)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b5941cd5eddc..fc65c24afcb8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1919,7 +1919,7 @@ object CombineFilters extends Rule[LogicalPlan] with
PredicateHelper {
object EliminateSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan.transformUpWithPruning(_.containsPattern(SORT)) {
case s @ Sort(orders, _, child, _) if orders.isEmpty ||
orders.exists(_.child.foldable) =>
- val newOrders = orders.filterNot(o => o.direction != Constant &&
o.child.foldable)
+ val newOrders = orders.filterNot(_.child.foldable)
if (newOrders.isEmpty) {
child
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
index efbd7b0c8b81..e1a9e8b5ea81 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans
import scala.collection.mutable
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeSet, Constant, Empty2Null, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.internal.SQLConf
/**
@@ -128,8 +128,6 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
}
}
}
- newOrdering.takeWhile(_.isDefined).flatten.toSeq ++
outputExpressions.collect {
- case a @ Alias(child, _) if child.foldable => SortOrder(a.toAttribute,
Constant)
- }
+ newOrdering.takeWhile(_.isDefined).flatten.toSeq
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 5a7548f13f1f..ad6939422b97 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -912,8 +912,7 @@ case class Sort(
override def maxRowsPerPartition: Option[Long] = {
if (global) maxRows else child.maxRowsPerPartition
}
- override def outputOrdering: Seq[SortOrder] =
- order ++ child.outputOrdering.filter(_.direction == Constant)
+ override def outputOrdering: Seq[SortOrder] = order
final override val nodePatterns: Seq[TreePattern] = Seq(SORT)
override protected def withNewChildInternal(newChild: LogicalPlan): Sort =
copy(child = newChild)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
index 5facdaeb1aca..06c8b5ccef65 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
@@ -43,10 +43,9 @@ class OrderingSuite extends SparkFunSuite with
ExpressionEvalHelper {
val sortOrder = direction match {
case Ascending => BoundReference(0, dataType, nullable = true).asc
case Descending => BoundReference(0, dataType, nullable = true).desc
- case Constant => BoundReference(0, dataType, nullable = true).const
}
val expectedCompareResult = direction match {
- case Ascending | Constant => signum(expected)
+ case Ascending => signum(expected)
case Descending => -1 * signum(expected)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 708e054664c5..11fde41aae9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -46,8 +46,7 @@ case class SortExec(
override def output: Seq[Attribute] = child.output
- override def outputOrdering: Seq[SortOrder] =
- sortOrder ++ child.outputOrdering.filter(_.direction == Constant)
+ override def outputOrdering: Seq[SortOrder] = sortOrder
// sort performed is local within a given partition so will retain
// child operator's partitioning
@@ -74,17 +73,15 @@ case class SortExec(
* should make it public.
*/
def createSorter(): UnsafeExternalRowSorter = {
- val effectiveSortOrder = sortOrder.filterNot(_.direction == Constant)
-
rowSorter = new ThreadLocal[UnsafeExternalRowSorter]()
val ordering = RowOrdering.create(sortOrder, output)
// The comparator for comparing prefix
- val boundSortExpression =
BindReferences.bindReference(effectiveSortOrder.head, output)
+ val boundSortExpression = BindReferences.bindReference(sortOrder.head,
output)
val prefixComparator =
SortPrefixUtils.getPrefixComparator(boundSortExpression)
- val canUseRadixSort = enableRadixSort && effectiveSortOrder.length == 1 &&
+ val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
// The generator for prefix
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index a50f2ba0d04f..7332bbcb1845 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -63,8 +63,6 @@ object SortPrefixUtils {
PrefixComparators.STRING_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.STRING_DESC
- case Constant =>
- NoOpPrefixComparator
}
}
@@ -78,8 +76,6 @@ object SortPrefixUtils {
PrefixComparators.BINARY_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.BINARY_DESC
- case Constant =>
- NoOpPrefixComparator
}
}
@@ -93,8 +89,6 @@ object SortPrefixUtils {
PrefixComparators.LONG_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.LONG_DESC
- case Constant =>
- NoOpPrefixComparator
}
}
@@ -108,8 +102,6 @@ object SortPrefixUtils {
PrefixComparators.DOUBLE_DESC_NULLS_FIRST
case Descending =>
PrefixComparators.DOUBLE_DESC
- case Constant =>
- NoOpPrefixComparator
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index bf7491625fa0..eabbc7fc74f5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -439,7 +439,9 @@ case class InMemoryRelation(
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
override def doCanonicalize(): logical.LogicalPlan =
- withOutput(output.map(QueryPlan.normalizeExpressions(_, output)))
+ copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
+ cacheBuilder,
+ outputOrdering)
@transient val partitionStatistics = new PartitionStatistics(output)
@@ -467,13 +469,8 @@ case class InMemoryRelation(
}
}
- def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
- val map = AttributeMap(output.zip(newOutput))
- val newOutputOrdering = outputOrdering
- .map(_.transform { case a: Attribute => map(a) })
- .asInstanceOf[Seq[SortOrder]]
- InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering,
statsOfPlanToCache)
- }
+ def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
+ InMemoryRelation(newOutput, cacheBuilder, outputOrdering,
statsOfPlanToCache)
override def newInstance(): this.type = {
InMemoryRelation(
@@ -490,12 +487,6 @@ case class InMemoryRelation(
cloned
}
- override def makeCopy(newArgs: Array[AnyRef]): LogicalPlan = {
- val copied = super.makeCopy(newArgs).asInstanceOf[InMemoryRelation]
- copied.statsOfPlanToCache = this.statsOfPlanToCache
- copied
- }
-
override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}],
${cacheBuilder.storageLevel}"
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 95bba45c3b1d..2e47f08ac115 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,7 +25,6 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
-import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.PREDICATES
import org.apache.spark.rdd.RDD
@@ -828,8 +827,6 @@ object DataSourceStrategy
val directionV2 = directionV1 match {
case Ascending => SortDirection.ASCENDING
case Descending => SortDirection.DESCENDING
- case Constant =>
- throw SparkException.internalError(s"Unexpected catalyst sort
direction $Constant")
}
val nullOrderingV2 = nullOrderingV1 match {
case NullsFirst => NullOrdering.NULLS_FIRST
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index 53e2d3f74bb3..280fe1068d81 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeMap, AttributeSet, BitwiseAnd, Constant, Empty2Null, Expression,
HiveHash, Literal, NamedExpression, Pmod, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash,
Literal, NamedExpression, Pmod, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
@@ -199,27 +199,13 @@ object V1WritesUtils {
expressions.exists(_.exists(_.isInstanceOf[Empty2Null]))
}
- // SortOrder sequence A (outputOrdering) satisfies SortOrder sequence B
(requiredOrdering)
- // if and only if B is an equivalent of A or of A's prefix, except for
SortOrder in B that
- // satisfies any constant SortOrder in A.
def isOrderingMatched(
requiredOrdering: Seq[Expression],
outputOrdering: Seq[SortOrder]): Boolean = {
- val (constantOutputOrdering, nonConstantOutputOrdering) =
outputOrdering.partition {
- case SortOrder(_, Constant, _, _) => true
- case SortOrder(child, _, _, _) => child.foldable
- }
-
- val effectiveRequiredOrdering = requiredOrdering.filterNot { requiredOrder
=>
- constantOutputOrdering.exists { outputOrder =>
- outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
- }
- }
-
- if (effectiveRequiredOrdering.length > nonConstantOutputOrdering.length) {
+ if (requiredOrdering.length > outputOrdering.length) {
false
} else {
- effectiveRequiredOrdering.zip(nonConstantOutputOrdering).forall {
+ requiredOrdering.zip(outputOrdering).forall {
case (requiredOrder, outputOrder) =>
outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index 3ce11acfc4d1..c2dedda832e2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, Constant,
CurrentRow, DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending,
Expression, ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType,
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression,
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering,
SortOrder, SpecifiedWindowFrame, TimestampAddInterval, [...]
+import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow,
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression,
ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType,
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression,
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering,
SortOrder, SpecifiedWindowFrame, TimestampAddInterval, TimestampA [...]
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
@@ -95,7 +95,7 @@ trait WindowEvaluatorFactoryBase {
// Flip the sign of the offset when processing the order is descending
val boundOffset = sortExpr.direction match {
case Descending => UnaryMinus(offset)
- case Ascending | Constant => offset
+ case Ascending => offset
}
// Create the projection which returns the current 'value' modified by
adding the offset.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index c16b02906724..80d771428d90 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -63,23 +63,10 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with
AdaptiveSparkPlanHelper
hasLogicalSort: Boolean,
orderingMatched: Boolean,
hasEmpty2Null: Boolean = false)(query: => Unit): Unit = {
- executeAndCheckOrderingAndCustomValidate(
- hasLogicalSort, orderingMatched, hasEmpty2Null)(query)(_ => ())
- }
-
- /**
- * Execute a write query and check ordering of the plan, then do custom
validation
- */
- protected def executeAndCheckOrderingAndCustomValidate(
- hasLogicalSort: Boolean,
- orderingMatched: Boolean,
- hasEmpty2Null: Boolean = false)(query: => Unit)(
- customValidate: LogicalPlan => Unit): Unit = {
- @volatile var optimizedPlan: LogicalPlan = null
+ var optimizedPlan: LogicalPlan = null
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
- val conf = qe.sparkSession.sessionState.conf
qe.optimizedPlan match {
case w: V1WriteCommand =>
if (hasLogicalSort && conf.getConf(SQLConf.PLANNED_WRITE_ENABLED))
{
@@ -100,8 +87,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with
AdaptiveSparkPlanHelper
// Check whether the output ordering is matched before FileFormatWriter
executes rdd.
assert(FileFormatWriter.outputOrderingMatched == orderingMatched,
- s"Expect orderingMatched: $orderingMatched, " +
- s"Actual: ${FileFormatWriter.outputOrderingMatched}")
+ s"Expect: $orderingMatched, Actual:
${FileFormatWriter.outputOrderingMatched}")
sparkContext.listenerBus.waitUntilEmpty()
@@ -117,8 +103,6 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils with
AdaptiveSparkPlanHelper
assert(empty2nullExpr == hasEmpty2Null,
s"Expect hasEmpty2Null: $hasEmpty2Null, Actual: $empty2nullExpr.
Plan:\n$optimizedPlan")
- customValidate(optimizedPlan)
-
spark.listenerManager.unregister(listener)
}
}
@@ -407,30 +391,4 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
}
}
}
-
- test("v1 write with sort by literal column preserve custom order") {
- withPlannedWrite { _ =>
- withTable("t") {
- sql(
- """
- |CREATE TABLE t(i INT, j INT, k STRING) USING PARQUET
- |PARTITIONED BY (k)
- |""".stripMargin)
- executeAndCheckOrderingAndCustomValidate(hasLogicalSort = true,
orderingMatched = true) {
- sql(
- """
- |INSERT OVERWRITE t
- |SELECT i, j, '0' as k FROM t0 SORT BY k, i
- |""".stripMargin)
- } { optimizedPlan =>
- assert {
- optimizedPlan.outputOrdering.exists {
- case SortOrder(attr: AttributeReference, _, _, _) => attr.name
== "i"
- case _ => false
- }
- }
- }
- }
- }
- }
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
index 55bb2c60dcca..e0e056be5987 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/V1WriteHiveCommandSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
SortOrder}
import org.apache.spark.sql.execution.datasources.V1WriteCommandSuiteBase
import org.apache.spark.sql.hive.HiveUtils._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -127,35 +126,4 @@ class V1WriteHiveCommandSuite
}
}
}
-
- test("v1 write to hive table with sort by literal column preserve custom
order") {
- withCovnertMetastore { _ =>
- withPlannedWrite { _ =>
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- withTable("t") {
- sql(
- """
- |CREATE TABLE t(i INT, j INT, k STRING) STORED AS PARQUET
- |PARTITIONED BY (k)
- |""".stripMargin)
- executeAndCheckOrderingAndCustomValidate(
- hasLogicalSort = true, orderingMatched = true) {
- sql(
- """
- |INSERT OVERWRITE t
- |SELECT i, j, '0' as k FROM t0 SORT BY k, i
- |""".stripMargin)
- } { optimizedPlan =>
- assert {
- optimizedPlan.outputOrdering.exists {
- case SortOrder(attr: AttributeReference, _, _, _) =>
attr.name == "i"
- case _ => false
- }
- }
- }
- }
- }
- }
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]