Repository: spark Updated Branches: refs/heads/master 3fd2ff4dd -> e1f986c7a
[SPARK-15860] Metrics for codegen size and perf ## What changes were proposed in this pull request? Adds codahale metrics for the codegen source text size and how long it takes to compile. The size is particularly interesting, since the JVM does have hard limits on how large methods can get. To simplify, I added the metrics under a statically-initialized source that is always registered with SparkEnv. ## How was this patch tested? Unit tests Author: Eric Liang <[email protected]> Closes #13586 from ericl/spark-15860. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1f986c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1f986c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1f986c7 Branch: refs/heads/master Commit: e1f986c7a3fcc3864d53ef99ef7f14fa4d262ac3 Parents: 3fd2ff4 Author: Eric Liang <[email protected]> Authored: Sat Jun 11 23:16:21 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat Jun 11 23:16:21 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/metrics/MetricsSystem.scala | 3 +- .../spark/metrics/source/StaticSources.scala | 50 ++++++++++++++++++++ .../spark/metrics/MetricsSystemSuite.scala | 8 ++-- .../expressions/codegen/CodeGenerator.scala | 3 ++ .../expressions/CodeGenerationSuite.scala | 9 ++++ 5 files changed, 68 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e1f986c7/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 0fed991..9b16c11 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -28,7 +28,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.metrics.sink.{MetricsServlet, Sink} -import org.apache.spark.metrics.source.Source +import org.apache.spark.metrics.source.{Source, StaticSources} import org.apache.spark.util.Utils /** @@ -96,6 +96,7 @@ private[spark] class MetricsSystem private ( def start() { require(!running, "Attempting to start a MetricsSystem that is already running") running = true + StaticSources.allSources.foreach(registerSource) registerSources() registerSinks() sinks.foreach(_.start) http://git-wip-us.apache.org/repos/asf/spark/blob/e1f986c7/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala new file mode 100644 index 0000000..6819222 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry + +import org.apache.spark.annotation.Experimental + +private[spark] object StaticSources { + /** + * The set of all static sources. These sources may be reported to from any class, including + * static classes, without requiring reference to a SparkEnv. + */ + val allSources = Seq(CodegenMetrics) +} + +/** + * :: Experimental :: + * Metrics for code generation. + */ +@Experimental +object CodegenMetrics extends Source { + override val sourceName: String = "CodeGenerator" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Histogram of the length of source code text compiled by CodeGenerator (in characters). + */ + val METRIC_SOURCE_CODE_SIZE = metricRegistry.histogram(MetricRegistry.name("sourceCodeSize")) + + /** + * Histogram of the time it took to compile source code text (in milliseconds). + */ + val METRIC_COMPILATION_TIME = metricRegistry.histogram(MetricRegistry.name("compilationTime")) +} http://git-wip-us.apache.org/repos/asf/spark/blob/e1f986c7/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 5d85542..2400832 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.master.MasterSource -import org.apache.spark.metrics.source.Source +import org.apache.spark.metrics.source.{Source, StaticSources} class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester{ var filePath: String = _ @@ -43,7 +43,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val sources = PrivateMethod[ArrayBuffer[Source]]('sources) val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) - assert(metricsSystem.invokePrivate(sources()).length === 0) + assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) assert(metricsSystem.invokePrivate(sinks()).length === 0) assert(metricsSystem.getServletHandlers.nonEmpty) } @@ -54,13 +54,13 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val sources = PrivateMethod[ArrayBuffer[Source]]('sources) val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) - assert(metricsSystem.invokePrivate(sources()).length === 0) + assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) assert(metricsSystem.invokePrivate(sinks()).length === 1) assert(metricsSystem.getServletHandlers.nonEmpty) val source = new MasterSource(null) metricsSystem.registerSource(source) - assert(metricsSystem.invokePrivate(sources()).length === 1) + assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length + 1) } test("MetricsSystem with Driver instance") { http://git-wip-us.apache.org/repos/asf/spark/blob/e1f986c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index ca20292..ff97cd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -26,6 +26,7 @@ import scala.language.existentials import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} @@ -886,6 +887,8 @@ object CodeGenerator extends Logging { val result = doCompile(code) val endTime = System.nanoTime() def timeMs: Double = (endTime - startTime).toDouble / 1000000 + CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) + CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) logInfo(s"Code generated in $timeMs ms") result } http://git-wip-us.apache.org/repos/asf/spark/blob/e1f986c7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 8ffe390..62429a2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite +import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -49,6 +50,14 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { futures.foreach(ThreadUtils.awaitResult(_, 10.seconds)) } + test("metrics are recorded on compile") { + val startCount1 = CodegenMetrics.METRIC_COMPILATION_TIME.getCount() + val startCount2 = CodegenMetrics.METRIC_SOURCE_CODE_SIZE.getCount() + GenerateOrdering.generate(Add(Literal(123), Literal(1)).asc :: Nil) + assert(CodegenMetrics.METRIC_COMPILATION_TIME.getCount() == startCount1 + 1) + assert(CodegenMetrics.METRIC_SOURCE_CODE_SIZE.getCount() == startCount2 + 1) + } + test("SPARK-8443: split wide projections into blocks due to JVM code size limit") { val length = 5000 val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1))) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
