Repository: spark Updated Branches: refs/heads/master 25bef7e69 -> 28128150e
SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta... ...ntiation Author: Sandy Ryza <[email protected]> Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits: 8d2e70e [Sandy Ryza] Kostas's review feedback e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader instantiation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28128150 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28128150 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28128150 Branch: refs/heads/master Commit: 28128150e7e0c2b7d1c483e67214bdaef59f7d75 Parents: 25bef7e Author: Sandy Ryza <[email protected]> Authored: Mon Nov 3 15:19:01 2014 -0800 Committer: Patrick Wendell <[email protected]> Committed: Mon Nov 3 15:19:01 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 25 ++++++++++-------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 26 ++++++++++--------- .../spark/metrics/InputMetricsSuite.scala | 27 ++++++++++++++++++-- 3 files changed, 53 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 946fb56..a157e36 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -211,20 +211,11 @@ class HadoopRDD[K, V]( val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) - var reader: RecordReader[K, V] = null val jobConf = getJobConf() - val inputFormat = getInputFormat(jobConf) - HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), - context.stageId, theSplit.index, context.attemptId.toInt, jobConf) - reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) - - // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context => closeIfNeeded() } - val key: K = reader.createKey() - val value: V = reader.createValue() val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - // Find a function that will return the FileSystem bytes read by this thread. + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf) @@ -234,6 +225,18 @@ class HadoopRDD[K, V]( if (bytesReadCallback.isDefined) { context.taskMetrics.inputMetrics = Some(inputMetrics) } + + var reader: RecordReader[K, V] = null + val inputFormat = getInputFormat(jobConf) + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), + context.stageId, theSplit.index, context.attemptId.toInt, jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener{ context => closeIfNeeded() } + val key: K = reader.createKey() + val value: V = reader.createValue() + var recordsSinceMetricsUpdate = 0 override def getNext() = { http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6d6b867..351e145 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -107,20 +107,10 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) - val format = inputFormatClass.newInstance - format match { - case configurable: Configurable => - configurable.setConf(conf) - case _ => - } - val reader = format.createRecordReader( - split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - // Find a function that will return the FileSystem bytes read by this thread. + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf) @@ -131,6 +121,18 @@ class NewHadoopRDD[K, V]( context.taskMetrics.inputMetrics = Some(inputMetrics) } + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) + val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) + val format = inputFormatClass.newInstance + format match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val reader = format.createRecordReader( + split.serializableHadoopSplit.value, hadoopAttemptContext) + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => close()) var havePair = false http://git-wip-us.apache.org/repos/asf/spark/blob/28128150/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala index 33bd1af..48c386b 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer import java.io.{FileWriter, PrintWriter, File} class InputMetricsSuite extends FunSuite with SharedSparkContext { - test("input metrics when reading text file") { + test("input metrics when reading text file with single split") { val file = new File(getClass.getSimpleName + ".txt") val pw = new PrintWriter(new FileWriter(file)) pw.println("some stuff") @@ -48,6 +48,29 @@ class InputMetricsSuite extends FunSuite with SharedSparkContext { // Wait for task end events to come in sc.listenerBus.waitUntilEmpty(500) assert(taskBytesRead.length == 2) - assert(taskBytesRead.sum == file.length()) + assert(taskBytesRead.sum >= file.length()) + } + + test("input metrics when reading text file with multiple splits") { + val file = new File(getClass.getSimpleName + ".txt") + val pw = new PrintWriter(new FileWriter(file)) + for (i <- 0 until 10000) { + pw.println("some stuff") + } + pw.close() + file.deleteOnExit() + + val taskBytesRead = new ArrayBuffer[Long]() + sc.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead + } + }) + sc.textFile("file://" + file.getAbsolutePath, 2).count() + + // Wait for task end events to come in + sc.listenerBus.waitUntilEmpty(500) + assert(taskBytesRead.length == 2) + assert(taskBytesRead.sum >= file.length()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
