This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 38f9580 [SPARK-26327][SQL][BACKPORT-2.2] Bug fix for
`FileSourceScanExec` metrics update
38f9580 is described below
commit 38f95801d2ff3a2894bb6d53db1b3b77dfd752aa
Author: Yuanjian Li <[email protected]>
AuthorDate: Fri Dec 14 13:09:23 2018 -0800
[SPARK-26327][SQL][BACKPORT-2.2] Bug fix for `FileSourceScanExec` metrics
update
## What changes were proposed in this pull request?
Backport #23277 to branch 2.2 without the metrics renaming.
## How was this patch tested?
New test case in `SQLMetricsSuite`.
Closes #23300 from xuanyuanking/SPARK-26327-2.2.
Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/DataSourceScanExec.scala | 26 +++++++++++++++-------
.../sql/execution/metric/SQLMetricsSuite.scala | 15 +++++++++++++
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 6fb41b6..1f6a7c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -170,19 +170,14 @@ case class FileSourceScanExec(
false
}
+ private var metadataTime = 0L
+
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs =
relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTakenMs = ((System.nanoTime() - startTime) +
optimizerMetadataTimeNs) / 1000 / 1000
-
- metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
- metrics("metadataTime").add(timeTakenMs)
-
- val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
- metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+ metadataTime = timeTakenMs
ret
}
@@ -281,6 +276,8 @@ case class FileSourceScanExec(
}
private lazy val inputRDD: RDD[InternalRow] = {
+ // Update metrics for taking effect in both code generation node and
normal node.
+ updateDriverMetrics()
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -514,6 +511,19 @@ case class FileSourceScanExec(
}
}
+ /**
+ * Send the updated metrics to driver, while this function calling,
selectedPartitions has
+ * been initialized. See SPARK-26327 for more detail.
+ */
+ private def updateDriverMetrics() = {
+ metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+ metrics("metadataTime").add(metadataTime)
+
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics("numFiles") :: metrics("metadataTime") :: Nil)
+ }
+
override lazy val canonicalized: FileSourceScanExec = {
FileSourceScanExec(
relation,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 79d1fbf..26b822f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -372,6 +372,21 @@ class SQLMetricsSuite extends SparkFunSuite with
SharedSQLContext {
assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) ::
(0L, 300L, 0L) :: Nil)
}
}
+
+ test("SPARK-26327: FileSourceScanExec metrics") {
+ withTable("testDataForScan") {
+ spark.range(10).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").saveAsTable("testDataForScan")
+ // The execution plan only has 1 FileScan node.
+ val df = spark.sql(
+ "SELECT * FROM testDataForScan WHERE p = 1")
+ testSparkPlanMetrics(df, 1, Map(
+ 0L -> (("Scan parquet default.testdataforscan", Map(
+ "number of output rows" -> 3L,
+ "number of files" -> 2L))))
+ )
+ }
+ }
}
object InputOutputMetricsHelper {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]