This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bc00ef2ab5ccfc85542c31e375f222bd32a0e0b5
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Sat Jun 10 10:26:52 2023 +0800

    KYLIN-5719 [FOLLOW UP] v3dct catch DeltaConcurrentModificationException
---
 .../spark/builder/v3dict/DictionaryBuilder.scala   |  9 ++++-
 .../builder/v3dict/GlobalDictionarySuite.scala     | 42 +++++++++++++++-------
 2 files changed, 37 insertions(+), 14 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
index 6d4a177916..e47dc7107e 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.kylin.engine.spark.builder.v3dict
 
+import io.delta.exceptions.DeltaConcurrentModificationException
 import io.delta.tables.DeltaTable
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KylinConfig
@@ -167,8 +168,14 @@ object DictionaryBuilder extends Logging {
     try {
       dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)
     } catch {
+      case e: DeltaConcurrentModificationException =>
+        logWarning(s"Concurrent modifications occurred: $dictPath", e)
+        throw e
       case NonFatal(e) =>
-        HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration, new 
Path(dictPath))
+        if (!DeltaTable.isDeltaTable(dictPath)) {
+          logWarning(s"Try to delete v3dict: $dictPath", e)
+          HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration, new 
Path(dictPath))
+        }
         throw e
     }
   }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
index eb9725df9f..c72de5c0b7 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark.builder.v3dict
 
+import io.delta.exceptions.ConcurrentWriteException
 import io.delta.tables.DeltaTable
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KylinConfig
@@ -228,34 +229,49 @@ class GlobalDictionarySuite extends SparderBaseFunSuite 
with LocalMetadata with
     ).count()
     assert(numFileRemaining < numOfFiles + deltaLog.snapshot.numOfRemoves)
   }
+
   test("KE-41980 Test failure to initialize dictionary file") {
+    def isDirectory(path: String): Boolean = {
+      HadoopUtil.getWorkingFileSystem.isDirectory(new Path(path))
+    }
+
     val project = "project"
     val dbName = "db"
     val tableName = "table"
     val colName = "col"
     val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + 
colName
-
     val context = new DictionaryContext(project, dbName, tableName, colName, 
null)
     val dictPath = DictionaryBuilder.getDictionaryPath(context)
-    val dictDF = spy(genRandomData(spark, encodeColName, 10, 2))
 
-    // mock write delta table failed and throw a Exception
-    when(dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath))
-      .thenThrow(new RuntimeException())
+    // make dictPath an empty dir, which is not a delta table
+    // so that getDictionaryPathAndCheck will throw KylinRuntimeException
+    HadoopUtil.mkdirIfNotExist(dictPath)
+    assert(isDirectory(dictPath) == true)
+    assert(DeltaTable.isDeltaTable(dictPath) == false)
+    intercept[KylinRuntimeException] {
+      DictionaryBuilder.getDictionaryPathAndCheck(context)
+    }
 
+    // When writing Delta Table throw `RuntimeException`
+    // `dictPath` will be delete if it is not an Delta Table
+    val dictDF1 = spy(genRandomData(spark, encodeColName, 5, 2))
+    when(dictDF1.write).thenThrow(new RuntimeException())
     intercept[RuntimeException] {
-      DictionaryBuilder.initAndSaveDict(dictDF, context)
+      DictionaryBuilder.initAndSaveDict(dictDF1, context)
     }
+    assert(isDirectory(dictPath) == false)
 
-    // after mock throw RuntimeException, dictPath will be delete
-    assert(HadoopUtil.getWorkingFileSystem().exists(new Path(dictPath)) == 
false)
 
-    // write temp file to dictPath, which is not a delta table
-    // so that getDictionaryPathAndCheck will throw KylinRuntimeException
-    HadoopUtil.writeStringToHdfs("tempString4Test", new Path(dictPath))
-    intercept[KylinRuntimeException] {
-      DictionaryBuilder.getDictionaryPathAndCheck(context)
+    // write a Delta Table for testing `ConcurrentWriteException`
+    // Delta Table will not be deleted after `ConcurrentWriteException`
+    val dictDF2 = spy(genRandomData(spark, encodeColName, 5, 2))
+    dictDF2.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)
+    assert(DeltaTable.isDeltaTable(dictPath) == true)
+    when(dictDF2.write).thenThrow(new ConcurrentWriteException(""))
+    intercept[ConcurrentWriteException] {
+      DictionaryBuilder.initAndSaveDict(dictDF2, context)
     }
+    assert(DeltaTable.isDeltaTable(dictPath) == true)
   }
 
   def genBuildDictTask(spark: SparkSession, context: DictionaryContext): 
Runnable = {

Reply via email to