Repository: kylin
Updated Branches:
  refs/heads/2.0-rc 604268d9f -> 7dae0cf4a


getting mr job status with kerberos authentication

Signed-off-by: fengyu <zhfen...@corp.netease.com>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7dae0cf4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7dae0cf4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7dae0cf4

Branch: refs/heads/2.0-rc
Commit: 7dae0cf4ad65f69ece49df13f85301c9470cf763
Parents: 604268d
Author: hzfen...@corp.netease.com <hzfen...@corp.netease.com>
Authored: Wed Dec 30 12:09:22 2015 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue Jan 5 11:04:04 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |   2 +
 core-common/pom.xml                             |   4 +
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../engine/mr/common/HadoopStatusChecker.java   |   6 +-
 .../engine/mr/common/HadoopStatusGetter.java    | 104 ++++++++++++++++++-
 .../engine/mr/common/MapReduceExecutable.java   |   3 +-
 pom.xml                                         |   6 ++
 7 files changed, 124 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index f2170bd..d029b4c 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -82,6 +82,8 @@ kylin.hbase.region.cut.small=5
 kylin.hbase.region.cut.medium=10
 kylin.hbase.region.cut.large=50
 
+# whether get job status from resource manager with kerberos authentication
+kylin.job.status.with.kerberos=false
 
 ## kylin security configurations
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/core-common/pom.xml
----------------------------------------------------------------------
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 07a7ee1..8c7edb1 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -69,6 +69,10 @@
             <artifactId>commons-httpclient</artifactId>
         </dependency>
         <dependency>
