This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ab4007b215453b233d0e96972bcb3955602cee8a Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Fri Jun 9 19:06:50 2023 +0800 KYLIN-5719 add v3dict delta table check --- .../spark/builder/v3dict/DictionaryBuilder.scala | 37 ++++++++++++++++------ .../builder/v3dict/GlobalDictionarySuite.scala | 35 +++++++++++++++++++- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala index f23a877c31..6d4a177916 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala @@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.builder.v3dict import io.delta.tables.DeltaTable import org.apache.hadoop.fs.Path import org.apache.kylin.common.KylinConfig +import org.apache.kylin.common.exception.KylinRuntimeException import org.apache.kylin.common.util.HadoopUtil import org.apache.kylin.engine.spark.builder.v3dict.DictBuildMode.{V2UPGRADE, V3APPEND, V3INIT, V3UPGRADE} import org.apache.kylin.engine.spark.job.NSparkCubingUtil @@ -40,6 +41,7 @@ import util.retry.blocking.{Failure, Retry, RetryStrategy, Success} import java.nio.file.Paths import scala.collection.mutable.ListBuffer import scala.concurrent.duration.DurationInt +import scala.util.control.NonFatal object DictionaryBuilder extends Logging { @@ -62,11 +64,12 @@ object DictionaryBuilder extends Logging { // so need retry commit incremental dict to delta table. Retry(incrementBuildDict(spark, child, context)) match { case Success(_) => logInfo(s"Incremental persist global dictionary for: $expr success.") - case Failure(e) => logInfo(s"Incremental persist global dictionary for: $expr failure.", e) + case Failure(e) => + throw new KylinRuntimeException(s"Incremental persist global dictionary for: $expr failure.", e) } spark.sparkContext.setJobDescription(null) - val dictPath = getDictionaryPath(context) + val dictPath = getDictionaryPathAndCheck(context) val dictPlan = getLogicalPlan(spark.read.format("delta").load(dictPath)) val (key, value) = (dictPlan.output.head, dictPlan.output(1)) val (existKey, existValue) = (child.output.head, child.output(1)) @@ -84,7 +87,7 @@ object DictionaryBuilder extends Logging { context: DictionaryContext, plan: LogicalPlan): LogicalPlan = { - val dictPath = getDictionaryPath(context) + val dictPath = getDictionaryPathAndCheck(context) val dictTable: DeltaTable = DeltaTable.forPath(dictPath) val maxOffset = dictTable.toDF.count() logInfo(s"Dict $dictPath item count $maxOffset") @@ -152,22 +155,28 @@ object DictionaryBuilder extends Logging { mergeIncrementDict(spark, context, plan) } - val dictPath = getDictionaryPath(context) + val dictPath = getDictionaryPathAndCheck(context) val dictDeltaLog = DeltaLog.forTable(spark, dictPath) val version = dictDeltaLog.snapshot.version logInfo(s"Completed the construction of dictionary version $version for dict $dictPath") } - private def initAndSaveDict(dictDF: Dataset[Row], context: DictionaryContext): Unit = { + def initAndSaveDict(dictDF: Dataset[Row], context: DictionaryContext): Unit = { val dictPath = getDictionaryPath(context) logInfo(s"Save dict values into path $dictPath.") - dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath) + try { + dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath) + } catch { + case NonFatal(e) => + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration, new Path(dictPath)) + throw e + } } private def mergeIncrementDict(spark: SparkSession, context: DictionaryContext, plan: LogicalPlan): Unit = { val dictPlan = transformerDictPlan(spark, context, plan) val incrementDictDF = getDataFrame(spark, dictPlan) - val dictPath = getDictionaryPath(context) + val dictPath = getDictionaryPathAndCheck(context) logInfo(s"increment build global dict $dictPath") val dictTable = DeltaTable.forPath(dictPath) dictTable.alias("dict") @@ -190,7 +199,7 @@ object DictionaryBuilder extends Logging { * files can be controlled to improve build performance. */ private def optimizeDictTable(spark: SparkSession, context: DictionaryContext): Unit = { - val dictPath = getDictionaryPath(context) + val dictPath = getDictionaryPathAndCheck(context) val deltaLog = DeltaLog.forTable(spark, dictPath) val numFile = deltaLog.snapshot.numOfFiles @@ -231,12 +240,12 @@ object DictionaryBuilder extends Logging { private def isExistsV3Dict(context: DictionaryContext): Boolean = { val dictPath = getDictionaryPath(context) - HadoopUtil.getWorkingFileSystem.exists(new Path(dictPath)) + DeltaTable.isDeltaTable(dictPath) } private def isExistsOriginalV3Dict(context: DictionaryContext): Boolean = { val dictPath = getOriginalDictionaryPath(context) - HadoopUtil.getWorkingFileSystem.exists(new Path(dictPath)) + DeltaTable.isDeltaTable(dictPath) } private def fetchExistsOriginalV3Dict(context: DictionaryContext): Dataset[Row] = { @@ -305,6 +314,14 @@ object DictionaryBuilder extends Logging { workingDir + dictDir } + def getDictionaryPathAndCheck(context: DictionaryContext): String = { + val v3ditPath = getDictionaryPath(context) + if (!DeltaTable.isDeltaTable(v3ditPath)) { + throw new KylinRuntimeException(s"This v3dict path: {$v3ditPath} is not a delta table.") + } + v3ditPath + } + def wrapCol(ref: TblColRef): String = { NSparkCubingUtil.convertFromDot(ref.getBackTickIdentity) } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala index 971203e911..eb9725df9f 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala @@ -19,7 +19,10 @@ package org.apache.kylin.engine.spark.builder.v3dict import io.delta.tables.DeltaTable +import org.apache.hadoop.fs.Path import org.apache.kylin.common.KylinConfig +import org.apache.kylin.common.exception.KylinRuntimeException +import org.apache.kylin.common.util.HadoopUtil import org.apache.kylin.engine.spark.builder.v3dict.GlobalDictionaryBuilderHelper.{checkAnswer, genDataWithWrapEncodeCol, genRandomData} import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.spark.sql.KapFunctions.dict_encode_v3 @@ -28,8 +31,9 @@ import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.spark.sql.functions.{col, count, countDistinct} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.util.SerializableConfiguration +import org.mockito.Mockito.{spy, when} import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -224,6 +228,35 @@ class GlobalDictionarySuite extends SparderBaseFunSuite with LocalMetadata with ).count() assert(numFileRemaining < numOfFiles + deltaLog.snapshot.numOfRemoves) } + test("KE-41980 Test failure to initialize dictionary file") { + val project = "project" + val dbName = "db" + val tableName = "table" + val colName = "col" + val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + colName + + val context = new DictionaryContext(project, dbName, tableName, colName, null) + val dictPath = DictionaryBuilder.getDictionaryPath(context) + val dictDF = spy(genRandomData(spark, encodeColName, 10, 2)) + + // mock write delta table failed and throw a Exception + when(dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)) + .thenThrow(new RuntimeException()) + + intercept[RuntimeException] { + DictionaryBuilder.initAndSaveDict(dictDF, context) + } + + // after mock throw RuntimeException, dictPath will be delete + assert(HadoopUtil.getWorkingFileSystem().exists(new Path(dictPath)) == false) + + // write temp file to dictPath, which is not a delta table + // so that getDictionaryPathAndCheck will throw KylinRuntimeException + HadoopUtil.writeStringToHdfs("tempString4Test", new Path(dictPath)) + intercept[KylinRuntimeException] { + DictionaryBuilder.getDictionaryPathAndCheck(context) + } + } def genBuildDictTask(spark: SparkSession, context: DictionaryContext): Runnable = { new Runnable {