This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 931854ad7cf7053366eb54877f6358eeae0d7343 Author: huangsheng <huangshen...@163.com> AuthorDate: Fri Apr 7 14:04:10 2023 +0800 KYLIN-5601 Temporarily Close AQE for dict build job (#30251) * KYLIN-5601 Temporarily Close AQE for dict build job --- .../org/apache/kylin/common/KylinConfigBase.java | 4 +++ .../engine/spark/builder/DFDictionaryBuilder.scala | 18 ++++++++++ .../engine/spark/builder/TestGlobalDictBuild.scala | 41 ++++++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 4a852aa1e4..9557ede4d5 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1632,6 +1632,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.engine.global-dict-check-enabled", FALSE)); } + public boolean isGlobalDictAQEEnabled() { + return Boolean.parseBoolean(getOptional("kylin.engine.global-dict-aqe-enabled", FALSE)); + } + public String getJdbcSourceName() { return getOptional(KYLIN_SOURCE_JDBC_SOURCE_NAME_KEY); } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala index 992bd6b70c..5dfc35eeac 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala @@ -44,9 +44,12 @@ class DFDictionaryBuilder( @throws[IOException] def buildDictSet(): Unit = { colRefSet.asScala.foreach(col => safeBuild(col)) + changeAQEConfig(true) } private val YARN_CLUSTER: String = "cluster" + private val AQE = "spark.sql.adaptive.enabled"; + private val originalAQE = ss.conf.get(AQE) private def tryZKJaasConfiguration(): Unit = { val config = KylinConfig.getInstanceFromEnv @@ -79,6 +82,18 @@ class DFDictionaryBuilder( } finally lock.unlock() } + // Workaround: https://olapio.atlassian.net/browse/KE-41645 + private[builder] def changeAQEConfig(isDictBuildFinished: Boolean = false) : Boolean = { + if (!seg.getConfig.isGlobalDictAQEEnabled && !isDictBuildFinished) { + logInfo("Temporarily Close AQE for dict build job") + ss.conf.set(AQE, false) + return false + } + logInfo(s"Restore AQE to its initial config: $originalAQE") + ss.conf.set(AQE, originalAQE) + originalAQE.toBoolean + } + @throws[IOException] private[builder] def build(ref: TblColRef, bucketPartitionSize: Int, afterDistinct: Dataset[Row]): Unit = logTime(s"building global dictionaries V2 for ${ref.getIdentity}") { @@ -86,7 +101,9 @@ class DFDictionaryBuilder( globalDict.prepareWrite() val broadcastDict = ss.sparkContext.broadcast(globalDict) + changeAQEConfig(false) ss.sparkContext.setJobDescription("Build dict " + ref.getIdentity) + val dictCol = col(afterDistinct.schema.fields.head.name) afterDistinct.filter(dictCol.isNotNull) .repartition(bucketPartitionSize, dictCol) @@ -118,6 +135,7 @@ class DFDictionaryBuilder( } logInfo(s"Global dict correctness check completed, table: ${ref.getTableAlias}, col: ${ref.getName}") } + changeAQEConfig(true) } private def getLockPath(pathName: String) = s"/${seg.getProject}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock" diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala index 30dcd4a961..764229ff44 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala @@ -164,6 +164,47 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi Assert.assertEquals(shufflePartitionSizeInt, dictDirSize) } + + test("global dict build and close aqe") { + val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, DEFAULT_PROJECT) + val df: NDataflow = dsMgr.getDataflow(CUBE_NAME) + val seg = df.getLastSegment + val nSpanningTree = NSpanningTreeFactory.fromLayouts(seg.getIndexPlan.getAllLayouts, df.getUuid) + val dictColSet = DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(seg, nSpanningTree.getAllIndexEntities) + val randomDataSet = generateOriginData(200, 10) + + val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, seg, spark, dictColSet) + val col = dictColSet.iterator().next() + val ds = randomDataSet.select("26").distinct() + val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, col, ds) + + val originalAQE = spark.conf.get("spark.sql.adaptive.enabled") + + // false false + seg.getConfig.setProperty("kylin.engine.global-dict-aqe-enabled", "FALSE") + dictionaryBuilder.changeAQEConfig(false) + Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean) + dictionaryBuilder.build(col, bucketPartitionSize, ds) + Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) + + // false true + dictionaryBuilder.changeAQEConfig(true) + Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) + + // true false + seg.getConfig.setProperty("kylin.engine.global-dict-aqe-enabled", "TRUE") + dictionaryBuilder.changeAQEConfig(false) + Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) + + dictionaryBuilder.build(col, bucketPartitionSize, ds) + Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) + + // true true + dictionaryBuilder.changeAQEConfig(true) + Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) + + } + def buildDict(seg: NDataSegment, randomDataSet: Dataset[Row], dictColSet: Set[TblColRef]): NGlobalDictMetaInfo = { val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, seg, randomDataSet.sparkSession, dictColSet) val col = dictColSet.iterator().next()