Repository: spark
Updated Branches:
  refs/heads/master 6c5a837c5 -> 9797cc20c


[SPARK-14929] [SQL] Disable vectorized map for wide schemas & high-precision 
decimals

## What changes were proposed in this pull request?

While the vectorized hash map in `TungstenAggregate` is currently supported for 
all primitive data types during partial aggregation, this patch only enables 
the hash map for a subset of cases that've been verified to show performance 
improvements on our benchmarks subject to an internal conf that sets an upper 
limit on the maximum length of the aggregate key/value schema. This list of 
supported use-cases should be expanded over time.

## How was this patch tested?

This is no new change in functionality so existing tests should suffice. 
Performance tests were done on TPCDS benchmarks.

Author: Sameer Agarwal <[email protected]>

Closes #12710 from sameeragarwal/vectorized-enable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9797cc20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9797cc20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9797cc20

Branch: refs/heads/master
Commit: 9797cc20c0b8fb34659df11af8eccb9ed293c52c
Parents: 6c5a837
Author: Sameer Agarwal <[email protected]>
Authored: Tue Apr 26 14:51:14 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Tue Apr 26 14:51:14 2016 -0700

----------------------------------------------------------------------
 .../execution/aggregate/TungstenAggregate.scala | 35 ++++++++++++++++----
 .../org/apache/spark/sql/internal/SQLConf.scala | 16 +++++----
 .../execution/BenchmarkWholeStageCodegen.scala  | 20 +++++------
 .../hive/execution/AggregationQuerySuite.scala  | 15 +++++----
 4 files changed, 55 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 49db75e..16362f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -447,16 +447,37 @@ case class TungstenAggregate(
     }
   }
 
+  /**
+   * Using the vectorized hash map in TungstenAggregate is currently supported 
for all primitive
+   * data types during partial aggregation. However, we currently only enable 
the hash map for a
+   * subset of cases that've been verified to show performance improvements on 
our benchmarks
+   * subject to an internal conf that sets an upper limit on the maximum 
length of the aggregate
+   * key/value schema.
+   *
+   * This list of supported use-cases should be expanded over time.
+   */
+  private def enableVectorizedHashMap(ctx: CodegenContext): Boolean = {
+    val schemaLength = (groupingKeySchema ++ bufferSchema).length
+    val isSupported =
+      (groupingKeySchema ++ bufferSchema).forall(f => 
ctx.isPrimitiveType(f.dataType) ||
+        f.dataType.isInstanceOf[DecimalType] || 
f.dataType.isInstanceOf[StringType]) &&
+        bufferSchema.nonEmpty && modes.forall(mode => mode == Partial || mode 
== PartialMerge)
+
+    // We do not support byte array based decimal type for aggregate values as
+    // ColumnVector.putDecimal for high-precision decimals doesn't currently 
support in-place
+    // updates. Due to this, appending the byte array in the vectorized hash 
map can turn out to be
+    // quite inefficient and can potentially OOM the executor.
+    val isNotByteArrayDecimalType = 
bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType])
+      .forall(!DecimalType.isByteArrayDecimalType(_))
+
+    isSupported  && isNotByteArrayDecimalType &&
+      schemaLength <= sqlContext.conf.vectorizedAggregateMapMaxColumns
+  }
+
   private def doProduceWithKeys(ctx: CodegenContext): String = {
     val initAgg = ctx.freshName("initAgg")
     ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
-
-    // Enable vectorized hash map for all primitive data types during partial 
aggregation
-    isVectorizedHashMapEnabled = sqlContext.conf.columnarAggregateMapEnabled &&
-      (groupingKeySchema ++ bufferSchema).forall(f => 
ctx.isPrimitiveType(f.dataType) ||
-        f.dataType.isInstanceOf[DecimalType] || 
f.dataType.isInstanceOf[StringType]) &&
-      bufferSchema.forall(!_.dataType.isInstanceOf[StringType]) && 
bufferSchema.nonEmpty &&
-      modes.forall(mode => mode == Partial || mode == PartialMerge)
+    isVectorizedHashMapEnabled = enableVectorizedHashMap(ctx)
     vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap")
     val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap")
     val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, 
aggregateExpressions,

http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index bbc424c..b268a4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -484,12 +484,14 @@ object SQLConf {
       .intConf
       .createWithDefault(40)
 
-  // TODO: This is still WIP and shouldn't be turned on without extensive test 
coverage
-  val COLUMNAR_AGGREGATE_MAP_ENABLED = 
SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled")
-    .internal()
-    .doc("When true, aggregate with keys use an in-memory columnar map to 
speed up execution.")
-    .booleanConf
-    .createWithDefault(false)
+  val VECTORIZED_AGG_MAP_MAX_COLUMNS =
+    SQLConfigBuilder("spark.sql.codegen.aggregate.map.columns.max")
+      .internal()
+      .doc("Sets the maximum width of schema (aggregate keys + values) for 
which aggregate with" +
+        "keys uses an in-memory columnar map to speed up execution. Setting 
this to 0 effectively" +
+        "disables the columnar map")
+      .intConf
+      .createWithDefault(3)
 
   val FILE_SINK_LOG_DELETION = 
SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
     .internal()
@@ -644,7 +646,7 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
 
-  def columnarAggregateMapEnabled: Boolean = 
getConf(COLUMNAR_AGGREGATE_MAP_ENABLED)
+  def vectorizedAggregateMapMaxColumns: Int = 
getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS)
 
   def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 7a120b9..841263d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -163,13 +163,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
     benchmark.addCase(s"codegen = T hashmap = F") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
     benchmark.addCase(s"codegen = T hashmap = T") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()
     }
 
@@ -201,13 +201,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
     benchmark.addCase(s"codegen = T hashmap = F") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
     benchmark.addCase(s"codegen = T hashmap = T") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()
     }
 
