This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 931854ad7cf7053366eb54877f6358eeae0d7343
Author: huangsheng <huangshen...@163.com>
AuthorDate: Fri Apr 7 14:04:10 2023 +0800

    KYLIN-5601 Temporarily Close AQE for dict build job (#30251)
    
    * KYLIN-5601 Temporarily Close AQE for dict build job
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +++
 .../engine/spark/builder/DFDictionaryBuilder.scala | 18 ++++++++++
 .../engine/spark/builder/TestGlobalDictBuild.scala | 41 ++++++++++++++++++++++
 3 files changed, 63 insertions(+)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4a852aa1e4..9557ede4d5 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1632,6 +1632,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.engine.global-dict-check-enabled", 
FALSE));
     }
 
+    public boolean isGlobalDictAQEEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.engine.global-dict-aqe-enabled", 
FALSE));
+    }
+
     public String getJdbcSourceName() {
         return getOptional(KYLIN_SOURCE_JDBC_SOURCE_NAME_KEY);
     }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
index 992bd6b70c..5dfc35eeac 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
@@ -44,9 +44,12 @@ class DFDictionaryBuilder(
   @throws[IOException]
   def buildDictSet(): Unit = {
     colRefSet.asScala.foreach(col => safeBuild(col))
+    changeAQEConfig(true)
   }
 
   private val YARN_CLUSTER: String = "cluster"
+  private val AQE = "spark.sql.adaptive.enabled";
+  private val originalAQE = ss.conf.get(AQE)
 
   private def tryZKJaasConfiguration(): Unit = {
     val config = KylinConfig.getInstanceFromEnv
@@ -79,6 +82,18 @@ class DFDictionaryBuilder(
     } finally lock.unlock()
   }
 
+  // Workaround: https://olapio.atlassian.net/browse/KE-41645
+  private[builder] def changeAQEConfig(isDictBuildFinished: Boolean = false) : 
Boolean = {
+    if (!seg.getConfig.isGlobalDictAQEEnabled && !isDictBuildFinished) {
+      logInfo("Temporarily Close AQE for dict build job")
+      ss.conf.set(AQE, false)
+      return false
+    }
+    logInfo(s"Restore AQE to its initial config: $originalAQE")
+    ss.conf.set(AQE, originalAQE)
+    originalAQE.toBoolean
+  }
+
   @throws[IOException]
   private[builder] def build(ref: TblColRef, bucketPartitionSize: Int,
                              afterDistinct: Dataset[Row]): Unit = 
logTime(s"building global dictionaries V2 for ${ref.getIdentity}") {
@@ -86,7 +101,9 @@ class DFDictionaryBuilder(
     globalDict.prepareWrite()
     val broadcastDict = ss.sparkContext.broadcast(globalDict)
 
+    changeAQEConfig(false)
     ss.sparkContext.setJobDescription("Build dict " + ref.getIdentity)
+
     val dictCol = col(afterDistinct.schema.fields.head.name)
     afterDistinct.filter(dictCol.isNotNull)
       .repartition(bucketPartitionSize, dictCol)
@@ -118,6 +135,7 @@ class DFDictionaryBuilder(
       }
       logInfo(s"Global dict correctness check completed, table: 
${ref.getTableAlias}, col: ${ref.getName}")
     }
+    changeAQEConfig(true)
   }
 
   private def getLockPath(pathName: String) = 
s"/${seg.getProject}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock"
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
index 30dcd4a961..764229ff44 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
@@ -164,6 +164,47 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with 
SharedSparkSession wi
     Assert.assertEquals(shufflePartitionSizeInt, dictDirSize)
   }
 
+
+  test("global dict build and close aqe") {
+    val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
DEFAULT_PROJECT)
+    val df: NDataflow = dsMgr.getDataflow(CUBE_NAME)
+    val seg = df.getLastSegment
+    val nSpanningTree = 
NSpanningTreeFactory.fromLayouts(seg.getIndexPlan.getAllLayouts, df.getUuid)
+    val dictColSet = 
DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(seg, 
nSpanningTree.getAllIndexEntities)
+    val randomDataSet = generateOriginData(200, 10)
+
+    val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, seg, spark, 
dictColSet)
+    val col = dictColSet.iterator().next()
+    val ds = randomDataSet.select("26").distinct()
+    val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, 
col, ds)
+
+    val originalAQE = spark.conf.get("spark.sql.adaptive.enabled")
+
+    // false false
+    seg.getConfig.setProperty("kylin.engine.global-dict-aqe-enabled", "FALSE")
+    dictionaryBuilder.changeAQEConfig(false)
+    Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean)
+    dictionaryBuilder.build(col, bucketPartitionSize, ds)
+    
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
+
+    // false true
+    dictionaryBuilder.changeAQEConfig(true)
+    
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
+
+    // true false
+    seg.getConfig.setProperty("kylin.engine.global-dict-aqe-enabled", "TRUE")
+    dictionaryBuilder.changeAQEConfig(false)
+    
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
+
+    dictionaryBuilder.build(col, bucketPartitionSize, ds)
+    
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
+
+    // true true
+    dictionaryBuilder.changeAQEConfig(true)
+    
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
+
+  }
+
   def buildDict(seg: NDataSegment, randomDataSet: Dataset[Row], dictColSet: 
Set[TblColRef]): NGlobalDictMetaInfo = {
     val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, seg, 
randomDataSet.sparkSession, dictColSet)
     val col = dictColSet.iterator().next()

Reply via email to