This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 51ff5ec9c6959bd5238c579573494bc4b07c2095 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Wed Nov 25 20:00:08 2020 +0800 KYLIN-4825 Add yarn tracking url on job step page --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/job/execution/ExecutableManager.java | 4 +- .../engine/spark/application/SparkApplication.java | 106 +++++++++++++++++++++ .../kylin/engine/spark/job/NSparkExecutable.java | 15 +-- .../kylin/rest/controller/JobController.java | 18 +++- .../kylin/rest/request/SparkJobUpdateRequest.java | 54 +++++++++++ .../org/apache/kylin/rest/service/JobService.java | 13 +++ server/src/main/resources/kylinSecurity.xml | 1 + 8 files changed, 198 insertions(+), 17 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 964eb64..8f2bf8e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2648,6 +2648,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.engine.driver-memory-base", "1024")); } + public boolean isTrackingUrlIpAddressEnabled() { + return Boolean.valueOf(this.getOptional("kylin.job.tracking-url-ip-address-enabled", TRUE)); + } + //Auto adjust the memory of driver public int[] getSparkEngineDriverMemoryStrategy() { String[] dft = {"2", "20", "100"}; diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 42a9c99..0bd7b6e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -672,6 +672,7 @@ public class ExecutableManager { + newStatus + ", job id: " + jobId); } jobOutput.setStatus(newStatus.toString()); + logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); } if (info != null) { jobOutput.setInfo(info); @@ -683,7 +684,6 @@ public class ExecutableManager { jobOutput.setContent(output); } executableDao.updateJobOutput(jobOutput); - logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); if (needDestroyProcess(oldStatus, newStatus)) { logger.debug("need kill {}, from {} to {}", jobId, oldStatus, newStatus); @@ -695,7 +695,7 @@ public class ExecutableManager { throw new RuntimeException(e); } - if (project != null) { + if (project != null && logPath != null) { updateJobOutputToHDFS(project, jobId, output, logPath); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 84737cb..98dde2b 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -18,6 +18,14 @@ package org.apache.kylin.engine.spark.application; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.engine.spark.job.BuildJobInfos; import org.apache.kylin.engine.spark.job.KylinBuildEnv; @@ -26,8 +34,15 @@ import org.apache.kylin.engine.spark.job.UdfManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.engine.spark.utils.SparkConfHelper; import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Locale; import java.util.Map; import org.apache.hadoop.fs.FileSystem; @@ -111,6 +126,93 @@ public abstract class SparkApplication { return YarnInfoFetcherUtils.getTrackingUrl(yarnAppId); } + /** + * http request the spark job controller + */ + public Boolean updateSparkJobInfo(String url, String json) { + String serverIp = System.getProperty("spark.driver.rest.server.ip", "127.0.0.1"); + String port = System.getProperty("spark.driver.rest.server.port", "7070"); + String requestApi = String.format(Locale.ROOT, "http://%s:%s" + url, serverIp, port); + + try { + DefaultHttpClient httpClient = new DefaultHttpClient(); + HttpPut httpPut = new HttpPut(requestApi); + httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + httpPut.setEntity(new StringEntity(json, StandardCharsets.UTF_8)); + + HttpResponse response = httpClient.execute(httpPut); + int code = response.getStatusLine().getStatusCode(); + if (code == HttpStatus.SC_OK) { + return true; + } else { + InputStream inputStream = response.getEntity().getContent(); + String responseContent = IOUtils.toString(inputStream); + logger.warn("update spark job failed, info: {}", responseContent); + } + } catch (IOException e) { + logger.error("http request {} failed!", requestApi, e); + } + return false; + } + + /** + * update spark job extra info, link yarn_application_tracking_url + */ + public Boolean updateSparkJobExtraInfo(String url, String project, String jobId, Map<String, String> extraInfo) { + Map<String, String> payload = new HashMap<>(5); + payload.put("project", project); + payload.put("taskId", System.getProperty("spark.driver.param.taskId", jobId)); + payload.putAll(extraInfo); + + try { + String payloadJson = JsonUtil.writeValueAsString(payload); + int retry = 3; + for (int i = 0; i < retry; i++) { + if (updateSparkJobInfo(url, payloadJson)) { + return Boolean.TRUE; + } + Thread.sleep(3000); + logger.warn("retry request rest api update spark extra job info"); + } + } catch (Exception e) { + logger.error("update spark job extra info failed!", e); + } + + return Boolean.FALSE; + } + + private String tryReplaceHostAddress(String url) { + String originHost = null; + try { + URI uri = URI.create(url); + originHost = uri.getHost(); + String hostAddress = InetAddress.getByName(originHost).getHostAddress(); + return url.replace(originHost, hostAddress); + } catch (UnknownHostException uhe) { + logger.error("failed to get the ip address of " + originHost + ", step back to use the origin tracking url.", uhe); + return url; + } + } + + private Map<String, String> getTrackingInfo(boolean ipAddressPreferred) { + String applicationId = ss.sparkContext().applicationId(); + Map<String, String> extraInfo = new HashMap<>(); + try { + String trackingUrl = getTrackingUrl(applicationId); + if (StringUtils.isBlank(trackingUrl)) { + logger.warn("Get tracking url of application {}, but empty url found.", applicationId); + return extraInfo; + } + if (ipAddressPreferred) { + trackingUrl = tryReplaceHostAddress(trackingUrl); + } + extraInfo.put("yarnAppUrl", trackingUrl); + } catch (Exception e) { + logger.error("get tracking url failed!", e); + } + return extraInfo; + } + final protected void execute() throws Exception { String hdfsMetalUrl = getParam(MetadataConstants.P_DIST_META_URL); @@ -180,6 +282,10 @@ public abstract class SparkApplication { }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") .getOrCreate(); + if (isJobOnCluster(sparkConf)) { + updateSparkJobExtraInfo("/kylin/api/jobs/spark", project, jobId, + getTrackingInfo(config.isTrackingUrlIpAddressEnabled())); + } // for spark metrics //JobMetricsUtils.registerListener(ss); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 7cd178c..393c10d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -59,7 +59,6 @@ import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.job.common.PatternedLogger; -import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -254,24 +253,12 @@ public class NSparkExecutable extends AbstractExecutable { String kylinJobJar, String appArgs, String jobId) { PatternedLogger patternedLogger; if (config.isJobLogPrintEnabled()) { - patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() { - @Override - public void onLogEvent(String infoKey, Map<String, String> info) { - // only care three properties here - if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey) - || ExecutableConstants.YARN_APP_ID.equals(infoKey) - || ExecutableConstants.YARN_APP_URL.equals(infoKey)) { - getManager().addJobInfo(getId(), info); - } - } - }); + patternedLogger = new PatternedLogger(logger); } else { patternedLogger = new PatternedLogger(null); } try { String cmd = generateSparkCmd(config, hadoopConf, jars, kylinJobJar, appArgs); - patternedLogger.log("cmd: "); - patternedLogger.log(cmd); CliCommandExecutor exec = new CliCommandExecutor(); exec.execute(cmd, patternedLogger, jobId); diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index cd63ac7..6b60326 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -34,6 +34,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.request.JobListRequest; +import org.apache.kylin.rest.request.SparkJobUpdateRequest; import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.rest.response.ResponseCode; import org.apache.kylin.rest.service.JobService; @@ -45,9 +46,11 @@ import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.PutMapping; import javax.servlet.http.HttpServletResponse; @@ -199,6 +202,19 @@ public class JobController extends BasicController { } /** + * RPC Call + */ + @PutMapping(value = "/spark") + @ResponseBody + public EnvelopeResponse<String> updateSparkJobInfo(@RequestBody SparkJobUpdateRequest sparkJobUpdateRequest) { + jobService.updateSparkJobInfo(sparkJobUpdateRequest.getProject(), + sparkJobUpdateRequest.getTaskId(), + sparkJobUpdateRequest.getYarnAppUrl()); + + return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", ""); + } + + /** * Resume a cube job * * @return diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java new file mode 100644 index 0000000..ce4147f --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.request; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class SparkJobUpdateRequest { + @JsonProperty + private String project; + @JsonProperty + private String taskId; + @JsonProperty + private String yarnAppUrl; + + public String getProject() { + return project; + } + + public void setProject(String project) { + this.project = project; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public String getYarnAppUrl() { + return yarnAppUrl; + } + + public void setYarnAppUrl(String yarnAppUrl) { + this.yarnAppUrl = yarnAppUrl; + } +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 90ee782..e543c22 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -33,6 +33,7 @@ import java.util.TimeZone; import javax.annotation.Nullable; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; @@ -54,6 +55,7 @@ import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JobSearchResult; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.SchedulerFactory; +import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.dao.ExecutableOutputPO; @@ -467,6 +469,17 @@ public class JobService extends BasicService implements InitializingBean { } } + /** + * update the spark job yarnAppUrl. + */ + public void updateSparkJobInfo(String project, String taskId, String yarnAppUrl) { + ExecutableManager executableManager = getExecutableManager(); + Map<String, String> extraInfo = Maps.newHashMap(); + extraInfo.put(ExecutableConstants.YARN_APP_URL, yarnAppUrl); + + executableManager.updateJobOutput(project, taskId, null, extraInfo, null, null); + } + public JobInstance getJobInstance(String uuid) { AbstractExecutable job = getExecutableManager().getJob(uuid); if (job instanceof CheckpointExecutable) { diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml index 0003a5f..f73c595 100644 --- a/server/src/main/resources/kylinSecurity.xml +++ b/server/src/main/resources/kylinSecurity.xml @@ -249,6 +249,7 @@ <scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()"/> + <scr:intercept-url pattern="/api/jobs/spark" access="permitAll"/> <scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/> <scr:intercept-url pattern="/api/admin/version" access="permitAll"/>