This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 132c25e  [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown 
flink yarn  session cluster
132c25e is described below

commit 132c25ed0d7e868f9120b193ada03af286cb8333
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Tue Jul 14 17:55:28 2020 +0800

    [ZEPPELIN-4955]. Invalid user jar cause unable to shutdown flink yarn  
session cluster
    
    ### What is this PR for?
    
    This PR would check whether the specified jars exists or not, otherwise the 
invalid user jar would lead the issue of unable to shutdown flink yarn session 
cluster in yarn interpreter mode.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4955
    
    ### How should this be tested?
    * CI pass
    
    https://travis-ci.org/github/zjffdu/zeppelin/builds/707928916
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? NO
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3855 from zjffdu/ZEPPELIN-4955 and squashes the following commits:
    
    80bdd917c [Jeff Zhang] [ZEPPELIN-4955]. Invalid user jar cause unable to 
shutdown flink yarn session cluster
---
 .../main/java/org/apache/zeppelin/flink/HadoopUtils.java  |  3 +++
 .../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | 15 +++++++++++++--
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
index e8cefba..85d33f0 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
@@ -72,6 +72,9 @@ public class HadoopUtils {
     File tmpDir = Files.createTempDir();
     FileSystem fs = FileSystem.get(new Configuration());
     Path sourcePath = fs.makeQualified(new Path(jarOnHdfs));
+    if (!fs.exists(sourcePath)) {
+      throw new IOException("jar file: " + jarOnHdfs + " doesn't exist.");
+    }
     Path destPath = new Path(tmpDir.getAbsolutePath() + "/" + 
sourcePath.getName());
     fs.copyToLocalFile(sourcePath, destPath);
     return new File(destPath.toString()).getAbsolutePath();
diff --git 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index d18a559..2f397c1 100644
--- 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.zeppelin.flink
 
-import java.io.{BufferedReader, File}
+import java.io.{BufferedReader, File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.nio.file.Files
 import java.util.Properties
@@ -786,7 +786,18 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   private def getOrDownloadJars(jars: Seq[String]): Seq[String] = {
-    jars.map(jar => if (jar.contains("://")) HadoopUtils.downloadJar(jar) else 
jar)
+    jars.map(jar => {
+      if (jar.contains("://")) {
+        HadoopUtils.downloadJar(jar)
+      } else {
+        val jarFile = new File(jar)
+        if (!jarFile.exists() || !jarFile.isFile) {
+          throw new Exception(s"jar file: ${jar} doesn't exist")
+        } else {
+          jar
+        }
+      }
+    })
   }
 
   def getJobManager = this.jobManager

Reply via email to