This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0b6cb3e92a03 [SPARK-48914][SQL][TESTS] Add OFFSET operator as an
option in the subquery generator
0b6cb3e92a03 is described below
commit 0b6cb3e92a03bc3d472f7bc03a6519c0be4187ae
Author: Avery Qi <[email protected]>
AuthorDate: Tue Jul 23 09:36:30 2024 +0800
[SPARK-48914][SQL][TESTS] Add OFFSET operator as an option in the subquery
generator
### What changes were proposed in this pull request?
This adds offset operator in subquery generator suite.
### Why are the changes needed?
Complete the subquery generator functionality
### Does this PR introduce _any_ user-facing change?
previously there's no subqueries having offset operator being tested.
Currently offset operator is added.
### How was this patch tested?
query test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47375 from averyqi-db/offset_operator.
Authored-by: Avery Qi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../jdbc/querytest/GeneratedSubquerySuite.scala | 51 +++++++++++++++++-----
.../apache/spark/sql/QueryGeneratorHelper.scala | 16 +++++--
2 files changed, 51 insertions(+), 16 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
index 8cde20529d7a..b526599482da 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
@@ -126,33 +126,49 @@ class GeneratedSubquerySuite extends
DockerJDBCIntegrationSuite with QueryGenera
case _ => None
}
- // For the OrderBy, consider whether or not the result of the subquery is
required to be sorted.
- // This is to maintain test determinism. This is affected by whether the
subquery has a limit
- // clause.
- val requiresLimitOne = isScalarSubquery && (operatorInSubquery match {
+ // For some situation needs exactly one row as output, we force the
+ // subquery to have a limit of 1 and no offset value (in case it outputs
+ // empty result set).
+ val requiresExactlyOneRowOutput = isScalarSubquery && (operatorInSubquery
match {
case a: Aggregate => a.groupingExpressions.nonEmpty
- case l: Limit => l.limitValue > 1
case _ => true
})
- val orderByClause = if (requiresLimitOne ||
operatorInSubquery.isInstanceOf[Limit]) {
+ // For the OrderBy, consider whether or not the result of the subquery is
required to be sorted.
+ // This is to maintain test determinism. This is affected by whether the
subquery has a limit
+ // clause or an offset clause.
+ val orderByClause = if (
+ requiresExactlyOneRowOutput ||
operatorInSubquery.isInstanceOf[LimitAndOffset]
+ ) {
Some(OrderByClause(projections))
} else {
None
}
+ // SPARK-46446: offset operator in correlated subquery is not supported
+ // as it creates incorrect results for now.
+ val requireNoOffsetInCorrelatedSubquery = correlationConditions.nonEmpty
+
// For the Limit clause, consider whether the subquery needs to return 1
row, or whether the
// operator to be included is a Limit.
- val limitClause = if (requiresLimitOne) {
- Some(Limit(1))
+ val limitAndOffsetClause = if (requiresExactlyOneRowOutput) {
+ Some(LimitAndOffset(1, 0))
} else {
operatorInSubquery match {
- case limit: Limit => Some(limit)
+ case lo: LimitAndOffset =>
+ val offsetValue = if (requireNoOffsetInCorrelatedSubquery) 0 else
lo.offsetValue
+ if (offsetValue == 0 && lo.limitValue == 0) {
+ None
+ } else {
+ Some(LimitAndOffset(lo.limitValue, offsetValue))
+ }
case _ => None
}
}
- Query(selectClause, fromClause, whereClause, groupByClause, orderByClause,
limitClause)
+ Query(
+ selectClause, fromClause, whereClause, groupByClause, orderByClause,
limitAndOffsetClause
+ )
}
/**
@@ -236,7 +252,7 @@ class GeneratedSubquerySuite extends
DockerJDBCIntegrationSuite with QueryGenera
val orderByClause = Some(OrderByClause(queryProjection))
Query(selectClause, fromClause, whereClause, groupByClause = None,
- orderByClause, limitClause = None)
+ orderByClause, limitAndOffsetClause = None)
}
private def getPostgresResult(stmt: Statement, sql: String): Array[Row] = {
@@ -340,6 +356,16 @@ class GeneratedSubquerySuite extends
DockerJDBCIntegrationSuite with QueryGenera
}
}
+ def limitAndOffsetChoices(): Seq[LimitAndOffset] = {
+ val limitValues = Seq(0, 1, 10)
+ val offsetValues = Seq(0, 1, 10)
+ limitValues.flatMap(
+ limit => offsetValues.map(
+ offset => LimitAndOffset(limit, offset)
+ )
+ ).filter(lo => !(lo.limitValue == 0 && lo.offsetValue == 0))
+ }
+
case class SubquerySpec(query: String, isCorrelated: Boolean,
subqueryType: SubqueryType.Value)
val generatedQuerySpecs = scala.collection.mutable.Set[SubquerySpec]()
@@ -363,7 +389,8 @@ class GeneratedSubquerySuite extends
DockerJDBCIntegrationSuite with QueryGenera
val aggregates = combinations.map {
case (af, groupBy) => Aggregate(Seq(af), if (groupBy)
Seq(groupByColumn) else Seq())
}
- val subqueryOperators = Seq(Limit(1), Limit(10)) ++ aggregates
+
+ val subqueryOperators = limitAndOffsetChoices() ++ aggregates
for {
subqueryOperator <- subqueryOperators
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
b/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
index 8e780b3ef70f..3b25edf1cf7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
@@ -185,8 +185,16 @@ trait QueryGeneratorHelper {
f"groupingExpr=[${groupingExpressions.mkString(",")}])"
}
- case class Limit(limitValue: Int) extends Operator with Clause {
- override def toString: String = f"LIMIT $limitValue"
+ case class LimitAndOffset(limitValue: Int, offsetValue: Int) extends
Operator with Clause {
+ override def toString: String = {
+ val limitClause = if (limitValue > 0) { s"LIMIT $limitValue" } else { ""
}
+ val offsetClause = if (offsetValue > 0) { s"OFFSET $offsetValue" } else
{ "" }
+ if (limitClause.nonEmpty && offsetClause.nonEmpty) {
+ s"$limitClause $offsetClause"
+ } else {
+ s"$limitClause$offsetClause"
+ }
+ }
}
object SubqueryLocation extends Enumeration {
@@ -223,7 +231,7 @@ trait QueryGeneratorHelper {
whereClause: Option[WhereClause] = None,
groupByClause: Option[GroupByClause] = None,
orderByClause: Option[OrderByClause] = None,
- limitClause: Option[Limit] = None
+ limitAndOffsetClause: Option[LimitAndOffset] = None
) extends Operator {
override def toString: String = {
@@ -232,7 +240,7 @@ trait QueryGeneratorHelper {
f"$selectClause $fromClause${getOptionClauseString(whereClause)}" +
f"${getOptionClauseString(groupByClause)}${getOptionClauseString(orderByClause)}"
+
- f"${getOptionClauseString(limitClause)}"
+ f"${getOptionClauseString(limitAndOffsetClause)}"
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]