This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 13366b6fb43bec19defaa1542745278ff0bf4622
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Fri Jun 9 14:23:29 2023 +0800

    KYLIN-5718 V3 Dictionary Automatic Merge
    
    ---------
    
    Co-authored-by: Mingming Ge <7mmi...@gmail.com>
---
 .../src/main/resources/config/init.properties      |  8 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 ++
 .../src/main/resources/kylin-defaults0.properties  |  3 +
 .../src/main/resources/config/init.properties      |  8 +-
 .../src/main/resources/config/init.properties      |  8 +-
 .../src/main/resources/config/init.properties      |  8 +-
 .../spark/builder/v3dict/DictionaryBuilder.scala   | 97 +++++++++++++++++-----
 .../v3dict/PreCountDistinctTransformer.scala       |  2 +-
 .../engine/spark/NLocalWithSparkSessionTest.java   |  2 +
 .../builder/v3dict/GlobalDictionarySuite.scala     | 95 ++++++++++++++++-----
 .../spark/sql/common/SharedSparkSession.scala      |  2 +
 11 files changed, 190 insertions(+), 51 deletions(-)

diff --git a/src/common-booter/src/main/resources/config/init.properties 
b/src/common-booter/src/main/resources/config/init.properties
index 2bddfae312..14ad876715 100644
--- a/src/common-booter/src/main/resources/config/init.properties
+++ b/src/common-booter/src/main/resources/config/init.properties
@@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client
 kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2
 
kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
 
+# for V3 Dictionary
+kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false
+kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true
+
 kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1
 
 # spark3 legacy config after calendar switch
@@ -440,6 +444,6 @@ 
kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -
 
kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} 
-Dkap.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkap.spark.category=streaming_job
 
 # custom
-kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
-kylin.storage.columnar.spark-conf.spark.executor.cores=4
+#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
+#kylin.storage.columnar.spark-conf.spark.executor.cores=4
 kylin.metadata.random-admin-password.enabled=false
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index aec5a58ada..3614898c9a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3298,6 +3298,14 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.build.is-convert-v3dict-enable", 
FALSE));
     }
 
+    public long getV3DictFileNumLimit() {
+        return Long.parseLong(getOptional("kylin.build.v3dict-file-num-limit", 
"10"));
+    }
+
+    public long getV3DictFileRetentionHours() {
+        return 
TimeUtil.timeStringAs(getOptional("kylin.build.v3dict-file-retention", "3d"), 
TimeUnit.HOURS);
+    }
+
     public String getV3DictDBName() {
         return getOptional("kylin.build.v3dict-db-name", DEFAULT);
     }
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties 
b/src/core-common/src/main/resources/kylin-defaults0.properties
index 8007bc0ab8..61412133ec 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -92,8 +92,11 @@ kylin.engine.spark-conf.spark.submit.deployMode=client
 kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2
 
kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
 
+# for V3 Dictionary
 
kylin.engine.spark-conf.spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
 
kylin.engine.spark-conf.spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
+kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false
+kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true
 
 kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1
 
diff --git a/src/data-loading-booter/src/main/resources/config/init.properties 
b/src/data-loading-booter/src/main/resources/config/init.properties
index 2bddfae312..14ad876715 100644
--- a/src/data-loading-booter/src/main/resources/config/init.properties
+++ b/src/data-loading-booter/src/main/resources/config/init.properties
@@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client
 kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2
 
kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
 
+# for V3 Dictionary
+kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false
+kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true
+
 kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1
 
 # spark3 legacy config after calendar switch
@@ -440,6 +444,6 @@ 
kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -
 
kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} 
-Dkap.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkap.spark.category=streaming_job
 
 # custom
-kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
-kylin.storage.columnar.spark-conf.spark.executor.cores=4
+#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
+#kylin.storage.columnar.spark-conf.spark.executor.cores=4
 kylin.metadata.random-admin-password.enabled=false
diff --git a/src/query-booter/src/main/resources/config/init.properties 
b/src/query-booter/src/main/resources/config/init.properties
index 2bddfae312..14ad876715 100644
--- a/src/query-booter/src/main/resources/config/init.properties
+++ b/src/query-booter/src/main/resources/config/init.properties
@@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client
 kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2
 
kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
 
+# for V3 Dictionary
+kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false
+kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true
+
 kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1
 
 # spark3 legacy config after calendar switch
@@ -440,6 +444,6 @@ 
kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -
 
kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} 
-Dkap.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkap.spark.category=streaming_job
 
 # custom
-kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
-kylin.storage.columnar.spark-conf.spark.executor.cores=4
+#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
+#kylin.storage.columnar.spark-conf.spark.executor.cores=4
 kylin.metadata.random-admin-password.enabled=false
diff --git a/src/server/src/main/resources/config/init.properties 
b/src/server/src/main/resources/config/init.properties
index 94b780aa7a..e705611a5a 100644
--- a/src/server/src/main/resources/config/init.properties
+++ b/src/server/src/main/resources/config/init.properties
@@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client
 kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2
 
kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/*
 
+# for V3 Dictionary
+kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false
+kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true
+
 kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1
 
 # spark3 legacy config after calendar switch
@@ -439,6 +443,6 @@ 
kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -
 
kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8
 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} 
-Dkap.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkap.spark.category=streaming_job
 
 # custom
-kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
-kylin.storage.columnar.spark-conf.spark.executor.cores=4
+#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m
+#kylin.storage.columnar.spark-conf.spark.executor.cores=4
 kylin.metadata.random-admin-password.enabled=false
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
index 3b88c8af43..f23a877c31 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.SparkInternalAgent._
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Window}
+import org.apache.spark.sql.delta.DeltaLog
 import org.apache.spark.sql.functions.{col, lit, row_number}
 import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
 import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
@@ -43,18 +44,18 @@ import scala.concurrent.duration.DurationInt
 object DictionaryBuilder extends Logging {
 
   implicit val retryStrategy: RetryStrategyProducer =
-    RetryStrategy.fixedBackOff(retryDuration = 10.seconds, maxAttempts = 5)
+    RetryStrategy.randomBackOff(5.seconds, 15.seconds, maxAttempts = 20)
 
   def buildGlobalDict(
-     project: String,
-     spark: SparkSession,
-     plan: LogicalPlan): LogicalPlan = transformCountDistinct(spark, plan) 
transform {
+                       project: String,
+                       spark: SparkSession,
+                       plan: LogicalPlan): LogicalPlan = 
transformCountDistinct(spark, plan) transform {
 
     case GlobalDictionaryPlaceHolder(expr: String, child: LogicalPlan, dbName: 
String) =>
       spark.sparkContext.setJobDescription(s"Build v3 dict $expr")
       val catalog = expr.split(NSparkCubingUtil.SEPARATOR)
-      val tableName = catalog.apply(0)
-      val columnName = catalog.apply(1)
+      val tableName = catalog(0)
+      val columnName = catalog(1)
       val context = new DictionaryContext(project, dbName, tableName, 
columnName, expr)
 
       // concurrent commit may cause delta ConcurrentAppendException.
@@ -79,13 +80,15 @@ object DictionaryBuilder extends Logging {
    * Use Left anti join to process raw data and dictionary tables.
    */
   private def transformerDictPlan(
-     spark: SparkSession,
-     context: DictionaryContext,
-     plan: LogicalPlan): LogicalPlan = {
+                                   spark: SparkSession,
+                                   context: DictionaryContext,
+                                   plan: LogicalPlan): LogicalPlan = {
 
     val dictPath = getDictionaryPath(context)
     val dictTable: DeltaTable = DeltaTable.forPath(dictPath)
     val maxOffset = dictTable.toDF.count()
+    logInfo(s"Dict $dictPath item count $maxOffset")
+
     plan match {
       case Project(_, Project(_, Window(_, _, _, windowChild))) =>
         val column = context.expr
@@ -101,7 +104,9 @@ object DictionaryBuilder extends Logging {
             "left_anti")
           .select(col(column).cast(StringType) as "dict_key",
             (row_number().over(windowSpec) + lit(maxOffset)).cast(LongType) as 
"dict_value")
+        logInfo(s"Dict logical plan : 
${antiJoinDF.queryExecution.logical.treeString}")
         getLogicalPlan(antiJoinDF)
+
       case _ => plan
     }
   }
@@ -121,30 +126,39 @@ object DictionaryBuilder extends Logging {
    * Build an incremental dictionary
    */
   private def incrementBuildDict(
-    spark: SparkSession,
-    plan: LogicalPlan,
-    context: DictionaryContext): Unit = {
+                                  spark: SparkSession,
+                                  plan: LogicalPlan,
+                                  context: DictionaryContext): Unit = {
     val dictMode = chooseDictBuildMode(context)
     logInfo(s"V3 Dict build mode is $dictMode")
     dictMode match {
       case V3INIT =>
         val dictDF = getDataFrame(spark, plan)
-        initAndSaveDictDF(dictDF, context)
+        initAndSaveDict(dictDF, context)
+
       case V3APPEND =>
         mergeIncrementDict(spark, context, plan)
-      // To be delete
+        optimizeDictTable(spark, context)
+
       case V3UPGRADE =>
+        // To be delete
         val v3OrigDict = upgradeFromOriginalV3(spark, context)
-        initAndSaveDictDF(v3OrigDict, context)
+        initAndSaveDict(v3OrigDict, context)
         mergeIncrementDict(spark, context, plan)
+
       case V2UPGRADE =>
         val v2Dict = upgradeFromV2(spark, context)
-        initAndSaveDictDF(v2Dict, context)
+        initAndSaveDict(v2Dict, context)
         mergeIncrementDict(spark, context, plan)
     }
+
+    val dictPath = getDictionaryPath(context)
+    val dictDeltaLog = DeltaLog.forTable(spark, dictPath)
+    val version = dictDeltaLog.snapshot.version
+    logInfo(s"Completed the construction of dictionary version $version for 
dict $dictPath")
   }
 
-  private def initAndSaveDictDF(dictDF: Dataset[Row], context: 
DictionaryContext): Unit = {
+  private def initAndSaveDict(dictDF: Dataset[Row], context: 
DictionaryContext): Unit = {
     val dictPath = getDictionaryPath(context)
     logInfo(s"Save dict values into path $dictPath.")
     dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)
@@ -164,6 +178,43 @@ object DictionaryBuilder extends Logging {
       .execute()
   }
 
+  /**
+   * In order to prevent the number of generated dictionary files from 
increasing with the
+   * continuous construction of dictionaries, which will lead to too many 
small files and reduce
+   * the build performance, it is necessary to periodically merge dictionary 
files.
+   *
+   * Currently, according to the configuration
+   * `kylin.build.v3dict-file-num-limit=10`
+   * to control whether file merging is required. When the number of 
dictionary files exceeds
+   * this limit, the dictionary files will be merged. By merging files, the 
total number of
+   * files can be controlled to improve build performance.
+   */
+  private def optimizeDictTable(spark: SparkSession, context: 
DictionaryContext): Unit = {
+    val dictPath = getDictionaryPath(context)
+    val deltaLog = DeltaLog.forTable(spark, dictPath)
+    val numFile = deltaLog.snapshot.numOfFiles
+
+    val config = KylinConfig.getInstanceFromEnv
+    val v3DictFileNumLimit = config.getV3DictFileNumLimit
+    if (numFile > v3DictFileNumLimit) {
+      val optimizeStartTime = System.nanoTime()
+      val dictTable = DeltaTable.forPath(dictPath)
+      logInfo(s"Optimize the storage of dict $dictPath, " +
+        s"dict file num: $numFile, " +
+        s"spark.build.v3dict-file-num-limit: $v3DictFileNumLimit")
+      dictTable.optimize().executeCompaction()
+
+      logInfo(s"Clean up dict $dictPath files via delta vacuum")
+      val v3DictRetention = config.getV3DictFileRetentionHours
+      dictTable.vacuum(v3DictRetention)
+
+      val optimizeTaken = (System.nanoTime() - optimizeStartTime) / 1000 / 1000
+      logInfo(s"It took ${optimizeTaken}ms to optimize dict $dictPath")
+    } else {
+      logInfo(s"No need to optimize dict: $dictPath, dict file num: $numFile")
+    }
+  }
+
   private def isExistsV2Dict(context: DictionaryContext): Boolean = {
     val config = KylinConfig.getInstanceFromEnv
     val globalDict = new NGlobalDictionaryV2(context.project,
@@ -260,14 +311,14 @@ object DictionaryBuilder extends Logging {
 }
 
 class DictionaryContext(
-   val project: String,
-   val dbName: String,
-   val tableName: String,
-   val columnName: String,
-   val expr: String)
+                         val project: String,
+                         val dbName: String,
+                         val tableName: String,
+                         val columnName: String,
+                         val expr: String)
 
 object DictBuildMode extends Enumeration {
 
   val V3UPGRADE, V2UPGRADE, V3APPEND, V3INIT = Value
 
-}
\ No newline at end of file
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala
index f4218df160..97f7fc78ae 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala
@@ -55,7 +55,7 @@ class PreCountDistinctTransformer(spark: SparkSession) 
extends Rule[LogicalPlan]
             val key = dictPlan.output.head
             val value = dictPlan.output(1)
             val valueAlias = Alias(value, encodedAttr.name)(encodedAttr.exprId)
-            (Project(Seq(key, valueAlias), dictPlan), (childExpr, 
encodedAttr), dbName)
+            (Project(Seq(key, valueAlias), dictPlan), (childExpr, encodedAttr))
         }
 
         val result = dictionaries.foldLeft(child) {
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
index 5c3c07a358..7aaafd2feb 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java
@@ -123,6 +123,8 @@ public class NLocalWithSparkSessionTest extends 
NLocalFileMetadataTestCase imple
             sparkConf.set("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension");
         }
         sparkConf.set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog");
+        sparkConf.set("spark.databricks.delta.retentionDurationCheck.enabled", 
"false");
+        sparkConf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", 
"true");
         ss = SparkSession.builder().withExtensions(ext -> {
             ext.injectOptimizerRule(ss -> new ConvertInnerJoinToSemiJoin());
             return null;
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
index 8bbc4cf9ce..971203e911 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
@@ -19,19 +19,34 @@
 package org.apache.kylin.engine.spark.builder.v3dict
 
 import io.delta.tables.DeltaTable
+import org.apache.kylin.common.KylinConfig
 import 
org.apache.kylin.engine.spark.builder.v3dict.GlobalDictionaryBuilderHelper.{checkAnswer,
 genDataWithWrapEncodeCol, genRandomData}
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
 import org.apache.spark.sql.KapFunctions.dict_encode_v3
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
+import org.apache.spark.sql.delta.DeltaLog
+import org.apache.spark.sql.delta.util.DeltaFileOperations
 import org.apache.spark.sql.functions.{col, count, countDistinct}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.util.SerializableConfiguration
 
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, 
Future}
 
 class GlobalDictionarySuite extends SparderBaseFunSuite with LocalMetadata 
with SharedSparkSession {
 
+  private var pool: ExecutorService = Executors.newFixedThreadPool(10)
+  implicit var ec: ExecutionContextExecutorService = 
ExecutionContext.fromExecutorService(pool)
+
+  protected override def beforeEach(): Unit = {
+    super.beforeEach()
+    if (pool.isShutdown) {
+      pool = Executors.newFixedThreadPool(10)
+      ec = ExecutionContext.fromExecutorService(pool)
+    }
+  }
+
   test("KE-35145 Test Continuously Build Dictionary") {
     val project = "p1"
     val dbName = "db1"
@@ -70,33 +85,23 @@ class GlobalDictionarySuite extends SparderBaseFunSuite 
with LocalMetadata with
     val colName = "c2"
     val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + 
colName
     val context = new DictionaryContext(project, dbName, tableName, colName, 
null)
-    val pool = Executors.newFixedThreadPool(10)
-    implicit val ec: ExecutionContextExecutorService = 
ExecutionContext.fromExecutorService(pool)
 
     DeltaTable.createIfNotExists()
-      .tableName("original_c2")
+      .tableName("original")
       .addColumn(encodeColName, StringType).execute()
 
-    val buildDictTask = new Runnable {
-      override def run(): Unit = {
-        val originalDF = genRandomData(spark, encodeColName, 100, 1)
-        val dictDF = genDataWithWrapEncodeCol(dbName, encodeColName, 
originalDF)
-        DeltaTable.forName("original_c2")
-          .merge(originalDF, "1 != 1")
-          .whenNotMatched()
-          .insertAll()
-          .execute()
-        DictionaryBuilder.buildGlobalDict(project, spark, 
dictDF.queryExecution.analyzed)
-      }
-    }
+    val buildDictTask = genBuildDictTask(spark, context)
 
-    for (_ <- 0 until 10) {ec.submit(buildDictTask)}
+    for (_ <- 0 until 10) {
+      ec.submit(buildDictTask)
+    }
+    ec.shutdown()
     ec.awaitTermination(2, TimeUnit.MINUTES)
 
     val originalDF = spark.sql(
       """
         |SELECT count(DISTINCT t1_0_DOT_0_c2)
-        |   FROM default.original_c2
+        |   FROM default.original
       """.stripMargin)
 
     val dictPath: String = DictionaryBuilder.getDictionaryPath(context)
@@ -183,4 +188,56 @@ class GlobalDictionarySuite extends SparderBaseFunSuite 
with LocalMetadata with
     val dictResultDF = 
DeltaTable.forPath(dictPath).toDF.agg(count(col("dict_key")))
     checkAnswer(originalDF, dictResultDF)
   }
+
+  test("KE-41744 Optimize the dict files to avoid too many small files") {
+    overwriteSystemProp("kylin.build.v3dict-file-num-limit", "5")
+    overwriteSystemProp("kylin.build.v3dict-file-retention", "0h")
+    val project = "p1"
+    val dbName = "db1"
+    val tableName = "t1"
+    val colName = "c2"
+
+    val context = new DictionaryContext(project, dbName, tableName, colName, 
null)
+    val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + 
colName
+    DeltaTable.createIfNotExists()
+      .tableName("original")
+      .addColumn(encodeColName, StringType).execute()
+
+    val buildDictTask = genBuildDictTask(spark, context)
+
+    for (_ <- 0 until 11) {
+      ec.execute(buildDictTask)
+    }
+    ec.shutdown()
+    ec.awaitTermination(10, TimeUnit.MINUTES)
+
+    val dictPath: String = DictionaryBuilder.getDictionaryPath(context)
+    val deltaLog = DeltaLog.forTable(spark, dictPath)
+    val numOfFiles = deltaLog.snapshot.numOfFiles
+    logInfo(s"Dict file num $numOfFiles")
+    assert(numOfFiles <= KylinConfig.getInstanceFromEnv.getV3DictFileNumLimit)
+
+    val numFileRemaining = DeltaFileOperations.recursiveListDirs(
+      spark,
+      Seq(dictPath),
+      spark.sparkContext.broadcast(new 
SerializableConfiguration(deltaLog.newDeltaHadoopConf()))
+    ).count()
+    assert(numFileRemaining < numOfFiles + deltaLog.snapshot.numOfRemoves)
+  }
+
+  def genBuildDictTask(spark: SparkSession, context: DictionaryContext): 
Runnable = {
+    new Runnable {
+      override def run(): Unit = {
+        val encodeColName: String = context.tableName + 
NSparkCubingUtil.SEPARATOR + context.columnName
+        val originalDF = genRandomData(spark, encodeColName, 100, 1)
+        val dictDF = genDataWithWrapEncodeCol(context.dbName, encodeColName, 
originalDF)
+        DeltaTable.forName("original")
+          .merge(originalDF, "1 != 1")
+          .whenNotMatched()
+          .insertAll()
+          .execute()
+        DictionaryBuilder.buildGlobalDict(context.project, spark, 
dictDF.queryExecution.analyzed)
+      }
+    }
+  }
 }
diff --git 
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala
 
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala
index 22414a2286..d3a9f6964e 100644
--- 
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala
+++ 
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala
@@ -84,6 +84,8 @@ trait SharedSparkSession
       .config("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")
       .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
       .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
+      .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+      .config("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
       .config(conf)
       .getOrCreate
     _jsc = new JavaSparkContext(_spark.sparkContext)

Reply via email to