Repository: zeppelin Updated Branches: refs/heads/master 04d34547a -> 2fcfaa8c7
ZEPPELIN-1432. Support cancellation of paragraph execution ### What is this PR for? Livy 0.3 support cancel operation, this PR is to support cancel in livy interpreter. First we would check the livy version, then based on the livy version, we would call the livy rest api to cancel the statement. ### What type of PR is it? Improvement | Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1432 ### How should this be tested? Tested manually, because cancel is only avaible in livy 0.3 which is not released yet. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #1859 from zjffdu/ZEPPELIN-1432 and squashes the following commits: 83eaf83 [Jeff Zhang] minor update 200ca71 [Jeff Zhang] address comments 1cbeb26 [Jeff Zhang] add zeppelin.livy.pull_status.interval.millis 070fea0 [Jeff Zhang] ZEPPELIN-1432. Support cancellation of paragraph execution Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2fcfaa8c Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2fcfaa8c Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2fcfaa8c Branch: refs/heads/master Commit: 2fcfaa8c74cad5adf9adcdf76987e4ffbe5983c7 Parents: 04d3454 Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Jan 16 09:01:54 2017 +0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Mon Jan 16 21:01:53 2017 -0800 ---------------------------------------------------------------------- docs/interpreter/livy.md | 5 + .../zeppelin/livy/APINotFoundException.java | 43 +++++ .../zeppelin/livy/BaseLivyInterprereter.java | 156 +++++++++++++------ .../zeppelin/livy/LivySparkSQLInterpreter.java | 9 +- .../org/apache/zeppelin/livy/LivyVersion.java | 96 ++++++++++++ .../src/main/resources/interpreter-setting.json | 5 + 6 files changed, 264 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2fcfaa8c/docs/interpreter/livy.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index 067a317..47ebc46 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -66,6 +66,11 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory` <td>Whether to display app info</td> </tr> <tr> + <td>zeppelin.livy.pull_status.interval.millis</td> + <td>1000</td> + <td>The interval for checking paragraph execution status</td> + </tr> + <tr> <td>livy.spark.driver.cores</td> <td></td> <td>Driver cores. ex) 1, 2.</td> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2fcfaa8c/livy/src/main/java/org/apache/zeppelin/livy/APINotFoundException.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/APINotFoundException.java b/livy/src/main/java/org/apache/zeppelin/livy/APINotFoundException.java new file mode 100644 index 0000000..3c4b714 --- /dev/null +++ b/livy/src/main/java/org/apache/zeppelin/livy/APINotFoundException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.livy; + +/** + * APINotFoundException happens because we may introduce new apis in new livy version. + */ +public class APINotFoundException extends LivyException { + public APINotFoundException() { + } + + public APINotFoundException(String message) { + super(message); + } + + public APINotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public APINotFoundException(Throwable cause) { + super(cause); + } + + public APINotFoundException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2fcfaa8c/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 8ed4622..3d84363 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentHashMap; /** * Base class for livy interpreters. @@ -48,15 +49,23 @@ public abstract class BaseLivyInterprereter extends Interpreter { protected volatile SessionInfo sessionInfo; private String livyURL; - private long sessionCreationTimeout; + private int sessionCreationTimeout; + private int pullStatusInterval; protected boolean displayAppInfo; private AtomicBoolean sessionExpired = new AtomicBoolean(false); + private LivyVersion livyVersion; + + // keep tracking the mapping between paragraphId and statementId, so that we can cancel the + // statement after we execute it. + private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMapping = new ConcurrentHashMap<>(); public BaseLivyInterprereter(Properties property) { super(property); this.livyURL = property.getProperty("zeppelin.livy.url"); - this.sessionCreationTimeout = Long.parseLong( + this.sessionCreationTimeout = Integer.parseInt( property.getProperty("zeppelin.livy.create.session.timeout", 120 + "")); + this.pullStatusInterval = Integer.parseInt( + property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + "")); } public abstract String getSessionKind(); @@ -89,23 +98,33 @@ public abstract class BaseLivyInterprereter extends Interpreter { // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it // explicitly by ourselves. sessionInfo.appId = extractStatementResult( - interpret("sc.applicationId", false, false).message() + interpret("sc.applicationId", null, false, false).message() .get(0).getData()); } interpret( "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", - false, false); + null, false, false); if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) { sessionInfo.webUIAddress = extractStatementResult( interpret( - "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false) + "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false) .message().get(0).getData()); } else { sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl"); } LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}", sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress); + } else { + LOGGER.info("Create livy session successfully with sessionId: {}", this.sessionInfo.id); + } + // check livy version + try { + this.livyVersion = getLivyVersion(); + LOGGER.info("Use livy " + livyVersion); + } catch (APINotFoundException e) { + this.livyVersion = new LivyVersion("0.2.0"); + LOGGER.info("Use livy 0.2.0"); } } @@ -120,7 +139,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { } try { - return interpret(st, this.displayAppInfo, true); + return interpret(st, context.getParagraphId(), this.displayAppInfo, true); } catch (LivyException e) { LOGGER.error("Fail to interpret:" + st, e); return new InterpreterResult(InterpreterResult.Code.ERROR, @@ -148,7 +167,21 @@ public abstract class BaseLivyInterprereter extends Interpreter { @Override public void cancel(InterpreterContext context) { - //TODO(zjffdu). Use livy cancel api which is available in livy 0.3 + if (livyVersion.isCancelSupported()) { + String paraId = context.getParagraphId(); + Integer stmtId = paragraphId2StmtIdMapping.get(paraId); + try { + if (stmtId != null) { + cancelStatement(stmtId); + } + } catch (LivyException e) { + LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e); + } finally { + paragraphId2StmtIdMapping.remove(paraId); + } + } else { + LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion); + } } @Override @@ -192,7 +225,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { LOGGER.error(msg); throw new LivyException(msg); } - Thread.sleep(1000); + Thread.sleep(pullStatusInterval); sessionInfo = getSessionInfo(sessionInfo.id); } return sessionInfo; @@ -206,44 +239,58 @@ public abstract class BaseLivyInterprereter extends Interpreter { return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET")); } - public InterpreterResult interpret(String code, boolean displayAppInfo, - boolean appendSessionExpired) - throws LivyException { + public InterpreterResult interpret(String code, + String paragraphId, + boolean displayAppInfo, + boolean appendSessionExpired) throws LivyException { StatementInfo stmtInfo = null; boolean sessionExpired = false; try { - stmtInfo = executeStatement(new ExecuteRequest(code)); - } catch (SessionNotFoundException e) { - LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id); - sessionExpired = true; - // we don't want to create multiple sessions because it is possible to have multiple thread - // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need - // to check session status again in this sync block - synchronized (this) { - if (isSessionExpired()) { - initLivySession(); + try { + stmtInfo = executeStatement(new ExecuteRequest(code)); + } catch (SessionNotFoundException e) { + LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id); + sessionExpired = true; + // we don't want to create multiple sessions because it is possible to have multiple thread + // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need + // to check session status again in this sync block + synchronized (this) { + if (isSessionExpired()) { + initLivySession(); + } } + stmtInfo = executeStatement(new ExecuteRequest(code)); } - stmtInfo = executeStatement(new ExecuteRequest(code)); - } - // pull the statement status - while (!stmtInfo.isAvailable()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOGGER.error("InterruptedException when pulling statement status.", e); - throw new LivyException(e); + if (paragraphId != null) { + paragraphId2StmtIdMapping.put(paragraphId, stmtInfo.id); + } + // pull the statement status + while (!stmtInfo.isAvailable()) { + try { + Thread.sleep(pullStatusInterval); + } catch (InterruptedException e) { + LOGGER.error("InterruptedException when pulling statement status.", e); + throw new LivyException(e); + } + stmtInfo = getStatementInfo(stmtInfo.id); + } + if (appendSessionExpired) { + return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo), + sessionExpired); + } else { + return getResultFromStatementInfo(stmtInfo, displayAppInfo); + } + } finally { + if (paragraphId != null) { + paragraphId2StmtIdMapping.remove(paragraphId); } - stmtInfo = getStatementInfo(stmtInfo.id); - } - if (appendSessionExpired) { - return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo), - sessionExpired); - } else { - return getResultFromStatementInfo(stmtInfo, displayAppInfo); } } + private LivyVersion getLivyVersion() throws LivyException { + return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version)); + } + private boolean isSessionExpired() throws LivyException { try { getSessionInfo(sessionInfo.id); @@ -270,6 +317,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { } } + private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo, boolean displayAppInfo) { if (stmtInfo.output.isError()) { @@ -341,6 +389,10 @@ public abstract class BaseLivyInterprereter extends Interpreter { callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET")); } + private void cancelStatement(int statementId) throws LivyException { + callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST"); + } + private RestTemplate getRestTemplate() { String keytabLocation = property.getProperty("zeppelin.livy.keytab"); String principal = property.getProperty("zeppelin.livy.principal"); @@ -385,21 +437,20 @@ public abstract class BaseLivyInterprereter extends Interpreter { LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(), response.getBody()); if (response.getStatusCode().value() == 200 - || response.getStatusCode().value() == 201 - || response.getStatusCode().value() == 404) { - String responseBody = response.getBody(); - if (responseBody.matches("\"Session '\\d+' not found.\"")) { - throw new SessionNotFoundException(responseBody); + || response.getStatusCode().value() == 201) { + return response.getBody(); + } else if (response.getStatusCode().value() == 404) { + if (response.getBody().matches("Session '\\d+' not found.")) { + throw new SessionNotFoundException(response.getBody()); } else { - return responseBody; + throw new APINotFoundException("No rest api found for " + targetURL + + ", " + response.getStatusCode()); } } else { String responseString = response.getBody(); if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) { return responseString; } - LOGGER.error(String.format("Error with %s StatusCode: %s", - response.getStatusCode().value(), responseString)); throw new LivyException(String.format("Error with %s StatusCode: %s", response.getStatusCode().value(), responseString)); } @@ -502,7 +553,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { } public boolean isAvailable() { - return state.equals("available"); + return state.equals("available") || state.equals("cancelled"); } private static class StatementOutput { @@ -543,4 +594,17 @@ public abstract class BaseLivyInterprereter extends Interpreter { } } + private static class LivyVersionResponse { + public String url; + public String branch; + public String revision; + public String version; + public String date; + public String user; + + public static LivyVersionResponse fromJson(String json) { + return gson.fromJson(json, LivyVersionResponse.class); + } + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2fcfaa8c/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 0e78860..471d199 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -51,7 +51,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession // to judge whether it is using spark2. try { - InterpreterResult result = sparkInterpreter.interpret("spark", false, false); + InterpreterResult result = sparkInterpreter.interpret("spark", null, false, false); if (result.code() == InterpreterResult.Code.SUCCESS && result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) { LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}", @@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { isSpark2 = true; } else { // spark 1.x - result = sparkInterpreter.interpret("sqlContext", false, false); + result = sparkInterpreter.interpret("sqlContext", null, false, false); if (result.code() == InterpreterResult.Code.SUCCESS) { LOGGER.info("sqlContext is detected."); } else if (result.code() == InterpreterResult.Code.ERROR) { @@ -68,7 +68,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves"); result = sparkInterpreter.interpret( "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" - + "import sqlContext.implicits._", false, false); + + "import sqlContext.implicits._", null, false, false); if (result.code() == InterpreterResult.Code.ERROR) { throw new LivyException("Fail to create SQLContext," + result.message().get(0).getData()); @@ -113,7 +113,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { } else { sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; } - InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true); + InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(), + this.displayAppInfo, true); if (result.code() == InterpreterResult.Code.SUCCESS) { InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2fcfaa8c/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java new file mode 100644 index 0000000..1b7fe30 --- /dev/null +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java @@ -0,0 +1,96 @@ +package org.apache.zeppelin.livy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide reading comparing capability of livy version + */ +public class LivyVersion { + private static final Logger logger = LoggerFactory.getLogger(LivyVersion.class); + + private static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0"); + private static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0"); + + private int version; + private String versionString; + + LivyVersion(String versionString) { + this.versionString = versionString; + + try { + int pos = versionString.indexOf('-'); + + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + + String versions[] = numberPart.split("\\."); + int major = Integer.parseInt(versions[0]); + int minor = Integer.parseInt(versions[1]); + int patch = Integer.parseInt(versions[2]); + // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602) + version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch)); + } catch (Exception e) { + logger.error("Can not recognize Livy version " + versionString + + ". Assume it's a future release", e); + + // assume it is future release + version = 99999; + } + } + + public int toNumber() { + return version; + } + + public String toString() { + return versionString; + } + + public static LivyVersion fromVersionString(String versionString) { + return new LivyVersion(versionString); + } + + public boolean isCancelSupported() { + return this.newerThanEquals(LIVY_0_3_0); + } + + public boolean equals(Object versionToCompare) { + return version == ((LivyVersion) versionToCompare).version; + } + + public boolean newerThan(LivyVersion versionToCompare) { + return version > versionToCompare.version; + } + + public boolean newerThanEquals(LivyVersion versionToCompare) { + return version >= versionToCompare.version; + } + + public boolean olderThan(LivyVersion versionToCompare) { + return version < versionToCompare.version; + } + + public boolean olderThanEquals(LivyVersion versionToCompare) { + return version <= versionToCompare.version; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2fcfaa8c/livy/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json index 4be537a..a2b9758 100644 --- a/livy/src/main/resources/interpreter-setting.json +++ b/livy/src/main/resources/interpreter-setting.json @@ -77,6 +77,11 @@ "defaultValue": "", "description": "Kerberos keytab to authenticate livy" }, + "zeppelin.livy.pull_status.interval.millis": { + "propertyName": "zeppelin.livy.pull_status.interval.millis", + "defaultValue": "1000", + "description": "The interval for checking paragraph execution status" + }, "livy.spark.jars.packages": { "propertyName": "livy.spark.jars.packages", "defaultValue": "",