This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b6cb3e92a03 [SPARK-48914][SQL][TESTS] Add OFFSET operator as an 
option in the subquery generator
0b6cb3e92a03 is described below

commit 0b6cb3e92a03bc3d472f7bc03a6519c0be4187ae
Author: Avery Qi <[email protected]>
AuthorDate: Tue Jul 23 09:36:30 2024 +0800

    [SPARK-48914][SQL][TESTS] Add OFFSET operator as an option in the subquery 
generator
    
    ### What changes were proposed in this pull request?
    This adds offset operator in subquery generator suite.
    
    ### Why are the changes needed?
    Complete the subquery generator functionality
    
    ### Does this PR introduce _any_ user-facing change?
    previously there's no subqueries having offset operator being tested. 
Currently offset operator is added.
    
    ### How was this patch tested?
    query test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47375 from averyqi-db/offset_operator.
    
    Authored-by: Avery Qi <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../jdbc/querytest/GeneratedSubquerySuite.scala    | 51 +++++++++++++++++-----
 .../apache/spark/sql/QueryGeneratorHelper.scala    | 16 +++++--
 2 files changed, 51 insertions(+), 16 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
index 8cde20529d7a..b526599482da 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/querytest/GeneratedSubquerySuite.scala
@@ -126,33 +126,49 @@ class GeneratedSubquerySuite extends 
DockerJDBCIntegrationSuite with QueryGenera
       case _ => None
     }
 
-    // For the OrderBy, consider whether or not the result of the subquery is 
required to be sorted.
-    // This is to maintain test determinism. This is affected by whether the 
subquery has a limit
-    // clause.
-    val requiresLimitOne = isScalarSubquery && (operatorInSubquery match {
+    // For some situation needs exactly one row as output, we force the
+    // subquery to have a limit of 1 and no offset value (in case it outputs
+    // empty result set).
+    val requiresExactlyOneRowOutput = isScalarSubquery && (operatorInSubquery 
match {
       case a: Aggregate => a.groupingExpressions.nonEmpty
-      case l: Limit => l.limitValue > 1
       case _ => true
     })
 
-    val orderByClause = if (requiresLimitOne || 
operatorInSubquery.isInstanceOf[Limit]) {
+    // For the OrderBy, consider whether or not the result of the subquery is 
required to be sorted.
+    // This is to maintain test determinism. This is affected by whether the 
subquery has a limit
+    // clause or an offset clause.
+    val orderByClause = if (
+      requiresExactlyOneRowOutput || 
operatorInSubquery.isInstanceOf[LimitAndOffset]
+    ) {
       Some(OrderByClause(projections))
     } else {
       None
     }
 
+    // SPARK-46446: offset operator in correlated subquery is not supported
+    // as it creates incorrect results for now.
+    val requireNoOffsetInCorrelatedSubquery = correlationConditions.nonEmpty
+
     // For the Limit clause, consider whether the subquery needs to return 1 
row, or whether the
     // operator to be included is a Limit.
-    val limitClause = if (requiresLimitOne) {
-      Some(Limit(1))
+    val limitAndOffsetClause = if (requiresExactlyOneRowOutput) {
+      Some(LimitAndOffset(1, 0))
     } else {
       operatorInSubquery match {
-        case limit: Limit => Some(limit)
+        case lo: LimitAndOffset =>
+          val offsetValue = if (requireNoOffsetInCorrelatedSubquery) 0 else 
lo.offsetValue
+          if (offsetValue == 0 && lo.limitValue == 0) {
+            None
+          } else {
+            Some(LimitAndOffset(lo.limitValue, offsetValue))
+          }
         case _ => None
       }
     }
 
-    Query(selectClause, fromClause, whereClause, groupByClause, orderByClause, 
limitClause)
+    Query(
+      selectClause, fromClause, whereClause, groupByClause, orderByClause, 
limitAndOffsetClause
+    )
   }
 
   /**
@@ -236,7 +252,7 @@ class GeneratedSubquerySuite extends 
DockerJDBCIntegrationSuite with QueryGenera
     val orderByClause = Some(OrderByClause(queryProjection))
 
     Query(selectClause, fromClause, whereClause, groupByClause = None,
-      orderByClause, limitClause = None)
+      orderByClause, limitAndOffsetClause = None)
   }
 
   private def getPostgresResult(stmt: Statement, sql: String): Array[Row] = {
@@ -340,6 +356,16 @@ class GeneratedSubquerySuite extends 
DockerJDBCIntegrationSuite with QueryGenera
       }
     }
 
+    def limitAndOffsetChoices(): Seq[LimitAndOffset] = {
+      val limitValues = Seq(0, 1, 10)
+      val offsetValues = Seq(0, 1, 10)
+      limitValues.flatMap(
+        limit => offsetValues.map(
+          offset => LimitAndOffset(limit, offset)
+        )
+      ).filter(lo => !(lo.limitValue == 0 && lo.offsetValue == 0))
+    }
+
     case class SubquerySpec(query: String, isCorrelated: Boolean, 
subqueryType: SubqueryType.Value)
 
     val generatedQuerySpecs = scala.collection.mutable.Set[SubquerySpec]()
@@ -363,7 +389,8 @@ class GeneratedSubquerySuite extends 
DockerJDBCIntegrationSuite with QueryGenera
       val aggregates = combinations.map {
         case (af, groupBy) => Aggregate(Seq(af), if (groupBy) 
Seq(groupByColumn) else Seq())
       }
-      val subqueryOperators = Seq(Limit(1), Limit(10)) ++ aggregates
+
+      val subqueryOperators = limitAndOffsetChoices() ++ aggregates
 
       for {
         subqueryOperator <- subqueryOperators
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
index 8e780b3ef70f..3b25edf1cf7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryGeneratorHelper.scala
@@ -185,8 +185,16 @@ trait QueryGeneratorHelper {
       f"groupingExpr=[${groupingExpressions.mkString(",")}])"
   }
 
-  case class Limit(limitValue: Int) extends Operator with Clause {
-    override def toString: String = f"LIMIT $limitValue"
+  case class LimitAndOffset(limitValue: Int, offsetValue: Int) extends 
Operator with Clause {
+    override def toString: String = {
+      val limitClause = if (limitValue > 0) { s"LIMIT $limitValue" } else { "" 
}
+      val offsetClause = if (offsetValue > 0) { s"OFFSET $offsetValue" } else 
{ "" }
+      if (limitClause.nonEmpty && offsetClause.nonEmpty) {
+        s"$limitClause $offsetClause"
+      } else {
+        s"$limitClause$offsetClause"
+      }
+    }
   }
 
   object SubqueryLocation extends Enumeration {
@@ -223,7 +231,7 @@ trait QueryGeneratorHelper {
       whereClause: Option[WhereClause] = None,
       groupByClause: Option[GroupByClause] = None,
       orderByClause: Option[OrderByClause] = None,
-      limitClause: Option[Limit] = None
+      limitAndOffsetClause: Option[LimitAndOffset] = None
   ) extends Operator {
 
     override def toString: String = {
@@ -232,7 +240,7 @@ trait QueryGeneratorHelper {
 
       f"$selectClause $fromClause${getOptionClauseString(whereClause)}" +
         
f"${getOptionClauseString(groupByClause)}${getOptionClauseString(orderByClause)}"
 +
-        f"${getOptionClauseString(limitClause)}"
+        f"${getOptionClauseString(limitAndOffsetClause)}"
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to