+               <groupId>org.apache.httpcomponents</groupId>
+               <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5968411..ee11158 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -466,6 +466,10 @@ public class KylinConfigBase implements Serializable {
     public String getHiveDatabaseForIntermediateTable() {
         return 
this.getOptional("kylin.job.hive.database.for.intermediatetable", "default");
     }
+    
+    public boolean isGetJobStatusWithKerberos() {
+        return 
Boolean.valueOf(this.getOptional("kylin.job.status.with.kerberos", "false"));
+    }
 
     public String getKylinOwner() {
         return this.getOptional("kylin.owner", "");

http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
index 1b71b92..ef45ed1 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java
@@ -39,11 +39,13 @@ public class HadoopStatusChecker {
     private final String yarnUrl;
     private final String mrJobID;
     private final StringBuilder output;
+    private final boolean useKerberosAuth;
 
-    public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder 
output) {
+    public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder 
output, boolean useKerberosAuth) {
         this.yarnUrl = yarnUrl;
         this.mrJobID = mrJobID;
         this.output = output;
+        this.useKerberosAuth = useKerberosAuth;
     }
 
     public JobStepStatusEnum checkStatus() {
@@ -53,7 +55,7 @@ public class HadoopStatusChecker {
         }
         JobStepStatusEnum status = null;
         try {
-            final Pair<RMAppState, FinalApplicationStatus> result = new 
HadoopStatusGetter(yarnUrl, mrJobID).get();
+            final Pair<RMAppState, FinalApplicationStatus> result = new 
HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth);
             logger.debug("State of Hadoop job: " + mrJobID + ":" + 
result.getLeft() + "-" + result.getRight());
             output.append(new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + 
result.getLeft() + " - " + result.getRight() + "\n");
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
index fd7afd3..081070e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java
@@ -24,10 +24,24 @@ import org.apache.commons.httpclient.HttpMethod;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.protocol.Protocol;
 import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.http.auth.AuthSchemeProvider;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.config.AuthSchemes;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Lookup;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -35,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.security.Principal;
 
 /**
  */
@@ -50,14 +65,99 @@ public class HadoopStatusGetter {
         this.mrJobId = mrJobId;
     }
 
-    public Pair<RMAppState, FinalApplicationStatus> get() throws IOException {
+    public Pair<RMAppState, FinalApplicationStatus> get(boolean 
useKerberosAuth) throws IOException {
         String applicationId = mrJobId.replace("job", "application");
         String url = yarnUrl.replace("${job_id}", applicationId);
-        JsonNode root = new ObjectMapper().readTree(getHttpResponse(url));
+        String response = useKerberosAuth ? 
getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); 
+        logger.debug("Hadoop job " + mrJobId + " status : " + response);
+        JsonNode root = new ObjectMapper().readTree(response);
         RMAppState state = 
RMAppState.valueOf(root.findValue("state").getTextValue());
         FinalApplicationStatus finalStatus = 
FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue());
         return Pair.of(state, finalStatus);
     }
+    
+    private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf";
+    private String getHttpResponseWithKerberosAuth(String url) throws 
IOException {
+       String krb5ConfigPath = System.getProperty("java.security.krb5.conf");
+       if(krb5ConfigPath == null) {
+               krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION;
+       }
+       boolean skipPortAtKerberosDatabaseLookup = true;
+       System.setProperty("java.security.krb5.conf", krb5ConfigPath);
+       System.setProperty("sun.security.krb5.debug", "true");
+       System.setProperty("javax.security.auth.useSubjectCredsOnly","false");
+       Lookup<AuthSchemeProvider> authSchemeRegistry = 
RegistryBuilder.<AuthSchemeProvider>create()
+                .register(AuthSchemes.SPNEGO, new 
SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup))
+                .build();
+       CloseableHttpClient client = 
HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build();
+       HttpClientContext context = HttpClientContext.create();
+        BasicCredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+        Credentials useJaasCreds = new Credentials() {
+               public String getPassword() {
+                       return null;
+               }
+               public Principal getUserPrincipal() {
+                       return null;
+               }
+        };
+        
+        credentialsProvider.setCredentials(new AuthScope(null, -1, null), 
useJaasCreds);
+        context.setCredentialsProvider(credentialsProvider);
+        String response = null;
+        while(response == null) {
+            if (url.startsWith("https://";)) {
+                registerEasyHttps();
+            }
+            if (url.contains("anonymous=true") == false) {
+                url = url.contains("?") ? "&" : "?";
+                url = "anonymous=true";
+            }
+            HttpGet httpget = new HttpGet(url);
+            httpget.addHeader("accept", "application/json");
+            try {
+                CloseableHttpResponse httpResponse = 
client.execute(httpget,context);
+                String redirect = null;
+                org.apache.http.Header h = 
httpResponse.getFirstHeader("Location");
+                if (h != null) {
+                       redirect = h.getValue();
+                    if (isValidURL(redirect) == false) {
+                        logger.info("Get invalid redirect url, skip it: " + 
redirect);
+                        Thread.sleep(1000l);
+                        continue;
+                    }
+                } else {
+                       h = httpResponse.getFirstHeader("Refresh");
+                       if (h != null) {
+                        String s = h.getValue();
+                        int cut = s.indexOf("url=");
+                        if (cut >= 0) {
+                            redirect = s.substring(cut + 4);
+
+                            if (isValidURL(redirect) == false) {
+                                logger.info("Get invalid redirect url, skip 
it: " + redirect);
+                                Thread.sleep(1000l);
+                                continue;
+                            }
+                        }
+                    }
+                }
+    
+                if (redirect == null) {
+                    response = 
IOUtils.toString(httpResponse.getEntity().getContent());
+                    logger.debug("Job " + mrJobId + " get status check 
result.\n");
+                } else {
+                    url = redirect;
+                    logger.debug("Job " + mrJobId + " check redirect url " + 
url + ".\n");
+                } 
+            } catch (InterruptedException e) {
+               logger.error(e.getMessage());
+            } finally {
+                httpget.releaseConnection();
+            }
+        }
+        
+        return response;
+    }
 
     private String getHttpResponse(String url) throws IOException {
         HttpClient client = new HttpClient();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 1449fa4..585e0e1 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -141,7 +141,8 @@ public class MapReduceExecutable extends AbstractExecutable 
{
                 return new ExecuteResult(ExecuteResult.State.ERROR, 
"restStatusCheckUrl is null");
             }
             String mrJobId = hadoopCmdOutput.getMrJobId();
-            HadoopStatusChecker statusChecker = new 
HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
+            boolean useKerberosAuth = 
context.getConfig().isGetJobStatusWithKerberos();
+            HadoopStatusChecker statusChecker = new 
HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth);
             JobStepStatusEnum status = JobStepStatusEnum.NEW;
             while (!isDiscarded()) {
                 JobStepStatusEnum newStatus = statusChecker.checkStatus();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a709a6..0a7a764 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
         <extendedset.version>1.3.4</extendedset.version>
         <jetty.version>9.2.7.v20150116</jetty.version>
         <kryo.version>3.0.3</kryo.version>
+        <apache-httpclient.version>4.5</apache-httpclient.version>
 
         <!-- REST Service -->
         <spring.framework.version>3.1.2.RELEASE</spring.framework.version>
@@ -456,6 +457,11 @@
                 <artifactId>curator-recipes</artifactId>
                 <version>${curator.version}</version>
             </dependency>
+               <dependency>    
+                       <groupId>org.apache.httpcomponents</groupId>
+                       <artifactId>httpclient</artifactId>
+                       <version>${apache-httpclient.version}</version>
+               </dependency>
 
         </dependencies>
     </dependencyManagement>

Reply via email to