This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b11a099  [SPARK-31391][SQL][TEST] Add AdaptiveTestUtils to ease the 
test of AQE
b11a099 is described below

commit b11a099ca6aa259c0818170fb962fe7cc5b746e2
Author: yi.wu <[email protected]>
AuthorDate: Mon Apr 13 14:40:53 2020 +0000

    [SPARK-31391][SQL][TEST] Add AdaptiveTestUtils to ease the test of AQE
    
    ### What changes were proposed in this pull request?
    
    This PR adds `AdaptiveTestUtils` to make AQE test simpler, which includes:
    
    `DisableAdaptiveExecution` - a test tag to skip a single test case if AQE 
is enabled.
    `EnableAdaptiveExecutionSuite` - a helper trait to enable AQE for all tests 
except those tagged with `DisableAdaptiveExecution`.
    `DisableAdaptiveExecutionSuite` - a helper trait to disable AQE for all 
tests.
    `assertExceptionMessage` - a method to handle message of normal or AQE 
exception in a consistent way.
    `assertExceptionCause` - a method to handle cause of normal or AQE 
exception in a consistent way.
    
    ### Why are the changes needed?
    
    With this utils, we can:
    - reduce much more duplicate codes;
    - handle normal or AQE exception in a consistent way;
    - improve the stability of AQE tests;
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated tests with the util.
    
    Closes #28162 from Ngone51/add_aqe_test_utils.
    
    Authored-by: yi.wu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit bbb3cd9c5e659a89c95fe718f8b63a540187fa43)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/ConfigBehaviorSuite.scala |  9 +--
 .../scala/org/apache/spark/sql/ExplainSuite.scala  | 15 +---
 .../org/apache/spark/sql/MetadataCacheSuite.scala  |  9 ++-
 .../scala/org/apache/spark/sql/SubquerySuite.scala |  8 +-
 .../DeprecatedWholeStageCodegenSuite.scala         | 25 +++---
 .../execution/LogicalPlanTagInSparkPlanSuite.scala | 18 +----
 .../apache/spark/sql/execution/PlannerSuite.scala  |  8 +-
 .../sql/execution/WholeStageCodegenSuite.scala     | 18 +----
 .../sql/execution/adaptive/AdaptiveTestUtils.scala | 93 ++++++++++++++++++++++
 .../sql/execution/datasources/json/JsonSuite.scala |  4 +-
 .../execution/datasources/orc/OrcQuerySuite.scala  | 25 +++---
 .../spark/sql/execution/debug/DebuggingSuite.scala | 19 +----
 .../sql/execution/metric/SQLMetricsSuite.scala     | 18 +----
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 14 ++--
 .../sql/internal/ExecutorSideSQLConfSuite.scala    |  7 +-
 .../spark/sql/sources/BucketedReadSuite.scala      |  3 +-
 .../org/apache/spark/sql/test/SQLTestUtils.scala   | 17 +++-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala    |  5 +-
 .../sql/hive/execution/HiveExplainSuite.scala      | 32 ++++----
 .../spark/sql/hive/execution/SQLMetricsSuite.scala | 19 +----
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |  7 +-
 .../org/apache/spark/sql/hive/test/TestHive.scala  | 13 +--
 22 files changed, 217 insertions(+), 169 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
