Repository: spark
Updated Branches:
  refs/heads/master 52e00f706 -> 2ac895be9


[SPARK-23247][SQL] combines Unsafe operations and statistics operations in Scan 
Data Source

## What changes were proposed in this pull request?

Currently, we scan the execution plan of the data source, first the unsafe 
operation of each row of data, and then re traverse the data for the count of 
rows. In terms of performance, this is not necessary. this PR combines the two 
operations and makes statistics on the number of rows while performing the 
unsafe operation.

Before modified,

```
val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
 proj.initialize(index)
iter.map(proj)
}

val numOutputRows = longMetric("numOutputRows")
unsafeRow.map { r =>
numOutputRows += 1
 r
}
```
After modified,

    val numOutputRows = longMetric("numOutputRows")

    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
      val proj = UnsafeProjection.create(schema)
      proj.initialize(index)
      iter.map( r => {
        numOutputRows += 1
        proj(r)
      })
    }

## How was this patch tested?

the existed test cases.

Author: caoxuewen <[email protected]>

Closes #20415 from heary-cao/DataSourceScanExec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ac895be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ac895be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ac895be

Branch: refs/heads/master
Commit: 2ac895be909de7e58e1051dc2a1bba98a25bf4be
Parents: 52e00f7
Author: caoxuewen <[email protected]>
Authored: Thu Feb 1 12:05:12 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Feb 1 12:05:12 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 45 ++++++++++----------
 1 file changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ac895be/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7732e2..ba1157d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -90,16 +90,15 @@ case class RowDataSourceScanExec(
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
+    val numOutputRows = longMetric("numOutputRows")
+
+    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
       val proj = UnsafeProjection.create(schema)
       proj.initialize(index)
-      iter.map(proj)
-    }
-
-    val numOutputRows = longMetric("numOutputRows")
-    unsafeRow.map { r =>
-      numOutputRows += 1
-      r
+      iter.map( r => {
+        numOutputRows += 1
+        proj(r)
+      })
     }
   }
 
@@ -326,22 +325,22 @@ case class FileSourceScanExec(
       // 2) the number of columns should be smaller than 
spark.sql.codegen.maxFields
       WholeStageCodegenExec(this)(codegenStageId = 0).execute()
     } else {
-      val unsafeRows = {
-        val scan = inputRDD
-        if (needsUnsafeRowConversion) {
-          scan.mapPartitionsWithIndexInternal { (index, iter) =>
-            val proj = UnsafeProjection.create(schema)
-            proj.initialize(index)
-            iter.map(proj)
-          }
-        } else {
-          scan
-        }
-      }
       val numOutputRows = longMetric("numOutputRows")
-      unsafeRows.map { r =>
-        numOutputRows += 1
-        r
+
+      if (needsUnsafeRowConversion) {
+        inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+          val proj = UnsafeProjection.create(schema)
+          proj.initialize(index)
+          iter.map( r => {
+            numOutputRows += 1
+            proj(r)
+          })
+        }
+      } else {
+        inputRDD.map { r =>
+          numOutputRows += 1
+          r
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to