This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 62d987a  [ZEPPELIN-4714]. Flink table api doesn't work in multiple 
threads
62d987a is described below

commit 62d987a603226ae8bbe9077207884e65e5b067c3
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Apr 2 11:43:38 2020 +0800

    [ZEPPELIN-4714]. Flink table api doesn't work in multiple threads
    
    ### What is this PR for?
    This PR is to fix the issue of FLINK-16936 by a workaround, already 
creating tableenv before execution scala or python code. Building tablenv is 
pretty light which won't cost much time. So it is acceptable for this 
workaround. Another this PR try to fix is the ClassLoader issue for 
PyFlinkInterpreter. This PR will always set classloader before executing python 
code so that pyflink api can call udf defined in scala.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4714
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3715 from zjffdu/ZEPPELIN-4714 and squashes the following commits:
    
    55d613576 [Jeff Zhang] [ZEPPELIN-4714]. Flink table api doesn't work in 
multiple threads
    
    (cherry picked from commit 7bd8e288b79033eded10ecbe075b4c938979bb68)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .travis.yml                                        |   1 +
 flink/pom.xml                                      |   5 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |   7 ++
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  46 +++++--
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |  54 ++++++--
 .../org/apache/zeppelin/flink/TableEnvFactory.java |  14 +++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  21 +++-
 .../flink/FlinkBatchSqlInterpreterTest.java        |   2 +-
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    | 139 ++++++++++++++++-----
 .../zeppelin/flink/PyFlinkInterpreterTest.java     |  17 +--
 flink/src/test/resources/log4j.properties          |   3 +-
 flink/src/test/resources/log4j2.properties         |   2 +-
 .../apache/zeppelin/python/PythonInterpreter.java  |   6 +-
 .../zeppelin/python/IPythonInterpreterTest.java    |   2 +-
 .../zeppelin/jupyter/JupyterKernelInterpreter.java |   2 -
 15 files changed, 246 insertions(+), 75 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 5082aac..d607a44 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -221,3 +221,4 @@ after_failure:
   - cat 
livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
   - cat 
livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
   - cat 
zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
+  - cat flink/*.log
diff --git a/flink/pom.xml b/flink/pom.xml
index 21ec182..e5dca7b 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -637,7 +637,10 @@
           <skip>false</skip>
           <forkCount>1</forkCount>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx3072m -XX:MaxPermSize=256m </argLine>
+          <!-- set sun.zip.disableMemoryMapping=true because of
+          https://blogs.oracle.com/poonam/crashes-in-zipgetentry
+          https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
+          <argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m 
-Dsun.zip.disableMemoryMapping=true</argLine>
 
           <environmentVariables>
             
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index b089b36..af4de3d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -137,6 +137,13 @@ public class FlinkInterpreter extends Interpreter {
     return this.innerIntp.getDefaultSqlParallelism();
   }
 
+  /**
+   * Workaround for issue of FLINK-16936.
+   */
+  public void createPlannerAgain() {
+    this.innerIntp.createPlannerAgain();
+  }
+
   public ClassLoader getFlinkScalaShellLoader() {
     return innerIntp.getFlinkScalaShellLoader();
   }
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 970f6cf..5564a57 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -40,6 +40,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
   private FlinkInterpreter flinkInterpreter;
   private InterpreterContext curInterpreterContext;
   private boolean opened = false;
