This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ed8b34665df96225ea949643cdbe48a3934f738e Author: Mingming Ge <7mmi...@gmail.com> AuthorDate: Mon Apr 17 18:24:38 2023 +0800 KYLIN-5641 fix set spark conf in serverless mode --- .../job/stage/build/FlatTableAndDictBase.scala | 13 ++++++---- .../kylin/engine/spark/utils/SparkConfHelper.java | 28 +++++++++++++++------- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index 5678b2f5f6..3e94fbefee 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -32,7 +32,7 @@ import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob, TableMetaMana import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc import org.apache.kylin.engine.spark.model.planner.{CuboIdToLayoutUtils, FlatTableToCostUtils} import org.apache.kylin.engine.spark.smarter.IndexDependencyParser -import org.apache.kylin.engine.spark.utils.LogEx +import org.apache.kylin.engine.spark.utils.{LogEx, SparkConfHelper} import org.apache.kylin.engine.spark.utils.SparkDataSource._ import org.apache.kylin.guava30.shaded.common.collect.Sets import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree @@ -347,11 +347,14 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, } logInfo(s"Segment $segmentId persist flat table: $flatTablePath") sparkSession.sparkContext.setJobDescription(s"Segment $segmentId persist flat table.") - if (config.isFlatTableRedistributionEnabled) { - sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource", "true") - } + SparkConfHelper.setLocalPropertyIfNeeded(sparkSession, + config.isFlatTableRedistributionEnabled, + "spark.sql.sources.repartitionWritingDataSource", + "true"); tableDS.write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString) - sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource", null) + SparkConfHelper.resetLocalPropertyIfNeeded(sparkSession, + config.isFlatTableRedistributionEnabled, + "spark.sql.sources.repartitionWritingDataSource"); DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => { copied.setFlatTableReady(true) if (dataSegment.isFlatTableReady) { diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java index 7ba17d3927..a0ee8446d5 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java @@ -18,15 +18,14 @@ package org.apache.kylin.engine.spark.utils; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.engine.spark.job.KylinBuildEnv; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.spark.SparkConf; import org.apache.spark.conf.rule.ExecutorCoreRule; import org.apache.spark.conf.rule.ExecutorInstancesRule; @@ -36,12 +35,13 @@ import org.apache.spark.conf.rule.ShufflePartitionsRule; import org.apache.spark.conf.rule.SparkConfRule; import org.apache.spark.conf.rule.StandaloneConfRule; import org.apache.spark.conf.rule.YarnConfRule; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class SparkConfHelper { protected static final Logger logger = LoggerFactory.getLogger(SparkConfHelper.class); @@ -115,4 +115,16 @@ public class SparkConfHelper { public boolean hasCountDistinct() { return "true".equalsIgnoreCase(getConf(COUNT_DISTICT)); } + + public static void setLocalPropertyIfNeeded(SparkSession ss, boolean config, String key, String value) { + if (config) { + ss.sessionState().conf().setLocalProperty(key, value); + } + } + + public static void resetLocalPropertyIfNeeded(SparkSession ss, boolean config, String key) { + if (config) { + ss.sessionState().conf().setLocalProperty(key, ""); + } + } }