This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 92d08f19282263f70140fd93bfa7ae5a4c0d652b
Author: Jiawei Li <1019037...@qq.com>
AuthorDate: Tue Apr 18 11:38:41 2023 +0800

    KYLIN-5638 KE create spark history dir auto in SparkApplication (#30295)
---
 .../engine/spark/application/SparkApplication.java | 13 +++++++-
 .../spark/application/SparkApplicationTest.java    | 37 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index bae641e0b0..9de5b85f8a 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -590,7 +590,8 @@ public abstract class SparkApplication implements 
Application {
         }
     }
 
-    private void exchangeSparkConf(SparkConf sparkConf) throws Exception {
+    @VisibleForTesting
+    void exchangeSparkConf(SparkConf sparkConf) throws Exception {
         if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
             Map<String, String> baseSparkConf = getSparkConfigOverride(config);
             if (!baseSparkConf.isEmpty()) {
@@ -606,6 +607,16 @@ public abstract class SparkApplication implements 
Application {
                     logger.warn("Auto set spark conf failed. Load spark conf 
from system properties", e);
                 }
             }
+
+        }
+        val eventLogEnabled = sparkConf.getBoolean("spark.eventLog.enabled", 
false);
+        val logDir = sparkConf.get("spark.eventLog.dir", "");
+        if (eventLogEnabled && !logDir.isEmpty()) {
+            val logPath = new Path(new URI(logDir).getPath());
+            val fs = HadoopUtil.getWorkingFileSystem();
+            if (!fs.exists(logPath)) {
+                fs.mkdirs(logPath);
+            }
         }
 
         atomicSparkConf.set(sparkConf);
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index cc44aedd0d..80d5f6fdd1 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -25,9 +25,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import lombok.val;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
@@ -43,6 +45,7 @@ import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.SparkConf;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -204,4 +207,38 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTest {
         Assert.assertFalse(upload.exists());
     }
 
+    @Test
+    public void testMkHistoryEventLog() throws Exception {
+        KylinConfig config = getTestConfig();
+        SparkApplication application = new SparkApplication() {
+            @Override
+            protected void doExecute() {
+            }
+        };
+        application.config = config;
+        SparkConf sparkConf = new SparkConf();
+
+        Path existedLogDir = new Path("/tmp/ke/testMkHistoryEventLog-existed-" 
+ System.currentTimeMillis());
+        Path notExistedLogDir = new 
Path("/tmp/ke/testMkHistoryEventLog-not-existed-" + System.currentTimeMillis());
+        val fs = HadoopUtil.getWorkingFileSystem();
+        if (!fs.exists(existedLogDir)) {
+            fs.mkdirs(existedLogDir);
+        }
+        if (fs.exists(notExistedLogDir)) {
+            fs.delete(existedLogDir);
+        }
+        sparkConf.set("spark.eventLog.enabled", "false");
+        sparkConf.set("spark.eventLog.dir", notExistedLogDir.toString());
+        application.exchangeSparkConf(sparkConf);
+        assert !fs.exists(notExistedLogDir);
+        sparkConf.set("spark.eventLog.enabled", "true");
+        application.exchangeSparkConf(sparkConf);
+        assert fs.exists(notExistedLogDir);
+        sparkConf.set("spark.eventLog.dir", existedLogDir.toString());
+        application.exchangeSparkConf(sparkConf);
+        assert fs.exists(existedLogDir);
+        sparkConf.set("spark.eventLog.dir", "");
+        application.exchangeSparkConf(sparkConf);
+    }
+
 }

Reply via email to