Repository: spark
Updated Branches:
  refs/heads/master 25bef7e69 -> 28128150e


SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...

...ntiation

Author: Sandy Ryza <[email protected]>

Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits:

8d2e70e [Sandy Ryza] Kostas's review feedback
e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in 
RecordReader instantiation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28128150
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28128150
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28128150

Branch: refs/heads/master
Commit: 28128150e7e0c2b7d1c483e67214bdaef59f7d75
Parents: 25bef7e
Author: Sandy Ryza <[email protected]>
Authored: Mon Nov 3 15:19:01 2014 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Mon Nov 3 15:19:01 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 25 ++++++++++--------
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 26 ++++++++++---------
 .../spark/metrics/InputMetricsSuite.scala       | 27 ++++++++++++++++++--
 3 files changed, 53 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 946fb56..a157e36 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -211,20 +211,11 @@ class HadoopRDD[K, V](
 
       val split = theSplit.asInstanceOf[HadoopPartition]
       logInfo("Input split: " + split.inputSplit)
-      var reader: RecordReader[K, V] = null
       val jobConf = getJobConf()
-      val inputFormat = getInputFormat(jobConf)
-      HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat("yyyyMMddHHmm").format(createTime),
-        context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
-      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
-
-      // Register an on-task-completion callback to close the input stream.
-      context.addTaskCompletionListener{ context => closeIfNeeded() }
-      val key: K = reader.createKey()
-      val value: V = reader.createValue()
 
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-      // Find a function that will return the FileSystem bytes read by this 
thread.
+      // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
+      // creating RecordReader, because RecordReader's constructor might read 
some bytes
       val bytesReadCallback = if 
(split.inputSplit.value.isInstanceOf[FileSplit]) {
         SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
           split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
@@ -234,6 +225,18 @@ class HadoopRDD[K, V](
       if (bytesReadCallback.isDefined) {
         context.taskMetrics.inputMetrics = Some(inputMetrics)
       }
+
+      var reader: RecordReader[K, V] = null
+      val inputFormat = getInputFormat(jobConf)
+      HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat("yyyyMMddHHmm").format(createTime),
+        context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
+      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
+
+      // Register an on-task-completion callback to close the input stream.
+      context.addTaskCompletionListener{ context => closeIfNeeded() }
+      val key: K = reader.createKey()
+      val value: V = reader.createValue()
+
       var recordsSinceMetricsUpdate = 0
 
       override def getNext() = {

http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6d6b867..351e145 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -107,20 +107,10 @@ class NewHadoopRDD[K, V](
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
       val conf = confBroadcast.value.value
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
-      val format = inputFormatClass.newInstance
-      format match {
-        case configurable: Configurable =>
-          configurable.setConf(conf)
-        case _ =>
-      }
-      val reader = format.createRecordReader(
-        split.serializableHadoopSplit.value, hadoopAttemptContext)
-      reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
 
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
-      // Find a function that will return the FileSystem bytes read by this 
thread.
+      // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
+      // creating RecordReader, because RecordReader's constructor might read 
some bytes
       val bytesReadCallback = if 
(split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
         SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
           split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, 
conf)
@@ -131,6 +121,18 @@ class NewHadoopRDD[K, V](
         context.taskMetrics.inputMetrics = Some(inputMetrics)
       }
 
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
+      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+      val format = inputFormatClass.newInstance
+      format match {
+        case configurable: Configurable =>
+          configurable.setConf(conf)
+        case _ =>
+      }
+      val reader = format.createRecordReader(
+        split.serializableHadoopSplit.value, hadoopAttemptContext)
+      reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
+
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener(context => close())
       var havePair = false

http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
index 33bd1af..48c386b 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
@@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
 import java.io.{FileWriter, PrintWriter, File}
 
 class InputMetricsSuite extends FunSuite with SharedSparkContext {
-  test("input metrics when reading text file") {
+  test("input metrics when reading text file with single split") {
     val file = new File(getClass.getSimpleName + ".txt")
     val pw = new PrintWriter(new FileWriter(file))
     pw.println("some stuff")
@@ -48,6 +48,29 @@ class InputMetricsSuite extends FunSuite with 
SharedSparkContext {
     // Wait for task end events to come in
     sc.listenerBus.waitUntilEmpty(500)
     assert(taskBytesRead.length == 2)
-    assert(taskBytesRead.sum == file.length())
+    assert(taskBytesRead.sum >= file.length())
+  }
+
+  test("input metrics when reading text file with multiple splits") {
+    val file = new File(getClass.getSimpleName + ".txt")
+    val pw = new PrintWriter(new FileWriter(file))
+    for (i <- 0 until 10000) {
+      pw.println("some stuff")
+    }
+    pw.close()
+    file.deleteOnExit()
+
+    val taskBytesRead = new ArrayBuffer[Long]()
+    sc.addSparkListener(new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+      }
+    })
+    sc.textFile("file://" + file.getAbsolutePath, 2).count()
+
+    // Wait for task end events to come in
+    sc.listenerBus.waitUntilEmpty(500)
+    assert(taskBytesRead.length == 2)
+    assert(taskBytesRead.sum >= file.length())
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to