This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f350b760ea11 [SPARK-47270][SQL] Dataset.isEmpty projects 
CommandResults locally
f350b760ea11 is described below

commit f350b760ea117a2d670720e37ba7be3fd8c0de0a
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Mar 11 19:33:43 2024 +0800

    [SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally
    
    ### What changes were proposed in this pull request?
    
    Similar to #40779, `Dataset.isEmpty` should also not trigger job execution 
on CommandResults.
    
    This PR converts `CommandResult` to `LocalRelation` in `Dataset.isEmpty` 
method.
    
    ### Why are the changes needed?
    
    A simple `spark.sql("show tables").isEmpty` shouldn not require an executor.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added new UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45373 from wForget/SPARK-47270.
    
    Authored-by: Zhen Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 22 ++++++++++++++--------
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 19 +++++++++++++++++++
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f0c9f7ae53fc..c2e51b574df7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -271,13 +271,7 @@ class Dataset[T] private[sql](
   private[sql] def getRows(
       numRows: Int,
       truncate: Int): Seq[Seq[String]] = {
-    val newDf = logicalPlan match {
-      case c: CommandResult =>
-        // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the 
casting locally to
-        // avoid triggering a job
-        Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows))
-      case _ => toDF()
-    }
+    val newDf = commandResultOptimized.toDF()
     val castCols = newDf.logicalPlan.output.map { col =>
       Column(ToPrettyString(col))
     }
@@ -655,7 +649,8 @@ class Dataset[T] private[sql](
    * @group basic
    * @since 2.4.0
    */
-  def isEmpty: Boolean = withAction("isEmpty", 
select().limit(1).queryExecution) { plan =>
+  def isEmpty: Boolean = withAction("isEmpty",
+      commandResultOptimized.select().limit(1).queryExecution) { plan =>
     plan.executeTake(1).isEmpty
   }
 
@@ -4471,6 +4466,17 @@ class Dataset[T] private[sql](
     }
   }
 
+  /** Returns a optimized plan for CommandResult, convert to `LocalRelation`. 
*/
+  private def commandResultOptimized: Dataset[T] = {
+    logicalPlan match {
+      case c: CommandResult =>
+        // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the 
casting locally to
+        // avoid triggering a job
+        Dataset(sparkSession, LocalRelation(c.output, c.rows))
+      case _ => this
+    }
+  }
+
   /** Convert to an RDD of serialized ArrowRecordBatches. */
   private[sql] def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
     val schemaCaptured = this.schema
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index bb18769a23b4..91057dcc98e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2719,6 +2719,25 @@ class DatasetSuite extends QueryTest
     checkDataset(ds.map(t => t),
       WithSet(0, HashSet("foo", "bar")), WithSet(1, HashSet("bar", "zoo")))
   }
+
+  test("SPARK-47270: isEmpty does not trigger job execution on 
CommandResults") {
+    withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {
+      withTable("t1") {
+        sql("create table t1(c int) using parquet")
+
+        @volatile var jobCounter = 0
+        val listener = new SparkListener {
+          override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+            jobCounter += 1
+          }
+        }
+        withListener(spark.sparkContext, listener) { _ =>
+          sql("show tables").isEmpty
+        }
+        assert(jobCounter === 0)
+      }
+    }
+  }
 }
 
 class DatasetLargeResultCollectingSuite extends QueryTest


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to