+  private ClassLoader originalClassLoader;
 
   public IPyFlinkInterpreter(Properties property) {
     super(property);
@@ -78,16 +79,26 @@ public class IPyFlinkInterpreter extends IPythonInterpreter 
{
   public InterpreterResult internalInterpret(String st,
                                              InterpreterContext context)
           throws InterpreterException {
-    // set InterpreterContext in the python thread first, otherwise flink job 
could not be
-    // associated with paragraph in JobListener
-    this.curInterpreterContext = context;
-    InterpreterResult result =
-            
super.internalInterpret("intp.setInterpreterContextInPythonThread()", context);
-    if (result.code() != InterpreterResult.Code.SUCCESS) {
-      throw new InterpreterException("Fail to 
setInterpreterContextInPythonThread: " +
-              result.toString());
+    try {
+      // set InterpreterContext in the python thread first, otherwise flink 
job could not be
+      // associated with paragraph in JobListener
+      this.curInterpreterContext = context;
+      InterpreterResult result =
+              super.internalInterpret("intp.initJavaThread()", context);
+      if (result.code() != InterpreterResult.Code.SUCCESS) {
+        throw new InterpreterException("Fail to initJavaThread: " +
+                result.toString());
+      }
+      return super.internalInterpret(st, context);
+    } finally {
+      if (getKernelProcessLauncher().isRunning()) {
+        InterpreterResult result =
+                
super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
+        if (result.code() != InterpreterResult.Code.SUCCESS) {
+          LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + 
result.toString());
+        }
+      }
     }
-    return super.internalInterpret(st, context);
   }
 
   @Override
@@ -105,8 +116,23 @@ public class IPyFlinkInterpreter extends 
IPythonInterpreter {
     }
   }
 
-  public void setInterpreterContextInPythonThread() {
+  /**
+   * Called by python process.
+   */
+  public void initJavaThread() {
     InterpreterContext.set(curInterpreterContext);
+    originalClassLoader = Thread.currentThread().getContextClassLoader();
+    
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+    flinkInterpreter.createPlannerAgain();
+  }
+
+  /**
+   * Called by python process.
+   */
+  public void resetClassLoaderInPythonThread() {
+    if (originalClassLoader != null) {
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
+    }
   }
 
   @Override
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 4ce4605..91ec0fe 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -46,6 +46,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
   private FlinkInterpreter flinkInterpreter;
   private InterpreterContext curInterpreterContext;
   private boolean isOpened = false;
+  private ClassLoader originalClassLoader;
 
   public PyFlinkInterpreter(Properties properties) {
     super(properties);
@@ -103,22 +104,53 @@ public class PyFlinkInterpreter extends PythonInterpreter 
{
 
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) 
throws InterpreterException {
-    if (isOpened) {
-      // set InterpreterContext in the python thread first, otherwise flink 
job could not be
-      // associated with paragraph in JobListener
-      this.curInterpreterContext = context;
-      InterpreterResult result =
-              super.interpret("intp.setInterpreterContextInPythonThread()", 
context);
-      if (result.code() != InterpreterResult.Code.SUCCESS) {
-        throw new InterpreterException("Fail to 
setInterpreterContextInPythonThread: " +
-                result.toString());
+    try {
+      if (isOpened) {
+        // set InterpreterContext in the python thread first, otherwise flink 
job could not be
+        // associated with paragraph in JobListener
+        this.curInterpreterContext = context;
+        InterpreterResult result =
+                super.interpret("intp.initJavaThread()", context);
+        if (result.code() != InterpreterResult.Code.SUCCESS) {
+          throw new InterpreterException("Fail to initJavaThread: " +
+                  result.toString());
+        }
+      }
+      flinkInterpreter.createPlannerAgain();
+      return super.interpret(st, context);
+    } finally {
+      if (getPythonProcessLauncher().isRunning()) {
+        InterpreterResult result = 
super.interpret("intp.resetClassLoaderInPythonThread()", context);
+        if (result.code() != InterpreterResult.Code.SUCCESS) {
+          LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + 
result.toString());
+        }
       }
     }
-    return super.interpret(st, context);
   }
 
-  public void setInterpreterContextInPythonThread() {
+  /**
+   * Called by python process.
+   */
+  public void initJavaThread() {
     InterpreterContext.set(curInterpreterContext);
+    originalClassLoader = Thread.currentThread().getContextClassLoader();
+    
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
+    flinkInterpreter.createPlannerAgain();
+  }
+
+  /**
+   * Called by python process.
+   */
+  public void resetClassLoaderInPythonThread() {
+    if (originalClassLoader != null) {
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    super.cancel(context);
+    flinkInterpreter.cancel(context);
   }
 
   @Override
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java 
b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 5ab551e..6720bf2 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -182,6 +182,20 @@ public class TableEnvFactory {
             settings.isStreamingMode());
   }
 
+  public void createPlanner(EnvironmentSettings settings) {
+    Map<String, String> executorProperties = settings.toExecutorProperties();
+    Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
+
+    Map<String, String> plannerProperties = settings.toPlannerProperties();
+    ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(
+                    plannerProperties,
+                    executor,
+                    tblConfig,
+                    blinkFunctionCatalog,
+                    catalogManager);
+  }
+
   public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
           EnvironmentSettings settings) {
 
diff --git 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 4e6f3d0..32962da 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -79,6 +79,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   private var mode: ExecutionMode.Value = _
 
+  private var tblEnvFactory: TableEnvFactory = _
   private var benv: ExecutionEnvironment = _
   private var senv: StreamExecutionEnvironment = _
 
@@ -229,7 +230,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
         config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
       val classLoader = Thread.currentThread().getContextClassLoader
       try {
-        // use FlinkClassLoader to initialize FlinkILoop, otherwise 
TableFactoryService could find
+        // use FlinkClassLoader to initialize FlinkILoop, otherwise 
TableFactoryService could not find
         // the TableFactory properly
         Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
         val repl = new FlinkILoop(configuration, config.externalJars, None, 
replOut)
@@ -299,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
       val flinkFunctionCatalog = new FunctionCatalog(tblConfig, 
catalogManager, moduleManager);
       val blinkFunctionCatalog = new FunctionCatalog(tblConfig, 
catalogManager, moduleManager);
 
-      val tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
+      this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
         catalogManager, moduleManager, flinkFunctionCatalog, 
blinkFunctionCatalog)
 
       // blink planner
@@ -547,7 +548,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
     field.get(obj)
   }
 
+  /**
+   * This is just a workaround to make table api work in multiple threads.
+   */
+  def createPlannerAgain(): Unit = {
+    val originalClassLoader = Thread.currentThread().getContextClassLoader
+    try {
+      Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
+      val stEnvSetting =
+        
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
+      this.tblEnvFactory.createPlanner(stEnvSetting)
+    } finally {
+      Thread.currentThread().setContextClassLoader(originalClassLoader)
+    }
+  }
+
   def interpret(code: String, context: InterpreterContext): InterpreterResult 
= {
+    createPlannerAgain()
     val originalStdOut = System.out
     val originalStdErr = System.err;
     if (context != null) {
diff --git 
a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
 
b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 651645b..fb51d57 100644
--- 
a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ 
b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -84,7 +84,7 @@ public class FlinkBatchSqlInterpreterTest extends 
SqlInterpreterTest {
     // select which use scala udf
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("SELECT addOne(id) as add_one FROM 
source_table", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(new String(context.out.toByteArray()), 
InterpreterResult.Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertEquals(InterpreterResult.Type.TABLE, 
resultMessages.get(0).getType());
diff --git 
a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java 
b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index fda41ad..eb678a2 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -33,6 +33,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Properties;
 
@@ -45,6 +46,7 @@ public class IPyFlinkInterpreterTest extends 
IPythonInterpreterTest {
 
   private RemoteInterpreterEventClient mockIntpEventClient =
           mock(RemoteInterpreterEventClient.class);
+  private LazyOpenInterpreter flinkScalaInterpreter;
 
   protected Properties initIntpProperties() {
     Properties p = new Properties();
@@ -62,12 +64,12 @@ public class IPyFlinkInterpreterTest extends 
IPythonInterpreterTest {
     context.setIntpEventClient(mockIntpEventClient);
     InterpreterContext.set(context);
 
-    LazyOpenInterpreter flinkInterpreter = new LazyOpenInterpreter(
+    this.flinkScalaInterpreter = new LazyOpenInterpreter(
         new FlinkInterpreter(properties));
     intpGroup = new InterpreterGroup();
     intpGroup.put("session_1", new ArrayList<Interpreter>());
-    intpGroup.get("session_1").add(flinkInterpreter);
-    flinkInterpreter.setInterpreterGroup(intpGroup);
+    intpGroup.get("session_1").add(flinkScalaInterpreter);
+    flinkScalaInterpreter.setInterpreterGroup(intpGroup);
 
     LazyOpenInterpreter pyFlinkInterpreter =
         new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
@@ -94,17 +96,17 @@ public class IPyFlinkInterpreterTest extends 
IPythonInterpreterTest {
 
   @Test
   public void testBatchIPyFlink() throws InterpreterException {
-    testBatchPyFlink(interpreter);
+    testBatchPyFlink(interpreter, flinkScalaInterpreter);
   }
 
   @Test
-  public void testStreamIPyFlink() throws InterpreterException {
-    testStreamPyFlink(interpreter);
+  public void testStreamIPyFlink() throws InterpreterException, IOException {
+    testStreamPyFlink(interpreter, flinkScalaInterpreter);
   }
 
-  public static void testBatchPyFlink(Interpreter interpreter) throws 
InterpreterException {
+  public static void testBatchPyFlink(Interpreter pyflinkInterpreter, 
Interpreter flinkScalaInterpreter) throws InterpreterException {
     InterpreterContext context = 
createInterpreterContext(mock(RemoteInterpreterEventClient.class));
-    InterpreterResult result = interpreter.interpret(
+    InterpreterResult result = pyflinkInterpreter.interpret(
         "import tempfile\n" +
         "import os\n" +
         "import shutil\n" +
@@ -131,6 +133,77 @@ public class IPyFlinkInterpreterTest extends 
IPythonInterpreterTest {
         "bt_env.execute(\"batch_job\")"
             , context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // use group by
+    context = 
createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    result = pyflinkInterpreter.interpret(
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "b_env.set_parallelism(1)\n" +
+            "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 
'hello')], ['a', 'b', 'c'])\n" +
+            "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+            "    .with_format(OldCsv()\n" +
+            "      .field_delimiter(',')\n" +
+            "      .field(\"a\", DataTypes.STRING())\n" +
+            "      .field(\"b\", DataTypes.BIGINT())\n" +
+            "      .field(\"c\", DataTypes.BIGINT())) \\\n" +
+            "    .with_schema(Schema()\n" +
+            "      .field(\"a\", DataTypes.STRING())\n" +
+            "      .field(\"b\", DataTypes.BIGINT())\n" +
+            "      .field(\"c\", DataTypes.BIGINT())) \\\n" +
+            "    .register_table_sink(\"batch_sink4\")\n" +
+            "t.group_by(\"c\").select(\"c, sum(a), 
count(b)\").insert_into(\"batch_sink4\")\n" +
+            "bt_env.execute(\"batch_job4\")"
+            , context);
+    assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, 
result.code());
+
+    // use scala udf in pyflink
+    // define scala udf
+    result = flinkScalaInterpreter.interpret(
+            "class AddOne extends ScalarFunction {\n" +
+                    "  def eval(a: java.lang.Long): String = a + \"\1\"\n" +
+                    "}", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = 
flinkScalaInterpreter.interpret("btenv.registerFunction(\"addOne\", new 
AddOne())",
+            context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = 
createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    result = pyflinkInterpreter.interpret(
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "b_env.set_parallelism(1)\n" +
+            "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 
'hello')], ['a', 'b', 'c'])\n" +
+            "bt_env.connect(FileSystem().path(sink_path)) \\\n" +
+            "    .with_format(OldCsv()\n" +
+            "      .field_delimiter(',')\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .with_schema(Schema()\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .register_table_sink(\"batch_sink3\")\n" +
+            "t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" +
+            "bt_env.execute(\"batch_job3\")"
+            , context);
+    assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, 
result.code());
   }
 
   @Override
@@ -149,33 +222,33 @@ public class IPyFlinkInterpreterTest extends 
IPythonInterpreterTest {
     }
   }
 
-  public static void testStreamPyFlink(Interpreter interpreter) throws 
InterpreterException {
+  public static void testStreamPyFlink(Interpreter interpreter, Interpreter 
flinkScalaInterpreter) throws InterpreterException, IOException {
     InterpreterContext context = 
createInterpreterContext(mock(RemoteInterpreterEventClient.class));
     InterpreterResult result = interpreter.interpret(
-          "import tempfile\n" +
-          "import os\n" +
-          "import shutil\n" +
-          "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
-          "if os.path.exists(sink_path):\n" +
-          "    if os.path.isfile(sink_path):\n" +
-          "      os.remove(sink_path)\n" +
-          "    else:\n" +
-          "      shutil.rmtree(sink_path)\n" +
-          "s_env.set_parallelism(1)\n" +
-          "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], 
['a', 'b', 'c'])\n" +
-          "st_env.connect(FileSystem().path(sink_path)) \\\n" +
-          "    .with_format(OldCsv()\n" +
-          "      .field_delimiter(',')\n" +
-          "      .field(\"a\", DataTypes.BIGINT())\n" +
-          "      .field(\"b\", DataTypes.STRING())\n" +
-          "      .field(\"c\", DataTypes.STRING())) \\\n" +
-          "    .with_schema(Schema()\n" +
-          "      .field(\"a\", DataTypes.BIGINT())\n" +
-          "      .field(\"b\", DataTypes.STRING())\n" +
-          "      .field(\"c\", DataTypes.STRING())) \\\n" +
-          "    .register_table_sink(\"stream_sink\")\n" +
-          "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
-          "st_env.execute(\"stream_job\")"
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "s_env.set_parallelism(1)\n" +
+            "t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 
'hello')], ['a', 'b', 'c'])\n" +
+            "st_env.connect(FileSystem().path(sink_path)) \\\n" +
+            "    .with_format(OldCsv()\n" +
+            "      .field_delimiter(',')\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .with_schema(Schema()\n" +
+            "      .field(\"a\", DataTypes.BIGINT())\n" +
+            "      .field(\"b\", DataTypes.STRING())\n" +
+            "      .field(\"c\", DataTypes.STRING())) \\\n" +
+            "    .register_table_sink(\"stream_sink\")\n" +
+            "t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
+            "st_env.execute(\"stream_job\")"
             , context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
   }
diff --git 
a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java 
b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index a42d594..7bbc1dd 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.flink;
 
 
 import com.google.common.io.Files;
-import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -32,6 +31,8 @@ import 
org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.python.PythonInterpreterTest;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -48,7 +49,7 @@ public class PyFlinkInterpreterTest extends 
PythonInterpreterTest {
   private RemoteInterpreterEventClient mockRemoteEventClient =
           mock(RemoteInterpreterEventClient.class);
 
-  private Interpreter flinkInterpreter;
+  private Interpreter flinkScalaInterpreter;
   private Interpreter streamSqlInterpreter;
   private Interpreter batchSqlInterpreter;
 
@@ -77,9 +78,9 @@ public class PyFlinkInterpreterTest extends 
PythonInterpreterTest {
         .setIntpEventClient(mockRemoteEventClient)
         .build();
     InterpreterContext.set(context);
-    flinkInterpreter = new LazyOpenInterpreter(new 
FlinkInterpreter(properties));
-    intpGroup.get("session_1").add(flinkInterpreter);
-    flinkInterpreter.setInterpreterGroup(intpGroup);
+    flinkScalaInterpreter = new LazyOpenInterpreter(new 
FlinkInterpreter(properties));
+    intpGroup.get("session_1").add(flinkScalaInterpreter);
+    flinkScalaInterpreter.setInterpreterGroup(intpGroup);
 
     LazyOpenInterpreter iPyFlinkInterpreter =
         new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
@@ -108,9 +109,9 @@ public class PyFlinkInterpreterTest extends 
PythonInterpreterTest {
   }
 
   @Test
-  public void testPyFlink() throws InterpreterException {
-    IPyFlinkInterpreterTest.testBatchPyFlink(interpreter);
-    IPyFlinkInterpreterTest.testStreamPyFlink(interpreter);
+  public void testPyFlink() throws InterpreterException, IOException {
+    IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, 
flinkScalaInterpreter);
+    IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, 
flinkScalaInterpreter);
   }
 
   protected InterpreterContext getInterpreterContext() {
diff --git a/flink/src/test/resources/log4j.properties 
b/flink/src/test/resources/log4j.properties
index 0d84434..8017840 100644
--- a/flink/src/test/resources/log4j.properties
+++ b/flink/src/test/resources/log4j.properties
@@ -15,11 +15,12 @@
 # limitations under the License.
 #
 
-log4j.rootLogger = WARN, stdout
+log4j.rootLogger = INFO, stdout
 
 log4j.appender.stdout = org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 
 log4j.logger.org.apache.hive=WARN
+log4j.logger.org.apache.flink=WARN
 
diff --git a/flink/src/test/resources/log4j2.properties 
b/flink/src/test/resources/log4j2.properties
index cf94a3e..1bce906 100755
--- a/flink/src/test/resources/log4j2.properties
+++ b/flink/src/test/resources/log4j2.properties
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-status = WARN
+status = INFO
 name = HiveLog4j2
 packages = org.apache.hadoop.hive.ql.log
 
diff --git 
a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java 
b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index 0b55018..e403a59 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.python;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
 import com.google.gson.Gson;
 import org.apache.commons.exec.CommandLine;
@@ -60,7 +59,7 @@ public class PythonInterpreter extends Interpreter {
   private static final int MAX_TIMEOUT_SEC = 30;
 
   private GatewayServer gatewayServer;
-  private PythonProcessLauncher pythonProcessLauncher;
+  protected PythonProcessLauncher pythonProcessLauncher;
   private File pythonWorkDir;
   protected boolean useBuiltinPy4j = true;
 
@@ -163,7 +162,6 @@ public class PythonInterpreter extends Interpreter {
     }
   }
 
-  @VisibleForTesting
   public PythonProcessLauncher getPythonProcessLauncher() {
     return pythonProcessLauncher;
   }
@@ -572,7 +570,7 @@ public class PythonInterpreter extends Interpreter {
     LOGGER.debug("Python Process Output: " + message);
   }
 
-  class PythonProcessLauncher extends ProcessLauncher {
+  public class PythonProcessLauncher extends ProcessLauncher {
 
     PythonProcessLauncher(CommandLine commandLine, Map<String, String> envs) {
       super(commandLine, envs);
diff --git 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index cabcabc..f3361a4 100644
--- 
a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ 
b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -146,7 +146,7 @@ public class IPythonInterpreterTest extends 
BasePythonInterpreterTest {
     assertEquals(Code.ERROR, result.code());
     output = context.out.toInterpreterResultMessage().get(0);
     assertTrue(output.getData(),
-            output.getData().equals("Ipython kernel has been stopped. Please 
check logs. "
+            output.getData().contains("Ipython kernel has been stopped. Please 
check logs. "
         + "It might be because of an out of memory issue."));
   }
 
diff --git 
a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
 
b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
index 5f8164f..dff1900 100644
--- 
a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
+++ 
b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.jupyter;
 
-import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ManagedChannelBuilder;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.environment.EnvironmentUtils;
@@ -213,7 +212,6 @@ public class JupyterKernelInterpreter extends 
AbstractInterpreter {
     return EnvironmentUtils.getProcEnvironment();
   }
 
-  @VisibleForTesting
   public JupyterKernelProcessLauncher getKernelProcessLauncher() {
     return jupyterKernelProcessLauncher;
   }

Reply via email to