This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 132c25e [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn session cluster 132c25e is described below commit 132c25ed0d7e868f9120b193ada03af286cb8333 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Jul 14 17:55:28 2020 +0800 [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn session cluster ### What is this PR for? This PR would check whether the specified jars exists or not, otherwise the invalid user jar would lead the issue of unable to shutdown flink yarn session cluster in yarn interpreter mode. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4955 ### How should this be tested? * CI pass https://travis-ci.org/github/zjffdu/zeppelin/builds/707928916 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? NO * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3855 from zjffdu/ZEPPELIN-4955 and squashes the following commits: 80bdd917c [Jeff Zhang] [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn session cluster --- .../main/java/org/apache/zeppelin/flink/HadoopUtils.java | 3 +++ .../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java index e8cefba..85d33f0 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java @@ -72,6 +72,9 @@ public class HadoopUtils { File tmpDir = Files.createTempDir(); FileSystem fs = FileSystem.get(new Configuration()); Path sourcePath = fs.makeQualified(new Path(jarOnHdfs)); + if (!fs.exists(sourcePath)) { + throw new IOException("jar file: " + jarOnHdfs + " doesn't exist."); + } Path destPath = new Path(tmpDir.getAbsolutePath() + "/" + sourcePath.getName()); fs.copyToLocalFile(sourcePath, destPath); return new File(destPath.toString()).getAbsolutePath(); diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index d18a559..2f397c1 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -18,7 +18,7 @@ package org.apache.zeppelin.flink -import java.io.{BufferedReader, File} +import java.io.{BufferedReader, File, IOException} import java.net.{URL, URLClassLoader} import java.nio.file.Files import java.util.Properties @@ -786,7 +786,18 @@ class FlinkScalaInterpreter(val properties: Properties) { } private def getOrDownloadJars(jars: Seq[String]): Seq[String] = { - jars.map(jar => if (jar.contains("://")) HadoopUtils.downloadJar(jar) else jar) + jars.map(jar => { + if (jar.contains("://")) { + HadoopUtils.downloadJar(jar) + } else { + val jarFile = new File(jar) + if (!jarFile.exists() || !jarFile.isFile) { + throw new Exception(s"jar file: ${jar} doesn't exist") + } else { + jar + } + } + }) } def getJobManager = this.jobManager