Repository: zeppelin Updated Branches: refs/heads/branch-0.7 22fecd3e5 -> d742b7e68
[Branch-0.7] Support for Zeppelin Context redefinition on Python Fixes a build problem in Zeppelin branch0.7. Hotfix. same pr #2207 Author: CloverHearts <cloverhearts...@gmail.com> Closes #2257 from cloverhearts/fix/branch-0.7-build-hotfix and squashes the following commits: 2da0b423b [CloverHearts] fix branch-0.7 build error a3a615c3b [CloverHearts] [Zeppelin-802] Support for Zeppelin Context redefinition on Python and Pyspark Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d742b7e6 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d742b7e6 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d742b7e6 Branch: refs/heads/branch-0.7 Commit: d742b7e68ea8b8877abd3533d98d3e92515c9136 Parents: 22fecd3 Author: CloverHearts <cloverhearts...@gmail.com> Authored: Mon Apr 17 22:39:08 2017 +0900 Committer: Jongyoul Lee <jongy...@apache.org> Committed: Mon Apr 17 23:17:53 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/python/PythonInterpreter.java | 2 +- .../python/PythonInterpreterPandasSql.java | 3 +- .../main/resources/python/zeppelin_python.py | 16 +++++--- .../zeppelin/python/PythonInterpreterTest.java | 13 ++++++ .../zeppelin/spark/PySparkInterpreter.java | 8 ++-- .../main/resources/python/zeppelin_pyspark.py | 42 ++++++++++++++------ .../zeppelin/spark/PySparkInterpreterTest.java | 15 +++++++ 7 files changed, 75 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index df62406..7f6a7eb 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -222,7 +222,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl // Add matplotlib display hook InterpreterGroup intpGroup = getInterpreterGroup(); if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "z._displayhook()"); + registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()"); } // Add matplotlib display hook try { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java index 6bf1970..e73d7b3 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java @@ -87,7 +87,8 @@ public class PythonInterpreterPandasSql extends Interpreter { LOG.info("Running SQL query: '{}' over Pandas DataFrame", st); Interpreter python = getPythonInterpreter(); - return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context); + return python.interpret( + "__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/main/resources/python/zeppelin_python.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py index a537c5d..31b993d 100644 --- a/python/src/main/resources/python/zeppelin_python.py +++ b/python/src/main/resources/python/zeppelin_python.py @@ -195,6 +195,7 @@ host = "127.0.0.1" if len(sys.argv) >= 3: host = sys.argv[2] +_zcUserQueryNameSpace = {} client = GatewayClient(address=host, port=int(sys.argv[1])) #gateway = JavaGateway(client, auto_convert = True) @@ -204,8 +205,11 @@ intp = gateway.entry_point intp.onPythonScriptInitialized(os.getpid()) java_import(gateway.jvm, "org.apache.zeppelin.display.Input") -z = PyZeppelinContext(intp) -z._setup_matplotlib() +z = __zeppelin__ = PyZeppelinContext(intp) +__zeppelin__._setup_matplotlib() + +_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ +_zcUserQueryNameSpace["z"] = z output = Logger() sys.stdout = output @@ -227,7 +231,7 @@ while True : global_hook = None try: - user_hook = z.getHook('post_exec') + user_hook = __zeppelin__.getHook('post_exec') except: user_hook = None @@ -263,17 +267,17 @@ while True : for node in to_run_exec: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_single: mod = ast.Interactive([node]) code = compile(mod, '<stdin>', 'single') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_hooks: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) except: raise Exception(traceback.format_exc()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java index b5cd680..837626c 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java @@ -106,6 +106,19 @@ public class PythonInterpreterTest implements InterpreterOutputListener { assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("hi\nhi\nhi")); } + @Test + public void testRedefinitionZeppelinContext() { + String pyRedefinitionCode = "z = 1\n"; + String pyRestoreCode = "z = __zeppelin__\n"; + String pyValidCode = "z.input(\"test\")\n"; + + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyValidCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyRedefinitionCode, context).code()); + assertEquals(InterpreterResult.Code.ERROR, pythonInterpreter.interpret(pyValidCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyRestoreCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyValidCode, context).code()); + } + @Override public void onUpdateAll(InterpreterOutput out) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 371578c..3ab5676 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -113,7 +113,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand // Add matplotlib display hook InterpreterGroup intpGroup = getInterpreterGroup(); if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "z._displayhook()"); + registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()"); } DepInterpreter depInterpreter = getDepInterpreter(); @@ -381,9 +381,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return new InterpreterResult(Code.ERROR, errorMessage); } String jobGroup = sparkInterpreter.getJobGroup(context); - ZeppelinContext z = sparkInterpreter.getZeppelinContext(); - z.setInterpreterContext(context); - z.setGui(context.getGui()); + ZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext(); + __zeppelin__.setInterpreterContext(context); + __zeppelin__.setGui(context.getGui()); pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup); statementOutput = null; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 6c39400..5029d59 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -271,19 +271,37 @@ else: java_import(gateway.jvm, "scala.Tuple2") +_zcUserQueryNameSpace = {} + jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) -sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) +sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) +_zcUserQueryNameSpace["_zsc_"] = _zsc_ +_zcUserQueryNameSpace["sc"] = sc + if sparkVersion.isSpark2(): - spark = SparkSession(sc, intp.getSparkSession()) - sqlc = spark._wrapped + spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) + sqlc = __zSqlc__ = __zSpark__._wrapped + _zcUserQueryNameSpace["sqlc"] = sqlc + _zcUserQueryNameSpace["__zSqlc__"] = __zSqlc__ + _zcUserQueryNameSpace["spark"] = spark + _zcUserQueryNameSpace["__zSpark__"] = __zSpark__ else: - sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) -sqlContext = sqlc + sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) + _zcUserQueryNameSpace["sqlc"] = sqlc + _zcUserQueryNameSpace["__zSqlc__"] = sqlc + +sqlContext = __zSqlc__ +_zcUserQueryNameSpace["sqlContext"] = sqlContext + +completion = __zeppelin_completion__ = PySparkCompletion(intp) +_zcUserQueryNameSpace["completion"] = completion +_zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__ -completion = PySparkCompletion(intp) -z = PyZeppelinContext(intp.getZeppelinContext()) -z._setup_matplotlib() +z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) +__zeppelin__._setup_matplotlib() +_zcUserQueryNameSpace["z"] = z +_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ while True : req = intp.getStatements() @@ -299,7 +317,7 @@ while True : global_hook = None try: - user_hook = z.getHook('post_exec') + user_hook = __zeppelin__.getHook('post_exec') except: user_hook = None @@ -334,17 +352,17 @@ while True : for node in to_run_exec: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_single: mod = ast.Interactive([node]) code = compile(mod, '<stdin>', 'single') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_hooks: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) except: raise Exception(traceback.format_exc()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 3697512..d47a8bd 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -123,6 +123,21 @@ public class PySparkInterpreterTest { } } + @Test + public void testRedefinitionZeppelinContext() { + if (getSparkVersionNumber() > 11) { + String redefinitionCode = "z = 1\n"; + String restoreCode = "z = __zeppelin__\n"; + String validCode = "z.input(\"test\")\n"; + + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(redefinitionCode, context).code()); + assertEquals(InterpreterResult.Code.ERROR, pySparkInterpreter.interpret(validCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(restoreCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code()); + } + } + private class infinityPythonJob implements Runnable { @Override public void run() {