This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new e229682 [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12 e229682 is described below commit e229682affc9dc46a5256e4d8587ecd8f059df94 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed May 6 22:02:13 2020 +0800 [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12 ### What is this PR for? This PR will throw exception when using flink for scala 2.12 which is not supported now. See the screenshot below. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4790 ### How should this be tested? * Manually tested. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3763 from zjffdu/ZEPPELIN-4790 and squashes the following commits: eac7b9a65 [Jeff Zhang] address comment 57b0737da [Jeff Zhang] [ZEPPELIN-4790]. Throw exception when using flink for scala 2.12 --- .../apache/zeppelin/flink/FlinkInterpreter.java | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 4565fc0..f02c21b 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,13 +17,10 @@ package org.apache.zeppelin.flink; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -48,18 +45,33 @@ public class FlinkInterpreter extends Interpreter { public FlinkInterpreter(Properties properties) { super(properties); - this.innerIntp = new FlinkScalaInterpreter(getProperties()); + } + + private void checkScalaVersion() throws InterpreterException { + String scalaVersionString = scala.util.Properties.versionString(); + LOGGER.info("Using Scala: " + scalaVersionString); + if (scalaVersionString.contains("version 2.11")) { + return; + } else { + throw new InterpreterException("Unsupported scala version: " + scalaVersionString + + ", Only scala 2.11 is supported"); + } } @Override public void open() throws InterpreterException { + checkScalaVersion(); + + this.innerIntp = new FlinkScalaInterpreter(getProperties()); this.innerIntp.open(); this.z = this.innerIntp.getZeppelinContext(); } @Override public void close() throws InterpreterException { - this.innerIntp.close(); + if (this.innerIntp != null) { + this.innerIntp.close(); + } } @Override