Repository: kylin Updated Branches: refs/heads/2.0-rc 604268d9f -> 7dae0cf4a
getting mr job status with kerberos authentication Signed-off-by: fengyu <zhfen...@corp.netease.com> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7dae0cf4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7dae0cf4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7dae0cf4 Branch: refs/heads/2.0-rc Commit: 7dae0cf4ad65f69ece49df13f85301c9470cf763 Parents: 604268d Author: hzfen...@corp.netease.com <hzfen...@corp.netease.com> Authored: Wed Dec 30 12:09:22 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Jan 5 11:04:04 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 2 + core-common/pom.xml | 4 + .../apache/kylin/common/KylinConfigBase.java | 4 + .../engine/mr/common/HadoopStatusChecker.java | 6 +- .../engine/mr/common/HadoopStatusGetter.java | 104 ++++++++++++++++++- .../engine/mr/common/MapReduceExecutable.java | 3 +- pom.xml | 6 ++ 7 files changed, 124 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index f2170bd..d029b4c 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -82,6 +82,8 @@ kylin.hbase.region.cut.small=5 kylin.hbase.region.cut.medium=10 kylin.hbase.region.cut.large=50 +# whether get job status from resource manager with kerberos authentication +kylin.job.status.with.kerberos=false ## kylin security configurations http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/core-common/pom.xml ---------------------------------------------------------------------- diff --git a/core-common/pom.xml b/core-common/pom.xml index 07a7ee1..8c7edb1 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -69,6 +69,10 @@ <artifactId>commons-httpclient</artifactId> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5968411..ee11158 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -466,6 +466,10 @@ public class KylinConfigBase implements Serializable { public String getHiveDatabaseForIntermediateTable() { return this.getOptional("kylin.job.hive.database.for.intermediatetable", "default"); } + + public boolean isGetJobStatusWithKerberos() { + return Boolean.valueOf(this.getOptional("kylin.job.status.with.kerberos", "false")); + } public String getKylinOwner() { return this.getOptional("kylin.owner", ""); http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java index 1b71b92..ef45ed1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java @@ -39,11 +39,13 @@ public class HadoopStatusChecker { private final String yarnUrl; private final String mrJobID; private final StringBuilder output; + private final boolean useKerberosAuth; - public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) { + public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output, boolean useKerberosAuth) { this.yarnUrl = yarnUrl; this.mrJobID = mrJobID; this.output = output; + this.useKerberosAuth = useKerberosAuth; } public JobStepStatusEnum checkStatus() { @@ -53,7 +55,7 @@ public class HadoopStatusChecker { } JobStepStatusEnum status = null; try { - final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(); + final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth); logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java index fd7afd3..081070e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java @@ -24,10 +24,24 @@ import org.apache.commons.httpclient.HttpMethod; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.protocol.Protocol; import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -35,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.MalformedURLException; +import java.security.Principal; /** */ @@ -50,14 +65,99 @@ public class HadoopStatusGetter { this.mrJobId = mrJobId; } - public Pair<RMAppState, FinalApplicationStatus> get() throws IOException { + public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberosAuth) throws IOException { String applicationId = mrJobId.replace("job", "application"); String url = yarnUrl.replace("${job_id}", applicationId); - JsonNode root = new ObjectMapper().readTree(getHttpResponse(url)); + String response = useKerberosAuth ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); + logger.debug("Hadoop job " + mrJobId + " status : " + response); + JsonNode root = new ObjectMapper().readTree(response); RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue()); FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue()); return Pair.of(state, finalStatus); } + + private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; + private String getHttpResponseWithKerberosAuth(String url) throws IOException { + String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); + if(krb5ConfigPath == null) { + krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; + } + boolean skipPortAtKerberosDatabaseLookup = true; + System.setProperty("java.security.krb5.conf", krb5ConfigPath); + System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("javax.security.auth.useSubjectCredsOnly","false"); + Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)) + .build(); + CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); + HttpClientContext context = HttpClientContext.create(); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + Credentials useJaasCreds = new Credentials() { + public String getPassword() { + return null; + } + public Principal getUserPrincipal() { + return null; + } + }; + + credentialsProvider.setCredentials(new AuthScope(null, -1, null), useJaasCreds); + context.setCredentialsProvider(credentialsProvider); + String response = null; + while(response == null) { + if (url.startsWith("https://")) { + registerEasyHttps(); + } + if (url.contains("anonymous=true") == false) { + url = url.contains("?") ? "&" : "?"; + url = "anonymous=true"; + } + HttpGet httpget = new HttpGet(url); + httpget.addHeader("accept", "application/json"); + try { + CloseableHttpResponse httpResponse = client.execute(httpget,context); + String redirect = null; + org.apache.http.Header h = httpResponse.getFirstHeader("Location"); + if (h != null) { + redirect = h.getValue(); + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000l); + continue; + } + } else { + h = httpResponse.getFirstHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000l); + continue; + } + } + } + } + + if (redirect == null) { + response = IOUtils.toString(httpResponse.getEntity().getContent()); + logger.debug("Job " + mrJobId + " get status check result.\n"); + } else { + url = redirect; + logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); + } + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } finally { + httpget.releaseConnection(); + } + } + + return response; + } private String getHttpResponse(String url) throws IOException { HttpClient client = new HttpClient(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 1449fa4..585e0e1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -141,7 +141,8 @@ public class MapReduceExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); } String mrJobId = hadoopCmdOutput.getMrJobId(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output); + boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); + HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { JobStepStatusEnum newStatus = statusChecker.checkStatus(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7dae0cf4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7a709a6..0a7a764 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ <extendedset.version>1.3.4</extendedset.version> <jetty.version>9.2.7.v20150116</jetty.version> <kryo.version>3.0.3</kryo.version> + <apache-httpclient.version>4.5</apache-httpclient.version> <!-- REST Service --> <spring.framework.version>3.1.2.RELEASE</spring.framework.version> @@ -456,6 +457,11 @@ <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${apache-httpclient.version}</version> + </dependency> </dependencies> </dependencyManagement>