index c3dbbb3..36989ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.commons.math3.stat.inference.ChiSquareTest
 
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -27,7 +28,8 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSparkSession {
 
   import testImplicits._
 
-  test("SPARK-22160 spark.sql.execution.rangeExchange.sampleSizePerPartition") 
{
+  test("SPARK-22160 spark.sql.execution.rangeExchange.sampleSizePerPartition",
+    DisableAdaptiveExecution("Post shuffle partition number can be 
different")) {
     // In this test, we run a sort and compute the histogram for partition 
size post shuffle.
     // With a high sample count, the partition size should be more evenly 
distributed, and has a
     // low chi-sq test value.
@@ -53,11 +55,8 @@ class ConfigBehaviorSuite extends QueryTest with 
SharedSparkSession {
         dist)
     }
 
-    // When enable AQE, the post partition number is changed.
     // And the ChiSquareTest result is also need updated. So disable AQE.
-    withSQLConf(
-        SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString,
-        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
       // The default chi-sq value should be low
       assert(computeChiSquareTest() < 100)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 16c5802..1a35e5b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -18,26 +18,15 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 
-class ExplainSuite extends QueryTest with SharedSparkSession {
+class ExplainSuite extends QueryTest with SharedSparkSession with 
DisableAdaptiveExecutionSuite {
   import testImplicits._
 
-  var originalValue: String = _
-  protected override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
-  }
-
-  protected override def afterAll(): Unit = {
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
-    super.afterAll()
-  }
-
   private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = 
{
     val output = new java.io.ByteArrayOutputStream()
     Console.withOut(output) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index 956bd78..a9f443b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import java.io.File
 
 import org.apache.spark.{SparkConf, SparkException}
+import 
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -55,8 +56,8 @@ abstract class MetadataCacheSuite extends QueryTest with 
SharedSparkSession {
       val e = intercept[SparkException] {
         df.count()
       }
-      assert(e.getMessage.contains("FileNotFoundException"))
-      assert(e.getMessage.contains("recreating the Dataset/DataFrame 
involved"))
+      assertExceptionMessage(e, "FileNotFoundException")
+      assertExceptionMessage(e, "recreating the Dataset/DataFrame involved")
     }
   }
 }
@@ -84,8 +85,8 @@ class MetadataCacheV1Suite extends MetadataCacheSuite {
       val e = intercept[SparkException] {
         sql("select count(*) from view_refresh").first()
       }
-      assert(e.getMessage.contains("FileNotFoundException"))
-      assert(e.getMessage.contains("REFRESH"))
+      assertExceptionMessage(e, "FileNotFoundException")
+      assertExceptionMessage(e, "REFRESH")
 
       // Refresh and we should be able to read it again.
       spark.catalog.refreshTable("view_refresh")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index ff8f94c..760b03d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
 import org.apache.spark.sql.execution.{ColumnarToRowExec, 
ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, 
ScalarSubquery, SubqueryExec, WholeStageCodegenExec}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecution}
 import org.apache.spark.sql.execution.datasources.FileScanRDD
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -1357,11 +1357,9 @@ class SubquerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     }
   }
 
-  test("SPARK-27279: Reuse Subquery") {
+  test("SPARK-27279: Reuse Subquery", DisableAdaptiveExecution("reuse is 
dynamic in AQE")) {
     Seq(true, false).foreach { reuse =>
-      withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString,
-        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-        // when enable AQE, the reusedExchange is inserted when executed.
+      withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
         val df = sql(
           """
             |SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM 
testData)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
index 1e90754..b27a940 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DeprecatedWholeStageCodegenSuite.scala
@@ -18,30 +18,27 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.expressions.scalalang.typed
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
+// Disable AQE because the WholeStageCodegenExec is added when running 
QueryStageExec
 @deprecated("This test suite will be removed.", "3.0.0")
 class DeprecatedWholeStageCodegenSuite extends QueryTest
   with SharedSparkSession
-  with AdaptiveSparkPlanHelper {
+  with DisableAdaptiveExecutionSuite {
 
   test("simple typed UDAF should be included in WholeStageCodegen") {
-    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-      // With enable AQE, the WholeStageCodegenExec rule is applied when 
running QueryStageExec.
-      import testImplicits._
+    import testImplicits._
 
-      val ds = Seq(("a", 10), ("b", 1), ("b", 2), ("c", 1)).toDS()
-        .groupByKey(_._1).agg(typed.sum(_._2))
+    val ds = Seq(("a", 10), ("b", 1), ("b", 2), ("c", 1)).toDS()
+      .groupByKey(_._1).agg(typed.sum(_._2))
 
-      val plan = ds.queryExecution.executedPlan
-      assert(find(plan)(p =>
-        p.isInstanceOf[WholeStageCodegenExec] &&
-          
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
-      assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
-    }
+    val plan = ds.queryExecution.executedPlan
+    assert(plan.find(p =>
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
+    assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index 311f84c..5bcec9b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.sql.TPCDSQuerySuite
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Complete, Final}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Generate, Join, 
LocalRelation, LogicalPlan, Range, Sample, Union, Window}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.columnar.{InMemoryRelation, 
InMemoryTableScanExec}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -29,22 +30,9 @@ import 
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.window.WindowExec
-import org.apache.spark.sql.internal.SQLConf
 
-class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite {
-
-  var originalValue: String = _
-  // when enable AQE, the 'AdaptiveSparkPlanExec' node does not have a logical 
plan link
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
-  }
-
-  override def afterAll(): Unit = {
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
-    super.afterAll()
-  }
+// Disable AQE because AdaptiveSparkPlanExec does not have a logical plan link
+class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite with 
DisableAdaptiveExecutionSuite {
 
   override protected def checkGeneratedCode(
       plan: SparkPlan, checkMethodCodeSize: Boolean = true): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 0c5e2e3..9c8e443 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Range, Repartition, Sort, Union}
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecution}
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.columnar.{InMemoryRelation, 
InMemoryTableScanExec}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
@@ -752,7 +752,8 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
   }
 
   test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec 
" +
-    "and InMemoryTableScanExec") {
+    "and InMemoryTableScanExec",
+    DisableAdaptiveExecution("Reuse is dynamic in AQE")) {
     def checkOutputPartitioningRewrite(
         plans: Seq[SparkPlan],
         expectedPartitioningClass: Class[_]): Unit = {
@@ -782,8 +783,7 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
       checkOutputPartitioningRewrite(inMemoryScan, expectedPartitioningClass)
     }
     // when enable AQE, the reusedExchange is inserted when executed.
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       // ReusedExchange is HashPartitioning
       val df1 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
       val df2 = Seq(1 -> "a").toDF("i", "j").repartition($"i")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 06a016f..f7396ee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, 
CodeAndComment, CodeGenerator}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
@@ -28,23 +29,12 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession {
+// Disable AQE because the WholeStageCodegenExec is added when running 
QueryStageExec
+class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
+  with DisableAdaptiveExecutionSuite {
 
   import testImplicits._
 
-  var originalValue: String = _
-  // With on AQE, the WholeStageCodegenExec is added when running 
QueryStageExec.
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
-  }
-
-  override def afterAll(): Unit = {
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
-    super.afterAll()
-  }
-
   test("range/filter should be combined") {
     val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
     val plan = df.queryExecution.executedPlan
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
new file mode 100644
index 0000000..ddaeb57
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.io.{PrintWriter, StringWriter}
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test with this tag will be ignored if the test suite extends 
`EnableAdaptiveExecutionSuite`.
+ * Otherwise, it will be executed with adaptive execution disabled.
+ */
+case class DisableAdaptiveExecution(reason: String) extends 
Tag("DisableAdaptiveExecution")
+
+/**
+ * Helper trait that enables AQE for all tests regardless of default config 
values, except that
+ * tests tagged with [[DisableAdaptiveExecution]] will be skipped.
+ */
+trait EnableAdaptiveExecutionSuite extends SQLTestUtils {
+  protected val forceApply = true
+
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
+      (implicit pos: Position): Unit = {
+    if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+      // we ignore the test here but assume that another test suite which 
extends
+      // `DisableAdaptiveExecutionSuite` will test it anyway to ensure test 
coverage
+      ignore(testName + " (disabled when AQE is on)", testTags: _*)(testFun)
+    } else {
+      super.test(testName, testTags: _*) {
+        withSQLConf(
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+          SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> forceApply.toString) {
+          testFun
+        }
+      }
+    }
+  }
+}
+
+/**
+ * Helper trait that disables AQE for all tests regardless of default config 
values.
+ */
+trait DisableAdaptiveExecutionSuite extends SQLTestUtils {
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
+      (implicit pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+        testFun
+      }
+    }
+  }
+}
+
+object AdaptiveTestUtils {
+  def assertExceptionMessage(e: Exception, expected: String): Unit = {
+    val stringWriter = new StringWriter()
+    e.printStackTrace(new PrintWriter(stringWriter))
+    val errorMsg = stringWriter.toString
+    assert(errorMsg.contains(expected))
+  }
+
+  def assertExceptionCause(t: Throwable, causeClass: Class[_]): Unit = {
+    var c = t.getCause
+    var foundCause = false
+    while (c != null && !foundCause) {
+      if (causeClass.isAssignableFrom(c.getClass)) {
+        foundCause = true
+      } else {
+        c = c.getCause
+      }
+    }
+    assert(foundCause, s"Can not find cause: $causeClass")
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 5466800..be95f33 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.ExternalRDD
+import 
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -2192,9 +2193,8 @@ abstract class JsonSuite extends QueryTest with 
SharedSparkSession with TestJson
         .json(testFile(fileName))
         .count()
     }
-    val errMsg = exception.getMessage
 
-    assert(errMsg.contains("Malformed records are detected in record parsing"))
+    assertExceptionMessage(exception, "Malformed records are detected in 
record parsing")
   }
 
   def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 5f49526..60f278b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -34,6 +34,7 @@ import org.apache.orc.mapreduce.OrcInputFormat
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import 
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -595,22 +596,22 @@ abstract class OrcQueryTest extends OrcTest {
     }
 
     withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
-      val m1 = intercept[SparkException] {
+      val e1 = intercept[SparkException] {
         testIgnoreCorruptFiles()
-      }.getMessage
-      assert(m1.contains("Malformed ORC file"))
-      val m2 = intercept[SparkException] {
+      }
+      assertExceptionMessage(e1, "Malformed ORC file")
+      val e2 = intercept[SparkException] {
         testIgnoreCorruptFilesWithoutSchemaInfer()
-      }.getMessage
-      assert(m2.contains("Malformed ORC file"))
-      val m3 = intercept[SparkException] {
+      }
+      assertExceptionMessage(e2, "Malformed ORC file")
+      val e3 = intercept[SparkException] {
         testAllCorruptFiles()
-      }.getMessage
-      assert(m3.contains("Could not read footer for file"))
-      val m4 = intercept[SparkException] {
+      }
+      assertExceptionMessage(e3, "Could not read footer for file")
+      val e4 = intercept[SparkException] {
         testAllCorruptFilesWithoutSchemaInfer()
-      }.getMessage
-      assert(m4.contains("Malformed ORC file"))
+      }
+      assertExceptionMessage(e4, "Malformed ORC file")
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 4cb845b..e9ef7c1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -24,27 +24,14 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.StructType
 
-class DebuggingSuite extends SharedSparkSession {
-
-
-  var originalValue: String = _
-  // With on AQE, the WholeStageCodegenExec is added when running 
QueryStageExec.
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
-  }
-
-  override def afterAll(): Unit = {
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
-    super.afterAll()
-  }
+// Disable AQE because the WholeStageCodegenExec is added when running 
QueryStageExec
+class DebuggingSuite extends SharedSparkSession with 
DisableAdaptiveExecutionSuite {
 
   test("DataFrame.debug()") {
     testData.debug()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a5b07d5..1c4a818 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.functions._
@@ -33,22 +34,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
 
-class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
+// Disable AQE because metric info is different with AQE on/off
+class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
+  with DisableAdaptiveExecutionSuite {
   import testImplicits._
 
-  var originalValue: String = _
-  // With AQE on/off, the metric info is different.
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
-  }
-
-  override def afterAll(): Unit = {
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
-    super.afterAll()
-  }
-
   /**
    * Generates a `DataFrame` by filling randomly generated bytes for hash 
collision.
    */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 949924e..0746059 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
@@ -623,13 +624,12 @@ class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTestUtils
     assert(statusStore.execution(2) === None)
   }
 
-  test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo") {
-    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-      // with AQE on, the WholeStageCodegen rule is applied when running 
QueryStageExec.
-      val df = createTestDataFrame.select(count("*"))
-      val sparkPlanInfo = 
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)
-      assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
-    }
+  test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo",
+    DisableAdaptiveExecution("WSCG rule is applied later in AQE")) {
+    // with AQE on, the WholeStageCodegen rule is applied when running 
QueryStageExec.
+    val df = createTestDataFrame.select(count("*"))
+    val sparkPlanInfo = 
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)
+    assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 888772c..567524a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
 import org.apache.spark.sql.execution.debug.codegenStringSeq
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.test.SQLTestUtils
@@ -98,10 +99,10 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with 
SQLTestUtils {
     }
   }
 
-  test("SPARK-22219: refactor to control to generate comment") {
+  test("SPARK-22219: refactor to control to generate comment",
+    DisableAdaptiveExecution("WSCG rule is applied later in AQE")) {
     Seq(true, false).foreach { flag =>
-      withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> flag.toString,
-        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+      withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> flag.toString) {
         // with AQE on, the WholeStageCodegen rule is applied when running 
QueryStageExec.
         val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 
2).count()
           .queryExecution.executedPlan)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 14ba008..707d9c2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, 
SortExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import 
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -758,7 +759,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
         agged.count()
       }
 
-      assert(error.getCause().toString contains "Invalid bucket file")
+      assertExceptionMessage(error, "Invalid bucket file")
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 38893f8..7be15e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,7 +27,8 @@ import scala.language.implicitConversions
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
-import org.scalatest.{BeforeAndAfterAll, Suite}
+import org.scalactic.source.Position
+import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
@@ -40,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.FilterExec
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.UninterruptibleThread
@@ -114,6 +116,19 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with 
SQLTestUtilsBase with
     }
   }
 
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)
+      (implicit pos: Position): Unit = {
+    if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
+      super.test(testName, testTags: _*) {
+        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+          testFun
+        }
+      }
+    } else {
+      super.test(testName, testTags: _*)(testFun)
+    }
+  }
+
   /**
    * Run a test on a separate `UninterruptibleThread`.
    */
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 94a55b9..743cdbd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.QueryTest
+import 
org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -99,7 +100,7 @@ class HiveMetadataCacheSuite extends QueryTest with 
SQLTestUtils with TestHiveSi
             val e = intercept[SparkException] {
               sql("select * from test").count()
             }
-            assert(e.getMessage.contains("FileNotFoundException"))
+            assertExceptionMessage(e, "FileNotFoundException")
 
             // Test refreshing the cache.
             spark.catalog.refreshTable("test")
@@ -114,7 +115,7 @@ class HiveMetadataCacheSuite extends QueryTest with 
SQLTestUtils with TestHiveSi
             val e2 = intercept[SparkException] {
               sql("select * from test").count()
             }
-            assert(e2.getMessage.contains("FileNotFoundException"))
+            assertExceptionMessage(e2, "FileNotFoundException")
             spark.catalog.refreshByPath(dir.getAbsolutePath)
             assert(sql("select * from test").count() == 3)
           }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index f9a4e2c..7a913e9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -133,22 +134,21 @@ class HiveExplainSuite extends QueryTest with 
SQLTestUtils with TestHiveSingleto
       "src")
   }
 
-  test("explain output of physical plan should contain proper codegen stage 
ID") {
-    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-      checkKeywordsExist(sql(
-        """
-          |EXPLAIN SELECT t1.id AS a, t2.id AS b FROM
-          |(SELECT * FROM range(3)) t1 JOIN
-          |(SELECT * FROM range(10)) t2 ON t1.id == t2.id % 3
-        """.stripMargin),
-        "== Physical Plan ==",
-        "*(2) Project ",
-        "+- *(2) BroadcastHashJoin ",
-        "   :- BroadcastExchange ",
-        "   :  +- *(1) Range ",
-        "   +- *(2) Range "
-      )
-    }
+  test("explain output of physical plan should contain proper codegen stage 
ID",
+    DisableAdaptiveExecution("Adaptive explain is different")) {
+    checkKeywordsExist(sql(
+      """
+        |EXPLAIN SELECT t1.id AS a, t2.id AS b FROM
+        |(SELECT * FROM range(3)) t1 JOIN
+        |(SELECT * FROM range(10)) t2 ON t1.id == t2.id % 3
+      """.stripMargin),
+      "== Physical Plan ==",
+      "*(2) Project ",
+      "+- *(2) BroadcastHashJoin ",
+      "   :- BroadcastExchange ",
+      "   :  +- *(1) Range ",
+      "   +- *(2) Range "
+    )
   }
 
   test("EXPLAIN CODEGEN command") {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
index 16668f9..4d6dafd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala
@@ -17,24 +17,13 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
 import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
 import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
 
-class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton {
-
-  var originalValue: String = _
-  // With AQE on/off, the metric info is different.
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    originalValue = spark.conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
-  }
-
-  override def afterAll(): Unit = {
-    spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, originalValue)
-    super.afterAll()
-  }
+// Disable AQE because metric info is different with AQE on/off
+class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton
+  with DisableAdaptiveExecutionSuite {
 
   test("writing data out metrics: hive") {
     testMetricsNonDynamicPartition("hive", "t1")
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 65c1db5..138dcc5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, 
HiveTableRelation}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.command.{FunctionsCommand, 
LoadDataCommand}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.functions._
@@ -67,7 +68,7 @@ case class Order(
  * Hive to generate them (in contrast to HiveQuerySuite).  Often this is 
because the query is
  * valid, but Hive currently cannot execute it.
  */
-class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton 
{
+abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
   import hiveContext._
   import spark.implicits._
 
@@ -2493,3 +2494,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     }
   }
 }
+
+class SQLQuerySuite extends SQLQuerySuiteBase with 
DisableAdaptiveExecutionSuite
+class SQLQuerySuiteAE extends SQLQuerySuiteBase with 
EnableAdaptiveExecutionSuite
+
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b9f10c3..a3e2444 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -234,16 +234,16 @@ private[hive] class TestHiveSparkSession(
    * Dataset.ofRows that creates a TestHiveQueryExecution (rather than a 
normal QueryExecution
    * which wouldn't load all the test tables).
    */
-  override def sql(sqlText: String): DataFrame = {
+  override def sql(sqlText: String): DataFrame = withActive {
     val plan = sessionState.sqlParser.parsePlan(sqlText)
     Dataset.ofRows(self, plan)
   }
 
-  override def newSession(): TestHiveSparkSession = {
+  override def newSession(): TestHiveSparkSession = withActive {
     new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
   }
 
-  override def cloneSession(): SparkSession = {
+  override def cloneSession(): SparkSession = withActive {
     val result = new TestHiveSparkSession(
       sparkContext,
       Some(sharedState),
@@ -264,7 +264,10 @@ private[hive] class TestHiveSparkSession(
   System.clearProperty("spark.hostPort")
 
   // For some hive test case which contain ${system:test.tmp.dir}
-  System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
+  // Make sure it is not called again when cloning sessions.
+  if (parentSessionState.isEmpty) {
+    System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
+  }
 
   /** The location of the compiled hive distribution */
   lazy val hiveHome = envVarToFile("HIVE_HOME")
@@ -587,7 +590,7 @@ private[hive] class TestHiveQueryExecution(
     this(TestHive.sparkSession, sql)
   }
 
-  override lazy val analyzed: LogicalPlan = {
+  override lazy val analyzed: LogicalPlan = sparkSession.withActive {
     val describedTables = logical match {
       case CacheTableCommand(tbl, _, _, _) => tbl :: Nil
       case _ => Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to