This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ed8b34665df96225ea949643cdbe48a3934f738e
Author: Mingming Ge <7mmi...@gmail.com>
AuthorDate: Mon Apr 17 18:24:38 2023 +0800

    KYLIN-5641 fix set spark conf in serverless mode
---
 .../job/stage/build/FlatTableAndDictBase.scala     | 13 ++++++----
 .../kylin/engine/spark/utils/SparkConfHelper.java  | 28 +++++++++++++++-------
 2 files changed, 28 insertions(+), 13 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index 5678b2f5f6..3e94fbefee 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -32,7 +32,7 @@ import org.apache.kylin.engine.spark.job.{FiltersUtil, 
SegmentJob, TableMetaMana
 import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
 import org.apache.kylin.engine.spark.model.planner.{CuboIdToLayoutUtils, 
FlatTableToCostUtils}
 import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
-import org.apache.kylin.engine.spark.utils.LogEx
+import org.apache.kylin.engine.spark.utils.{LogEx, SparkConfHelper}
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
 import org.apache.kylin.guava30.shaded.common.collect.Sets
 import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
@@ -347,11 +347,14 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     }
     logInfo(s"Segment $segmentId persist flat table: $flatTablePath")
     sparkSession.sparkContext.setJobDescription(s"Segment $segmentId persist 
flat table.")
-    if (config.isFlatTableRedistributionEnabled) {
-      
sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource",
 "true")
-    }
+    SparkConfHelper.setLocalPropertyIfNeeded(sparkSession,
+      config.isFlatTableRedistributionEnabled,
+      "spark.sql.sources.repartitionWritingDataSource",
+      "true");
     tableDS.write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString)
-    
sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource",
 null)
+    SparkConfHelper.resetLocalPropertyIfNeeded(sparkSession,
+      config.isFlatTableRedistributionEnabled,
+      "spark.sql.sources.repartitionWritingDataSource");
     DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => {
       copied.setFlatTableReady(true)
       if (dataSegment.isFlatTableReady) {
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java
index 7ba17d3927..a0ee8446d5 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/utils/SparkConfHelper.java
@@ -18,15 +18,14 @@
 
 package org.apache.kylin.engine.spark.utils;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.spark.SparkConf;
 import org.apache.spark.conf.rule.ExecutorCoreRule;
 import org.apache.spark.conf.rule.ExecutorInstancesRule;
@@ -36,12 +35,13 @@ import org.apache.spark.conf.rule.ShufflePartitionsRule;
 import org.apache.spark.conf.rule.SparkConfRule;
 import org.apache.spark.conf.rule.StandaloneConfRule;
 import org.apache.spark.conf.rule.YarnConfRule;
+import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class SparkConfHelper {
     protected static final Logger logger = 
LoggerFactory.getLogger(SparkConfHelper.class);
@@ -115,4 +115,16 @@ public class SparkConfHelper {
     public boolean hasCountDistinct() {
         return "true".equalsIgnoreCase(getConf(COUNT_DISTICT));
     }
+
+    public static void setLocalPropertyIfNeeded(SparkSession ss, boolean 
config, String key, String value) {
+        if (config) {
+            ss.sessionState().conf().setLocalProperty(key, value);
+        }
+    }
+
+    public static void resetLocalPropertyIfNeeded(SparkSession ss, boolean 
config, String key) {
+        if (config) {
+            ss.sessionState().conf().setLocalProperty(key, "");
+        }
+    }
 }

Reply via email to