This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5e55b761ea30e00c4488b9ef7deac3f9cc5e4a6d Author: Yu Gan <yu....@kyligence.io> AuthorDate: Wed Nov 30 22:58:18 2022 +0800 KYLIN-5416 ensure at least one worker was registered before dict lock added --- .../job/stage/build/FlatTableAndDictBase.scala | 38 +++++++++- .../spark/builder/TestBuildDictLockConflict.scala | 88 ++++++++++++++++++++++ 2 files changed, 122 insertions(+), 4 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index 4de752d79e..c44fe1f9e0 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -18,8 +18,7 @@ package org.apache.kylin.engine.spark.job.stage.build -import java.util.{Locale, Objects} - +import com.google.common.collect.Sets import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.kylin.common.util.HadoopUtil @@ -44,6 +43,8 @@ import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.utils.ProxyThreadUtils +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Locale, Objects, Timer, TimerTask} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.parallel.ForkJoinTaskSupport @@ -51,8 +52,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS} import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success, Try} -import com.google.common.collect.Sets - abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, private val dataSegment: NDataSegment, private val buildParam: BuildParam) @@ -500,6 +499,8 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, if (dataSegment.isDictReady) { logInfo(s"Skip DICTIONARY segment $segmentId") } else { + // ensure at least one worker was registered before dictionary lock added. + waitTillWorkerRegistered() buildDict(table, dictCols) } @@ -525,6 +526,35 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, SparkInternalAgent.getDataFrame(sparkSession, encodePlan) } + def waitTillWorkerRegistered(timeout: Long = 20, unit: TimeUnit = TimeUnit.SECONDS): Unit = { + val cdl = new CountDownLatch(1) + val timer = new Timer("worker-starvation-timer", true) + timer.scheduleAtFixedRate(new TimerTask { + + private def releaseAll(): Unit = { + this.cancel() + cdl.countDown() + } + + override def run(): Unit = { + try { + if (sparkSession.sparkContext.statusTracker.getExecutorInfos.isEmpty) { + logWarning("Ensure at least one worker has been registered before building dictionary.") + } else { + releaseAll() + } + } catch { + case t: Throwable => + logError(s"Something unexpected happened, we wouldn't wait for resource ready.", t) + releaseAll() + } + } + }, 0, unit.toMillis(timeout)) + // At most waiting for 10 hours. + cdl.await(10, TimeUnit.HOURS) + timer.cancel() + } + private def concatCCs(table: Dataset[Row], computColumns: Set[TblColRef]): Dataset[Row] = { val matchedCols = selectColumnsInTable(table, computColumns) var tableWithCcs = table diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestBuildDictLockConflict.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestBuildDictLockConflict.scala new file mode 100644 index 0000000000..c1a8d54466 --- /dev/null +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestBuildDictLockConflict.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.builder + +import org.apache.kylin.common.KylinConfig +import org.apache.kylin.engine.spark.job.SegmentBuildJob +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.engine.spark.job.stage.build.BuildDict +import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc +import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree +import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder +import org.apache.kylin.metadata.cube.model._ +import org.apache.kylin.metadata.model.SegmentRange +import org.apache.spark.SparkExecutorInfo +import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} +import org.mockito.Mockito + +import scala.collection.JavaConverters._ + + +class TestBuildDictLockConflict extends SparderBaseFunSuite with SharedSparkSession with LocalMetadata { + + def getTestConfig: KylinConfig = { + KylinConfig.getInstanceFromEnv + } + + test("waitTillWorkerRegistered") { + getTestConfig.setProperty("kylin.engine.persist-flattable-enabled", "false") + getTestConfig.setProperty("kylin.engine.count.lookup-table-max-time", "0") + getTestConfig.setProperty("kylin.source.record-source-usage-enabled", "false") + + val project1 = "default" + val model1 = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96" + + val dfMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, project1) + val df: NDataflow = dfMgr.getDataflow(model1) + // cleanup all segments first + val update = new NDataflowUpdate(df.getUuid) + update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray) + dfMgr.updateDataflow(update) + + val seg = dfMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) + val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts)) + val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, toBuildTree) + + val spiedSparkSession = Mockito.spy(spark) + val spiedSparkContext = Mockito.spy(spark.sparkContext) + val spiedTracker = Mockito.spy(spark.sparkContext.statusTracker) + val spiedSegmentJob = Mockito.spy(classOf[SegmentBuildJob]) + Mockito.doAnswer(_ => spiedSparkSession).when(spiedSegmentJob).getSparkSession + val buildParam = new BuildParam() + buildParam.setSpanningTree(toBuildTree) + buildParam.setFlatTableDesc(flatTableDesc) + + val flatTable = new BuildDict(spiedSegmentJob, seg, buildParam) { + override def execute(): Unit = { + // do nothing. + } + } + Mockito.when(spiedSparkSession.sparkContext).thenReturn(spiedSparkContext) + Mockito.when(spiedSparkContext.statusTracker).thenReturn(spiedTracker) + Mockito.when(spiedTracker.getExecutorInfos) + .thenReturn(Array.empty[SparkExecutorInfo]) + .thenCallRealMethod() + flatTable.waitTillWorkerRegistered(timeout = 1L) + + Mockito.when(spiedTracker.getExecutorInfos) + .thenThrow(new NullPointerException("Test NPE!!")) + .thenCallRealMethod() + flatTable.waitTillWorkerRegistered(timeout = 1L) + } +}