This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fbf255ef7b01 [SPARK-50379][SQL] Fix DayTimeIntevalType handling in
WindowExecBase
fbf255ef7b01 is described below
commit fbf255ef7b01e7016f368b6afbc4234b79130af4
Author: Mihailo Milosevic <[email protected]>
AuthorDate: Thu Nov 21 10:55:07 2024 +0100
[SPARK-50379][SQL] Fix DayTimeIntevalType handling in WindowExecBase
### What changes were proposed in this pull request?
Initial PR missed adding DayTimeIntervalType handling with DateType
https://github.com/apache/spark/pull/32294. This PR adds cases for
DayTimeIntervalType.
### Why are the changes needed?
This needs to be supported by documentation.
### Does this PR introduce _any_ user-facing change?
Yes, we enable queries.
### How was this patch tested?
Golden file test added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48893 from mihailom-db/SPARK-35110.
Authored-by: Mihailo Milosevic <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 2 +-
.../catalyst/expressions/windowExpressions.scala | 2 +
.../window/WindowEvaluatorFactoryBase.scala | 5 +-
.../sql-tests/analyzer-results/window.sql.out | 60 ++++++++++++++++++++
.../src/test/resources/sql-tests/inputs/window.sql | 4 ++
.../resources/sql-tests/results/window.sql.out | 65 ++++++++++++++++++++++
6 files changed, 136 insertions(+), 2 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 272a0f92431e..38b1656ac05c 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -965,7 +965,7 @@
},
"RANGE_FRAME_INVALID_TYPE" : {
"message" : [
- "The data type <orderSpecType> used in the order specification does
not match the data type <valueBoundaryType> which is used in the range frame."
+ "The data type <orderSpecType> used in the order specification does
not support the data type <valueBoundaryType> which is used in the range frame."
]
},
"RANGE_FRAME_MULTI_ORDER" : {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index ecc32bc8d0ef..ab787663c992 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.trees.{BinaryLike,
LeafLike, TernaryLike, U
import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern,
UNRESOLVED_WINDOW_EXPRESSION, WINDOW_EXPRESSION}
import org.apache.spark.sql.errors.{DataTypeErrorsBase,
QueryCompilationErrors, QueryErrorsBase, QueryExecutionErrors}
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.DayTimeIntervalType.DAY
/**
* The trait of the Window Specification (specified in the OVER clause or
WINDOW clause) for
@@ -108,6 +109,7 @@ case class WindowSpecDefinition(
private def isValidFrameType(ft: DataType): Boolean =
(orderSpec.head.dataType, ft) match {
case (DateType, IntegerType) => true
case (DateType, _: YearMonthIntervalType) => true
+ case (DateType, DayTimeIntervalType(DAY, DAY)) => true
case (TimestampType | TimestampNTZType, CalendarIntervalType) => true
case (TimestampType | TimestampNTZType, _: YearMonthIntervalType) => true
case (TimestampType | TimestampNTZType, _: DayTimeIntervalType) => true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index bdaccd43c1b7..7d13dbbe2a06 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -22,11 +22,12 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow,
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression,
FrameLessOffsetWindowFunction, FrameType, IdentityProjection, IntegerLiteral,
MutableProjection, NamedExpression, OffsetWindowFunction, PythonFuncExpression,
RangeFrame, RowFrame, RowOrdering, SortOrder, SpecifiedWindowFrame, TimeAdd,
TimestampAddYMInterval, UnaryMinus, UnboundedFol [...]
+import org.apache.spark.sql.catalyst.expressions.{Add,
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow,
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression,
ExtractANSIIntervalDays, FrameLessOffsetWindowFunction, FrameType,
IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression,
OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering,
SortOrder, SpecifiedWindowFrame, TimeAdd, TimestampAddYMInterval, [...]
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{CalendarIntervalType, DateType,
DayTimeIntervalType, DecimalType, IntegerType, TimestampNTZType, TimestampType,
YearMonthIntervalType}
+import org.apache.spark.sql.types.DayTimeIntervalType.DAY
import org.apache.spark.util.collection.Utils
trait WindowEvaluatorFactoryBase {
@@ -101,6 +102,8 @@ trait WindowEvaluatorFactoryBase {
val boundExpr = (expr.dataType, boundOffset.dataType) match {
case (DateType, IntegerType) => DateAdd(expr, boundOffset)
case (DateType, _: YearMonthIntervalType) => DateAddYMInterval(expr,
boundOffset)
+ case (DateType, DayTimeIntervalType(DAY, DAY)) =>
+ DateAdd(expr, ExtractANSIIntervalDays(boundOffset))
case (TimestampType | TimestampNTZType, CalendarIntervalType) =>
TimeAdd(expr, boundOffset, Some(timeZone))
case (TimestampType | TimestampNTZType, _: YearMonthIntervalType) =>
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/window.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/window.sql.out
index 367d5b016701..db49b1bfd39d 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/window.sql.out
@@ -1414,3 +1414,63 @@ Project [cate#x, val#x, r#x]
+- Project [val#x, val_long#xL, val_double#x,
val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL,
val_double#x, val_date#x, val_timestamp#x, cate#x]
+
+
+-- !query
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE
INTERVAL '5' DAY PRECEDING) AS mean FROM testData
+-- !query analysis
+Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x,
cate#x, mean#x]
++- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x,
cate#x, mean#x, mean#x]
+ +- Window [mean(val_double#x) windowspecdefinition(val#x, val_date#x ASC
NULLS FIRST, specifiedwindowframe(RangeFrame, -INTERVAL '5' DAY,
currentrow$())) AS mean#x], [val#x], [val_date#x ASC NULLS FIRST]
+ +- Project [val#x, val_long#xL, val_double#x, val_date#x,
val_timestamp#x, cate#x]
+ +- SubqueryAlias testdata
+ +- View (`testData`, [val#x, val_long#xL, val_double#x,
val_date#x, val_timestamp#x, cate#x])
+ +- Project [cast(val#x as int) AS val#x, cast(val_long#xL as
bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x,
cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS
val_timestamp#x, cast(cate#x as string) AS cate#x]
+ +- Project [val#x, val_long#xL, val_double#x, val_date#x,
val_timestamp#x, cate#x]
+ +- SubqueryAlias testData
+ +- LocalRelation [val#x, val_long#xL, val_double#x,
val_date#x, val_timestamp#x, cate#x]
+
+
+-- !query
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE
INTERVAL '1 2:3:4.001' DAY TO SECOND PRECEDING) AS mean FROM testData
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.RANGE_FRAME_INVALID_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "orderSpecType" : "\"DATE\"",
+ "sqlExpr" : "\"(PARTITION BY val ORDER BY val_date ASC NULLS FIRST RANGE
BETWEEN INTERVAL '1 02:03:04.001' DAY TO SECOND PRECEDING AND CURRENT ROW)\"",
+ "valueBoundaryType" : "\"INTERVAL DAY TO SECOND\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 33,
+ "stopIndex" : 121,
+ "fragment" : "(partition BY val ORDER BY val_date RANGE INTERVAL '1
2:3:4.001' DAY TO SECOND PRECEDING)"
+ } ]
+}
+
+
+-- !query
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE DATE
'2024-01-01' FOLLOWING) AS mean FROM testData
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.SPECIFIED_WINDOW_FRAME_UNACCEPTED_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "expectedType" : "(\"NUMERIC\" or \"INTERVAL DAY TO SECOND\" or \"INTERVAL
YEAR TO MONTH\" or \"INTERVAL\")",
+ "exprType" : "\"DATE\"",
+ "location" : "lower",
+ "sqlExpr" : "\"RANGE BETWEEN DATE '2024-01-01' FOLLOWING AND CURRENT ROW\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 33,
+ "stopIndex" : 102,
+ "fragment" : "(partition BY val ORDER BY val_date RANGE DATE '2024-01-01'
FOLLOWING)"
+ } ]
+}
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql
b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index f94ff0f0a68a..f3cbf6ef1ccb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -478,3 +478,7 @@ SELECT * FROM (SELECT cate, val, dense_rank()
OVER(PARTITION BY cate ORDER BY va
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY
val) as r FROM testData) where r <= 2;
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY
val) as r FROM testData) where r = 1;
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY
val) as r FROM testData) where r <= 2;
+
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE
INTERVAL '5' DAY PRECEDING) AS mean FROM testData;
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE
INTERVAL '1 2:3:4.001' DAY TO SECOND PRECEDING) AS mean FROM testData;
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE DATE
'2024-01-01' FOLLOWING) AS mean FROM testData;
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out
b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index 182a4b819fcb..87381b64638b 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1442,3 +1442,68 @@ a 1 2
a NULL 1
b 1 1
b 2 2
+
+
+-- !query
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE
INTERVAL '5' DAY PRECEDING) AS mean FROM testData
+-- !query schema
+struct<val:int,val_long:bigint,val_double:double,val_date:date,val_timestamp:timestamp,cate:string,mean:double>
+-- !query output
+1 1 1.0 2017-08-01 2017-07-31 17:00:00 a 1.0
+1 2 2.5 2017-08-02 2017-08-05 23:13:20 a 1.5
+1 NULL 1.0 2017-08-01 2017-07-31 17:00:00 b 1.0
+2 2147483650 100.001 2020-12-31 2020-12-30 16:00:00 a
100.001
+2 3 3.3 2017-08-03 2017-08-17 13:00:00 b 3.3
+3 1 1.0 2017-08-01 2017-07-31 17:00:00 NULL 1.0
+3 2147483650 100.001 2020-12-31 2020-12-30 16:00:00 b
100.001
+NULL 1 1.0 2017-08-01 2017-07-31 17:00:00 a 1.0
+NULL NULL NULL NULL NULL NULL NULL
+
+
+-- !query
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE
INTERVAL '1 2:3:4.001' DAY TO SECOND PRECEDING) AS mean FROM testData
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.RANGE_FRAME_INVALID_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "orderSpecType" : "\"DATE\"",
+ "sqlExpr" : "\"(PARTITION BY val ORDER BY val_date ASC NULLS FIRST RANGE
BETWEEN INTERVAL '1 02:03:04.001' DAY TO SECOND PRECEDING AND CURRENT ROW)\"",
+ "valueBoundaryType" : "\"INTERVAL DAY TO SECOND\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 33,
+ "stopIndex" : 121,
+ "fragment" : "(partition BY val ORDER BY val_date RANGE INTERVAL '1
2:3:4.001' DAY TO SECOND PRECEDING)"
+ } ]
+}
+
+
+-- !query
+SELECT *, mean(val_double) over (partition BY val ORDER BY val_date RANGE DATE
'2024-01-01' FOLLOWING) AS mean FROM testData
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.SPECIFIED_WINDOW_FRAME_UNACCEPTED_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "expectedType" : "(\"NUMERIC\" or \"INTERVAL DAY TO SECOND\" or \"INTERVAL
YEAR TO MONTH\" or \"INTERVAL\")",
+ "exprType" : "\"DATE\"",
+ "location" : "lower",
+ "sqlExpr" : "\"RANGE BETWEEN DATE '2024-01-01' FOLLOWING AND CURRENT ROW\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 33,
+ "stopIndex" : 102,
+ "fragment" : "(partition BY val ORDER BY val_date RANGE DATE '2024-01-01'
FOLLOWING)"
+ } ]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]