This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 73f9650d48 [ZEPPELIN-5760] fix passing configs to spark in k8s (#4398)
73f9650d48 is described below

commit 73f9650d4877b568e5077853e184343fffe10e65
Author: Michal Vince <vince.mic...@gmail.com>
AuthorDate: Thu Sep 29 10:10:10 2022 +0200

    [ZEPPELIN-5760] fix passing configs to spark in k8s (#4398)
    
    * passing static arguments to spark-submit command so driver can pick them 
up
    
    * fixed static names
    
    * removed duplicate driver memory setting
    
    * fixed driver extra java opts
    
    * extend test
    
    * use ZEPPELIN_SPARK_CONF env var to pass spark configurations
    
    * fix import wildmark
    
    * fix separator
    
    * remove redundant concatenation
    
    * - remove redundant concatenation
    - fix tests
---
 .../launcher/K8sRemoteInterpreterProcess.java      | 74 +++++++++++++++-------
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 40 +++++++-----
 .../K8sStandardInterpreterLauncherTest.java        |  4 +-
 3 files changed, 77 insertions(+), 41 deletions(-)

diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index a39b0bbb61..08569bba97 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -25,12 +25,12 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
@@ -78,6 +78,8 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterManagedProcess
   private static final String SPARK_CONTAINER_IMAGE = 
"zeppelin.k8s.spark.container.image";
   private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN";
   private static final String ENV_ZEPPELIN_HOME = "ZEPPELIN_HOME";
+  private static final String SPARK_DRIVER_DEFAULTJAVAOPTS = 
"spark.driver.defaultJavaOptions";
+  private static final String SPARK_DRIVER_EXTRAJAVAOPTS = 
"spark.driver.extraJavaOptions";
 
   public K8sRemoteInterpreterProcess(
           KubernetesClient client,
@@ -319,9 +321,24 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterManagedProcess
     if (isSpark()) {
       int webUiPort = 4040;
       k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage);
+
+      // There is already initial value following --driver-java-options added 
in interpreter.sh
+      // so we need to pass spark.driver.defaultJavaOptions and 
spark.driver.extraJavaOptions
+      // as SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF env variable to build 
spark-submit command correctly.
+      StringJoiner driverExtraJavaOpts = new StringJoiner(" ");
+      if (properties.containsKey(SPARK_DRIVER_DEFAULTJAVAOPTS)) {
+        driverExtraJavaOpts.add((String) 
properties.remove(SPARK_DRIVER_DEFAULTJAVAOPTS));
+      }
+      if (properties.containsKey(SPARK_DRIVER_EXTRAJAVAOPTS)) {
+        driverExtraJavaOpts.add((String) 
properties.remove(SPARK_DRIVER_EXTRAJAVAOPTS));
+      }
+      if (driverExtraJavaOpts.length() > 0) {
+        k8sEnv.put("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF", 
driverExtraJavaOpts.toString());
+      }
+
       if (isSparkOnKubernetes(properties)) {
-        k8sEnv.put("SPARK_SUBMIT_OPTIONS",
-            getEnv().getOrDefault("SPARK_SUBMIT_OPTIONS", "") + 
buildSparkSubmitOptions(userName));
+        addSparkK8sProperties();
+        k8sEnv.put("ZEPPELIN_SPARK_CONF", prepareZeppelinSparkConf(userName));
       }
       k8sEnv.put("SPARK_HOME", getEnv().getOrDefault("SPARK_HOME", "/spark"));
 
@@ -403,28 +420,35 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterManagedProcess
   }
 
   @VisibleForTesting
-  String buildSparkSubmitOptions(String userName) {
-    StringBuilder options = new StringBuilder();
-
-    options.append(" --master k8s://https://kubernetes.default.svc";);
-    options.append(" --deploy-mode client");
-    if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
-      options.append(" --driver-memory 
").append(properties.get(SPARK_DRIVER_MEMORY));
-    }
+  String prepareZeppelinSparkConf(String userName) {
+    StringJoiner sparkConfSJ = new StringJoiner("|");
     if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, 
"anonymous")) {
-      options.append(" --proxy-user ").append(userName);
+      sparkConfSJ.add("--proxy-user");
+      sparkConfSJ.add(userName);
     }
-    options.append(" --conf 
spark.kubernetes.namespace=").append(getInterpreterNamespace());
-    options.append(" --conf spark.executor.instances=1");
-    options.append(" --conf 
spark.kubernetes.driver.pod.name=").append(getPodName());
-    String sparkContainerImage = properties.containsKey(SPARK_CONTAINER_IMAGE) 
? properties.getProperty(SPARK_CONTAINER_IMAGE) : sparkImage;
-    options.append(" --conf 
spark.kubernetes.container.image=").append(sparkContainerImage);
-    options.append(" --conf spark.driver.bindAddress=0.0.0.0");
-    options.append(" --conf 
spark.driver.host=").append(getInterpreterPodDnsName());
-    options.append(" --conf spark.driver.port=").append(getSparkDriverPort());
-    options.append(" --conf 
spark.blockManager.port=").append(getSparkBlockManagerPort());
-
-    return options.toString();
+
+    for (String key : properties.stringPropertyNames()) {
+      String propValue = properties.getProperty(key);
+      if (isSparkConf(key, propValue)) {
+        sparkConfSJ.add("--conf");
+        sparkConfSJ.add(key + "=" + propValue);
+      }
+    }
+
+    return sparkConfSJ.toString();
+  }
+
+  private void addSparkK8sProperties() {
+    properties.setProperty("spark.master", 
"k8s://https://kubernetes.default.svc";);
+    properties.setProperty("spark.submit.deployMode", "client");
+    properties.setProperty("spark.kubernetes.namespace", 
getInterpreterNamespace());
+    properties.setProperty("spark.kubernetes.driver.pod.name", getPodName());
+    properties.setProperty("spark.kubernetes.container.image", 
properties.containsKey(SPARK_CONTAINER_IMAGE) ?
+            properties.getProperty(SPARK_CONTAINER_IMAGE) : sparkImage);
+    properties.setProperty("spark.driver.bindAddress", "0.0.0.0");
+    properties.setProperty("spark.driver.host", getInterpreterPodDnsName());
+    properties.setProperty("spark.driver.port", 
String.valueOf(getSparkDriverPort()));
+    properties.setProperty("spark.blockManager.port", 
String.valueOf(getSparkBlockManagerPort()));
   }
 
   private String getInterpreterPodDnsName() {
@@ -433,6 +457,10 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterManagedProcess
         getInterpreterNamespace());
   }
 
+  private boolean isSparkConf(String key, String value) {
+    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && 
!StringUtils.isEmpty(value);
+  }
+
   /**
    * See xxx-interpreter-pod.yaml
    * @return SparkDriverPort
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index df211eb796..82e56efcd8 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -138,6 +138,8 @@ public class K8sRemoteInterpreterProcessTest {
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api";);
+    properties.put("spark.jars.ivy", "my_ivy_path");
+    properties.put("spark.driver.extraJavaOptions", "-Dextra_option");
     Map<String, String> envs = new HashMap<>();
     envs.put("MY_ENV1", "V1");
     envs.put("SPARK_SUBMIT_OPTIONS", "my options");
@@ -171,16 +173,21 @@ public class K8sRemoteInterpreterProcessTest {
 
     envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
     assertTrue( envs.containsKey("SPARK_HOME"));
+    assertTrue( envs.containsKey("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF"));
+    String driverExtraOptions = envs.get("SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF");
+    assertTrue(driverExtraOptions.contains("-Dextra_option"));
 
     String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
-    assertTrue(sparkSubmitOptions.startsWith("my options "));
-    
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
-    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" 
+ intp.getPodName()));
-    
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
-    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + 
intp.getPodName() + ".default.svc"));
-    assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + 
intp.getSparkDriverPort()));
-    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + 
intp.getSparkBlockManagerPort()));
-    assertFalse(sparkSubmitOptions.contains("--proxy-user"));
+    assertTrue(sparkSubmitOptions.startsWith("my options"));
+    String zeppelinSparkConf = envs.get("ZEPPELIN_SPARK_CONF");
+    
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.namespace=default"));
+    assertTrue(zeppelinSparkConf.contains("spark.kubernetes.driver.pod.name=" 
+ intp.getPodName()));
+    
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.container.image=spark-container:1.0"));
+    assertTrue(zeppelinSparkConf.contains("spark.driver.host=" + 
intp.getPodName() + ".default.svc"));
+    assertTrue(zeppelinSparkConf.contains("spark.driver.port=" + 
intp.getSparkDriverPort()));
+    assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + 
intp.getSparkBlockManagerPort()));
+    assertTrue(zeppelinSparkConf.contains("spark.jars.ivy=my_ivy_path"));
+    assertFalse(zeppelinSparkConf.contains("--proxy-user"));
     assertTrue(intp.isSpark());
   }
 
@@ -225,14 +232,15 @@ public class K8sRemoteInterpreterProcessTest {
     assertTrue( envs.containsKey("SPARK_HOME"));
 
     String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
-    assertTrue(sparkSubmitOptions.startsWith("my options "));
-    
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
-    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" 
+ intp.getPodName()));
-    
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
-    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + 
intp.getPodName() + ".default.svc"));
-    assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + 
intp.getSparkDriverPort()));
-    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + 
intp.getSparkBlockManagerPort()));
-    assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
+    assertTrue(sparkSubmitOptions.startsWith("my options"));
+    String zeppelinSparkConf = envs.get("ZEPPELIN_SPARK_CONF");
+    
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.namespace=default"));
+    assertTrue(zeppelinSparkConf.contains("spark.kubernetes.driver.pod.name=" 
+ intp.getPodName()));
+    
assertTrue(zeppelinSparkConf.contains("spark.kubernetes.container.image=spark-container:1.0"));
+    assertTrue(zeppelinSparkConf.contains("spark.driver.host=" + 
intp.getPodName() + ".default.svc"));
+    assertTrue(zeppelinSparkConf.contains("spark.driver.port=" + 
intp.getSparkDriverPort()));
+    assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + 
intp.getSparkBlockManagerPort()));
+    assertTrue(zeppelinSparkConf.contains("--proxy-user|mytestUser"));
     assertTrue(intp.isSpark());
   }
 
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
index 4afeb0ff5b..8433f23701 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
@@ -98,7 +98,7 @@ public class K8sStandardInterpreterLauncherTest {
     assertTrue(client instanceof K8sRemoteInterpreterProcess);
     K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
     assertTrue(process.isSpark());
-    
assertTrue(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user
 user1"));
+    
assertTrue(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user|user1"));
   }
 
   @Test
@@ -131,6 +131,6 @@ public class K8sStandardInterpreterLauncherTest {
     assertTrue(client instanceof K8sRemoteInterpreterProcess);
     K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
     assertTrue(process.isSpark());
-    
assertFalse(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user
 user1"));
+    
assertFalse(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user
 user1"));
   }
 }

Reply via email to