This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5e55b761ea30e00c4488b9ef7deac3f9cc5e4a6d
Author: Yu Gan <yu....@kyligence.io>
AuthorDate: Wed Nov 30 22:58:18 2022 +0800

    KYLIN-5416 ensure at least one worker was registered before dict lock added
---
 .../job/stage/build/FlatTableAndDictBase.scala     | 38 +++++++++-
 .../spark/builder/TestBuildDictLockConflict.scala  | 88 ++++++++++++++++++++++
 2 files changed, 122 insertions(+), 4 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index 4de752d79e..c44fe1f9e0 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.engine.spark.job.stage.build
 
-import java.util.{Locale, Objects}
-
+import com.google.common.collect.Sets
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.util.HadoopUtil
@@ -44,6 +43,8 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.utils.ProxyThreadUtils
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Locale, Objects, Timer, TimerTask}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.parallel.ForkJoinTaskSupport
@@ -51,8 +52,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS}
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success, Try}
 
-import com.google.common.collect.Sets
-
 abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
                                     private val dataSegment: NDataSegment,
                                     private val buildParam: BuildParam)
@@ -500,6 +499,8 @@ abstract class FlatTableAndDictBase(private val jobContext: 
SegmentJob,
     if (dataSegment.isDictReady) {
       logInfo(s"Skip DICTIONARY segment $segmentId")
     } else {
+      // ensure at least one worker was registered before dictionary lock 
added.
+      waitTillWorkerRegistered()
       buildDict(table, dictCols)
     }
 
@@ -525,6 +526,35 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     SparkInternalAgent.getDataFrame(sparkSession, encodePlan)
   }
 
+  def waitTillWorkerRegistered(timeout: Long = 20, unit: TimeUnit = 
TimeUnit.SECONDS): Unit = {
+    val cdl = new CountDownLatch(1)
+    val timer = new Timer("worker-starvation-timer", true)
+    timer.scheduleAtFixedRate(new TimerTask {
+
+      private def releaseAll(): Unit = {
+        this.cancel()
+        cdl.countDown()
+      }
+
+      override def run(): Unit = {
+        try {
+          if 
(sparkSession.sparkContext.statusTracker.getExecutorInfos.isEmpty) {
+            logWarning("Ensure at least one worker has been registered before 
building dictionary.")
+          } else {
+            releaseAll()
+          }
+        } catch {
+          case t: Throwable =>
+            logError(s"Something unexpected happened, we wouldn't wait for 
resource ready.", t)
+            releaseAll()
+        }
+      }
+    }, 0, unit.toMillis(timeout))
+    // At most waiting for 10 hours.
+    cdl.await(10, TimeUnit.HOURS)
+    timer.cancel()
+  }
+
   private def concatCCs(table: Dataset[Row], computColumns: Set[TblColRef]): 
Dataset[Row] = {
     val matchedCols = selectColumnsInTable(table, computColumns)
     var tableWithCcs = table
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestBuildDictLockConflict.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestBuildDictLockConflict.scala
new file mode 100644
index 0000000000..c1a8d54466
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestBuildDictLockConflict.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.builder
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.engine.spark.job.SegmentBuildJob
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.engine.spark.job.stage.build.BuildDict
+import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
+import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
+import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
+import org.apache.kylin.metadata.cube.model._
+import org.apache.kylin.metadata.model.SegmentRange
+import org.apache.spark.SparkExecutorInfo
+import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
+import org.mockito.Mockito
+
+import scala.collection.JavaConverters._
+
+
+class TestBuildDictLockConflict extends SparderBaseFunSuite with 
SharedSparkSession with LocalMetadata {
+
+  def getTestConfig: KylinConfig = {
+    KylinConfig.getInstanceFromEnv
+  }
+
+  test("waitTillWorkerRegistered") {
+    getTestConfig.setProperty("kylin.engine.persist-flattable-enabled", 
"false")
+    getTestConfig.setProperty("kylin.engine.count.lookup-table-max-time", "0")
+    getTestConfig.setProperty("kylin.source.record-source-usage-enabled", 
"false")
+
+    val project1 = "default"
+    val model1 = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96"
+
+    val dfMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
project1)
+    val df: NDataflow = dfMgr.getDataflow(model1)
+    // cleanup all segments first
+    val update = new NDataflowUpdate(df.getUuid)
+    update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray)
+    dfMgr.updateDataflow(update)
+
+    val seg = dfMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
+    val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new 
AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts))
+    val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, 
toBuildTree)
+
+    val spiedSparkSession = Mockito.spy(spark)
+    val spiedSparkContext = Mockito.spy(spark.sparkContext)
+    val spiedTracker = Mockito.spy(spark.sparkContext.statusTracker)
+    val spiedSegmentJob = Mockito.spy(classOf[SegmentBuildJob])
+    Mockito.doAnswer(_ => 
spiedSparkSession).when(spiedSegmentJob).getSparkSession
+    val buildParam = new BuildParam()
+    buildParam.setSpanningTree(toBuildTree)
+    buildParam.setFlatTableDesc(flatTableDesc)
+
+    val flatTable = new BuildDict(spiedSegmentJob, seg, buildParam) {
+      override def execute(): Unit = {
+        // do nothing.
+      }
+    }
+    Mockito.when(spiedSparkSession.sparkContext).thenReturn(spiedSparkContext)
+    Mockito.when(spiedSparkContext.statusTracker).thenReturn(spiedTracker)
+    Mockito.when(spiedTracker.getExecutorInfos)
+      .thenReturn(Array.empty[SparkExecutorInfo])
+      .thenCallRealMethod()
+    flatTable.waitTillWorkerRegistered(timeout = 1L)
+
+    Mockito.when(spiedTracker.getExecutorInfos)
+      .thenThrow(new NullPointerException("Test NPE!!"))
+      .thenCallRealMethod()
+    flatTable.waitTillWorkerRegistered(timeout = 1L)
+  }
+}

Reply via email to