Repository: spark
Updated Branches:
  refs/heads/master 4cb025afa -> 8f0511ed4


[SPARK-19650] Commands should not trigger a Spark job

Spark executes SQL commands eagerly. It does this by creating an RDD which 
contains the command's results. The downside to this is that any action on this 
RDD triggers a Spark job which is expensive and is unnecessary.

This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; 
it just materializes the result and puts them in a `LocalRelation`.

Added a regression test to `SQLQuerySuite`.

Author: Herman van Hovell <[email protected]>

Closes #17027 from hvanhovell/no-job-command.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0511ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0511ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0511ed

Branch: refs/heads/master
Commit: 8f0511ed49a353fb0745f320a84063ced5cc1857
Parents: 4cb025a
Author: Herman van Hovell <[email protected]>
Authored: Fri Feb 24 23:05:36 2017 -0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Feb 24 23:05:59 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    | 20 ++++++----------
 .../spark/sql/execution/QueryExecution.scala    |  2 --
 .../spark/sql/execution/SparkStrategies.scala   |  3 +--
 .../sql-tests/results/change-column.sql.out     |  4 ++--
 .../sql-tests/results/group-by-ordinal.sql.out  |  2 +-
 .../sql-tests/results/order-by-ordinal.sql.out  |  2 +-
 .../sql-tests/results/outer-join.sql.out        |  4 ++--
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 25 ++++++++++++++++++++
 8 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c212d6..1b04623 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
PartitioningCollection}
 import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CreateViewCommand, 
ExplainCommand, GlobalTempView, LocalTempView}
+import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.streaming.DataStreamWriter
@@ -175,19 +175,13 @@ class Dataset[T] private[sql](
   }
 
   @transient private[sql] val logicalPlan: LogicalPlan = {
-    def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
-      case _: Command |
-           _: InsertIntoTable => true
-      case _ => false
-    }
-
+    // For various commands (like DDL) and queries with side effects, we force 
query execution
+    // to happen right away to let these side effects take place eagerly.
     queryExecution.analyzed match {
-      // For various commands (like DDL) and queries with side effects, we 
force query execution
-      // to happen right away to let these side effects take place eagerly.
-      case p if hasSideEffects(p) =>
-        LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sparkSession)
-      case Union(children) if children.forall(hasSideEffects) =>
-        LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sparkSession)
+      case c: Command =>
+        LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
+      case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
+        LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
       case _ =>
         queryExecution.analyzed
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 137f7ba..6ec2f4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
     // SHOW TABLES in Hive only output table names, while ours outputs 
database, table name, isTemp.
     case command: ExecutedCommandExec if 
command.cmd.isInstanceOf[ShowTablesCommand] =>
       command.executeCollect().map(_.getString(1))
-    case command: ExecutedCommandExec =>
-      command.executeCollect().map(_.getString(0))
     case other =>
       val result: Seq[Seq[Any]] = 
other.executeCollectPublic().map(_.toSeq).toSeq
       // We need the types so we can output struct field names

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 027b148..20bf492 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SaveMode, Strategy}
+import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.columnar.{InMemoryRelation, 
InMemoryTableScanExec}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution.streaming._

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out 
b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index 59eb569..ba8bc93 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false
 -- !query 19 schema
 struct<key:string,value:string>
 -- !query 19 output
-spark.sql.caseSensitive
+spark.sql.caseSensitive        false
 
 
 -- !query 20
@@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true
 -- !query 21 schema
 struct<key:string,value:string>
 -- !query 21 output
-spark.sql.caseSensitive
+spark.sql.caseSensitive        true
 
 
 -- !query 22

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
index c64520f..c0930bb 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out
@@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false
 -- !query 17 schema
 struct<key:string,value:string>
 -- !query 17 output
-spark.sql.groupByOrdinal
+spark.sql.groupByOrdinal       false
 
 
 -- !query 18

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out 
b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
index 03a4e72..cc47cc6 100644
--- a/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/order-by-ordinal.sql.out
@@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false
 -- !query 9 schema
 struct<key:string,value:string>
 -- !query 9 output
-spark.sql.orderByOrdinal
+spark.sql.orderByOrdinal       false
 
 
 -- !query 10

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
index cc50b94..5db3bae 100644
--- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true
 -- !query 5 schema
 struct<key:string,value:string>
 -- !query 5 output
-spark.sql.crossJoin.enabled
+spark.sql.crossJoin.enabled    true
 
 
 -- !query 6
@@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false
 -- !query 7 schema
 struct<key:string,value:string>
 -- !query 7 output
-spark.sql.crossJoin.enabled
+spark.sql.crossJoin.enabled    false

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0511ed/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 40d0ce0..03cdfcc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql
 import java.io.File
 import java.math.MathContext
 import java.sql.Timestamp
+import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.spark.{AccumulatorSuite, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
CartesianProductExec, SortMergeJoinExec}
@@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     checkAnswer(sql(badQuery), Row(1) :: Nil)
   }
 
+  test("SPARK-19650: An action on a Command should not trigger a Spark job") {
+    // Create a listener that checks if new jobs have started.
+    val jobStarted = new AtomicBoolean(false)
+    val listener = new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        jobStarted.set(true)
+      }
+    }
+
+    // Make sure no spurious job starts are pending in the listener bus.
+    sparkContext.listenerBus.waitUntilEmpty(500)
+    sparkContext.addSparkListener(listener)
+    try {
+      // Execute the command.
+      sql("show databases").head()
+
+      // Make sure we have seen all events triggered by DataFrame.show()
+      sparkContext.listenerBus.waitUntilEmpty(500)
+    } finally {
+      sparkContext.removeSparkListener(listener)
+    }
+    assert(!jobStarted.get(), "Command should not trigger a Spark job.")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to