This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ce79ff947a [Enhancement](spark load)Support spark time out config (#17108) ce79ff947a is described below commit ce79ff947a6d6456f4719076235b0115daa9733a Author: liujinhui <965147...@qq.com> AuthorDate: Thu Mar 30 20:12:46 2023 +0800 [Enhancement](spark load)Support spark time out config (#17108) --- .../doris/load/loadv2/SparkEtlJobHandler.java | 5 ++++- .../doris/load/loadv2/SparkLauncherMonitor.java | 22 ++++++++++++++++++---- .../load/loadv2/SparkLauncherMonitorTest.java | 7 ++++++- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 57f44f41d5..57f0c58af7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -117,6 +117,8 @@ public class SparkEtlJobHandler { sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath); } + LOG.info("submit etl spark job, sparkConfigs:{}", sparkConfigs); + try { byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8"); BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc); @@ -154,7 +156,8 @@ public class SparkEtlJobHandler { Process process = launcher.launch(); handle.setProcess(process); if (!FeConstants.runningUnitTest) { - SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); + SparkLauncherMonitor.LogMonitor logMonitor = + SparkLauncherMonitor.createLogMonitor(handle, sparkConfigs); logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS); logMonitor.setRedirectLogPath(logFilePath); logMonitor.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 012b7ad6d5..4cf388f53b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -20,6 +20,8 @@ package org.apache.doris.load.loadv2; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Strings; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.logging.log4j.LogManager; @@ -32,14 +34,15 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; public class SparkLauncherMonitor { private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitor.class); - public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) { - return new LogMonitor(handle); + public static LogMonitor createLogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) { + return new LogMonitor(handle, resourceSparkConfig); } private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) { @@ -80,18 +83,29 @@ public class SparkLauncherMonitor { // 5min private static final long DEFAULT_SUBMIT_TIMEOUT_MS = 300000L; + private static final String SUBMIT_TIMEOUT_KEY = "spark.submit.timeout"; - public LogMonitor(SparkLoadAppHandle handle) { + public LogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) { this.handle = handle; this.process = handle.getProcess(); this.isStop = false; - setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS); + + if (MapUtils.isNotEmpty(resourceSparkConfig) + && StringUtils.isNotEmpty(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY))) { + setSubmitTimeoutMs(Long.parseLong(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY))); + } else { + setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS); + } } public void setSubmitTimeoutMs(long submitTimeoutMs) { this.submitTimeoutMs = submitTimeoutMs; } + public long getSubmitTimeoutMs() { + return submitTimeoutMs; + } + public void setRedirectLogPath(String redirectLogPath) throws IOException { this.outputStream = new FileOutputStream(new File(redirectLogPath), false); this.handle.setLogPath(redirectLogPath); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java index f9874c33c7..c9f28f188e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.junit.After; import org.junit.Assert; @@ -26,6 +27,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.net.URL; +import java.util.Map; public class SparkLauncherMonitorTest { private String appId; @@ -51,13 +53,16 @@ public class SparkLauncherMonitorTest { @Test public void testLogMonitorNormal() { + Map<String, String> config = new HashedMap(); + config.put("spark.submit.timeout", "600000"); URL log = getClass().getClassLoader().getResource("spark_launcher_monitor.log"); String cmd = "cat " + log.getPath(); SparkLoadAppHandle handle = null; try { Process process = Runtime.getRuntime().exec(cmd); handle = new SparkLoadAppHandle(process); - SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); + SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle, config); + Assert.assertEquals(logMonitor.getSubmitTimeoutMs(), 600000L); logMonitor.setRedirectLogPath(logPath); logMonitor.start(); try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org