This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bc00ef2ab5ccfc85542c31e375f222bd32a0e0b5 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Sat Jun 10 10:26:52 2023 +0800 KYLIN-5719 [FOLLOW UP] v3dct catch DeltaConcurrentModificationException --- .../spark/builder/v3dict/DictionaryBuilder.scala | 9 ++++- .../builder/v3dict/GlobalDictionarySuite.scala | 42 +++++++++++++++------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala index 6d4a177916..e47dc7107e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala @@ -17,6 +17,7 @@ */ package org.apache.kylin.engine.spark.builder.v3dict +import io.delta.exceptions.DeltaConcurrentModificationException import io.delta.tables.DeltaTable import org.apache.hadoop.fs.Path import org.apache.kylin.common.KylinConfig @@ -167,8 +168,14 @@ object DictionaryBuilder extends Logging { try { dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath) } catch { + case e: DeltaConcurrentModificationException => + logWarning(s"Concurrent modifications occurred: $dictPath", e) + throw e case NonFatal(e) => - HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration, new Path(dictPath)) + if (!DeltaTable.isDeltaTable(dictPath)) { + logWarning(s"Try to delete v3dict: $dictPath", e) + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration, new Path(dictPath)) + } throw e } } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala index eb9725df9f..c72de5c0b7 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionarySuite.scala @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark.builder.v3dict +import io.delta.exceptions.ConcurrentWriteException import io.delta.tables.DeltaTable import org.apache.hadoop.fs.Path import org.apache.kylin.common.KylinConfig @@ -228,34 +229,49 @@ class GlobalDictionarySuite extends SparderBaseFunSuite with LocalMetadata with ).count() assert(numFileRemaining < numOfFiles + deltaLog.snapshot.numOfRemoves) } + test("KE-41980 Test failure to initialize dictionary file") { + def isDirectory(path: String): Boolean = { + HadoopUtil.getWorkingFileSystem.isDirectory(new Path(path)) + } + val project = "project" val dbName = "db" val tableName = "table" val colName = "col" val encodeColName: String = tableName + NSparkCubingUtil.SEPARATOR + colName - val context = new DictionaryContext(project, dbName, tableName, colName, null) val dictPath = DictionaryBuilder.getDictionaryPath(context) - val dictDF = spy(genRandomData(spark, encodeColName, 10, 2)) - // mock write delta table failed and throw a Exception - when(dictDF.write.mode(SaveMode.Overwrite).format("delta").save(dictPath)) - .thenThrow(new RuntimeException()) + // make dictPath an empty dir, which is not a delta table + // so that getDictionaryPathAndCheck will throw KylinRuntimeException + HadoopUtil.mkdirIfNotExist(dictPath) + assert(isDirectory(dictPath) == true) + assert(DeltaTable.isDeltaTable(dictPath) == false) + intercept[KylinRuntimeException] { + DictionaryBuilder.getDictionaryPathAndCheck(context) + } + // When writing Delta Table throw `RuntimeException` + // `dictPath` will be delete if it is not an Delta Table + val dictDF1 = spy(genRandomData(spark, encodeColName, 5, 2)) + when(dictDF1.write).thenThrow(new RuntimeException()) intercept[RuntimeException] { - DictionaryBuilder.initAndSaveDict(dictDF, context) + DictionaryBuilder.initAndSaveDict(dictDF1, context) } + assert(isDirectory(dictPath) == false) - // after mock throw RuntimeException, dictPath will be delete - assert(HadoopUtil.getWorkingFileSystem().exists(new Path(dictPath)) == false) - // write temp file to dictPath, which is not a delta table - // so that getDictionaryPathAndCheck will throw KylinRuntimeException - HadoopUtil.writeStringToHdfs("tempString4Test", new Path(dictPath)) - intercept[KylinRuntimeException] { - DictionaryBuilder.getDictionaryPathAndCheck(context) + // write a Delta Table for testing `ConcurrentWriteException` + // Delta Table will not be deleted after `ConcurrentWriteException` + val dictDF2 = spy(genRandomData(spark, encodeColName, 5, 2)) + dictDF2.write.mode(SaveMode.Overwrite).format("delta").save(dictPath) + assert(DeltaTable.isDeltaTable(dictPath) == true) + when(dictDF2.write).thenThrow(new ConcurrentWriteException("")) + intercept[ConcurrentWriteException] { + DictionaryBuilder.initAndSaveDict(dictDF2, context) } + assert(DeltaTable.isDeltaTable(dictPath) == true) } def genBuildDictTask(spark: SparkSession, context: DictionaryContext): Runnable = {