This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 92d08f19282263f70140fd93bfa7ae5a4c0d652b Author: Jiawei Li <1019037...@qq.com> AuthorDate: Tue Apr 18 11:38:41 2023 +0800 KYLIN-5638 KE create spark history dir auto in SparkApplication (#30295) --- .../engine/spark/application/SparkApplication.java | 13 +++++++- .../spark/application/SparkApplicationTest.java | 37 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index bae641e0b0..9de5b85f8a 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -590,7 +590,8 @@ public abstract class SparkApplication implements Application { } } - private void exchangeSparkConf(SparkConf sparkConf) throws Exception { + @VisibleForTesting + void exchangeSparkConf(SparkConf sparkConf) throws Exception { if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) { Map<String, String> baseSparkConf = getSparkConfigOverride(config); if (!baseSparkConf.isEmpty()) { @@ -606,6 +607,16 @@ public abstract class SparkApplication implements Application { logger.warn("Auto set spark conf failed. Load spark conf from system properties", e); } } + + } + val eventLogEnabled = sparkConf.getBoolean("spark.eventLog.enabled", false); + val logDir = sparkConf.get("spark.eventLog.dir", ""); + if (eventLogEnabled && !logDir.isEmpty()) { + val logPath = new Path(new URI(logDir).getPath()); + val fs = HadoopUtil.getWorkingFileSystem(); + if (!fs.exists(logPath)) { + fs.mkdirs(logPath); + } } atomicSparkConf.set(sparkConf); diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java index cc44aedd0d..80d5f6fdd1 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java @@ -25,9 +25,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import lombok.val; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; import org.apache.kylin.engine.spark.job.KylinBuildEnv; @@ -43,6 +45,7 @@ import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.spark.SparkConf; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; import org.junit.After; import org.junit.Assert; @@ -204,4 +207,38 @@ public class SparkApplicationTest extends NLocalWithSparkSessionTest { Assert.assertFalse(upload.exists()); } + @Test + public void testMkHistoryEventLog() throws Exception { + KylinConfig config = getTestConfig(); + SparkApplication application = new SparkApplication() { + @Override + protected void doExecute() { + } + }; + application.config = config; + SparkConf sparkConf = new SparkConf(); + + Path existedLogDir = new Path("/tmp/ke/testMkHistoryEventLog-existed-" + System.currentTimeMillis()); + Path notExistedLogDir = new Path("/tmp/ke/testMkHistoryEventLog-not-existed-" + System.currentTimeMillis()); + val fs = HadoopUtil.getWorkingFileSystem(); + if (!fs.exists(existedLogDir)) { + fs.mkdirs(existedLogDir); + } + if (fs.exists(notExistedLogDir)) { + fs.delete(existedLogDir); + } + sparkConf.set("spark.eventLog.enabled", "false"); + sparkConf.set("spark.eventLog.dir", notExistedLogDir.toString()); + application.exchangeSparkConf(sparkConf); + assert !fs.exists(notExistedLogDir); + sparkConf.set("spark.eventLog.enabled", "true"); + application.exchangeSparkConf(sparkConf); + assert fs.exists(notExistedLogDir); + sparkConf.set("spark.eventLog.dir", existedLogDir.toString()); + application.exchangeSparkConf(sparkConf); + assert fs.exists(existedLogDir); + sparkConf.set("spark.eventLog.dir", ""); + application.exchangeSparkConf(sparkConf); + } + }