This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b11a099 [SPARK-31391][SQL][TEST] Add AdaptiveTestUtils to ease the
test of AQE
b11a099 is described below
commit b11a099ca6aa259c0818170fb962fe7cc5b746e2
Author: yi.wu <[email protected]>
AuthorDate: Mon Apr 13 14:40:53 2020 +0000
[SPARK-31391][SQL][TEST] Add AdaptiveTestUtils to ease the test of AQE
### What changes were proposed in this pull request?
This PR adds `AdaptiveTestUtils` to make AQE test simpler, which includes:
`DisableAdaptiveExecution` - a test tag to skip a single test case if AQE
is enabled.
`EnableAdaptiveExecutionSuite` - a helper trait to enable AQE for all tests
except those tagged with `DisableAdaptiveExecution`.
`DisableAdaptiveExecutionSuite` - a helper trait to disable AQE for all
tests.
`assertExceptionMessage` - a method to handle message of normal or AQE
exception in a consistent way.
`assertExceptionCause` - a method to handle cause of normal or AQE
exception in a consistent way.
### Why are the changes needed?
With this utils, we can:
- reduce much more duplicate codes;
- handle normal or AQE exception in a consistent way;
- improve the stability of AQE tests;
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Updated tests with the util.
Closes #28162 from Ngone51/add_aqe_test_utils.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit bbb3cd9c5e659a89c95fe718f8b63a540187fa43)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/ConfigBehaviorSuite.scala | 9 +--
.../scala/org/apache/spark/sql/ExplainSuite.scala | 15 +---
.../org/apache/spark/sql/MetadataCacheSuite.scala | 9 ++-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 8 +-
.../DeprecatedWholeStageCodegenSuite.scala | 25 +++---
.../execution/LogicalPlanTagInSparkPlanSuite.scala | 18 +----
.../apache/spark/sql/execution/PlannerSuite.scala | 8 +-
.../sql/execution/WholeStageCodegenSuite.scala | 18 +----
.../sql/execution/adaptive/AdaptiveTestUtils.scala | 93 ++++++++++++++++++++++
.../sql/execution/datasources/json/JsonSuite.scala | 4 +-
.../execution/datasources/orc/OrcQuerySuite.scala | 25 +++---
.../spark/sql/execution/debug/DebuggingSuite.scala | 19 +----
.../sql/execution/metric/SQLMetricsSuite.scala | 18 +----
.../execution/ui/SQLAppStatusListenerSuite.scala | 14 ++--
.../sql/internal/ExecutorSideSQLConfSuite.scala | 7 +-
.../spark/sql/sources/BucketedReadSuite.scala | 3 +-
.../org/apache/spark/sql/test/SQLTestUtils.scala | 17 +++-
.../spark/sql/hive/HiveMetadataCacheSuite.scala | 5 +-
.../sql/hive/execution/HiveExplainSuite.scala | 32 ++++----
.../spark/sql/hive/execution/SQLMetricsSuite.scala | 19 +----
.../spark/sql/hive/execution/SQLQuerySuite.scala | 7 +-
.../org/apache/spark/sql/hive/test/TestHive.scala | 13 +--
22 files changed, 217 insertions(+), 169 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
index c3dbbb3..36989ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import org.apache.commons.math3.stat.inference.ChiSquareTest
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -27,7 +28,8 @@ class ConfigBehaviorSuite extends QueryTest with
SharedSparkSession {
import testImplicits._
- test("SPARK-22160 spark.sql.execution.rangeExchange.sampleSizePerPartition")
{
+ test("SPARK-22160 spark.sql.execution.rangeExchange.sampleSizePerPartition",
+ DisableAdaptiveExecution("Post shuffle partition number can be
different")) {
// In this test, we run a sort and compute the histogram for partition
size post shuffle.
// With a high sample count, the partition size should be more evenly
distributed, and has a
// low chi-sq test value.
@@ -53,11 +55,8 @@ class ConfigBehaviorSuite extends QueryTest with
SharedSparkSession {
dist)
}
- // When enable AQE, the post partition number is changed.
// And the ChiSquareTest result is also need updated. So disable AQE.
- withSQLConf(
- SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString,
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 16c5802..1a35e5b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -18,26 +18,15 @@
package org.apache.spark.sql
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
-class ExplainSuite extends QueryTest with SharedSparkSession {
+class ExplainSuite extends QueryTest with SharedSparkSession with
DisableAdaptiveExecutionSuite {
import testImplicits._
- var originalValue: String = _
- protected override def beforeAll(): Unit = {
- super.beforeAll()
- originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
- }
-
- protected override def afterAll(): Unit = {
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
- super.afterAll()
- }
-
private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String =
{
val output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index 956bd78..a9f443b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.io.File
import org.apache.spark.{SparkConf, SparkException}
+import
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -55,8 +56,8 @@ abstract class MetadataCacheSuite extends QueryTest with
SharedSparkSession {
val e = intercept[SparkException] {
df.count()
}
- assert(e.getMessage.contains("FileNotFoundException"))
- assert(e.getMessage.contains("recreating the Dataset/DataFrame
involved"))
+ assertExceptionMessage(e, "FileNotFoundException")
+ assertExceptionMessage(e, "recreating the Dataset/DataFrame involved")
}
}
}
@@ -84,8 +85,8 @@ class MetadataCacheV1Suite extends MetadataCacheSuite {
val e = intercept[SparkException] {
sql("select count(*) from view_refresh").first()
}
- assert(e.getMessage.contains("FileNotFoundException"))
- assert(e.getMessage.contains("REFRESH"))
+ assertExceptionMessage(e, "FileNotFoundException")
+ assertExceptionMessage(e, "REFRESH")
// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index ff8f94c..760b03d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec,
ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -1357,11 +1357,9 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
}
}
- test("SPARK-27279: Reuse Subquery") {
+ test("SPARK-27279: Reuse Subquery", DisableAdaptiveExecution("reuse is
dynamic in AQE")) {
Seq(true, false).foreach { reuse =>
- withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString,
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- // when enable AQE, the reusedExchange is inserted when executed.
+ withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
val df = sql(
"""
|SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM
testData)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
index 1e90754..b27a940 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
@@ -18,30 +18,27 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.expressions.scalalang.typed
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+// Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
@deprecated("This test suite will be removed.", "3.0.0")
class DeprecatedWholeStageCodegenSuite extends QueryTest
with SharedSparkSession
- with AdaptiveSparkPlanHelper {
+ with DisableAdaptiveExecutionSuite {
test("simple typed UDAF should be included in WholeStageCodegen") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- // With enable AQE, the WholeStageCodegenExec rule is applied when
running QueryStageExec.
- import testImplicits._
+ import testImplicits._
- val ds = Seq(("a", 10), ("b", 1), ("b", 2), ("c", 1)).toDS()
- .groupByKey(_._1).agg(typed.sum(_._2))
+ val ds = Seq(("a", 10), ("b", 1), ("b", 2), ("c", 1)).toDS()
+ .groupByKey(_._1).agg(typed.sum(_._2))
- val plan = ds.queryExecution.executedPlan
- assert(find(plan)(p =>
- p.isInstanceOf[WholeStageCodegenExec] &&
-
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
- assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
- }
+ val plan = ds.queryExecution.executedPlan
+ assert(plan.find(p =>
+ p.isInstanceOf[WholeStageCodegenExec] &&
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+ assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index 311f84c..5bcec9b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.sql.TPCDSQuerySuite
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Complete, Final}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Generate, Join,
LocalRelation, LogicalPlan, Range, Sample, Union, Window}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.{InMemoryRelation,
InMemoryTableScanExec}
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -29,22 +30,9 @@ import
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
-import org.apache.spark.sql.internal.SQLConf
-class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite {
-
- var originalValue: String = _
- // when enable AQE, the 'AdaptiveSparkPlanExec' node does not have a logical
plan link
- override def beforeAll(): Unit = {
- super.beforeAll()
- originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
- }
-
- override def afterAll(): Unit = {
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
- super.afterAll()
- }
+// Disable AQE because AdaptiveSparkPlanExec does not have a logical plan link
+class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite with
DisableAdaptiveExecutionSuite {
override protected def checkGeneratedCode(
plan: SparkPlan, checkMethodCodeSize: Boolean = true): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 0c5e2e3..9c8e443 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation,
LogicalPlan, Range, Repartition, Sort, Union}
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecution}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.{InMemoryRelation,
InMemoryTableScanExec}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements,
ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
@@ -752,7 +752,8 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
}
test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec
" +
- "and InMemoryTableScanExec") {
+ "and InMemoryTableScanExec",
+ DisableAdaptiveExecution("Reuse is dynamic in AQE")) {
def checkOutputPartitioningRewrite(
plans: Seq[SparkPlan],
expectedPartitioningClass: Class[_]): Unit = {
@@ -782,8 +783,7 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
checkOutputPartitioningRewrite(inMemoryScan, expectedPartitioningClass)
}
// when enable AQE, the reusedExchange is inserted when executed.
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// ReusedExchange is HashPartitioning
val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 06a016f..f7396ee 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
@@ -28,23 +29,12 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession {
+// Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
+ with DisableAdaptiveExecutionSuite {
import testImplicits._
- var originalValue: String = _
- // With on AQE, the WholeStageCodegenExec is added when running
QueryStageExec.
- override def beforeAll(): Unit = {
- super.beforeAll()
- originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
- }
-
- override def afterAll(): Unit = {
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
- super.afterAll()
- }
-
test("range/filter should be combined") {
val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
val plan = df.queryExecution.executedPlan
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
new file mode 100644
index 0000000..ddaeb57
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.io.{PrintWriter, StringWriter}
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test with this tag will be ignored if the test suite extends
`EnableAdaptiveExecutionSuite`.
+ * Otherwise, it will be executed with adaptive execution disabled.
+ */
+case class DisableAdaptiveExecution(reason: String) extends
Tag("DisableAdaptiveExecution")
+
+/**
+ * Helper trait that enables AQE for all tests regardless of default config
values, except that
+ * tests tagged with [[DisableAdaptiveExecution]] will be skipped.
+ */
+trait EnableAdaptiveExecutionSuite extends SQLTestUtils {
+ protected val forceApply = true
+
+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
+ (implicit pos: Position): Unit = {
+ if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+ // we ignore the test here but assume that another test suite which
extends
+ // `DisableAdaptiveExecutionSuite` will test it anyway to ensure test
coverage
+ ignore(testName + " (disabled when AQE is on)", testTags: _*)(testFun)
+ } else {
+ super.test(testName, testTags: _*) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> forceApply.toString) {
+ testFun
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Helper trait that disables AQE for all tests regardless of default config
values.
+ */
+trait DisableAdaptiveExecutionSuite extends SQLTestUtils {
+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
+ (implicit pos: Position): Unit = {
+ super.test(testName, testTags: _*) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ testFun
+ }
+ }
+ }
+}
+
+object AdaptiveTestUtils {
+ def assertExceptionMessage(e: Exception, expected: String): Unit = {
+ val stringWriter = new StringWriter()
+ e.printStackTrace(new PrintWriter(stringWriter))
+ val errorMsg = stringWriter.toString
+ assert(errorMsg.contains(expected))
+ }
+
+ def assertExceptionCause(t: Throwable, causeClass: Class[_]): Unit = {
+ var c = t.getCause
+ var foundCause = false
+ while (c != null && !foundCause) {
+ if (causeClass.isAssignableFrom(c.getClass)) {
+ foundCause = true
+ } else {
+ c = c.getCause
+ }
+ }
+ assert(foundCause, s"Can not find cause: $causeClass")
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 5466800..be95f33 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.ExternalRDD
+import
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -2192,9 +2193,8 @@ abstract class JsonSuite extends QueryTest with
SharedSparkSession with TestJson
.json(testFile(fileName))
.count()
}
- val errMsg = exception.getMessage
- assert(errMsg.contains("Malformed records are detected in record parsing"))
+ assertExceptionMessage(exception, "Malformed records are detected in
record parsing")
}
def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 5f49526..60f278b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -34,6 +34,7 @@ import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -595,22 +596,22 @@ abstract class OrcQueryTest extends OrcTest {
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
- val m1 = intercept[SparkException] {
+ val e1 = intercept[SparkException] {
testIgnoreCorruptFiles()
- }.getMessage
- assert(m1.contains("Malformed ORC file"))
- val m2 = intercept[SparkException] {
+ }
+ assertExceptionMessage(e1, "Malformed ORC file")
+ val e2 = intercept[SparkException] {
testIgnoreCorruptFilesWithoutSchemaInfer()
- }.getMessage
- assert(m2.contains("Malformed ORC file"))
- val m3 = intercept[SparkException] {
+ }
+ assertExceptionMessage(e2, "Malformed ORC file")
+ val e3 = intercept[SparkException] {
testAllCorruptFiles()
- }.getMessage
- assert(m3.contains("Could not read footer for file"))
- val m4 = intercept[SparkException] {
+ }
+ assertExceptionMessage(e3, "Could not read footer for file")
+ val e4 = intercept[SparkException] {
testAllCorruptFilesWithoutSchemaInfer()
- }.getMessage
- assert(m4.contains("Malformed ORC file"))
+ }
+ assertExceptionMessage(e4, "Malformed ORC file")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 4cb845b..e9ef7c1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -24,27 +24,14 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode,
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.StructType
-class DebuggingSuite extends SharedSparkSession {
-
-
- var originalValue: String = _
- // With on AQE, the WholeStageCodegenExec is added when running
QueryStageExec.
- override def beforeAll(): Unit = {
- super.beforeAll()
- originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
- }
-
- override def afterAll(): Unit = {
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
- super.afterAll()
- }
+// Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+class DebuggingSuite extends SharedSparkSession with
DisableAdaptiveExecutionSuite {
test("DataFrame.debug()") {
testData.debug()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a5b07d5..1c4a818 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan,
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
@@ -33,22 +34,11 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
-class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
+// Disable AQE because metric info is different with AQE on/off
+class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
+ with DisableAdaptiveExecutionSuite {
import testImplicits._
- var originalValue: String = _
- // With AQE on/off, the metric info is different.
- override def beforeAll(): Unit = {
- super.beforeAll()
- originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
- }
-
- override def afterAll(): Unit = {
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
- super.afterAll()
- }
-
/**
* Generates a `DataFrame` by filling randomly generated bytes for hash
collision.
*/
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 949924e..0746059 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution,
SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.internal.SQLConf
@@ -623,13 +624,12 @@ class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTestUtils
assert(statusStore.execution(2) === None)
}
- test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- // with AQE on, the WholeStageCodegen rule is applied when running
QueryStageExec.
- val df = createTestDataFrame.select(count("*"))
- val sparkPlanInfo =
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)
- assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
- }
+ test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo",
+ DisableAdaptiveExecution("WSCG rule is applied later in AQE")) {
+ // with AQE on, the WholeStageCodegen rule is applied when running
QueryStageExec.
+ val df = createTestDataFrame.select(count("*"))
+ val sparkPlanInfo =
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)
+ assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 888772c..567524a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.debug.codegenStringSeq
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SQLTestUtils
@@ -98,10 +99,10 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with
SQLTestUtils {
}
}
- test("SPARK-22219: refactor to control to generate comment") {
+ test("SPARK-22219: refactor to control to generate comment",
+ DisableAdaptiveExecution("WSCG rule is applied later in AQE")) {
Seq(true, false).foreach { flag =>
- withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> flag.toString,
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> flag.toString) {
// with AQE on, the WholeStageCodegen rule is applied when running
QueryStageExec.
val res = codegenStringSeq(spark.range(10).groupBy(col("id") *
2).count()
.queryExecution.executedPlan)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 14ba008..707d9c2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec,
SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -758,7 +759,7 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils {
agged.count()
}
- assert(error.getCause().toString contains "Invalid bucket file")
+ assertExceptionMessage(error, "Invalid bucket file")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 38893f8..7be15e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,7 +27,8 @@ import scala.language.implicitConversions
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
-import org.scalatest.{BeforeAndAfterAll, Suite}
+import org.scalactic.source.Position
+import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
@@ -40,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.FilterExec
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.UninterruptibleThread
@@ -114,6 +116,19 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with
SQLTestUtilsBase with
}
}
+ override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)
+ (implicit pos: Position): Unit = {
+ if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+ super.test(testName, testTags: _*) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ testFun
+ }
+ }
+ } else {
+ super.test(testName, testTags: _*)(testFun)
+ }
+ }
+
/**
* Run a test on a separate `UninterruptibleThread`.
*/
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 94a55b9..743cdbd 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
+import
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -99,7 +100,7 @@ class HiveMetadataCacheSuite extends QueryTest with
SQLTestUtils with TestHiveSi
val e = intercept[SparkException] {
sql("select * from test").count()
}
- assert(e.getMessage.contains("FileNotFoundException"))
+ assertExceptionMessage(e, "FileNotFoundException")
// Test refreshing the cache.
spark.catalog.refreshTable("test")
@@ -114,7 +115,7 @@ class HiveMetadataCacheSuite extends QueryTest with
SQLTestUtils with TestHiveSi
val e2 = intercept[SparkException] {
sql("select * from test").count()
}
- assert(e2.getMessage.contains("FileNotFoundException"))
+ assertExceptionMessage(e2, "FileNotFoundException")
spark.catalog.refreshByPath(dir.getAbsolutePath)
assert(sql("select * from test").count() == 3)
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index f9a4e2c..7a913e9 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -133,22 +134,21 @@ class HiveExplainSuite extends QueryTest with
SQLTestUtils with TestHiveSingleto
"src")
}
- test("explain output of physical plan should contain proper codegen stage
ID") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- checkKeywordsExist(sql(
- """
- |EXPLAIN SELECT t1.id AS a, t2.id AS b FROM
- |(SELECT * FROM range(3)) t1 JOIN
- |(SELECT * FROM range(10)) t2 ON t1.id == t2.id % 3
- """.stripMargin),
- "== Physical Plan ==",
- "*(2) Project ",
- "+- *(2) BroadcastHashJoin ",
- " :- BroadcastExchange ",
- " : +- *(1) Range ",
- " +- *(2) Range "
- )
- }
+ test("explain output of physical plan should contain proper codegen stage
ID",
+ DisableAdaptiveExecution("Adaptive explain is different")) {
+ checkKeywordsExist(sql(
+ """
+ |EXPLAIN SELECT t1.id AS a, t2.id AS b FROM
+ |(SELECT * FROM range(3)) t1 JOIN
+ |(SELECT * FROM range(10)) t2 ON t1.id == t2.id % 3
+ """.stripMargin),
+ "== Physical Plan ==",
+ "*(2) Project ",
+ "+- *(2) BroadcastHashJoin ",
+ " :- BroadcastExchange ",
+ " : +- *(1) Range ",
+ " +- *(2) Range "
+ )
}
test("EXPLAIN CODEGEN command") {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
index 16668f9..4d6dafd 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -17,24 +17,13 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton {
-
- var originalValue: String = _
- // With AQE on/off, the metric info is different.
- override def beforeAll(): Unit = {
- super.beforeAll()
- originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
- }
-
- override def afterAll(): Unit = {
- spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
- super.afterAll()
- }
+// Disable AQE because metric info is different with AQE on/off
+class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton
+ with DisableAdaptiveExecutionSuite {
test("writing data out metrics: hive") {
testMetricsNonDynamicPartition("hive", "t1")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 65c1db5..138dcc5 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils,
HiveTableRelation}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite,
EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.command.{FunctionsCommand,
LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
import org.apache.spark.sql.functions._
@@ -67,7 +68,7 @@ case class Order(
* Hive to generate them (in contrast to HiveQuerySuite). Often this is
because the query is
* valid, but Hive currently cannot execute it.
*/
-class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton
{
+abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with
TestHiveSingleton {
import hiveContext._
import spark.implicits._
@@ -2493,3 +2494,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils
with TestHiveSingleton {
}
}
}
+
+class SQLQuerySuite extends SQLQuerySuiteBase with
DisableAdaptiveExecutionSuite
+class SQLQuerySuiteAE extends SQLQuerySuiteBase with
EnableAdaptiveExecutionSuite
+
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b9f10c3..a3e2444 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -234,16 +234,16 @@ private[hive] class TestHiveSparkSession(
* Dataset.ofRows that creates a TestHiveQueryExecution (rather than a
normal QueryExecution
* which wouldn't load all the test tables).
*/
- override def sql(sqlText: String): DataFrame = {
+ override def sql(sqlText: String): DataFrame = withActive {
val plan = sessionState.sqlParser.parsePlan(sqlText)
Dataset.ofRows(self, plan)
}
- override def newSession(): TestHiveSparkSession = {
+ override def newSession(): TestHiveSparkSession = withActive {
new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
}
- override def cloneSession(): SparkSession = {
+ override def cloneSession(): SparkSession = withActive {
val result = new TestHiveSparkSession(
sparkContext,
Some(sharedState),
@@ -264,7 +264,10 @@ private[hive] class TestHiveSparkSession(
System.clearProperty("spark.hostPort")
// For some hive test case which contain ${system:test.tmp.dir}
- System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
+ // Make sure it is not called again when cloning sessions.
+ if (parentSessionState.isEmpty) {
+ System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
+ }
/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
@@ -587,7 +590,7 @@ private[hive] class TestHiveQueryExecution(
this(TestHive.sparkSession, sql)
}
- override lazy val analyzed: LogicalPlan = {
+ override lazy val analyzed: LogicalPlan = sparkSession.withActive {
val describedTables = logical match {
case CacheTableCommand(tbl, _, _, _) => tbl :: Nil
case _ => Nil
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]