Repository: spark
Updated Branches:
  refs/heads/master 30bdb5cbd -> 02d9c352c


[SPARK-14092] [SQL] move shouldStop() to end of while loop

## What changes were proposed in this pull request?

This PR rollback some changes in #11274 , which introduced some performance 
regression when do a simple aggregation on parquet scan with one integer column.

Does not really understand how this change introduce this huge impact, maybe 
related show JIT compiler inline functions. (saw very different stats from 
profiling).

## How was this patch tested?

Manually run the parquet reader benchmark, before this change:
```
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
Int and String Scan:                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized                   2391 / 3107         43.9          22.8 
      1.0X
```
After this change
```
Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
Int and String Scan:                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized                   2032 / 2626         51.6          19.4 
      1.0X```

Author: Davies Liu <[email protected]>

Closes #11912 from davies/fix_regression.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02d9c352
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02d9c352
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02d9c352

Branch: refs/heads/master
Commit: 02d9c352c72a16725322678ef174c5c6e9f2c617
Parents: 30bdb5c
Author: Davies Liu <[email protected]>
Authored: Wed Mar 23 11:58:43 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Wed Mar 23 11:58:43 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/execution/ExistingRDD.scala   | 8 +++++---
 .../org/apache/spark/sql/execution/WholeStageCodegen.scala   | 8 +++++---
 .../org/apache/spark/sql/execution/basicOperators.scala      | 3 ++-
 3 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02d9c352/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index b4348d3..3e2c799 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -255,11 +255,12 @@ private[sql] case class DataSourceScan(
       |       $numOutputRows.add(numRows);
       |     }
       |
-      |     while (!shouldStop() && $idx < numRows) {
+      |     // this loop is very perf sensitive and changes to it should be 
measured carefully
+      |     while ($idx < numRows) {
       |       int $rowidx = $idx++;
       |       ${consume(ctx, columns1).trim}
+      |       if (shouldStop()) return;
       |     }
-      |     if (shouldStop()) return;
       |
       |     if (!$input.hasNext()) {
       |       $batch = null;
@@ -280,7 +281,7 @@ private[sql] case class DataSourceScan(
       s"""
        | private void $scanRows(InternalRow $row) throws java.io.IOException {
        |   boolean firstRow = true;
-       |   while (!shouldStop() && (firstRow || $input.hasNext())) {
+       |   while (firstRow || $input.hasNext()) {
        |     if (firstRow) {
        |       firstRow = false;
        |     } else {
@@ -288,6 +289,7 @@ private[sql] case class DataSourceScan(
        |     }
        |     $numOutputRows.add(1);
        |     ${consume(ctx, columns2, inputRow).trim}
+       |     if (shouldStop()) return;
        |   }
        | }""".stripMargin)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/02d9c352/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 5634e5f..0be0b80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -103,11 +103,12 @@ trait CodegenSupport extends SparkPlan {
     *     # call child.produce()
     *     initialized = true;
     *   }
-    *   while (!shouldStop() && hashmap.hasNext()) {
+    *   while (hashmap.hasNext()) {
     *     row = hashmap.next();
     *     # build the aggregation results
     *     # create variables for results
     *     # call consume(), which will call parent.doConsume()
+   *      if (shouldStop()) return;
     *   }
     */
   protected def doProduce(ctx: CodegenContext): String
@@ -251,9 +252,10 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryNode with CodegenSupport
     ctx.currentVars = null
     val columns = exprs.map(_.gen(ctx))
     s"""
-       | while (!shouldStop() && $input.hasNext()) {
+       | while ($input.hasNext()) {
        |   InternalRow $row = (InternalRow) $input.next();
        |   ${consume(ctx, columns, row).trim}
+       |   if (shouldStop()) return;
        | }
      """.stripMargin
   }
@@ -320,7 +322,7 @@ case class WholeStageCodegen(child: SparkPlan) extends 
UnaryNode with CodegenSup
       /** Codegened pipeline for:
         * ${toCommentSafeString(child.treeString.trim)}
         */
-      class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
+      final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
 
         private Object[] references;
         ${ctx.declareMutableStates()}

http://git-wip-us.apache.org/repos/asf/spark/blob/02d9c352/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 6e2a5aa..ee3f1d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -282,13 +282,14 @@ case class Range(
       |   }
       | }
       |
-      | while (!$overflow && $checkEnd && !shouldStop()) {
+      | while (!$overflow && $checkEnd) {
       |  long $value = $number;
       |  $number += ${step}L;
       |  if ($number < $value ^ ${step}L < 0) {
       |    $overflow = true;
       |  }
       |  ${consume(ctx, Seq(ev))}
+      |  if (shouldStop()) return;
       | }
      """.stripMargin
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to