@@ -238,13 +238,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
     benchmark.addCase(s"codegen = T hashmap = F") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
     benchmark.addCase(s"codegen = T hashmap = T") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()
     }
 
@@ -275,13 +275,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
     benchmark.addCase(s"codegen = T hashmap = F") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
     benchmark.addCase(s"codegen = T hashmap = T") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
       f()
     }
 
@@ -322,13 +322,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
     benchmark.addCase(s"codegen = T hashmap = F") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
       f()
     }
 
     benchmark.addCase(s"codegen = T hashmap = T") { iter =>
       sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+      sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "10")
       f()
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9797cc20/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index b16c9c1..0ba72b0 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -967,8 +967,9 @@ class TungstenAggregationQuerySuite extends 
AggregationQuerySuite
 class TungstenAggregationQueryWithControlledFallbackSuite extends 
AggregationQuerySuite {
 
   override protected def checkAnswer(actual: => DataFrame, expectedAnswer: 
Seq[Row]): Unit = {
-    Seq(false, true).foreach { enableColumnarHashMap =>
-      withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> 
enableColumnarHashMap.toString) {
+    Seq(0, 10).foreach { maxColumnarHashMapColumns =>
+      withSQLConf("spark.sql.codegen.aggregate.map.columns.max" ->
+        maxColumnarHashMapColumns.toString) {
         (1 to 3).foreach { fallbackStartsAt =>
           withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
             s"${(fallbackStartsAt - 1).toString}, 
${fallbackStartsAt.toString}") {
@@ -981,11 +982,11 @@ class TungstenAggregationQueryWithControlledFallbackSuite 
extends AggregationQue
               case Some(errorMessage) =>
                 val newErrorMessage =
                   s"""
-                    |The following aggregation query failed when using 
TungstenAggregate with
-                    |controlled fallback (it falls back to bytes to bytes map 
once it has processed
-                    |${fallbackStartsAt -1} input rows and to sort-based 
aggregation once it has
-                    |processed $fallbackStartsAt input rows). The query is 
${actual.queryExecution}
-                    |
+                     |The following aggregation query failed when using 
TungstenAggregate with
+                     |controlled fallback (it falls back to bytes to bytes map 
once it has processed
+                     |${fallbackStartsAt - 1} input rows and to sort-based 
aggregation once it has
+                     |processed $fallbackStartsAt input rows). The query is 
${actual.queryExecution}
+                     |
                     |$errorMessage
                   """.stripMargin
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to