[ZEPPELIN-2075] Can't stop infinite `while` statement in pyspark Interpreter.
### What is this PR for? If following code runs with Pyspark Interpreter, there is no way to cancel except Zeppelin Server restart. ``` %spark.pyspark import time while True: time.sleep(1) print("running..") ``` ### What type of PR is it? Bug Fix | Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-2075 ### How should this be tested? Run above code with Pyspark Interpreter and try to cancel. ### Screenshots (if appropriate) - before  - after  ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: astroshim <hss...@zepl.com> Closes #1985 from astroshim/ZEPPELIN-2075 and squashes the following commits: 84bf09a [astroshim] fix testcase bc12eaa [astroshim] pass pid to java b60d89a [astroshim] Merge branch 'master' into ZEPPELIN-2075 f26eacf [astroshim] add test-case for canceling. c0cac4e [astroshim] fix logging 678c183 [astroshim] remove signal handler 65d8cc6 [astroshim] init python pid variable 6731e56 [astroshim] add signal to cancel job (cherry picked from commit 9f22db91c279b7daf6a13b2d805a874074b070fd) Signed-off-by: Jongyoul Lee <jongy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/730784ba Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/730784ba Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/730784ba Branch: refs/heads/branch-0.7 Commit: 730784bab1004e5ecf6d938b26380c3cd4ca6d1f Parents: a90004b Author: astroshim <hss...@zepl.com> Authored: Sun Feb 19 00:36:45 2017 +0900 Committer: Jongyoul Lee <jongy...@apache.org> Committed: Tue Mar 14 15:32:53 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 20 ++++++++++++- .../main/resources/python/zeppelin_pyspark.py | 2 +- .../zeppelin/spark/PySparkInterpreterTest.java | 30 ++++++++++++++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/730784ba/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 5a8e040..371578c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -73,10 +73,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private String scriptPath; boolean pythonscriptRunning = false; private static final int MAX_TIMEOUT_SEC = 10; + private long pythonPid; public PySparkInterpreter(Properties property) { super(property); + pythonPid = -1; try { File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py"); scriptPath = scriptFile.getAbsolutePath(); @@ -310,7 +312,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand boolean pythonScriptInitialized = false; Integer pythonScriptInitializeNotifier = new Integer(0); - public void onPythonScriptInitialized() { + public void onPythonScriptInitialized(long pid) { + pythonPid = pid; synchronized (pythonScriptInitializeNotifier) { pythonScriptInitialized = true; pythonScriptInitializeNotifier.notifyAll(); @@ -411,10 +414,25 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + public void interrupt() throws IOException { + if (pythonPid > -1) { + logger.info("Sending SIGINT signal to PID : " + pythonPid); + Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); + } else { + logger.warn("Non UNIX/Linux system, close the interpreter"); + close(); + } + } + @Override public void cancel(InterpreterContext context) { SparkInterpreter sparkInterpreter = getSparkInterpreter(); sparkInterpreter.cancel(context); + try { + interrupt(); + } catch (IOException e) { + logger.error("Error", e); + } } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/730784ba/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index c59d2f4..d9c68c2 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -252,7 +252,7 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point -intp.onPythonScriptInitialized() +intp.onPythonScriptInitialized(os.getpid()) jsc = intp.getJavaSparkContext() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/730784ba/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 35b876d..55c405d 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -36,6 +36,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.junit.Assert.*; @@ -121,4 +123,32 @@ public class PySparkInterpreterTest { assertTrue(completions.size() > 0); } } + + private class infinityPythonJob implements Runnable { + @Override + public void run() { + String code = "import time\nwhile True:\n time.sleep(1)" ; + InterpreterResult ret = pySparkInterpreter.interpret(code, context); + assertNotNull(ret); + Pattern expectedMessage = Pattern.compile("KeyboardInterrupt"); + Matcher m = expectedMessage.matcher(ret.message().toString()); + assertTrue(m.find()); + } + } + + @Test + public void testCancelIntp() throws InterruptedException { + if (getSparkVersionNumber() > 11) { + assertEquals(InterpreterResult.Code.SUCCESS, + pySparkInterpreter.interpret("a = 1\n", context).code()); + + Thread t = new Thread(new infinityPythonJob()); + t.start(); + Thread.sleep(5000); + pySparkInterpreter.cancel(context); + assertTrue(t.isAlive()); + t.join(2000); + assertFalse(t.isAlive()); + } + } }