This is an automated email from the ASF dual-hosted git repository. yaqian pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5d552fe61e4a05b5f061805807f866ce53f4c03a Author: tianhui5 <tianh...@xiaomi.com> AuthorDate: Mon Feb 22 10:15:46 2021 +0800 KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster --- .../engine/spark/application/SparkApplication.java | 20 ++++++++++++-------- .../kylin/engine/spark/job/NSparkExecutable.java | 15 +++++++++++---- .../org/apache/spark/application/JobWorkSpace.scala | 6 +++++- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 10af5cd..9baa490 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.application; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; @@ -33,14 +34,15 @@ import org.apache.kylin.engine.spark.job.LogJobInfoUtils; import org.apache.kylin.engine.spark.job.UdfManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.engine.spark.utils.SparkConfHelper; + +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -79,8 +81,14 @@ public abstract class SparkApplication { protected BuildJobInfos infos; public void execute(String[] args) { - try { - String argsLine = Files.readAllLines(Paths.get(args[0])).get(0); + Path path = new Path(args[0]); + try ( + FileSystem fileSystem = FileSystem.get(path.toUri(), HadoopUtil.getCurrentConfiguration()); + FSDataInputStream inputStream = fileSystem.open(path); + InputStreamReader inputStreamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader); + ) { + String argsLine = bufferedReader.readLine(); if (argsLine.isEmpty()) { throw new RuntimeException("Args file is empty"); } @@ -312,10 +320,6 @@ public abstract class SparkApplication { if (infos != null) { infos.jobEnd(); } - if (ss != null && !ss.conf().get("spark.master").startsWith("local")) { - //JobMetricsUtils.unRegisterListener(ss); - ss.stop(); - } } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 557627d..66fa91d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark.job; +import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; @@ -38,8 +39,8 @@ import java.util.Map.Entry; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; +import org.apache.hadoop.fs.FileSystem; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -169,11 +170,15 @@ public class NSparkExecutable extends AbstractExecutable { String dumpArgs() throws ExecuteException { File tmpDir = null; try { - tmpDir = File.createTempFile(MetadataConstants.P_SEGMENT_IDS, ""); - FileUtils.writeByteArrayToFile(tmpDir, JsonUtil.writeValueAsBytes(getParams())); + String pathName = getId() + "_" + MetadataConstants.P_JOB_ID; + Path tgtPath = new Path(getConfig().getJobTmpDir(getParams().get("project")), pathName); + FileSystem fileSystem = FileSystem.get(tgtPath.toUri(), HadoopUtil.getCurrentConfiguration()); + try (BufferedOutputStream outputStream = new BufferedOutputStream(fileSystem.create(tgtPath, false))) { + outputStream.write(JsonUtil.writeValueAsBytes(getParams())); + } logger.info("Spark job args json is : {}.", JsonUtil.writeValueAsString(getParams())); - return tmpDir.getCanonicalPath(); + return tgtPath.toUri().toString(); } catch (IOException e) { if (tmpDir != null && tmpDir.exists()) { try { @@ -382,6 +387,8 @@ public class NSparkExecutable extends AbstractExecutable { if (StringUtils.isNotBlank(sparkUploadFiles)) { sb.append("--files ").append(sparkUploadFiles).append(" "); } + sb.append("--principal ").append(config.getKerberosPrincipal()).append(" "); + sb.append("--keytab ").append(config.getKerberosKeytabPath()).append(" "); sb.append("--name job_step_%s "); sb.append("--jars %s %s %s"); String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar, diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala index c168632..a4ddf25 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala @@ -35,7 +35,11 @@ object JobWorkSpace extends Logging { val worker = new JobWorker(application, appArgs, eventLoop) val monitor = new JobMonitor(eventLoop) val workspace = new JobWorkSpace(eventLoop, monitor, worker) - System.exit(workspace.run()) + if (System.getProperty("spark.master").equals("yarn") && System.getProperty("spark.submit.deployMode").equals("cluster")) { + workspace.run() + } else { + System.exit(workspace.run()) + } } catch { case throwable: Throwable => logError("Error occurred when init job workspace.", throwable)