Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.8 27f4b5917 -> 1b868e8e9


ZEPPELIN-3707. In yarn cluster mode, zeppelin does not allow specifying 
additional artifacts to be uploaded via distributed cache

Put all the additional files to `spark.files` instead of `--files`. Because 
`--files` will overwrite `spark.files`

[Bug Fix]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-3707

* CI pass

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #3135 from zjffdu/ZEPPELIN-3707 and squashes the following commits:

c31a1008a [Jeff Zhang] ZEPPELIN-3707. In yarn cluster mode, zeppelin does not 
allow specifying additional artifacts to be uploaded via distributed cache

(cherry picked from commit 839db5ec58fd5d852706e10f7a05bb756c8e75b9)
Signed-off-by: Jeff Zhang <zjf...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1b868e8e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1b868e8e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1b868e8e

Branch: refs/heads/branch-0.8
Commit: 1b868e8e98a2480a6155e257aa441982bbab4c2b
Parents: 27f4b59
Author: Jeff Zhang <zjf...@apache.org>
Authored: Sun Aug 12 11:04:53 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Tue Aug 14 21:33:22 2018 +0800

----------------------------------------------------------------------
 .../launcher/SparkInterpreterLauncher.java      |  7 ++-
 .../launcher/SparkInterpreterLauncherTest.java  | 48 +++++++++++++++++++-
 2 files changed, 52 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b868e8e/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 688d95f..b6e901b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -65,7 +65,12 @@ public class SparkInterpreterLauncher extends 
ShellScriptLauncher {
       sparkConfBuilder.append(" --master " + sparkMaster);
     }
     if (isYarnMode() && getDeployMode().equals("cluster")) {
-      sparkConfBuilder.append(" --files " + zConf.getConfDir() + 
"/log4j_yarn_cluster.properties");
+      if (sparkProperties.containsKey("spark.files")) {
+        sparkProperties.put("spark.files", 
sparkProperties.getProperty("spark.files") + "," +
+            zConf.getConfDir() + "/log4j_yarn_cluster.properties");
+      } else {
+        sparkProperties.put("spark.files", zConf.getConfDir() + 
"/log4j_yarn_cluster.properties");
+      }
     }
     for (String name : sparkProperties.stringPropertyNames()) {
       sparkConfBuilder.append(" --conf " + name + "=" + 
sparkProperties.getProperty(name));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b868e8e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index ef5a04e..e8df174 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -17,12 +17,16 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.Path;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -129,7 +133,7 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getEnv().size() >= 3);
     assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
     assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn-cluster --files 
.//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf 
spark.jars='jar_1' --conf spark.yarn.isPython=true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    assertEquals(" --master yarn-cluster --conf 
spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf 
spark.jars='jar_1' --conf spark.yarn.isPython=true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
   @Test
@@ -147,6 +151,11 @@ public class SparkInterpreterLauncherTest {
     InterpreterOption option = new InterpreterOption();
     option.setUserImpersonate(true);
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark");
+    Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), 
context.getInterpreterSettingId());
+    FileUtils.deleteDirectory(localRepoPath.toFile());
+    Files.createDirectories(localRepoPath);
+    Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), 
"test.jar"));
+
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
@@ -157,6 +166,41 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getEnv().size() >= 3);
     assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
     assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties 
--conf spark.files='file_1' --conf spark.jars='jar_1' --conf 
spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --proxy-user 
user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    assertEquals(" --master yarn --conf 
spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf 
spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf 
spark.yarn.isPython=true --proxy-user user1", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), 
"test.jar"));
+    FileUtils.deleteDirectory(localRepoPath.toFile());
+  }
+
+  @Test
+  public void testYarnClusterMode_3() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "yarn");
+    properties.setProperty("spark.submit.deployMode", "cluster");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark");
+    Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), 
context.getInterpreterSettingId());
+    FileUtils.deleteDirectory(localRepoPath.toFile());
+    Files.createDirectories(localRepoPath);
+
+    InterpreterClient client = launcher.launch(context);
+    assertTrue(client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 3);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
+    assertEquals(" --master yarn --conf 
spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf 
spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf 
spark.yarn.isPython=true --proxy-user user1", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    FileUtils.deleteDirectory(localRepoPath.toFile());
   }
 }

Reply via email to