This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new db1c3fe [SPARK-31045][SQL] Add config for AQE logging level
db1c3fe is described below
commit db1c3feacdca04c1b72191269f4f3910ad05bcb4
Author: maryannxue <[email protected]>
AuthorDate: Fri Mar 6 11:41:45 2020 +0800
[SPARK-31045][SQL] Add config for AQE logging level
### What changes were proposed in this pull request?
This PR adds an internal config for changing the logging level of adaptive
execution query plan evolvement.
### Why are the changes needed?
To make AQE debugging easier.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added UT.
Closes #27798 from maryannxue/aqe-log-level.
Authored-by: maryannxue <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d705d36c0c94d3a4684de6ca0f444557c3cec25e)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 12 +++++-
.../adaptive/AdaptiveQueryExecSuite.scala | 47 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b2b3d12..cd465bc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -378,6 +378,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel")
+ .internal()
+ .doc("Configures the log level for adaptive execution logging of plan
changes. The value " +
+ "can be 'trace', 'debug', 'info', 'warn', or 'error'. The default log
level is 'debug'.")
+ .version("3.0.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR"))
+ .createWithDefault("debug")
+
val ADVISORY_PARTITION_SIZE_IN_BYTES =
buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
.doc("The advisory size in bytes of the shuffle partition during
adaptive optimization " +
@@ -2428,6 +2438,8 @@ class SQLConf extends Serializable with Logging {
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
+ def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
+
def fetchShuffleBlocksInBatch: Boolean =
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)
def nonEmptyPartitionRatioForBroadcastJoin: Double =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index b74401e..fc88a7f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -66,6 +66,15 @@ case class AdaptiveSparkPlanExec(
@transient private val lock = new Object()
+ @transient private val logOnLevel: ( => String) => Unit =
conf.adaptiveExecutionLogLevel match {
+ case "TRACE" => logTrace(_)
+ case "DEBUG" => logDebug(_)
+ case "INFO" => logInfo(_)
+ case "WARN" => logWarning(_)
+ case "ERROR" => logError(_)
+ case _ => logDebug(_)
+ }
+
// The logical plan optimizer for re-optimizing the current logical plan.
@transient private val optimizer = new RuleExecutor[LogicalPlan] {
// TODO add more optimization rules
@@ -204,6 +213,7 @@ case class AdaptiveSparkPlanExec(
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
+ logOnLevel(s"Plan changed from $currentPhysicalPlan to
$newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
@@ -217,7 +227,7 @@ case class AdaptiveSparkPlanExec(
currentPhysicalPlan = applyPhysicalRules(result.newPlan,
queryStageOptimizerRules)
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
- logDebug(s"Final plan: $currentPhysicalPlan")
+ logOnLevel(s"Final plan: $currentPhysicalPlan")
}
currentPhysicalPlan
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index a7fa63d..25b1f89 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.adaptive
import java.io.File
import java.net.URI
+import org.apache.log4j.Level
+
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobStart}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD,
SparkPlan}
@@ -729,5 +731,50 @@ class AdaptiveQueryExecSuite
s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is" +
s" enabled but is not supported for")))
}
+
+ test("test log level") {
+ def verifyLog(expectedLevel: Level): Unit = {
+ val logAppender = new LogAppender("adaptive execution")
+ withLogAppender(
+ logAppender,
+ loggerName = Some(AdaptiveSparkPlanExec.getClass.getName.dropRight(1)),
+ level = Some(Level.TRACE)) {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+ sql("SELECT * FROM testData join testData2 ON key = a where value =
'1'").collect()
+ }
+ }
+ Seq("Plan changed", "Final plan").foreach { msg =>
+ assert(
+ logAppender.loggingEvents.exists { event =>
+ event.getRenderedMessage.contains(msg) && event.getLevel ==
expectedLevel
+ })
+ }
+ }
+
+ // Verify default log level
+ verifyLog(Level.DEBUG)
+
+ // Verify custom log level
+ val levels = Seq(
+ "TRACE" -> Level.TRACE,
+ "trace" -> Level.TRACE,
+ "DEBUG" -> Level.DEBUG,
+ "debug" -> Level.DEBUG,
+ "INFO" -> Level.INFO,
+ "info" -> Level.INFO,
+ "WARN" -> Level.WARN,
+ "warn" -> Level.WARN,
+ "ERROR" -> Level.ERROR,
+ "error" -> Level.ERROR,
+ "deBUG" -> Level.DEBUG)
+
+ levels.foreach { level =>
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_LOG_LEVEL.key -> level._1) {
+ verifyLog(level._2)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]