Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4059454f9 -> d68198d26


[SPARK-23223][SQL] Make stacking dataset transforms more performant

## What changes were proposed in this pull request?
It is a common pattern to apply multiple transforms to a `Dataset` (using 
`Dataset.withColumn` for example. This is currently quite expensive because we 
run `CheckAnalysis` on the full plan and create an encoder for each 
intermediate `Dataset`.

This PR extends the usage of the `AnalysisBarrier` to include `CheckAnalysis`. 
By doing this we hide the already analyzed plan  from `CheckAnalysis` because 
barrier is a `LeafNode`. The `AnalysisBarrier` is in the `FinishAnalysis` phase 
of the optimizer.

We also make binding the `Dataset` encoder lazy. The bound encoder is only 
needed when we materialize the dataset.

## How was this patch tested?
Existing test should cover this.

Author: Herman van Hovell <[email protected]>

Closes #20402 from hvanhovell/SPARK-23223.

(cherry picked from commit 2d903cf9d3a827e54217dfc9f1e4be99d8204387)
Signed-off-by: Herman van Hovell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d68198d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d68198d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d68198d2

Branch: refs/heads/branch-2.3
Commit: d68198d26e32ce98cbf0d3f8755d21dc72b3756d
Parents: 4059454
Author: Herman van Hovell <[email protected]>
Authored: Mon Jan 29 09:00:54 2018 -0800
Committer: Herman van Hovell <[email protected]>
Committed: Mon Jan 29 09:01:13 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala      | 16 ++++++++++++++--
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala |  1 +
 .../spark/sql/catalyst/analysis/AnalysisTest.scala  |  3 +--
 .../main/scala/org/apache/spark/sql/Dataset.scala   |  8 ++++++--
 .../apache/spark/sql/execution/QueryExecution.scala | 16 ++--------------
 .../org/apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 6 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d68198d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2b14c82..91cb036 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -98,6 +98,19 @@ class Analyzer(
     this(catalog, conf, conf.optimizerMaxIterations)
   }
 
+  def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
+    val analyzed = execute(plan)
+    try {
+      checkAnalysis(analyzed)
+      EliminateBarriers(analyzed)
+    } catch {
+      case e: AnalysisException =>
+        val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Option(analyzed))
+        ae.setStackTrace(e.getStackTrace)
+        throw ae
+    }
+  }
+
   override def execute(plan: LogicalPlan): LogicalPlan = {
     AnalysisContext.reset()
     try {
@@ -178,8 +191,7 @@ class Analyzer(
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
-      CleanupAliases,
-      EliminateBarriers)
+      CleanupAliases)
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d68198d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index bbcec56..0d189b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -348,6 +348,7 @@ trait CheckAnalysis extends PredicateHelper {
     }
     extendedCheckRules.foreach(_(plan))
     plan.foreachUp {
+      case AnalysisBarrier(child) if !child.resolved => checkAnalysis(child)
       case o if !o.resolved => failAnalysis(s"unresolved operator 
${o.simpleString}")
       case _ =>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d68198d2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 549a435..3d7c918 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -54,8 +54,7 @@ trait AnalysisTest extends PlanTest {
       expectedPlan: LogicalPlan,
       caseSensitive: Boolean = true): Unit = {
     val analyzer = getAnalyzer(caseSensitive)
-    val actualPlan = analyzer.execute(inputPlan)
-    analyzer.checkAnalysis(actualPlan)
+    val actualPlan = analyzer.executeAndCheck(inputPlan)
     comparePlans(actualPlan, expectedPlan)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d68198d2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index edb6644..cc5b647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -62,7 +62,11 @@ import org.apache.spark.util.Utils
 
 private[sql] object Dataset {
   def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): 
Dataset[T] = {
-    new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
+    val dataset = new Dataset(sparkSession, logicalPlan, 
implicitly[Encoder[T]])
+    // Eagerly bind the encoder so we verify that the encoder matches the 
underlying
+    // schema. The user will get an error if this is not the case.
+    dataset.deserializer
+    dataset
   }
 
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame 
= {
@@ -204,7 +208,7 @@ class Dataset[T] private[sql](
 
   // The deserializer expression which can be used to build a projection and 
turn rows to objects
   // of type T, after collecting rows to the driver side.
-  private val deserializer =
+  private lazy val deserializer =
     exprEnc.resolveAndBind(logicalPlan.output, 
sparkSession.sessionState.analyzer).deserializer
 
   private implicit def classTag = exprEnc.clsTag

http://git-wip-us.apache.org/repos/asf/spark/blob/d68198d2/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 8bfe3ef..7cae24b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -44,19 +44,7 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
   // TODO: Move the planner an optimizer into here from SessionState.
   protected def planner = sparkSession.sessionState.planner
 
-  def assertAnalyzed(): Unit = {
-    // Analyzer is invoked outside the try block to avoid calling it again 
from within the
-    // catch block below.
-    analyzed
-    try {
-      sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
-    } catch {
-      case e: AnalysisException =>
-        val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Option(analyzed))
-        ae.setStackTrace(e.getStackTrace)
-        throw ae
-    }
-  }
+  def assertAnalyzed(): Unit = analyzed
 
   def assertSupported(): Unit = {
     if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
@@ -66,7 +54,7 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
 
   lazy val analyzed: LogicalPlan = {
     SparkSession.setActiveSession(sparkSession)
-    sparkSession.sessionState.analyzer.execute(logical)
+    sparkSession.sessionState.analyzer.executeAndCheck(logical)
   }
 
   lazy val withCachedData: LogicalPlan = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d68198d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 7287e20..59708e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -575,7 +575,7 @@ private[hive] class TestHiveQueryExecution(
     logDebug(s"Query references test tables: 
${referencedTestTables.mkString(", ")}")
     referencedTestTables.foreach(sparkSession.loadTestTable)
     // Proceed with analysis.
-    sparkSession.sessionState.analyzer.execute(logical)
+    sparkSession.sessionState.analyzer.executeAndCheck(logical)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to