Repository: spark Updated Branches: refs/heads/master 6c5a837c5 -> 9797cc20c
[SPARK-14929] [SQL] Disable vectorized map for wide schemas & high-precision decimals ## What changes were proposed in this pull request? While the vectorized hash map in `TungstenAggregate` is currently supported for all primitive data types during partial aggregation, this patch only enables the hash map for a subset of cases that've been verified to show performance improvements on our benchmarks subject to an internal conf that sets an upper limit on the maximum length of the aggregate key/value schema. This list of supported use-cases should be expanded over time. ## How was this patch tested? This is no new change in functionality so existing tests should suffice. Performance tests were done on TPCDS benchmarks. Author: Sameer Agarwal <[email protected]> Closes #12710 from sameeragarwal/vectorized-enable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9797cc20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9797cc20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9797cc20 Branch: refs/heads/master Commit: 9797cc20c0b8fb34659df11af8eccb9ed293c52c Parents: 6c5a837 Author: Sameer Agarwal <[email protected]> Authored: Tue Apr 26 14:51:14 2016 -0700 Committer: Davies Liu <[email protected]> Committed: Tue Apr 26 14:51:14 2016 -0700 ---------------------------------------------------------------------- .../execution/aggregate/TungstenAggregate.scala | 35 ++++++++++++++++---- .../org/apache/spark/sql/internal/SQLConf.scala | 16 +++++---- .../execution/BenchmarkWholeStageCodegen.scala | 20 +++++------ .../hive/execution/AggregationQuerySuite.scala | 15 +++++---- 4 files changed, 55 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 49db75e..16362f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -447,16 +447,37 @@ case class TungstenAggregate( } } + /** + * Using the vectorized hash map in TungstenAggregate is currently supported for all primitive + * data types during partial aggregation. However, we currently only enable the hash map for a + * subset of cases that've been verified to show performance improvements on our benchmarks + * subject to an internal conf that sets an upper limit on the maximum length of the aggregate + * key/value schema. + * + * This list of supported use-cases should be expanded over time. + */ + private def enableVectorizedHashMap(ctx: CodegenContext): Boolean = { + val schemaLength = (groupingKeySchema ++ bufferSchema).length + val isSupported = + (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) || + f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) && + bufferSchema.nonEmpty && modes.forall(mode => mode == Partial || mode == PartialMerge) + + // We do not support byte array based decimal type for aggregate values as + // ColumnVector.putDecimal for high-precision decimals doesn't currently support in-place + // updates. Due to this, appending the byte array in the vectorized hash map can turn out to be + // quite inefficient and can potentially OOM the executor. + val isNotByteArrayDecimalType = bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType]) + .forall(!DecimalType.isByteArrayDecimalType(_)) + + isSupported && isNotByteArrayDecimalType && + schemaLength <= sqlContext.conf.vectorizedAggregateMapMaxColumns + } + private def doProduceWithKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") - - // Enable vectorized hash map for all primitive data types during partial aggregation - isVectorizedHashMapEnabled = sqlContext.conf.columnarAggregateMapEnabled && - (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) || - f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) && - bufferSchema.forall(!_.dataType.isInstanceOf[StringType]) && bufferSchema.nonEmpty && - modes.forall(mode => mode == Partial || mode == PartialMerge) + isVectorizedHashMapEnabled = enableVectorizedHashMap(ctx) vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap") val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap") val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, aggregateExpressions, http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bbc424c..b268a4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -484,12 +484,14 @@ object SQLConf { .intConf .createWithDefault(40) - // TODO: This is still WIP and shouldn't be turned on without extensive test coverage - val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled") - .internal() - .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.") - .booleanConf - .createWithDefault(false) + val VECTORIZED_AGG_MAP_MAX_COLUMNS = + SQLConfigBuilder("spark.sql.codegen.aggregate.map.columns.max") + .internal() + .doc("Sets the maximum width of schema (aggregate keys + values) for which aggregate with" + + "keys uses an in-memory columnar map to speed up execution. Setting this to 0 effectively" + + "disables the columnar map") + .intConf + .createWithDefault(3) val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() @@ -644,7 +646,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES) - def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED) + def vectorizedAggregateMapMaxColumns: Int = getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS) def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED) http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 7a120b9..841263d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -163,13 +163,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -201,13 +201,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -238,13 +238,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -275,13 +275,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -322,13 +322,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "10") f() } http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index b16c9c1..0ba72b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -967,8 +967,9 @@ class TungstenAggregationQuerySuite extends AggregationQuerySuite class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite { override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { - Seq(false, true).foreach { enableColumnarHashMap => - withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> enableColumnarHashMap.toString) { + Seq(0, 10).foreach { maxColumnarHashMapColumns => + withSQLConf("spark.sql.codegen.aggregate.map.columns.max" -> + maxColumnarHashMapColumns.toString) { (1 to 3).foreach { fallbackStartsAt => withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") { @@ -981,11 +982,11 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue case Some(errorMessage) => val newErrorMessage = s""" - |The following aggregation query failed when using TungstenAggregate with - |controlled fallback (it falls back to bytes to bytes map once it has processed - |${fallbackStartsAt -1} input rows and to sort-based aggregation once it has - |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution} - | + |The following aggregation query failed when using TungstenAggregate with + |controlled fallback (it falls back to bytes to bytes map once it has processed + |${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has + |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution} + | |$errorMessage """.stripMargin --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
