Repository: zeppelin Updated Branches: refs/heads/branch-0.8 27f4b5917 -> 1b868e8e9
ZEPPELIN-3707. In yarn cluster mode, zeppelin does not allow specifying additional artifacts to be uploaded via distributed cache Put all the additional files to `spark.files` instead of `--files`. Because `--files` will overwrite `spark.files` [Bug Fix] * [ ] - Task * https://issues.apache.org/jira/browse/ZEPPELIN-3707 * CI pass * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3135 from zjffdu/ZEPPELIN-3707 and squashes the following commits: c31a1008a [Jeff Zhang] ZEPPELIN-3707. In yarn cluster mode, zeppelin does not allow specifying additional artifacts to be uploaded via distributed cache (cherry picked from commit 839db5ec58fd5d852706e10f7a05bb756c8e75b9) Signed-off-by: Jeff Zhang <zjf...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1b868e8e Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1b868e8e Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1b868e8e Branch: refs/heads/branch-0.8 Commit: 1b868e8e98a2480a6155e257aa441982bbab4c2b Parents: 27f4b59 Author: Jeff Zhang <zjf...@apache.org> Authored: Sun Aug 12 11:04:53 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Tue Aug 14 21:33:22 2018 +0800 ---------------------------------------------------------------------- .../launcher/SparkInterpreterLauncher.java | 7 ++- .../launcher/SparkInterpreterLauncherTest.java | 48 +++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b868e8e/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 688d95f..b6e901b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -65,7 +65,12 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher { sparkConfBuilder.append(" --master " + sparkMaster); } if (isYarnMode() && getDeployMode().equals("cluster")) { - sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + if (sparkProperties.containsKey("spark.files")) { + sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," + + zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + } else { + sparkProperties.put("spark.files", zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + } } for (String name : sparkProperties.stringPropertyNames()) { sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b868e8e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index ef5a04e..e8df174 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -17,12 +17,16 @@ package org.apache.zeppelin.interpreter.launcher; +import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.junit.Test; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.Path; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -129,7 +133,7 @@ public class SparkInterpreterLauncherTest { assertTrue(interpreterProcess.getEnv().size() >= 3); assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + assertEquals(" --master yarn-cluster --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @Test @@ -147,6 +151,11 @@ public class SparkInterpreterLauncherTest { InterpreterOption option = new InterpreterOption(); option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark"); + Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + FileUtils.deleteDirectory(localRepoPath.toFile()); + Files.createDirectories(localRepoPath); + Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); + InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; @@ -157,6 +166,41 @@ public class SparkInterpreterLauncherTest { assertTrue(interpreterProcess.getEnv().size() >= 3); assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); - assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + assertEquals(" --master yarn --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); + FileUtils.deleteDirectory(localRepoPath.toFile()); + } + + @Test + public void testYarnClusterMode_3() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "yarn"); + properties.setProperty("spark.submit.deployMode", "cluster"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark"); + Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + FileUtils.deleteDirectory(localRepoPath.toFile()); + Files.createDirectories(localRepoPath); + + InterpreterClient client = launcher.launch(context); + assertTrue(client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 3); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); + assertEquals(" --master yarn --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + FileUtils.deleteDirectory(localRepoPath.toFile()); } }