This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ab4007b215453b233d0e96972bcb3955602cee8a
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Fri Jun 9 19:06:50 2023 +0800

    KYLIN-5719 add v3dict delta table check
---
 .../spark/builder/v3dict/DictionaryBuilder.scala   | 37 ++++++++++++++++------
 .../builder/v3dict/GlobalDictionarySuite.scala     | 35 +++++++++++++++++++-
 2 files changed, 61 insertions(+), 11 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
index f23a877c31..6d4a177916 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.builder.v3dict
 import io.delta.tables.DeltaTable
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.common.exception.KylinRuntimeException
 import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.engine.spark.builder.v3dict.DictBuildMode.{V2UPGRADE, 
V3APPEND, V3INIT, V3UPGRADE}
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
@@ -40,6 +41,7 @@ import util.retry.blocking.{Failure, Retry, RetryStrategy, 
Success}
 import java.nio.file.Paths
 import scala.collection.mutable.ListBuffer
 import scala.concurrent.duration.DurationInt
+import scala.util.control.NonFatal
 
 object DictionaryBuilder extends Logging {
 
@@ -62,11 +64,12 @@ object DictionaryBuilder extends Logging {
       // so need retry commit incremental dict to delta table.
       Retry(incrementBuildDict(spark, child, context)) match {
         case Success(_) => logInfo(s"Incremental persist global dictionary 
for: $expr success.")
-        case Failure(e) => logInfo(s"Incremental persist global dictionary 
for: $expr failure.", e)
+        case Failure(e) =>
+          throw new KylinRuntimeException(s"Incremental persist global 
dictionary for: $expr failure.", e)
       }
       spark.sparkContext.setJobDescription(null)
 
-      val dictPath = getDictionaryPath(context)
+      val dictPath = getDictionaryPathAndCheck(context)
       val dictPlan = getLogicalPlan(spark.read.format("delta").load(dictPath))
       val (key, value) = (dictPlan.output.head, dictPlan.output(1))
       val (existKey, existValue) = (child.output.head, child.output(1))
@@ -84,7 +87,7 @@ object DictionaryBuilder extends Logging {
                                    context: DictionaryContext,
                                    plan: LogicalPlan): LogicalPlan = {
 
-    val dictPath = getDictionaryPath(context)
+    val dictPath = getDictionaryPathAndCheck(context)
     val dictTable: DeltaTable = DeltaTable.forPath(dictPath)
     val maxOffset = dictTable.toDF.count()
     logInfo(s"Dict $dictPath item count $maxOffset")
@@ -152,22 +155,28 @@ object DictionaryBuilder extends Logging {
         mergeIncrementDict(spark, context, plan)
     }
 
-    val dictPath = getDictionaryPath(context)
+    val dictPath = getDictionaryPathAndCheck(context)
     val dictDeltaLog = DeltaLog.forTable(spark, dictPath)
     val version = dictDeltaLog.snapshot.version
     logInfo(s"Completed the construction of dictionary version $version for 
dict $dictPath")
   }
 
-  private def initAndSaveDict(dictDF: Dataset[Row], context: 
DictionaryContext): Unit = {
+  def initAndSaveDict(dictDF: Dataset[Row], context: DictionaryContext): Unit 
= {
     val dictPath = getDictionaryPath(context)
     logInfo(s"Save dict values into path $dictPath.")
-    dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)
+    try {
+      dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)
+    } catch {
+      case NonFatal(e) =>
+        HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration, new 
Path(dictPath))
+        throw e
+    }
   }
 
   private def mergeIncrementDict(spark: SparkSession, context: 
DictionaryContext, plan: LogicalPlan): Unit = {
     val dictPlan = transformerDictPlan(spark, context, plan)
     val incrementDictDF = getDataFrame(spark, dictPlan)
-    val dictPath = getDictionaryPath(context)
+    val dictPath = getDictionaryPathAndCheck(context)
     logInfo(s"increment build global dict $dictPath")
     val dictTable = DeltaTable.forPath(dictPath)
     dictTable.alias("dict")
@@ -190,7 +199,7 @@ object DictionaryBuilder extends Logging {
    * files can be controlled to improve build performance.
    */
   private def optimizeDictTable(spark: SparkSession, context: 
DictionaryContext): Unit = {
-    val dictPath = getDictionaryPath(context)
+    val dictPath = getDictionaryPathAndCheck(context)
     val deltaLog = DeltaLog.forTable(spark, dictPath)
     val numFile = deltaLog.snapshot.numOfFiles
 
@@ -231,12 +240,12 @@ object DictionaryBuilder extends Logging {
 
   private def isExistsV3Dict(context: DictionaryContext): Boolean = {
     val dictPath = getDictionaryPath(context)
-    HadoopUtil.getWorkingFileSystem.exists(new Path(dictPath))
+    DeltaTable.isDeltaTable(dictPath)
   }
 
   private def isExistsOriginalV3Dict(context: DictionaryContext): Boolean = {
     val dictPath = getOriginalDictionaryPath(context)
-    HadoopUtil.getWorkingFileSystem.exists(new Path(dictPath))
+    DeltaTable.isDeltaTable(dictPath)
   }
 
   private def fetchExistsOriginalV3Dict(context: DictionaryContext): 
Dataset[Row] = {
@@ -305,6 +314,14 @@ object DictionaryBuilder extends Logging {
     workingDir + dictDir
   }
 
+  def getDictionaryPathAndCheck(context: DictionaryContext): String = {
+    val v3ditPath = getDictionaryPath(context)
+    if (!DeltaTable.isDeltaTable(v3ditPath)) {
+      throw new KylinRuntimeException(s"This v3dict path: {$v3ditPath} is not 
a delta table.")
+    }
+    v3ditPath
+  }
+
   def wrapCol(ref: TblColRef): String = {
     NSparkCubingUtil.convertFromDot(ref.getBackTickIdentity)
   }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
index 971203e911..eb9725df9f 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
@@ -19,7 +19,10 @@
 package org.apache.kylin.engine.spark.builder.v3dict
 
 import io.delta.tables.DeltaTable
+import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.common.exception.KylinRuntimeException
+import org.apache.kylin.common.util.HadoopUtil
 import 
org.apache.kylin.engine.spark.builder.v3dict.GlobalDictionaryBuilderHelper.{checkAnswer,
 genDataWithWrapEncodeCol, genRandomData}
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
 import org.apache.spark.sql.KapFunctions.dict_encode_v3
@@ -28,8 +31,9 @@ import org.apache.spark.sql.delta.DeltaLog
 import org.apache.spark.sql.delta.util.DeltaFileOperations
 import org.apache.spark.sql.functions.{col, count, countDistinct}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.util.SerializableConfiguration
+import org.mockito.Mockito.{spy, when}
 
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, 
Future}
@@ -224,6 +228,35 @@ class GlobalDictionarySuite extends SparderBaseFunSuite 
with LocalMetadata with
     ).count()
     assert(numFileRemaining < numOfFiles + deltaLog.snapshot.numOfRemoves)
   }
+  test("KE-41980 Test failure to initialize dictionary file") {
+    val project = "project"
+    val dbName = "db"
+    val tableName = "table"
+    val colName = "col"
+    val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + 
colName
+
+    val context = new DictionaryContext(project, dbName, tableName, colName, 
null)
+    val dictPath = DictionaryBuilder.getDictionaryPath(context)
+    val dictDF = spy(genRandomData(spark, encodeColName, 10, 2))
+
+    // mock write delta table failed and throw a Exception
+    when(dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath))
+      .thenThrow(new RuntimeException())
+
+    intercept[RuntimeException] {
+      DictionaryBuilder.initAndSaveDict(dictDF, context)
+    }
+
+    // after mock throw RuntimeException, dictPath will be delete
+    assert(HadoopUtil.getWorkingFileSystem().exists(new Path(dictPath)) == 
false)
+
+    // write temp file to dictPath, which is not a delta table
+    // so that getDictionaryPathAndCheck will throw KylinRuntimeException
+    HadoopUtil.writeStringToHdfs("tempString4Test", new Path(dictPath))
+    intercept[KylinRuntimeException] {
+      DictionaryBuilder.getDictionaryPathAndCheck(context)
+    }
+  }
 
   def genBuildDictTask(spark: SparkSession, context: DictionaryContext): 
Runnable = {
     new Runnable {

Reply via email to