http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 9ab7d96..945830c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -375,26 +375,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.taskTime += info.duration stageData.numActiveTasks -= 1 - val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) = + val errorMessage: Option[String] = taskEnd.reason match { case org.apache.spark.Success => stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 - (None, taskEnd.taskMetrics.accumulatorUpdates()) + None case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 - (Some(e.toErrorString), e.accumUpdates) + Some(e.toErrorString) case e: TaskFailedReason => // All other failure cases stageData.numFailedTasks += 1 - (Some(e.toErrorString), Seq.empty[AccumulableInfo]) + Some(e.toErrorString) } - val taskMetrics = - if (accums.nonEmpty) { - Some(TaskMetrics.fromAccumulatorUpdates(accums)) - } else { - None - } + val taskMetrics = Option(taskEnd.taskMetrics) taskMetrics.foreach { m => val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics) updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) @@ -503,7 +498,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) val taskData = stageData.taskData.get(taskId) - val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates) + val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) taskData.foreach { t => if (!t.taskInfo.finished) { updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a613fbc..aeab71d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -840,7 +840,9 @@ private[spark] object JsonProtocol { // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) - .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates()) + .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => { + acc.toInfo(Some(acc.localValue), None) + })) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 6063476..5f97e58 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -28,17 +28,17 @@ import scala.util.control.NonFatal import org.scalatest.Matchers import org.scalatest.exceptions.TestFailedException -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.AccumulatorParam.{ListAccumulatorParam, StringAccumulatorParam} import org.apache.spark.scheduler._ import org.apache.spark.serializer.JavaSerializer class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { - import AccumulatorParam._ + import AccumulatorSuite.createLongAccum override def afterEach(): Unit = { try { - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterEach() } @@ -59,9 +59,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex } } + test("accumulator serialization") { + val ser = new JavaSerializer(new SparkConf).newInstance() + val acc = createLongAccum("x") + acc.add(5) + assert(acc.value == 5) + assert(acc.isAtDriverSide) + + // serialize and de-serialize it, to simulate sending accumulator to executor. + val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc)) + // value is reset on the executors + assert(acc2.localValue == 0) + assert(!acc2.isAtDriverSide) + + acc2.add(10) + // serialize and de-serialize it again, to simulate sending accumulator back to driver. + val acc3 = ser.deserialize[LongAccumulator](ser.serialize(acc2)) + // value is not reset on the driver + assert(acc3.value == 10) + assert(acc3.isAtDriverSide) + } + test ("basic accumulation") { sc = new SparkContext("local", "test") - val acc : Accumulator[Int] = sc.accumulator(0) + val acc: Accumulator[Int] = sc.accumulator(0) val d = sc.parallelize(1 to 20) d.foreach{x => acc += x} @@ -75,7 +96,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex test("value not assignable from tasks") { sc = new SparkContext("local", "test") - val acc : Accumulator[Int] = sc.accumulator(0) + val acc: Accumulator[Int] = sc.accumulator(0) val d = sc.parallelize(1 to 20) an [Exception] should be thrownBy {d.foreach{x => acc.value = x}} @@ -169,14 +190,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) - Accumulators.remove(accId) - assert(!Accumulators.originals.get(accId).isDefined) + AccumulatorContext.remove(accId) + assert(!AccumulatorContext.originals.containsKey(accId)) } test("get accum") { - sc = new SparkContext("local", "test") // Don't register with SparkContext for cleanup - var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true) + var acc = createLongAccum("a") val accId = acc.id val ref = WeakReference(acc) assert(ref.get.isDefined) @@ -188,44 +208,16 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex // Getting a garbage collected accum should throw error intercept[IllegalAccessError] { - Accumulators.get(accId) + AccumulatorContext.get(accId) } // Getting a normal accumulator. Note: this has to be separate because referencing an // accumulator above in an `assert` would keep it from being garbage collected. - val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true) - assert(Accumulators.get(acc2.id) === Some(acc2)) + val acc2 = createLongAccum("b") + assert(AccumulatorContext.get(acc2.id) === Some(acc2)) // Getting an accumulator that does not exist should return None - assert(Accumulators.get(100000).isEmpty) - } - - test("copy") { - val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false) - val acc2 = acc1.copy() - assert(acc1.id === acc2.id) - assert(acc1.value === acc2.value) - assert(acc1.name === acc2.name) - assert(acc1.countFailedValues === acc2.countFailedValues) - assert(acc1 !== acc2) - // Modifying one does not affect the other - acc1.add(44L) - assert(acc1.value === 500L) - assert(acc2.value === 456L) - acc2.add(144L) - assert(acc1.value === 500L) - assert(acc2.value === 600L) - } - - test("register multiple accums with same ID") { - val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true) - // `copy` will create a new Accumulable and register it. - val acc2 = acc1.copy() - assert(acc1 !== acc2) - assert(acc1.id === acc2.id) - // The second one does not override the first one - assert(Accumulators.originals.size === 1) - assert(Accumulators.get(acc1.id) === Some(acc1)) + assert(AccumulatorContext.get(100000).isEmpty) } test("string accumulator param") { @@ -257,38 +249,33 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.setValue(Seq(9, 10)) assert(acc.value === Seq(9, 10)) } - - test("value is reset on the executors") { - val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing")) - val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2")) - val externalAccums = Seq(acc1, acc2) - val taskMetrics = new TaskMetrics - // Set some values; these should not be observed later on the "executors" - acc1.setValue(10) - acc2.setValue(20L) - taskMetrics.testAccum.get.setValue(30L) - // Simulate the task being serialized and sent to the executors. - val dummyTask = new DummyTask(taskMetrics, externalAccums) - val serInstance = new JavaSerializer(new SparkConf).newInstance() - val taskSer = Task.serializeWithDependencies( - dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance) - // Now we're on the executors. - // Deserialize the task and assert that its accumulators are zero'ed out. - val (_, _, _, taskBytes) = Task.deserializeWithDependencies(taskSer) - val taskDeser = serInstance.deserialize[DummyTask]( - taskBytes, Thread.currentThread.getContextClassLoader) - // Assert that executors see only zeros - taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) } - taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) } - } - } private[spark] object AccumulatorSuite { - import InternalAccumulator._ /** + * Create a long accumulator and register it to [[AccumulatorContext]]. + */ + def createLongAccum( + name: String, + countFailedValues: Boolean = false, + initValue: Long = 0, + id: Long = AccumulatorContext.newId()): LongAccumulator = { + val acc = new LongAccumulator + acc.setValue(initValue) + acc.metadata = AccumulatorMetadata(id, Some(name), countFailedValues) + AccumulatorContext.register(acc) + acc + } + + /** + * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the + * info as an accumulator update. + */ + def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) + + /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory * accumulator is updated afterwards. */ @@ -340,7 +327,6 @@ private class SaveInfoListener extends SparkListener { if (jobCompletionCallback != null) { jobCompletionSem.acquire() if (exception != null) { - exception = null throw exception } } @@ -377,13 +363,3 @@ private class SaveInfoListener extends SparkListener { (taskEnd.stageId, taskEnd.stageAttemptId), new ArrayBuffer[TaskInfo]) += taskEnd.taskInfo } } - - -/** - * A dummy [[Task]] that contains internal and external [[Accumulator]]s. - */ -private[spark] class DummyTask( - metrics: TaskMetrics, - val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) { - override def runTask(c: TaskContext): Int = 1 -} http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 4d2b3e7..1adc90a 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -211,10 +211,10 @@ class HeartbeatReceiverSuite private def triggerHeartbeat( executorId: String, executorShouldReregister: Boolean): Unit = { - val metrics = new TaskMetrics + val metrics = TaskMetrics.empty val blockManagerId = BlockManagerId(executorId, "localhost", 12345) val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( - Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId)) + Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId)) if (executorShouldReregister) { assert(response.reregisterBlockManager) } else { @@ -222,7 +222,7 @@ class HeartbeatReceiverSuite // Additionally verify that the scheduler callback is called with the correct parameters verify(scheduler).executorHeartbeatReceived( Matchers.eq(executorId), - Matchers.eq(Array(1L -> metrics.accumulatorUpdates())), + Matchers.eq(Array(1L -> metrics.accumulators())), Matchers.eq(blockManagerId)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index b074b95..e4474bb 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics @@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { override def afterEach(): Unit = { try { - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterEach() } @@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { test("internal accumulators in TaskContext") { val taskContext = TaskContext.empty() - val accumUpdates = taskContext.taskMetrics.accumulatorUpdates() + val accumUpdates = taskContext.taskMetrics.accumulators() assert(accumUpdates.size > 0) - assert(accumUpdates.forall(_.internal)) val testAccum = taskContext.taskMetrics.testAccum.get assert(accumUpdates.exists(_.id == testAccum.id)) } @@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { sc.addSparkListener(listener) // Have each task add 1 to the internal accumulator val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 1 + TaskContext.get().taskMetrics().testAccum.get.add(1) iter } // Register asserts in job completion callback to avoid flakiness @@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.parallelize(1 to 100, numPartitions) .map { i => (i, i) } .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 1 + TaskContext.get().taskMetrics().testAccum.get.add(1) iter } .reduceByKey { case (x, y) => x + y } .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 10 + TaskContext.get().taskMetrics().testAccum.get.add(10) iter } .repartition(numPartitions * 2) .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 100 + TaskContext.get().taskMetrics().testAccum.get.add(100) iter } // Register asserts in job completion callback to avoid flakiness @@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { // This should retry both stages in the scheduler. Note that we only want to fail the // first stage attempt because we want the stage to eventually succeed. val x = sc.parallelize(1 to 100, numPartitions) - .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter } + .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter } .groupBy(identity) val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId val rdd = x.mapPartitionsWithIndex { case (i, iter) => @@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private val myCleaner = new SaveAccumContextCleaner(this) override def cleaner: Option[ContextCleaner] = Some(myCleaner) } - assert(Accumulators.originals.isEmpty) + assert(AccumulatorContext.originals.isEmpty) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() val numInternalAccums = TaskMetrics.empty.internalAccums.length // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage - assert(Accumulators.originals.size === numInternalAccums * 2) + assert(AccumulatorContext.originals.size === numInternalAccums * 2) val accumsRegistered = sc.cleaner match { case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup case _ => Seq.empty[Long] } // Make sure the same set of accumulators is registered for cleanup assert(accumsRegistered.size === numInternalAccums * 2) - assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet) + assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala) } /** @@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { private val accumsRegistered = new ArrayBuffer[Long] - override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = { + override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { accumsRegistered += a.id super.registerAccumulatorForCleanup(a) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/SparkFunSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 3228752..4aae2c9 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -34,7 +34,7 @@ private[spark] abstract class SparkFunSuite protected override def afterAll(): Unit = { try { // Avoid leaking map entries in tests that use accumulators without SparkContext - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index ee70419..94f6e1a 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -20,14 +20,11 @@ package org.apache.spark.executor import org.scalatest.Assertions import org.apache.spark._ -import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId} +import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId} class TaskMetricsSuite extends SparkFunSuite { - import AccumulatorParam._ import StorageLevel._ - import TaskMetricsSuite._ test("mutating values") { val tm = new TaskMetrics @@ -59,8 +56,8 @@ class TaskMetricsSuite extends SparkFunSuite { tm.incPeakExecutionMemory(8L) val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L)) val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L)) - tm.incUpdatedBlockStatuses(Seq(block1)) - tm.incUpdatedBlockStatuses(Seq(block2)) + tm.incUpdatedBlockStatuses(block1) + tm.incUpdatedBlockStatuses(block2) // assert new values exist assert(tm.executorDeserializeTime == 1L) assert(tm.executorRunTime == 2L) @@ -194,18 +191,19 @@ class TaskMetricsSuite extends SparkFunSuite { } test("additional accumulables") { - val tm = new TaskMetrics - val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a")) - val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b")) - val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c")) - val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true) + val tm = TaskMetrics.empty + val acc1 = AccumulatorSuite.createLongAccum("a") + val acc2 = AccumulatorSuite.createLongAccum("b") + val acc3 = AccumulatorSuite.createLongAccum("c") + val acc4 = AccumulatorSuite.createLongAccum("d", true) tm.registerAccumulator(acc1) tm.registerAccumulator(acc2) tm.registerAccumulator(acc3) tm.registerAccumulator(acc4) - acc1 += 1 - acc2 += 2 - val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap + acc1.add(1) + acc2.add(2) + val newUpdates = tm.accumulators() + .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap assert(newUpdates.contains(acc1.id)) assert(newUpdates.contains(acc2.id)) assert(newUpdates.contains(acc3.id)) @@ -214,46 +212,14 @@ class TaskMetricsSuite extends SparkFunSuite { assert(newUpdates(acc2.id).name === Some("b")) assert(newUpdates(acc3.id).name === Some("c")) assert(newUpdates(acc4.id).name === Some("d")) - assert(newUpdates(acc1.id).update === Some(1)) - assert(newUpdates(acc2.id).update === Some(2)) - assert(newUpdates(acc3.id).update === Some(0)) - assert(newUpdates(acc4.id).update === Some(0)) + assert(newUpdates(acc1.id).value === 1) + assert(newUpdates(acc2.id).value === 2) + assert(newUpdates(acc3.id).value === 0) + assert(newUpdates(acc4.id).value === 0) assert(!newUpdates(acc3.id).countFailedValues) assert(newUpdates(acc4.id).countFailedValues) - assert(newUpdates.values.map(_.update).forall(_.isDefined)) - assert(newUpdates.values.map(_.value).forall(_.isEmpty)) assert(newUpdates.size === tm.internalAccums.size + 4) } - - test("from accumulator updates") { - val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a => - AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues) - } - val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) - assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1) - // Test this with additional accumulators to ensure that we do not crash when handling - // updates from unregistered accumulators. In practice, all accumulators created - // on the driver, internal or not, should be registered with `Accumulators` at some point. - val param = IntAccumulatorParam - val registeredAccums = Seq( - new Accumulator(0, param, Some("a"), countFailedValues = true), - new Accumulator(0, param, Some("b"), countFailedValues = false)) - val unregisteredAccums = Seq( - new Accumulator(0, param, Some("c"), countFailedValues = true), - new Accumulator(0, param, Some("d"), countFailedValues = false)) - registeredAccums.foreach(Accumulators.register) - registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id))) - unregisteredAccums.foreach(a => Accumulators.remove(a.id)) - unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id))) - // set some values in these accums - registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) } - unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) } - val registeredAccumInfos = registeredAccums.map(makeInfo) - val unregisteredAccumInfos = unregisteredAccums.map(makeInfo) - val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos - // Simply checking that this does not crash: - TaskMetrics.fromAccumulatorUpdates(accumUpdates2) - } } @@ -264,21 +230,14 @@ private[spark] object TaskMetricsSuite extends Assertions { * Note: this does NOT check accumulator ID equality. */ def assertUpdatesEquals( - updates1: Seq[AccumulableInfo], - updates2: Seq[AccumulableInfo]): Unit = { + updates1: Seq[NewAccumulator[_, _]], + updates2: Seq[NewAccumulator[_, _]]): Unit = { assert(updates1.size === updates2.size) - updates1.zip(updates2).foreach { case (info1, info2) => + updates1.zip(updates2).foreach { case (acc1, acc2) => // do not assert ID equals here - assert(info1.name === info2.name) - assert(info1.update === info2.update) - assert(info1.value === info2.value) - assert(info1.countFailedValues === info2.countFailedValues) + assert(acc1.name === acc2.name) + assert(acc1.countFailedValues === acc2.countFailedValues) + assert(acc1.value == acc2.value) } } - - /** - * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the - * info as an accumulator update. - */ - def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b76c0a4..9912d1f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def stop() = {} override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager @@ -277,8 +277,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou taskSet.tasks(i), result._1, result._2, - Seq(new AccumulableInfo( - accumId, Some(""), Some(1), None, internal = false, countFailedValues = false)))) + Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId)))) } } } @@ -484,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -997,10 +996,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // complete two tasks runEvent(makeCompletionEvent( taskSets(0).tasks(0), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0))) + Seq.empty, createFakeTaskInfoWithId(0))) runEvent(makeCompletionEvent( taskSets(0).tasks(1), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1))) + Seq.empty, createFakeTaskInfoWithId(1))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // verify stage exists assert(scheduler.stageIdToStage.contains(0)) @@ -1009,10 +1008,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // finish other 2 tasks runEvent(makeCompletionEvent( taskSets(0).tasks(2), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2))) + Seq.empty, createFakeTaskInfoWithId(2))) runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3))) + Seq.empty, createFakeTaskInfoWithId(3))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 4) @@ -1023,14 +1022,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // with a speculative task and make sure the event is sent out runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5))) + Seq.empty, createFakeTaskInfoWithId(5))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 5) // make sure non successful tasks also send out event runEvent(makeCompletionEvent( taskSets(0).tasks(3), UnknownReason, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6))) + Seq.empty, createFakeTaskInfoWithId(6))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 6) } @@ -1613,37 +1612,43 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou test("accumulator not calculated for resubmitted result stage") { // just for register - val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam) + val accum = AccumulatorSuite.createLongAccum("a") val finalRdd = new MyRDD(sc, 1, Nil) submit(finalRdd, Array(0)) completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) - val accVal = Accumulators.originals(accum.id).get.get.value - - assert(accVal === 1) - + assert(accum.value === 1) assertDataStructuresEmpty() } test("accumulators are updated on exception failures") { - val acc1 = sc.accumulator(0L, "ingenieur") - val acc2 = sc.accumulator(0L, "boulanger") - val acc3 = sc.accumulator(0L, "agriculteur") - assert(Accumulators.get(acc1.id).isDefined) - assert(Accumulators.get(acc2.id).isDefined) - assert(Accumulators.get(acc3.id).isDefined) - val accInfo1 = acc1.toInfo(Some(15L), None) - val accInfo2 = acc2.toInfo(Some(13L), None) - val accInfo3 = acc3.toInfo(Some(18L), None) - val accumUpdates = Seq(accInfo1, accInfo2, accInfo3) - val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates) + val acc1 = AccumulatorSuite.createLongAccum("ingenieur") + val acc2 = AccumulatorSuite.createLongAccum("boulanger") + val acc3 = AccumulatorSuite.createLongAccum("agriculteur") + assert(AccumulatorContext.get(acc1.id).isDefined) + assert(AccumulatorContext.get(acc2.id).isDefined) + assert(AccumulatorContext.get(acc3.id).isDefined) + val accUpdate1 = new LongAccumulator + accUpdate1.metadata = acc1.metadata + accUpdate1.setValue(15) + val accUpdate2 = new LongAccumulator + accUpdate2.metadata = acc2.metadata + accUpdate2.setValue(13) + val accUpdate3 = new LongAccumulator + accUpdate3.metadata = acc3.metadata + accUpdate3.setValue(18) + val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) + val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + val exceptionFailure = new ExceptionFailure( + new SparkException("fondue?"), + accumInfo).copy(accums = accumUpdates) submit(new MyRDD(sc, 1, Nil), Array(0)) runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) - assert(Accumulators.get(acc1.id).get.value === 15L) - assert(Accumulators.get(acc2.id).get.value === 13L) - assert(Accumulators.get(acc3.id).get.value === 18L) + assert(AccumulatorContext.get(acc1.id).get.value === 15L) + assert(AccumulatorContext.get(acc2.id).get.value === 13L) + assert(AccumulatorContext.get(acc3.id).get.value === 18L) } test("reduce tasks should be placed locally with map output") { @@ -2007,12 +2012,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou task: Task[_], reason: TaskEndReason, result: Any, - extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo], + extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty, taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = { val accumUpdates = reason match { - case Success => task.metrics.accumulatorUpdates() - case ef: ExceptionFailure => ef.accumUpdates - case _ => Seq.empty[AccumulableInfo] + case Success => task.metrics.accumulators() + case ef: ExceptionFailure => ef.accums + case _ => Seq.empty } CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 9971d48..16027d9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -17,12 +17,11 @@ package org.apache.spark.scheduler -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId -class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext -{ +class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext { test("launch of backend and scheduler") { val conf = new SparkConf().setMaster("myclusterManager"). setAppName("testcm").set("spark.driver.allowMultipleContexts", "true") @@ -68,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = true } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index d55f6f6..9aca4db 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -162,18 +162,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark }.count() // The one that counts failed values should be 4x the one that didn't, // since we ran each task 4 times - assert(Accumulators.get(acc1.id).get.value === 40L) - assert(Accumulators.get(acc2.id).get.value === 10L) + assert(AccumulatorContext.get(acc1.id).get.value === 40L) + assert(AccumulatorContext.get(acc2.id).get.value === 10L) } test("failed tasks collect only accumulators whose values count during failures") { sc = new SparkContext("local", "test") - val param = AccumulatorParam.LongAccumulatorParam - val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true) - val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false) + val acc1 = AccumulatorSuite.createLongAccum("x", true) + val acc2 = AccumulatorSuite.createLongAccum("y", false) // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. - val taskMetrics = new TaskMetrics + val taskMetrics = TaskMetrics.empty val task = new Task[Int](0, 0, 0) { context = new TaskContextImpl(0, 0, 0L, 0, new TaskMemoryManager(SparkEnv.get.memoryManager, 0L), @@ -186,12 +185,11 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark } // First, simulate task success. This should give us all the accumulators. val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false) - val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2)) - .map(TaskMetricsSuite.makeInfo) + val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2) TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2) // Now, simulate task failures. This should give us only the accums that count failed values. val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true) - val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo) + val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1) TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index b5385c1..9e472f9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -241,8 +241,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(resultGetter.taskResults.size === 1) val resBefore = resultGetter.taskResults.head val resAfter = captor.getValue - val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update) - val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update) + val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value) + val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value) assert(resSizeBefore.exists(_ == 0L)) assert(resSizeAfter.exists(_.toString.toLong > 0L)) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ecf4b76..339fc42 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[AccumulableInfo], + accumUpdates: Seq[NewAccumulator[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason } @@ -166,8 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - val accumUpdates = - taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) } + val accumUpdates = taskSet.tasks.head.metrics.internalAccums // Offer a host with NO_PREF as the constraint, // we should get a nopref task immediately since that's what we only have @@ -185,8 +184,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task => - task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) } + val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums } // First three offers should all find tasks @@ -792,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private def createTaskResult( id: Int, - accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = { + accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 2211248..ce7d51d 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -183,7 +183,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with test("test executor id to summary") { val conf = new SparkConf() val listener = new JobProgressListener(conf) - val taskMetrics = new TaskMetrics() + val taskMetrics = TaskMetrics.empty val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() assert(listener.stageIdToData.size === 0) @@ -230,7 +230,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with test("test task success vs failure counting for different task end reasons") { val conf = new SparkConf() val listener = new JobProgressListener(conf) - val metrics = new TaskMetrics() + val metrics = TaskMetrics.empty val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 val task = new ShuffleMapTask(0) @@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val execId = "exe-1" def makeTaskMetrics(base: Int): TaskMetrics = { - val taskMetrics = new TaskMetrics + val taskMetrics = TaskMetrics.empty val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics val inputMetrics = taskMetrics.inputMetrics @@ -300,9 +300,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()), - (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()), - (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates())))) + (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)), + (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)), + (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo))))) var stage0Data = listener.stageIdToData.get((0, 0)).get var stage1Data = listener.stageIdToData.get((1, 0)).get http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index d3b6cdf..6fda737 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -85,7 +85,8 @@ class JsonProtocolSuite extends SparkFunSuite { // Use custom accum ID for determinism val accumUpdates = makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) - .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) } + .accumulators().map(AccumulatorSuite.makeInfo) + .zipWithIndex.map { case (a, i) => a.copy(id = i) } SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates))) } @@ -385,7 +386,7 @@ class JsonProtocolSuite extends SparkFunSuite { // "Task Metrics" field, if it exists. val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true) val tmJson = JsonProtocol.taskMetricsToJson(tm) - val accumUpdates = tm.accumulatorUpdates() + val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo) val exception = new SparkException("sentimental") val exceptionFailure = new ExceptionFailure(exception, accumUpdates) val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure) @@ -813,7 +814,7 @@ private[spark] object JsonProtocolSuite extends Assertions { hasHadoopInput: Boolean, hasOutput: Boolean, hasRecords: Boolean = true) = { - val t = new TaskMetrics + val t = TaskMetrics.empty t.setExecutorDeserializeTime(a) t.setExecutorRunTime(b) t.setResultSize(c) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 0f8648f..6fc49a0 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -688,6 +688,18 @@ object MimaExcludes { ) ++ Seq( // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable") + ) ++ Seq( + // SPARK-14654: New accumulator API + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ExceptionFailure$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.metrics"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.this"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.ShuffleReadMetrics.remoteBlocksFetched"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.ShuffleReadMetrics.totalBlocksFetched"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.ShuffleReadMetrics.localBlocksFetched"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.remoteBlocksFetched"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.localBlocksFetched") ) case v if v.startsWith("1.6") => Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 520ceaa..d6516f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,7 @@ private[sql] case class RDDScanExec( override val nodeName: String) extends LeafExecNode { private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") @@ -147,7 +147,7 @@ private[sql] case class RowDataSourceScanExec( extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) val outputUnsafeRows = relation match { case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => @@ -216,7 +216,7 @@ private[sql] case class BatchedDataSourceScanExec( extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 7c47566..c201822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -40,7 +40,7 @@ case class ExpandExec( extends UnaryExecNode with CodegenSupport { private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) // The GroupExpressions can output data with arbitrary partitioning, so set it // as UNKNOWN partitioning http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 10cfec3..934bc38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -56,7 +56,7 @@ case class GenerateExec( extends UnaryExecNode { private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def producedAttributes: AttributeSet = AttributeSet(output) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 4ab447a..c5e78b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -31,7 +31,7 @@ private[sql] case class LocalTableScanExec( rows: Seq[InternalRow]) extends LeafExecNode { private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) private val unsafeRows: Array[InternalRow] = { val proj = UnsafeProjection.create(output, output) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 861ff3c..0bbe970 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric} +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.DataType import org.apache.spark.util.ThreadUtils @@ -77,7 +77,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Return all metrics containing metrics of this SparkPlan. */ - private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty + private[sql] def metrics: Map[String, SQLMetric] = Map.empty /** * Reset all the metrics. @@ -89,8 +89,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Return a LongSQLMetric according to the name. */ - private[sql] def longMetric(name: String): LongSQLMetric = - metrics(name).asInstanceOf[LongSQLMetric] + private[sql] def longMetric(name: String): SQLMetric = metrics(name) // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index cb4b1cf..f84070a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -55,8 +55,7 @@ private[sql] object SparkPlanInfo { case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => - new SQLMetricInfo(metric.name.getOrElse(key), metric.id, - Utils.getFormattedClassName(metric.param)) + new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 362d0d7..4849234 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -26,7 +26,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.metric.LongSQLMetric +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.unsafe.Platform /** @@ -42,7 +42,7 @@ import org.apache.spark.unsafe.Platform */ private[sql] class UnsafeRowSerializer( numFields: Int, - dataSize: LongSQLMetric = null) extends Serializer with Serializable { + dataSize: SQLMetric = null) extends Serializer with Serializable { override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields, dataSize) override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true @@ -50,7 +50,7 @@ private[sql] class UnsafeRowSerializer( private class UnsafeRowSerializerInstance( numFields: Int, - dataSize: LongSQLMetric) extends SerializerInstance { + dataSize: SQLMetric) extends SerializerInstance { /** * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes. @@ -60,13 +60,10 @@ private class UnsafeRowSerializerInstance( private[this] val dOut: DataOutputStream = new DataOutputStream(new BufferedOutputStream(out)) - // LongSQLMetricParam.add() is faster than LongSQLMetric.+= - val localDataSize = if (dataSize != null) dataSize.localValue else null - override def writeValue[T: ClassTag](value: T): SerializationStream = { val row = value.asInstanceOf[UnsafeRow] - if (localDataSize != null) { - localDataSize.add(row.getSizeInBytes) + if (dataSize != null) { + dataSize.add(row.getSizeInBytes) } dOut.writeInt(row.getSizeInBytes) row.writeToStream(dOut, writeBuffer) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 6a03bd0..15b4abe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.toCommentSafeString import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} -import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -52,11 +52,7 @@ trait CodegenSupport extends SparkPlan { * @return name of the variable representing the metric */ def metricTerm(ctx: CodegenContext, name: String): String = { - val metric = ctx.addReferenceObj(name, longMetric(name)) - val value = ctx.freshName("metricValue") - val cls = classOf[LongSQLMetricValue].getName - ctx.addMutableState(cls, value, s"$value = ($cls) $metric.localValue();") - value + ctx.addReferenceObj(name, longMetric(name)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala index 3169e0a..2e74d59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala @@ -46,7 +46,7 @@ case class SortBasedAggregateExec( AttributeSet(aggregateBufferAttributes) override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index c35d781..f392b13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction} -import org.apache.spark.sql.execution.metric.LongSQLMetric +import org.apache.spark.sql.execution.metric.SQLMetric /** * An iterator used to evaluate [[AggregateFunction]]. It assumes the input rows have been @@ -35,7 +35,7 @@ class SortBasedAggregationIterator( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, - numOutputRows: LongSQLMetric) + numOutputRows: SQLMetric) extends AggregationIterator( groupingExpressions, valueAttributes, http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 16362f7..d0ba37e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator @@ -51,7 +51,7 @@ case class TungstenAggregate( aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")) @@ -309,8 +309,8 @@ case class TungstenAggregate( def finishAggregate( hashMap: UnsafeFixedWidthAggregationMap, sorter: UnsafeKVExternalSorter, - peakMemory: LongSQLMetricValue, - spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = { + peakMemory: SQLMetric, + spillSize: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = { // update peak execution memory val mapMemory = hashMap.getPeakMemoryUsedBytes http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 9db5087..243aa15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, UnsafeKVExternalSorter} -import org.apache.spark.sql.execution.metric.LongSQLMetric +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.KVIterator @@ -86,9 +86,9 @@ class TungstenAggregationIterator( originalInputAttributes: Seq[Attribute], inputIter: Iterator[InternalRow], testFallbackStartsAt: Option[(Int, Int)], - numOutputRows: LongSQLMetric, - peakMemory: LongSQLMetric, - spillSize: LongSQLMetric) + numOutputRows: SQLMetric, + peakMemory: SQLMetric, + spillSize: SQLMetric) extends AggregationIterator( groupingExpressions, originalInputAttributes, http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 83f527f..77be613 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -103,7 +103,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) } private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() @@ -229,7 +229,7 @@ case class SampleExec( override def output: Seq[Attribute] = child.output private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { if (withReplacement) { @@ -322,7 +322,7 @@ case class RangeExec( extends LeafExecNode with CodegenSupport { private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) // output attributes should not affect the results override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index cb957b9..577c34b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{Accumulable, Accumulator, Accumulators} +import org.apache.spark.{Accumulable, Accumulator, AccumulatorContext} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -204,7 +204,7 @@ private[sql] case class InMemoryRelation( Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) private[sql] def uncache(blocking: Boolean): Unit = { - Accumulators.remove(batchStats.id) + AccumulatorContext.remove(batchStats.id) cachedColumnBuffers.unpersist(blocking) _cachedColumnBuffers = null } @@ -217,7 +217,7 @@ private[sql] case class InMemoryTableScanExec( extends LeafExecNode { private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = attributes http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 573ca19..b6ecd3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -38,10 +38,10 @@ case class BroadcastExchangeExec( child: SparkPlan) extends Exchange { override private[sql] lazy val metrics = Map( - "dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"), - "collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"), - "buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"), - "broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)")) + "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), + "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"), + "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"), + "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)")) override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index b0a6b8f..587c603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -46,7 +46,7 @@ case class BroadcastHashJoinExec( extends BinaryExecNode with HashJoin with CodegenSupport { override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index 51afa00..a659bf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -35,7 +35,7 @@ case class BroadcastNestedLoopJoinExec( condition: Option[Expression]) extends BinaryExecNode { override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) /** BuildRight means the right relation <=> the broadcast relation. */ private val (streamed, broadcast) = buildSide match { http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 67f5919..8d7ecc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -86,7 +86,7 @@ case class CartesianProductExec( override def output: Seq[Attribute] = left.output ++ right.output override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index d6feedc..9c173d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{RowIterator, SparkPlan} -import org.apache.spark.sql.execution.metric.LongSQLMetric +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{IntegralType, LongType} trait HashJoin { @@ -201,7 +201,7 @@ trait HashJoin { protected def join( streamedIter: Iterator[InternalRow], hashed: HashedRelation, - numOutputRows: LongSQLMetric): Iterator[InternalRow] = { + numOutputRows: SQLMetric): Iterator[InternalRow] = { val joinedIter = joinType match { case Inner => http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index a242a07..3ef2fec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -40,7 +40,7 @@ case class ShuffledHashJoinExec( extends BinaryExecNode with HashJoin { override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"), "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map")) http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index a4c5491..775f8ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, RowIterator, SparkPlan} -import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.collection.BitSet /** @@ -41,7 +41,7 @@ case class SortMergeJoinExec( right: SparkPlan) extends BinaryExecNode with CodegenSupport { override private[sql] lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = { joinType match { @@ -734,7 +734,7 @@ private class LeftOuterIterator( rightNullRow: InternalRow, boundCondition: InternalRow => Boolean, resultProj: InternalRow => InternalRow, - numOutputRows: LongSQLMetric) + numOutputRows: SQLMetric) extends OneSideOuterIterator( smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows) { @@ -750,7 +750,7 @@ private class RightOuterIterator( leftNullRow: InternalRow, boundCondition: InternalRow => Boolean, resultProj: InternalRow => InternalRow, - numOutputRows: LongSQLMetric) + numOutputRows: SQLMetric) extends OneSideOuterIterator(smjScanner, leftNullRow, boundCondition, resultProj, numOutputRows) { protected override def setStreamSideOutput(row: InternalRow): Unit = joinedRow.withRight(row) @@ -778,7 +778,7 @@ private abstract class OneSideOuterIterator( bufferedSideNullRow: InternalRow, boundCondition: InternalRow => Boolean, resultProj: InternalRow => InternalRow, - numOutputRows: LongSQLMetric) extends RowIterator { + numOutputRows: SQLMetric) extends RowIterator { // A row to store the joined result, reused many times protected[this] val joinedRow: JoinedRow = new JoinedRow() @@ -1016,7 +1016,7 @@ private class SortMergeFullOuterJoinScanner( private class FullOuterIterator( smjScanner: SortMergeFullOuterJoinScanner, resultProj: InternalRow => InternalRow, - numRows: LongSQLMetric) extends RowIterator { + numRows: SQLMetric) extends RowIterator { private[this] val joinedRow: JoinedRow = smjScanner.getJoinedRow() override def advanceNext(): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala index 2708219..adb8151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala @@ -27,4 +27,4 @@ import org.apache.spark.annotation.DeveloperApi class SQLMetricInfo( val name: String, val accumulatorId: Long, - val metricParam: String) + val metricType: String) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
