Repository: kylin
Updated Branches:
  refs/heads/master 3152264c1 -> 07cf14282


KYLIN-2562 Allow configuring yarn app tracking URL pattern


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07cf1428
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07cf1428
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07cf1428

Branch: refs/heads/master
Commit: 07cf142820ed2033d9af4278c8f4166c1815bd0d
Parents: 3152264
Author: shaofengshi <shaofeng...@apache.org>
Authored: Mon Apr 24 16:54:53 2017 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Apr 24 18:15:26 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  3 +++
 .../kylin/job/common/PatternedLogger.java       |  8 ++++++
 .../kylin/job/execution/ExecutableManager.java  | 26 ++++++++++++++++++++
 3 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/07cf1428/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bf6cdb8..5490e93 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -989,4 +989,7 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getResourceStoreImpls().get(key);
     }
 
+    public String getJobTrackingURLPattern() {
+        return getOptional("kylin.job.tracking-url-pattern", "");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/07cf1428/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java 
b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 8399b44..8be5d02 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -45,6 +45,8 @@ public class PatternedLogger extends BufferedLogger {
     private static final Pattern PATTERN_HIVE_APP_ID_URL = 
Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
     private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = 
Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
 
+    private static final Pattern PATTERN_HIVE_APP_ID_URL_2 = 
Pattern.compile("Executing on YARN cluster with App id  (.*?)");
+
     // spark
     private static final Pattern PATTERN_SPARK_APP_ID = 
Pattern.compile("Submitted application (.*?)");
     private static final Pattern PATTERN_SPARK_APP_URL = 
Pattern.compile("tracking URL: (.*)");
@@ -100,6 +102,12 @@ public class PatternedLogger extends BufferedLogger {
             String trackingUrl = matcher.group(2);
             info.put(ExecutableConstants.MR_JOB_ID, jobId);
             info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+        } else {
+            matcher = PATTERN_HIVE_APP_ID_URL_2.matcher(message);
+            if (matcher.find()) {
+                String jobId = matcher.group(1);
+                info.put(ExecutableConstants.YARN_APP_ID, jobId);
+            }
         }
 
         matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);

http://git-wip-us.apache.org/repos/asf/kylin/blob/07cf1428/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 0c86d72..2272582 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -20,13 +20,16 @@ package org.apache.kylin.job.execution;
 
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
+import java.util.IllegalFormatException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
@@ -39,6 +42,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import static org.apache.kylin.job.constant.ExecutableConstants.MR_JOB_ID;
+import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
+import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
+
 /**
  */
 public class ExecutableManager {
@@ -412,6 +419,25 @@ public class ExecutableManager {
         if (info == null) {
             return;
         }
+
+        // post process
+        if (info.containsKey(MR_JOB_ID) && 
!info.containsKey(ExecutableConstants.YARN_APP_ID)) {
+            String jobId = info.get(MR_JOB_ID);
+            if (jobId.startsWith("job_")) {
+                info.put(YARN_APP_ID, jobId.replace("job_", "application_"));
+            }
+        }
+
+        if (info.containsKey(YARN_APP_ID) && 
!StringUtils.isEmpty(config.getJobTrackingURLPattern())) {
+            String pattern = config.getJobTrackingURLPattern();
+            try {
+                String newTrackingURL = String.format(pattern, 
info.get(YARN_APP_ID));
+                info.put(YARN_APP_URL, newTrackingURL);
+            } catch (IllegalFormatException ife) {
+                logger.error("Illegal tracking url pattern: " + 
config.getJobTrackingURLPattern());
+            }
+        }
+
         try {
             ExecutableOutputPO output = executableDao.getJobOutput(id);
             Preconditions.checkArgument(output != null, "there is no related 
output for job id:" + id);

Reply via email to