This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 13366b6fb43bec19defaa1542745278ff0bf4622 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Fri Jun 9 14:23:29 2023 +0800 KYLIN-5718 V3 Dictionary Automatic Merge --------- Co-authored-by: Mingming Ge <7mmi...@gmail.com> --- .../src/main/resources/config/init.properties | 8 +- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../src/main/resources/kylin-defaults0.properties | 3 + .../src/main/resources/config/init.properties | 8 +- .../src/main/resources/config/init.properties | 8 +- .../src/main/resources/config/init.properties | 8 +- .../spark/builder/v3dict/DictionaryBuilder.scala | 97 +++++++++++++++++----- .../v3dict/PreCountDistinctTransformer.scala | 2 +- .../engine/spark/NLocalWithSparkSessionTest.java | 2 + .../builder/v3dict/GlobalDictionarySuite.scala | 95 ++++++++++++++++----- .../spark/sql/common/SharedSparkSession.scala | 2 + 11 files changed, 190 insertions(+), 51 deletions(-) diff --git a/src/common-booter/src/main/resources/config/init.properties b/src/common-booter/src/main/resources/config/init.properties index 2bddfae312..14ad876715 100644 --- a/src/common-booter/src/main/resources/config/init.properties +++ b/src/common-booter/src/main/resources/config/init.properties @@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2 kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/* +# for V3 Dictionary +kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false +kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true + kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1 # spark3 legacy config after calendar switch @@ -440,6 +444,6 @@ kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 - kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=streaming_job # custom -kylin.storage.columnar.spark-conf.spark.executor.memory=4096m -kylin.storage.columnar.spark-conf.spark.executor.cores=4 +#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m +#kylin.storage.columnar.spark-conf.spark.executor.cores=4 kylin.metadata.random-admin-password.enabled=false diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index aec5a58ada..3614898c9a 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3298,6 +3298,14 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.build.is-convert-v3dict-enable", FALSE)); } + public long getV3DictFileNumLimit() { + return Long.parseLong(getOptional("kylin.build.v3dict-file-num-limit", "10")); + } + + public long getV3DictFileRetentionHours() { + return TimeUtil.timeStringAs(getOptional("kylin.build.v3dict-file-retention", "3d"), TimeUnit.HOURS); + } + public String getV3DictDBName() { return getOptional("kylin.build.v3dict-db-name", DEFAULT); } diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties b/src/core-common/src/main/resources/kylin-defaults0.properties index 8007bc0ab8..61412133ec 100644 --- a/src/core-common/src/main/resources/kylin-defaults0.properties +++ b/src/core-common/src/main/resources/kylin-defaults0.properties @@ -92,8 +92,11 @@ kylin.engine.spark-conf.spark.submit.deployMode=client kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2 kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/* +# for V3 Dictionary kylin.engine.spark-conf.spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension kylin.engine.spark-conf.spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog +kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false +kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1 diff --git a/src/data-loading-booter/src/main/resources/config/init.properties b/src/data-loading-booter/src/main/resources/config/init.properties index 2bddfae312..14ad876715 100644 --- a/src/data-loading-booter/src/main/resources/config/init.properties +++ b/src/data-loading-booter/src/main/resources/config/init.properties @@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2 kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/* +# for V3 Dictionary +kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false +kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true + kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1 # spark3 legacy config after calendar switch @@ -440,6 +444,6 @@ kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 - kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=streaming_job # custom -kylin.storage.columnar.spark-conf.spark.executor.memory=4096m -kylin.storage.columnar.spark-conf.spark.executor.cores=4 +#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m +#kylin.storage.columnar.spark-conf.spark.executor.cores=4 kylin.metadata.random-admin-password.enabled=false diff --git a/src/query-booter/src/main/resources/config/init.properties b/src/query-booter/src/main/resources/config/init.properties index 2bddfae312..14ad876715 100644 --- a/src/query-booter/src/main/resources/config/init.properties +++ b/src/query-booter/src/main/resources/config/init.properties @@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2 kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/* +# for V3 Dictionary +kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false +kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true + kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1 # spark3 legacy config after calendar switch @@ -440,6 +444,6 @@ kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 - kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=streaming_job # custom -kylin.storage.columnar.spark-conf.spark.executor.memory=4096m -kylin.storage.columnar.spark-conf.spark.executor.cores=4 +#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m +#kylin.storage.columnar.spark-conf.spark.executor.cores=4 kylin.metadata.random-admin-password.enabled=false diff --git a/src/server/src/main/resources/config/init.properties b/src/server/src/main/resources/config/init.properties index 94b780aa7a..e705611a5a 100644 --- a/src/server/src/main/resources/config/init.properties +++ b/src/server/src/main/resources/config/init.properties @@ -88,6 +88,10 @@ kylin.engine.spark-conf.spark.submit.deployMode=client kylin.engine.spark-conf.spark.sql.hive.metastore.version=1.2.2 kylin.engine.spark-conf.spark.sql.hive.metastore.jars=${KYLIN_HOME}/spark/hive_1_2_2/* +# for V3 Dictionary +kylin.engine.spark-conf.spark.databricks.delta.retentionDurationCheck.enabled=false +kylin.engine.spark-conf.spark.databricks.delta.vacuum.parallelDelete.enabled=true + kylin.engine.spark-conf.spark.stage.maxConsecutiveAttempts=1 # spark3 legacy config after calendar switch @@ -439,6 +443,6 @@ kylin.streaming.spark-conf.spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 - kylin.streaming.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=streaming_job # custom -kylin.storage.columnar.spark-conf.spark.executor.memory=4096m -kylin.storage.columnar.spark-conf.spark.executor.cores=4 +#kylin.storage.columnar.spark-conf.spark.executor.memory=4096m +#kylin.storage.columnar.spark-conf.spark.executor.cores=4 kylin.metadata.random-admin-password.enabled=false diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala index 3b88c8af43..f23a877c31 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.SparkInternalAgent._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Window} +import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.functions.{col, lit, row_number} import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} @@ -43,18 +44,18 @@ import scala.concurrent.duration.DurationInt object DictionaryBuilder extends Logging { implicit val retryStrategy: RetryStrategyProducer = - RetryStrategy.fixedBackOff(retryDuration = 10.seconds, maxAttempts = 5) + RetryStrategy.randomBackOff(5.seconds, 15.seconds, maxAttempts = 20) def buildGlobalDict( - project: String, - spark: SparkSession, - plan: LogicalPlan): LogicalPlan = transformCountDistinct(spark, plan) transform { + project: String, + spark: SparkSession, + plan: LogicalPlan): LogicalPlan = transformCountDistinct(spark, plan) transform { case GlobalDictionaryPlaceHolder(expr: String, child: LogicalPlan, dbName: String) => spark.sparkContext.setJobDescription(s"Build v3 dict $expr") val catalog = expr.split(NSparkCubingUtil.SEPARATOR) - val tableName = catalog.apply(0) - val columnName = catalog.apply(1) + val tableName = catalog(0) + val columnName = catalog(1) val context = new DictionaryContext(project, dbName, tableName, columnName, expr) // concurrent commit may cause delta ConcurrentAppendException. @@ -79,13 +80,15 @@ object DictionaryBuilder extends Logging { * Use Left anti join to process raw data and dictionary tables. */ private def transformerDictPlan( - spark: SparkSession, - context: DictionaryContext, - plan: LogicalPlan): LogicalPlan = { + spark: SparkSession, + context: DictionaryContext, + plan: LogicalPlan): LogicalPlan = { val dictPath = getDictionaryPath(context) val dictTable: DeltaTable = DeltaTable.forPath(dictPath) val maxOffset = dictTable.toDF.count() + logInfo(s"Dict $dictPath item count $maxOffset") + plan match { case Project(_, Project(_, Window(_, _, _, windowChild))) => val column = context.expr @@ -101,7 +104,9 @@ object DictionaryBuilder extends Logging { "left_anti") .select(col(column).cast(StringType) as "dict_key", (row_number().over(windowSpec) + lit(maxOffset)).cast(LongType) as "dict_value") + logInfo(s"Dict logical plan : ${antiJoinDF.queryExecution.logical.treeString}") getLogicalPlan(antiJoinDF) + case _ => plan } } @@ -121,30 +126,39 @@ object DictionaryBuilder extends Logging { * Build an incremental dictionary */ private def incrementBuildDict( - spark: SparkSession, - plan: LogicalPlan, - context: DictionaryContext): Unit = { + spark: SparkSession, + plan: LogicalPlan, + context: DictionaryContext): Unit = { val dictMode = chooseDictBuildMode(context) logInfo(s"V3 Dict build mode is $dictMode") dictMode match { case V3INIT => val dictDF = getDataFrame(spark, plan) - initAndSaveDictDF(dictDF, context) + initAndSaveDict(dictDF, context) + case V3APPEND => mergeIncrementDict(spark, context, plan) - // To be delete + optimizeDictTable(spark, context) + case V3UPGRADE => + // To be delete val v3OrigDict = upgradeFromOriginalV3(spark, context) - initAndSaveDictDF(v3OrigDict, context) + initAndSaveDict(v3OrigDict, context) mergeIncrementDict(spark, context, plan) + case V2UPGRADE => val v2Dict = upgradeFromV2(spark, context) - initAndSaveDictDF(v2Dict, context) + initAndSaveDict(v2Dict, context) mergeIncrementDict(spark, context, plan) } + + val dictPath = getDictionaryPath(context) + val dictDeltaLog = DeltaLog.forTable(spark, dictPath) + val version = dictDeltaLog.snapshot.version + logInfo(s"Completed the construction of dictionary version $version for dict $dictPath") } - private def initAndSaveDictDF(dictDF: Dataset[Row], context: DictionaryContext): Unit = { + private def initAndSaveDict(dictDF: Dataset[Row], context: DictionaryContext): Unit = { val dictPath = getDictionaryPath(context) logInfo(s"Save dict values into path $dictPath.") dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath) @@ -164,6 +178,43 @@ object DictionaryBuilder extends Logging { .execute() } + /** + * In order to prevent the number of generated dictionary files from increasing with the + * continuous construction of dictionaries, which will lead to too many small files and reduce + * the build performance, it is necessary to periodically merge dictionary files. + * + * Currently, according to the configuration + * `kylin.build.v3dict-file-num-limit=10` + * to control whether file merging is required. When the number of dictionary files exceeds + * this limit, the dictionary files will be merged. By merging files, the total number of + * files can be controlled to improve build performance. + */ + private def optimizeDictTable(spark: SparkSession, context: DictionaryContext): Unit = { + val dictPath = getDictionaryPath(context) + val deltaLog = DeltaLog.forTable(spark, dictPath) + val numFile = deltaLog.snapshot.numOfFiles + + val config = KylinConfig.getInstanceFromEnv + val v3DictFileNumLimit = config.getV3DictFileNumLimit + if (numFile > v3DictFileNumLimit) { + val optimizeStartTime = System.nanoTime() + val dictTable = DeltaTable.forPath(dictPath) + logInfo(s"Optimize the storage of dict $dictPath, " + + s"dict file num: $numFile, " + + s"spark.build.v3dict-file-num-limit: $v3DictFileNumLimit") + dictTable.optimize().executeCompaction() + + logInfo(s"Clean up dict $dictPath files via delta vacuum") + val v3DictRetention = config.getV3DictFileRetentionHours + dictTable.vacuum(v3DictRetention) + + val optimizeTaken = (System.nanoTime() - optimizeStartTime) / 1000 / 1000 + logInfo(s"It took ${optimizeTaken}ms to optimize dict $dictPath") + } else { + logInfo(s"No need to optimize dict: $dictPath, dict file num: $numFile") + } + } + private def isExistsV2Dict(context: DictionaryContext): Boolean = { val config = KylinConfig.getInstanceFromEnv val globalDict = new NGlobalDictionaryV2(context.project, @@ -260,14 +311,14 @@ object DictionaryBuilder extends Logging { } class DictionaryContext( - val project: String, - val dbName: String, - val tableName: String, - val columnName: String, - val expr: String) + val project: String, + val dbName: String, + val tableName: String, + val columnName: String, + val expr: String) object DictBuildMode extends Enumeration { val V3UPGRADE, V2UPGRADE, V3APPEND, V3INIT = Value -} \ No newline at end of file +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala index f4218df160..97f7fc78ae 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/PreCountDistinctTransformer.scala @@ -55,7 +55,7 @@ class PreCountDistinctTransformer(spark: SparkSession) extends Rule[LogicalPlan] val key = dictPlan.output.head val value = dictPlan.output(1) val valueAlias = Alias(value, encodedAttr.name)(encodedAttr.exprId) - (Project(Seq(key, valueAlias), dictPlan), (childExpr, encodedAttr), dbName) + (Project(Seq(key, valueAlias), dictPlan), (childExpr, encodedAttr)) } val result = dictionaries.foldLeft(child) { diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java index 5c3c07a358..7aaafd2feb 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java @@ -123,6 +123,8 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple sparkConf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"); } sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"); + sparkConf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false"); + sparkConf.set("spark.databricks.delta.vacuum.parallelDelete.enabled", "true"); ss = SparkSession.builder().withExtensions(ext -> { ext.injectOptimizerRule(ss -> new ConvertInnerJoinToSemiJoin()); return null; diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala index 8bbc4cf9ce..971203e911 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala @@ -19,19 +19,34 @@ package org.apache.kylin.engine.spark.builder.v3dict import io.delta.tables.DeltaTable +import org.apache.kylin.common.KylinConfig import org.apache.kylin.engine.spark.builder.v3dict.GlobalDictionaryBuilderHelper.{checkAnswer, genDataWithWrapEncodeCol, genRandomData} import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.spark.sql.KapFunctions.dict_encode_v3 -import org.apache.spark.sql.Row import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.spark.sql.functions.{col, count, countDistinct} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.util.SerializableConfiguration -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} class GlobalDictionarySuite extends SparderBaseFunSuite with LocalMetadata with SharedSparkSession { + private var pool: ExecutorService = Executors.newFixedThreadPool(10) + implicit var ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool) + + protected override def beforeEach(): Unit = { + super.beforeEach() + if (pool.isShutdown) { + pool = Executors.newFixedThreadPool(10) + ec = ExecutionContext.fromExecutorService(pool) + } + } + test("KE-35145 Test Continuously Build Dictionary") { val project = "p1" val dbName = "db1" @@ -70,33 +85,23 @@ class GlobalDictionarySuite extends SparderBaseFunSuite with LocalMetadata with val colName = "c2" val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + colName val context = new DictionaryContext(project, dbName, tableName, colName, null) - val pool = Executors.newFixedThreadPool(10) - implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool) DeltaTable.createIfNotExists() - .tableName("original_c2") + .tableName("original") .addColumn(encodeColName, StringType).execute() - val buildDictTask = new Runnable { - override def run(): Unit = { - val originalDF = genRandomData(spark, encodeColName, 100, 1) - val dictDF = genDataWithWrapEncodeCol(dbName, encodeColName, originalDF) - DeltaTable.forName("original_c2") - .merge(originalDF, "1 != 1") - .whenNotMatched() - .insertAll() - .execute() - DictionaryBuilder.buildGlobalDict(project, spark, dictDF.queryExecution.analyzed) - } - } + val buildDictTask = genBuildDictTask(spark, context) - for (_ <- 0 until 10) {ec.submit(buildDictTask)} + for (_ <- 0 until 10) { + ec.submit(buildDictTask) + } + ec.shutdown() ec.awaitTermination(2, TimeUnit.MINUTES) val originalDF = spark.sql( """ |SELECT count(DISTINCT t1_0_DOT_0_c2) - | FROM default.original_c2 + | FROM default.original """.stripMargin) val dictPath: String = DictionaryBuilder.getDictionaryPath(context) @@ -183,4 +188,56 @@ class GlobalDictionarySuite extends SparderBaseFunSuite with LocalMetadata with val dictResultDF = DeltaTable.forPath(dictPath).toDF.agg(count(col("dict_key"))) checkAnswer(originalDF, dictResultDF) } + + test("KE-41744 Optimize the dict files to avoid too many small files") { + overwriteSystemProp("kylin.build.v3dict-file-num-limit", "5") + overwriteSystemProp("kylin.build.v3dict-file-retention", "0h") + val project = "p1" + val dbName = "db1" + val tableName = "t1" + val colName = "c2" + + val context = new DictionaryContext(project, dbName, tableName, colName, null) + val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + colName + DeltaTable.createIfNotExists() + .tableName("original") + .addColumn(encodeColName, StringType).execute() + + val buildDictTask = genBuildDictTask(spark, context) + + for (_ <- 0 until 11) { + ec.execute(buildDictTask) + } + ec.shutdown() + ec.awaitTermination(10, TimeUnit.MINUTES) + + val dictPath: String = DictionaryBuilder.getDictionaryPath(context) + val deltaLog = DeltaLog.forTable(spark, dictPath) + val numOfFiles = deltaLog.snapshot.numOfFiles + logInfo(s"Dict file num $numOfFiles") + assert(numOfFiles <= KylinConfig.getInstanceFromEnv.getV3DictFileNumLimit) + + val numFileRemaining = DeltaFileOperations.recursiveListDirs( + spark, + Seq(dictPath), + spark.sparkContext.broadcast(new SerializableConfiguration(deltaLog.newDeltaHadoopConf())) + ).count() + assert(numFileRemaining < numOfFiles + deltaLog.snapshot.numOfRemoves) + } + + def genBuildDictTask(spark: SparkSession, context: DictionaryContext): Runnable = { + new Runnable { + override def run(): Unit = { + val encodeColName: String = context.tableName + NSparkCubingUtil.SEPARATOR + context.columnName + val originalDF = genRandomData(spark, encodeColName, 100, 1) + val dictDF = genDataWithWrapEncodeCol(context.dbName, encodeColName, originalDF) + DeltaTable.forName("original") + .merge(originalDF, "1 != 1") + .whenNotMatched() + .insertAll() + .execute() + DictionaryBuilder.buildGlobalDict(context.project, spark, dictDF.queryExecution.analyzed) + } + } + } } diff --git a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala index 22414a2286..d3a9f6964e 100644 --- a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala +++ b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/SharedSparkSession.scala @@ -84,6 +84,8 @@ trait SharedSparkSession .config("spark.sql.legacy.allowNegativeScaleOfDecimal", "true") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") + .config("spark.databricks.delta.vacuum.parallelDelete.enabled", "true") .config(conf) .getOrCreate _jsc = new JavaSparkContext(_spark.sparkContext)