This is an automated email from the ASF dual-hosted git repository. alexott pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new a282e78 [ZEPPELIN-4821] Spark interpreter uses incorrect property name a282e78 is described below commit a282e7861da444c65091b1c56cbded48a69e5609 Author: Alex Ott <alex...@apache.org> AuthorDate: Sun May 17 12:03:57 2020 +0200 [ZEPPELIN-4821] Spark interpreter uses incorrect property name ### What is this PR for? Spark interpreter did use incorrect property name for Spark Master - `master`, although a lot of code was dependent on `spark.master` - it's better to use only one name everywhere ### What type of PR is it? Bug Fix ### What is the Jira issue? * ZEPPELIN-4821 ### How should this be tested? * https://travis-ci.org/github/alexott/zeppelin/builds/699582429 Author: Alex Ott <alex...@apache.org> Closes #3773 from alexott/ZEPPELIN-4821 and squashes the following commits: 82b8321f1 [Alex Ott] [ZEPPELIN-4821] Spark interpreter uses incorrect property name --- conf/zeppelin-env.cmd.template | 40 +++++------ conf/zeppelin-env.sh.template | 78 +++++++++++----------- docs/interpreter/spark.md | 4 +- docs/quickstart/kubernetes.md | 4 +- docs/setup/deployment/cdh.md | 3 +- docs/setup/deployment/flink_and_spark_cluster.md | 2 +- docs/setup/deployment/spark_cluster_mode.md | 6 +- docs/setup/deployment/yarn_install.md | 2 +- k8s/zeppelin-server.yaml | 2 +- .... Spark Interpreter Introduction_2F8KN6TKK.zpln | 2 +- .../spark_mesos/entrypoint.sh | 2 +- .../apache/zeppelin/spark/IPySparkInterpreter.java | 5 +- .../apache/zeppelin/spark/SparkInterpreter.java | 15 ++++- .../apache/zeppelin/spark/SparkRInterpreter.java | 7 ++ .../zeppelin/spark/SparkStringConstants.java | 27 ++++++++ .../src/main/resources/interpreter-setting.json | 4 +- .../zeppelin/spark/IPySparkInterpreterTest.java | 5 +- .../zeppelin/spark/KotlinSparkInterpreterTest.java | 4 +- .../spark/PySparkInterpreterMatplotlibTest.java | 4 +- .../zeppelin/spark/PySparkInterpreterTest.java | 6 +- .../zeppelin/spark/SparkIRInterpreterTest.java | 4 +- .../zeppelin/spark/SparkInterpreterTest.java | 24 +++---- .../zeppelin/spark/SparkRInterpreterTest.java | 8 +-- .../zeppelin/spark/SparkShinyInterpreterTest.java | 4 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 4 +- .../zeppelin/spark/SparkScala211Interpreter.scala | 2 +- .../zeppelin/spark/SparkScala212Interpreter.scala | 2 +- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 6 +- .../zeppelin/integration/SparkIntegrationTest.java | 29 ++++---- .../integration/ZeppelinSparkClusterTest.java | 11 ++- .../launcher/K8sRemoteInterpreterProcess.java | 2 +- .../launcher/K8sRemoteInterpreterProcessTest.java | 6 +- .../launcher/SparkInterpreterLauncher.java | 54 +++++++++------ .../launcher/SparkInterpreterLauncherTest.java | 38 ++++++----- 34 files changed, 235 insertions(+), 181 deletions(-) diff --git a/conf/zeppelin-env.cmd.template b/conf/zeppelin-env.cmd.template index ee3c798..54cf8d5 100644 --- a/conf/zeppelin-env.cmd.template +++ b/conf/zeppelin-env.cmd.template @@ -17,35 +17,35 @@ REM limitations under the License. REM REM set JAVA_HOME= -REM set MASTER= REM Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode. -REM set ZEPPELIN_JAVA_OPTS REM Additional jvm options. for example, set ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" -REM set ZEPPELIN_MEM REM Zeppelin jvm mem options Default -Xms1024m -Xmx1024m -XX:MaxMetaspaceSize=512m -REM set ZEPPELIN_INTP_MEM REM zeppelin interpreter process jvm mem options. Default -Xmx1024m -Xms1024m -XX:MaxMetaspaceSize=512m -REM set ZEPPELIN_INTP_JAVA_OPTS REM zeppelin interpreter process jvm options. -REM set ZEPPELIN_JMX_ENABLE REM Enable JMX feature by defining it like "true" -REM set ZEPPELIN_JMX_PORT REM Port number which JMX uses. If not set, JMX won't be enabled +REM set SPARK_MASTER= REM Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode. +REM set ZEPPELIN_JAVA_OPTS REM Additional jvm options. for example, set ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" +REM set ZEPPELIN_MEM REM Zeppelin jvm mem options Default -Xms1024m -Xmx1024m -XX:MaxMetaspaceSize=512m +REM set ZEPPELIN_INTP_MEM REM zeppelin interpreter process jvm mem options. Default -Xmx1024m -Xms1024m -XX:MaxMetaspaceSize=512m +REM set ZEPPELIN_INTP_JAVA_OPTS REM zeppelin interpreter process jvm options. +REM set ZEPPELIN_JMX_ENABLE REM Enable JMX feature by defining it like "true" +REM set ZEPPELIN_JMX_PORT REM Port number which JMX uses. If not set, JMX won't be enabled -REM set ZEPPELIN_LOG_DIR REM Where log files are stored. PWD by default. -REM set ZEPPELIN_PID_DIR REM The pid files are stored. /tmp by default. -REM set ZEPPELIN_WAR_TEMPDIR REM The location of jetty temporary directory. -REM set ZEPPELIN_NOTEBOOK_DIR REM Where notebook saved -REM set ZEPPELIN_NOTEBOOK_HOMESCREEN REM Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z -REM set ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE REM hide homescreen notebook from list when this value set to "true". default "false" +REM set ZEPPELIN_LOG_DIR REM Where log files are stored. PWD by default. +REM set ZEPPELIN_PID_DIR REM The pid files are stored. /tmp by default. +REM set ZEPPELIN_WAR_TEMPDIR REM The location of jetty temporary directory. +REM set ZEPPELIN_NOTEBOOK_DIR REM Where notebook saved +REM set ZEPPELIN_NOTEBOOK_HOMESCREEN REM Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z +REM set ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE REM hide homescreen notebook from list when this value set to "true". default "false" REM set ZEPPELIN_NOTEBOOK_S3_BUCKET REM Bucket where notebook saved REM set ZEPPELIN_NOTEBOOK_S3_USER REM User in bucket where notebook saved. For example bucket/user/notebook/2A94M5J1Z/note.json REM set ZEPPELIN_NOTEBOOK_S3_ENDPOINT REM Endpoint of the bucket REM set ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID REM AWS KMS key ID REM set ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION REM AWS KMS key region REM set ZEPPELIN_NOTEBOOK_S3_SSE REM Server-side encryption enabled for notebooks -REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zeppelin. $USER by default. -REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0. +REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zeppelin. $USER by default. +REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0. REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading REM set ZEPPELIN_INTERPRETER_DEP_MVNREPO REM Maven principal repository for interpreter's additional dependency loading REM set ZEPPELIN_HELIUM_NODE_INSTALLER_URL REM Remote Node installer url for Helium dependency loader REM set ZEPPELIN_HELIUM_NPM_INSTALLER_URL REM Remote Npm installer url for Helium dependency loader REM set ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL REM Remote Yarn package installer url for Helium dependency loader -REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). -REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC REM If there are multiple notebook storages, should we treat the first one as the only source of truth? +REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). +REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC REM If there are multiple notebook storages, should we treat the first one as the only source of truth? REM Spark interpreter configuration @@ -62,10 +62,10 @@ REM without SPARK_HOME defined, Zeppelin still able to run spark interpreter pro REM however, it is not encouraged when you can define SPARK_HOME REM REM Options read in YARN client mode -REM set HADOOP_CONF_DIR REM yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. +REM set HADOOP_CONF_DIR REM yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. REM Pyspark (supported with Spark 1.2.1 and above) REM To configure pyspark, you need to set spark distribution's path to 'spark.home' property in Interpreter setting screen in Zeppelin GUI -REM set PYSPARK_PYTHON REM path to the python command. must be the same path on the driver(Zeppelin) and all workers. +REM set PYSPARK_PYTHON REM path to the python command. must be the same path on the driver(Zeppelin) and all workers. REM set PYTHONPATH REM Spark interpreter options @@ -77,6 +77,6 @@ REM set ZEPPELIN_SPARK_MAXRESULT REM Max number of Spark SQL result to dis REM ZeppelinHub connection configuration REM -REM set ZEPPELINHUB_API_ADDRESS REM Refers to the address of the ZeppelinHub service in use +REM set ZEPPELINHUB_API_ADDRESS REM Refers to the address of the ZeppelinHub service in use REM set ZEPPELINHUB_API_TOKEN REM Refers to the Zeppelin instance token of the user REM set ZEPPELINHUB_USER_KEY REM Optional, when using Zeppelin with authentication. diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index ee03635..bb24e62 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -17,50 +17,50 @@ # # export JAVA_HOME= -# export MASTER= # Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode. -# export ZEPPELIN_ADDR # Bind address (default 127.0.0.1) -# export ZEPPELIN_PORT # port number to listen (default 8080) -# export ZEPPELIN_LOCAL_IP # Zeppelin's thrift server ip address, if not specified, one random IP address will be choosen. -# export ZEPPELIN_JAVA_OPTS # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" -# export ZEPPELIN_MEM # Zeppelin jvm mem options Default -Xms1024m -Xmx1024m -XX:MaxMetaspaceSize=512m -# export ZEPPELIN_INTP_MEM # zeppelin interpreter process jvm mem options. Default -Xms1024m -Xmx1024m -XX:MaxMetaspaceSize=512m -# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. -# export ZEPPELIN_SSL_PORT # ssl port (used when ssl environment variable is set to true) -# export ZEPPELIN_JMX_ENABLE # Enable JMX feature by defining "true" -# export ZEPPELIN_JMX_PORT # Port number which JMX uses. If not set, JMX won't be enabled - -# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default. -# export ZEPPELIN_PID_DIR # The pid files are stored. ${ZEPPELIN_HOME}/run by default. -# export ZEPPELIN_WAR_TEMPDIR # The location of jetty temporary directory. -# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved -# export ZEPPELIN_NOTEBOOK_HOMESCREEN # Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z -# export ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE # hide homescreen notebook from list when this value set to "true". default "false" - -# export ZEPPELIN_NOTEBOOK_S3_BUCKET # Bucket where notebook saved -# export ZEPPELIN_NOTEBOOK_S3_ENDPOINT # Endpoint of the bucket -# export ZEPPELIN_NOTEBOOK_S3_USER # User in bucket where notebook saved. For example bucket/user/notebook/2A94M5J1Z/note.json -# export ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID # AWS KMS key ID -# export ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION # AWS KMS key region -# export ZEPPELIN_NOTEBOOK_S3_SSE # Server-side encryption enabled for notebooks +# export SPARK_MASTER= # Spark master url. eg. spark://master_addr:7077. Leave empty if you want to use local mode. +# export ZEPPELIN_ADDR # Bind address (default 127.0.0.1) +# export ZEPPELIN_PORT # port number to listen (default 8080) +# export ZEPPELIN_LOCAL_IP # Zeppelin's thrift server ip address, if not specified, one random IP address will be choosen. +# export ZEPPELIN_JAVA_OPTS # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" +# export ZEPPELIN_MEM # Zeppelin jvm mem options Default -Xms1024m -Xmx1024m -XX:MaxMetaspaceSize=512m +# export ZEPPELIN_INTP_MEM # zeppelin interpreter process jvm mem options. Default -Xms1024m -Xmx1024m -XX:MaxMetaspaceSize=512m +# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. +# export ZEPPELIN_SSL_PORT # ssl port (used when ssl environment variable is set to true) +# export ZEPPELIN_JMX_ENABLE # Enable JMX feature by defining "true" +# export ZEPPELIN_JMX_PORT # Port number which JMX uses. If not set, JMX won't be enabled + +# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default. +# export ZEPPELIN_PID_DIR # The pid files are stored. ${ZEPPELIN_HOME}/run by default. +# export ZEPPELIN_WAR_TEMPDIR # The location of jetty temporary directory. +# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved +# export ZEPPELIN_NOTEBOOK_HOMESCREEN # Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z +# export ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE # hide homescreen notebook from list when this value set to "true". default "false" + +# export ZEPPELIN_NOTEBOOK_S3_BUCKET # Bucket where notebook saved +# export ZEPPELIN_NOTEBOOK_S3_ENDPOINT # Endpoint of the bucket +# export ZEPPELIN_NOTEBOOK_S3_USER # User in bucket where notebook saved. For example bucket/user/notebook/2A94M5J1Z/note.json +# export ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID # AWS KMS key ID +# export ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION # AWS KMS key region +# export ZEPPELIN_NOTEBOOK_S3_SSE # Server-side encryption enabled for notebooks # export ZEPPELIN_NOTEBOOK_GCS_STORAGE_DIR # GCS "directory" (prefix) under which notebooks are saved. E.g. gs://example-bucket/path/to/dir # export GOOGLE_APPLICATION_CREDENTIALS # Provide a service account key file for GCS and BigQuery API calls (overrides application default credentials) -# export ZEPPELIN_NOTEBOOK_MONGO_URI # MongoDB connection URI used to connect to a MongoDB database server. Default "mongodb://localhost" -# export ZEPPELIN_NOTEBOOK_MONGO_DATABASE # Database name to store notebook. Default "zeppelin" -# export ZEPPELIN_NOTEBOOK_MONGO_COLLECTION # Collection name to store notebook. Default "notes" -# export ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT # If "true" import local notes under ZEPPELIN_NOTEBOOK_DIR on startup. Default "false" +# export ZEPPELIN_NOTEBOOK_MONGO_URI # MongoDB connection URI used to connect to a MongoDB database server. Default "mongodb://localhost" +# export ZEPPELIN_NOTEBOOK_MONGO_DATABASE # Database name to store notebook. Default "zeppelin" +# export ZEPPELIN_NOTEBOOK_MONGO_COLLECTION # Collection name to store notebook. Default "notes" +# export ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT # If "true" import local notes under ZEPPELIN_NOTEBOOK_DIR on startup. Default "false" -# export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default. -# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. +# export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default. +# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. # export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading # export ZEPPELIN_INTERPRETER_DEP_MVNREPO # Remote principal repository for interpreter's additional dependency loading # export ZEPPELIN_HELIUM_NODE_INSTALLER_URL # Remote Node installer url for Helium dependency loader # export ZEPPELIN_HELIUM_NPM_INSTALLER_URL # Remote Npm installer url for Helium dependency loader # export ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL # Remote Yarn package installer url for Helium dependency loader -# export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). -# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC # If there are multiple notebook storages, should we treat the first one as the only source of truth? -# export ZEPPELIN_NOTEBOOK_PUBLIC # Make notebook public by default when created, private otherwise +# export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). +# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC # If there are multiple notebook storages, should we treat the first one as the only source of truth? +# export ZEPPELIN_NOTEBOOK_PUBLIC # Make notebook public by default when created, private otherwise # export DOCKER_TIME_ZONE # Set to the same time zone as the zeppelin server. E.g, "America/New_York" or "Asia/Shanghai" @@ -84,10 +84,10 @@ ## however, it is not encouraged when you can define SPARK_HOME ## # Options read in YARN client mode -# export HADOOP_CONF_DIR # yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. +# export HADOOP_CONF_DIR # yarn-site.xml is located in configuration directory in HADOOP_CONF_DIR. # Pyspark (supported with Spark 1.2.1 and above) # To configure pyspark, you need to set spark distribution's path to 'spark.home' property in Interpreter setting screen in Zeppelin GUI -# export PYSPARK_PYTHON # path to the python command. must be the same path on the driver(Zeppelin) and all workers. +# export PYSPARK_PYTHON # path to the python command. must be the same path on the driver(Zeppelin) and all workers. # export PYTHONPATH ## Spark interpreter options ## @@ -106,9 +106,9 @@ # export HBASE_CONF_DIR= # (optional) Alternatively, configuration directory can be set to point to the directory that has hbase-site.xml #### ZeppelinHub connection configuration #### -# export ZEPPELINHUB_API_ADDRESS # Refers to the address of the ZeppelinHub service in use -# export ZEPPELINHUB_API_TOKEN # Refers to the Zeppelin instance token of the user -# export ZEPPELINHUB_USER_KEY # Optional, when using Zeppelin with authentication. +# export ZEPPELINHUB_API_ADDRESS # Refers to the address of the ZeppelinHub service in use +# export ZEPPELINHUB_API_TOKEN # Refers to the Zeppelin instance token of the user +# export ZEPPELINHUB_USER_KEY # Optional, when using Zeppelin with authentication. #### Zeppelin impersonation configuration # export ZEPPELIN_IMPERSONATE_CMD # Optional, when user want to run interpreter as end web user. eg) 'sudo -H -u ${ZEPPELIN_IMPERSONATE_USER} bash -c ' diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md index 1f0aed7..990f466 100644 --- a/docs/interpreter/spark.md +++ b/docs/interpreter/spark.md @@ -81,7 +81,7 @@ You can also set other Spark properties which are not listed in the table. For a <td>Location of spark distribution</td> <tr> <tr> - <td>master</td> + <td>spark.master</td> <td>local[*]</td> <td>Spark master uri. <br/> e.g. spark://master_host:7077</td> <tr> @@ -248,7 +248,7 @@ configuration with code together for more flexibility. e.g. </center> ### Set master in Interpreter menu -After starting Zeppelin, go to **Interpreter** menu and edit **master** property in your Spark interpreter setting. The value may vary depending on your Spark cluster deployment type. +After starting Zeppelin, go to **Interpreter** menu and edit **spark.master** property in your Spark interpreter setting. The value may vary depending on your Spark cluster deployment type. For example, diff --git a/docs/quickstart/kubernetes.md b/docs/quickstart/kubernetes.md index 7b64e7f..cd45912 100644 --- a/docs/quickstart/kubernetes.md +++ b/docs/quickstart/kubernetes.md @@ -113,7 +113,7 @@ And then start your spark interpreter sc.parallelize(1 to 100).count ... ``` -While `master` property of SparkInterpreter starts with `k8s://` (default `k8s://https://kubernetes.default.svc` when Zeppelin started using zeppelin-server.yaml), Spark executors will be automatically created in your Kubernetes cluster. +While `spark.master` property of SparkInterpreter starts with `k8s://` (default `k8s://https://kubernetes.default.svc` when Zeppelin started using zeppelin-server.yaml), Spark executors will be automatically created in your Kubernetes cluster. Spark UI is accessible by clicking `SPARK JOB` on the Paragraph. Check [here](https://spark.apache.org/docs/latest/running-on-kubernetes.html) to know more about Running Spark on Kubernetes. @@ -192,7 +192,7 @@ and all interpreter properties are accessible inside the templates. When interpreter group is `spark`, Zeppelin sets necessary spark configuration automatically to use Spark on Kubernetes. It uses client mode, so Spark interpreter Pod works as a Spark driver, spark executors are launched in separate Pods. -This auto configuration can be overrided by manually setting `master` property of Spark interpreter. +This auto configuration can be overrided by manually setting `spark.master` property of Spark interpreter. ### Accessing Spark UI (or Service running in interpreter Pod) diff --git a/docs/setup/deployment/cdh.md b/docs/setup/deployment/cdh.md index d35292e..20f819b 100644 --- a/docs/setup/deployment/cdh.md +++ b/docs/setup/deployment/cdh.md @@ -76,14 +76,13 @@ To verify the application is running well, check the web UI for HDFS on `http:// Set following configurations to `conf/zeppelin-env.sh`. ```bash -export MASTER=yarn-client export HADOOP_CONF_DIR=[your_hadoop_conf_path] export SPARK_HOME=[your_spark_home_path] ``` `HADOOP_CONF_DIR`(Hadoop configuration path) is defined in `/scripts/docker/spark-cluster-managers/cdh/hdfs_conf`. -Don't forget to set Spark `master` as `yarn-client` in Zeppelin **Interpreters** setting page like below. +Don't forget to set Spark `spark.master` as `yarn-client` in Zeppelin **Interpreters** setting page like below. <img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/zeppelin_yarn_conf.png" /> diff --git a/docs/setup/deployment/flink_and_spark_cluster.md b/docs/setup/deployment/flink_and_spark_cluster.md index 5094840..c793651 100644 --- a/docs/setup/deployment/flink_and_spark_cluster.md +++ b/docs/setup/deployment/flink_and_spark_cluster.md @@ -394,7 +394,7 @@ Open a web browser and go to the Zeppelin web-ui at http://yourip:8080. Now go back to the Zeppelin web-ui at http://`yourip`:8080 and this time click on *anonymous* at the top right, which will open a drop-down menu, select *Interpreters* to enter interpreter configuration. In the Spark section, click the edit button in the top right corner to make the property values editable (looks like a pencil). -The only field that needs to be edited in the Spark interpreter is the master field. Change this value from `local[*]` to the URL you used to start the slave, mine was `spark://ubuntu:7077`. +The only field that needs to be edited in the Spark interpreter is the `spark.master` field. Change this value from `local[*]` to the URL you used to start the slave, mine was `spark://ubuntu:7077`. Click *Save* to update the parameters, and click *OK* when it asks you about restarting the interpreter. diff --git a/docs/setup/deployment/spark_cluster_mode.md b/docs/setup/deployment/spark_cluster_mode.md index 94102bf..f6d4de8 100644 --- a/docs/setup/deployment/spark_cluster_mode.md +++ b/docs/setup/deployment/spark_cluster_mode.md @@ -130,14 +130,13 @@ You can also check each application web UI for HDFS on `http://<hostname>:50070/ Set following configurations to `conf/zeppelin-env.sh`. ```bash -export MASTER=yarn-client export HADOOP_CONF_DIR=[your_hadoop_conf_path] export SPARK_HOME=[your_spark_home_path] ``` `HADOOP_CONF_DIR`(Hadoop configuration path) is defined in `/scripts/docker/spark-cluster-managers/spark_yarn_cluster/hdfs_conf`. -Don't forget to set Spark `master` as `yarn-client` in Zeppelin **Interpreters** setting page like below. +Don't forget to set Spark `spark.master` as `yarn-client` in Zeppelin **Interpreters** setting page like below. <img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/zeppelin_yarn_conf.png" /> @@ -193,13 +192,12 @@ You can also check each application web UI for Mesos on `http://<hostname>:5050/ ### 4. Configure Spark interpreter in Zeppelin ```bash -export MASTER=mesos://127.0.1.1:5050 export MESOS_NATIVE_JAVA_LIBRARY=[PATH OF libmesos.so] export SPARK_HOME=[PATH OF SPARK HOME] ``` -Don't forget to set Spark `master` as `mesos://127.0.1.1:5050` in Zeppelin **Interpreters** setting page like below. +Don't forget to set Spark `spark.master` as `mesos://127.0.1.1:5050` in Zeppelin **Interpreters** setting page like below. <img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/zeppelin_mesos_conf.png" /> diff --git a/docs/setup/deployment/yarn_install.md b/docs/setup/deployment/yarn_install.md index fd39c84..b130272 100644 --- a/docs/setup/deployment/yarn_install.md +++ b/docs/setup/deployment/yarn_install.md @@ -136,7 +136,7 @@ It was assumed that 1.6.0 version of Spark is installed at /usr/lib/spark. Look <th>Remarks</th> </tr> <tr> - <td>master</td> + <td>spark.master</td> <td>yarn-client</td> <td>In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.</td> </tr> diff --git a/k8s/zeppelin-server.yaml b/k8s/zeppelin-server.yaml index aefd07c..11023d2 100644 --- a/k8s/zeppelin-server.yaml +++ b/k8s/zeppelin-server.yaml @@ -33,7 +33,7 @@ data: ZEPPELIN_HOME: /zeppelin ZEPPELIN_SERVER_RPC_PORTRANGE: 12320:12320 # default value of 'master' property for spark interpreter. - MASTER: k8s://https://kubernetes.default.svc + SPARK_MASTER: k8s://https://kubernetes.default.svc # default value of 'SPARK_HOME' property for spark interpreter. SPARK_HOME: /spark --- diff --git a/notebook/Spark Tutorial/1. Spark Interpreter Introduction_2F8KN6TKK.zpln b/notebook/Spark Tutorial/1. Spark Interpreter Introduction_2F8KN6TKK.zpln index 28dd67c..d085d9f 100644 --- a/notebook/Spark Tutorial/1. Spark Interpreter Introduction_2F8KN6TKK.zpln +++ b/notebook/Spark Tutorial/1. Spark Interpreter Introduction_2F8KN6TKK.zpln @@ -2,7 +2,7 @@ "paragraphs": [ { "title": "", - "text": "%md\n\n# Introduction\n\nThis tutorial is for how to use Spark Interpreter in Zeppelin.\n\n1. Specify `SPARK_HOME` in interpreter setting. If you don\u0027t specify `SPARK_HOME`, Zeppelin will use the embedded spark which can only run in local mode. And some advanced features may not work in this embedded spark.\n2. Specify `master` for spark execution mode.\n * `local[*]` - Driver and Executor would both run in the same host of zeppelin server. It is only for testing [...] + "text": "%md\n\n# Introduction\n\nThis tutorial is for how to use Spark Interpreter in Zeppelin.\n\n1. Specify `SPARK_HOME` in interpreter setting. If you don\u0027t specify `SPARK_HOME`, Zeppelin will use the embedded spark which can only run in local mode. And some advanced features may not work in this embedded spark.\n2. Specify `spark.master` for spark execution mode.\n * `local[*]` - Driver and Executor would both run in the same host of zeppelin server. It is only for te [...] "user": "anonymous", "dateUpdated": "2020-05-04 13:44:39.482", "config": { diff --git a/scripts/docker/spark-cluster-managers/spark_mesos/entrypoint.sh b/scripts/docker/spark-cluster-managers/spark_mesos/entrypoint.sh index 2f9572b..d8306ba 100755 --- a/scripts/docker/spark-cluster-managers/spark_mesos/entrypoint.sh +++ b/scripts/docker/spark-cluster-managers/spark_mesos/entrypoint.sh @@ -20,7 +20,7 @@ export SPARK_MASTER_WEBUI_PORT=8080 export SPARK_WORKER_PORT=8888 export SPARK_WORKER_WEBUI_PORT=8081 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server/ -export MASTER=mesos://127.0.1.1:5050 +export SPARK_MASTER=mesos://127.0.1.1:5050 export MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so # spark configuration diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index c7d449f..b2a1bc1 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -23,7 +23,6 @@ import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.python.IPythonInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +61,8 @@ public class IPySparkInterpreter extends IPythonInterpreter { SparkConf conf = sparkInterpreter.getSparkContext().getConf(); // only set PYTHONPATH in embedded, local or yarn-client mode. // yarn-cluster will setup PYTHONPATH automatically. - if (!conf.contains("spark.submit.deployMode") || - !conf.get("spark.submit.deployMode").equals("cluster")) { + if (!conf.contains(SparkStringConstants.SUBMIT_DEPLOY_MODE_PROP_NAME) || + !conf.get(SparkStringConstants.SUBMIT_DEPLOY_MODE_PROP_NAME).equals("cluster")) { setAdditionalPythonPath(PythonUtils.sparkPythonPath()); } setUseBuiltinPy4j(false); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 0d69acc..fc0a2c1 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -89,11 +89,19 @@ public class SparkInterpreter extends AbstractInterpreter { } if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL") && entry.getValue().toString().equals("true")) { - conf.set("spark.scheduler.mode", "FAIR"); + conf.set(SparkStringConstants.SCHEDULER_MODE_PROP_NAME, "FAIR"); } } // use local mode for embedded spark mode when spark.master is not found - conf.setIfMissing("spark.master", "local"); + if (!conf.contains(SparkStringConstants.MASTER_PROP_NAME)) { + if (conf.contains("master")) { + conf.set(SparkStringConstants.MASTER_PROP_NAME, conf.get("master")); + } else { + String masterEnv = System.getenv(SparkStringConstants.MASTER_ENV_NAME); + conf.set(SparkStringConstants.MASTER_PROP_NAME, + masterEnv == null ? SparkStringConstants.DEFAULT_MASTER_VALUE : masterEnv); + } + } this.innerInterpreter = loadSparkScalaInterpreter(conf); this.innerInterpreter.open(); @@ -198,6 +206,9 @@ public class SparkInterpreter extends AbstractInterpreter { } public ZeppelinContext getZeppelinContext() { + if (this.innerInterpreter == null) { + LOGGER.error("innerInterpreter is null!"); + } return this.innerInterpreter.getZeppelinContext(); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 951ddd0..bcd9288 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -73,6 +74,12 @@ public class SparkRInterpreter extends RInterpreter { this.sparkVersion = new SparkVersion(sc.version()); this.isSpark1 = sparkVersion.getMajorVersion() == 1; + LOGGER.info("SparkRInterpreter: SPARK_HOME={}", sc.getConf().getenv("SPARK_HOME")); + Arrays.stream(sc.getConf().getAll()) + .forEach(x -> LOGGER.info("SparkRInterpreter: conf, {}={}", x._1, x._2)); + properties.entrySet().stream().forEach(x -> + LOGGER.info("SparkRInterpreter: prop, {}={}", x.getKey(), x.getValue())); + ZeppelinRContext.setSparkContext(sc); ZeppelinRContext.setJavaSparkContext(jsc); if (!isSpark1) { diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkStringConstants.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkStringConstants.java new file mode 100644 index 0000000..08f62db --- /dev/null +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkStringConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +public class SparkStringConstants { + public static final String MASTER_PROP_NAME = "spark.master"; + public static final String MASTER_ENV_NAME = "SPARK_MASTER"; + public static final String SCHEDULER_MODE_PROP_NAME = "spark.scheduler.mode"; + public static final String APP_NAME_PROP_NAME = "spark.app.name"; + public static final String SUBMIT_DEPLOY_MODE_PROP_NAME = "spark.submit.deployMode"; + public static final String DEFAULT_MASTER_VALUE = "local[*]"; +} diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json index 1556d33..dfe09d6 100644 --- a/spark/interpreter/src/main/resources/interpreter-setting.json +++ b/spark/interpreter/src/main/resources/interpreter-setting.json @@ -12,8 +12,8 @@ "description": "Location of spark distribution", "type": "string" }, - "master": { - "envName": "MASTER", + "spark.master": { + "envName": "SPARK_MASTER", "propertyName": "spark.master", "defaultValue": "local[*]", "description": "Spark master uri. local | yarn-client | yarn-cluster | spark master address of standalone mode, ex) spark://master_host:7077", diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index 40ab851..0c160b3 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -55,10 +55,9 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { @Override protected Properties initIntpProperties() { Properties p = new Properties(); - p.setProperty("spark.master", "local[4]"); - p.setProperty("master", "local[4]"); + p.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local[4]"); p.setProperty("spark.submit.deployMode", "client"); - p.setProperty("spark.app.name", "Zeppelin Test"); + p.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "Zeppelin Test"); p.setProperty("zeppelin.spark.useHiveContext", "false"); p.setProperty("zeppelin.spark.maxResult", "3"); p.setProperty("zeppelin.spark.importImplicit", "true"); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/KotlinSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/KotlinSparkInterpreterTest.java index 464c8f2..536142e 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/KotlinSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/KotlinSparkInterpreterTest.java @@ -68,8 +68,8 @@ public class KotlinSparkInterpreterTest { public static Properties getSparkTestProperties(TemporaryFolder tmpDir) throws IOException { Properties p = new Properties(); - p.setProperty("master", "local[*]"); - p.setProperty("spark.app.name", "Zeppelin Test"); + p.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local[*]"); + p.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "Zeppelin Test"); p.setProperty("zeppelin.spark.useHiveContext", "true"); p.setProperty("zeppelin.spark.maxResult", "1000"); p.setProperty("zeppelin.spark.importImplicit", "true"); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java index 4f5c020..a9aa283 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java @@ -93,8 +93,8 @@ public class PySparkInterpreterMatplotlibTest { private static Properties getPySparkTestProperties() throws IOException { Properties p = new Properties(); - p.setProperty("spark.master", "local[*]"); - p.setProperty("spark.app.name", "Zeppelin Test"); + p.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local[*]"); + p.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "Zeppelin Test"); p.setProperty("zeppelin.spark.useHiveContext", "true"); p.setProperty("zeppelin.spark.maxResult", "1000"); p.setProperty("zeppelin.spark.importImplicit", "true"); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 7e48666..ab24315 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -46,8 +46,8 @@ public class PySparkInterpreterTest extends PythonInterpreterTest { @Override public void setUp() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "Zeppelin Test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "Zeppelin Test"); properties.setProperty("zeppelin.spark.useHiveContext", "false"); properties.setProperty("zeppelin.spark.maxResult", "3"); properties.setProperty("zeppelin.spark.importImplicit", "true"); @@ -104,7 +104,7 @@ public class PySparkInterpreterTest extends PythonInterpreterTest { intpGroup = new InterpreterGroup(); Properties properties = new Properties(); - properties.setProperty("spark.app.name", "Zeppelin Test"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "Zeppelin Test"); properties.setProperty("spark.pyspark.python", "invalid_python"); properties.setProperty("zeppelin.python.useIPython", "false"); properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1"); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java index f0808f2..11f5594 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java @@ -57,8 +57,8 @@ public class SparkIRInterpreterTest extends IRInterpreterTest { @Before public void setUp() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("spark.r.backendConnectionTimeout", "10"); properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 711635f..be03985 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -69,8 +69,8 @@ public class SparkInterpreterTest { @Test public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl/{{applicationId}}"); // disable color output for easy testing @@ -394,8 +394,8 @@ public class SparkInterpreterTest { @Test public void testDisableReplOutput() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("zeppelin.spark.printREPLOutput", "false"); // disable color output for easy testing @@ -465,8 +465,8 @@ public class SparkInterpreterTest { @Test public void testSchedulePool() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("spark.scheduler.mode", "FAIR"); // disable color output for easy testing @@ -494,8 +494,8 @@ public class SparkInterpreterTest { @Test public void testDisableSparkUI_1() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("spark.ui.enabled", "false"); // disable color output for easy testing @@ -519,8 +519,8 @@ public class SparkInterpreterTest { @Test public void testDisableSparkUI_2() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("zeppelin.spark.ui.hidden", "true"); // disable color output for easy testing @@ -543,8 +543,8 @@ public class SparkInterpreterTest { @Test public void testScopedMode() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); // disable color output for easy testing properties.setProperty("zeppelin.spark.scala.color", "false"); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 8c2b799..f87caa0 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -51,8 +51,8 @@ public class SparkRInterpreterTest { @Before public void setUp() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); properties.setProperty("zeppelin.R.knitr", "true"); properties.setProperty("spark.r.backendConnectionTimeout", "10"); @@ -155,8 +155,8 @@ public class SparkRInterpreterTest { Properties properties = new Properties(); properties.setProperty("zeppelin.R.cmd", "invalid_r"); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); InterpreterGroup interpreterGroup = new InterpreterGroup(); Interpreter sparkRInterpreter = new LazyOpenInterpreter(new SparkRInterpreter(properties)); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java index 86a591c..14f572b 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java @@ -49,8 +49,8 @@ public class SparkShinyInterpreterTest extends ShinyInterpreterTest { @Before public void setUp() throws InterpreterException { Properties properties = new Properties(); - properties.setProperty("master", "local[*]"); - properties.setProperty("spark.app.name", "test"); + properties.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local[*]"); + properties.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); InterpreterContext context = getInterpreterContext(); InterpreterContext.set(context); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index dcab8d3..740cc59 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -48,8 +48,8 @@ public class SparkSqlInterpreterTest { @BeforeClass public static void setUp() throws Exception { Properties p = new Properties(); - p.setProperty("spark.master", "local[4]"); - p.setProperty("spark.app.name", "test"); + p.setProperty(SparkStringConstants.MASTER_PROP_NAME, "local[4]"); + p.setProperty(SparkStringConstants.APP_NAME_PROP_NAME, "test"); p.setProperty("zeppelin.spark.maxResult", "10"); p.setProperty("zeppelin.spark.concurrentSQL", "true"); p.setProperty("zeppelin.spark.sql.stacktrace", "true"); diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 64c6502..cb5a016 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -53,7 +53,7 @@ class SparkScala211Interpreter(override val conf: SparkConf, override def open(): Unit = { super.open() - if (conf.get("spark.master", "local") == "yarn-client") { + if (sparkMaster == "yarn-client") { System.setProperty("SPARK_YARN_MODE", "true") } // Only Spark1 requires to create http server, Spark2 removes HttpServer class. diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index 6d90026..2b04a1d 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -51,7 +51,7 @@ class SparkScala212Interpreter(override val conf: SparkConf, override def open(): Unit = { super.open() - if (conf.get("spark.master", "local") == "yarn-client") { + if (sparkMaster == "yarn-client") { System.setProperty("SPARK_YARN_MODE", "true") } // Only Spark1 requires to create http server, Spark2 removes HttpServer class. diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index f05acb0..e5212d9 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -77,6 +77,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected val interpreterOutput: InterpreterOutputStream + protected val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, + SparkStringConstants.DEFAULT_MASTER_VALUE) protected def open(): Unit = { /* Required for scoped mode. @@ -186,7 +188,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected def close(): Unit = { // delete stagingDir for yarn mode - if (conf.get("spark.master").startsWith("yarn")) { + if (sparkMaster.startsWith("yarn")) { val hadoopConf = new YarnConfiguration() val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) { new Path(conf.get("spark.yarn.stagingDir")) @@ -357,7 +359,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, private def useYarnProxyURLIfNeeded() { if (properties.getProperty("spark.webui.yarn.useProxy", "false").toBoolean) { - if (sc.getConf.get("spark.master").startsWith("yarn")) { + if (sparkMaster.startsWith("yarn")) { val appId = sc.applicationId val yarnClient = YarnClient.createYarnClient val yarnConf = new YarnConfiguration() diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index e0a4990..dd86529 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -44,6 +44,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.EnumSet; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -101,34 +102,34 @@ public abstract class SparkIntegrationTest { InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); String detectedSparkVersion = interpreterResult.message().get(0).getData(); assertTrue(detectedSparkVersion +" doesn't contain " + this.sparkVersion, detectedSparkVersion.contains(this.sparkVersion)); interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); - assertTrue(interpreterResult.message().get(0).getData().contains("45")); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("45")); // test jars & packages can be loaded correctly interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.integration.DummyClass\n" + "import com.maxmind.geoip2._", context); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); // test PySparkInterpreter Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("spark.pyspark", new ExecutionContext("user1", "note1", "test")); interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); // test IPySparkInterpreter Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("spark.ipyspark", new ExecutionContext("user1", "note1", "test")); interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); // test SparkSQLInterpreter Interpreter sqlInterpreter = interpreterFactory.getInterpreter("spark.sql", new ExecutionContext("user1", "note1", "test")); interpreterResult = sqlInterpreter.interpret("select count(1) as c from test", context); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); - assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); - assertEquals("c\n2\n", interpreterResult.message().get(0).getData()); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType()); + assertEquals(interpreterResult.toString(), "c\n2\n", interpreterResult.message().get(0).getData()); // test SparkRInterpreter Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", "note1", "test")); @@ -138,14 +139,14 @@ public abstract class SparkIntegrationTest { interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context); } assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); - assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType()); - assertTrue(interpreterResult.message().get(0).getData().contains("eruptions waiting")); + assertEquals(interpreterResult.toString(), InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType()); + assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("eruptions waiting")); } @Test public void testLocalMode() throws IOException, YarnException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); - sparkInterpreterSetting.setProperty("master", "local[*]"); + sparkInterpreterSetting.setProperty("spark.master", "local[*]"); sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome); sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false"); @@ -166,7 +167,7 @@ public abstract class SparkIntegrationTest { @Test public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); - sparkInterpreterSetting.setProperty("master", "yarn-client"); + sparkInterpreterSetting.setProperty("spark.master", "yarn-client"); sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath()); sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome); sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); @@ -211,7 +212,7 @@ public abstract class SparkIntegrationTest { @Test public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); - sparkInterpreterSetting.setProperty("master", "yarn-cluster"); + sparkInterpreterSetting.setProperty("spark.master", "yarn-cluster"); sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath()); sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome); sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 932dd82..599727f 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -63,6 +63,7 @@ import static org.junit.Assert.assertTrue; public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinSparkClusterTest.class); + public static final String SPARK_MASTER_PROPERTY_NAME = "spark.master"; //This is for only run setupSparkInterpreter one time for each spark version, otherwise //each test method will run setupSparkInterpreter which will cost a long time and may cause travis @@ -93,14 +94,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { Map<String, InterpreterProperty> sparkProperties = (Map<String, InterpreterProperty>) sparkIntpSetting.getProperties(); LOG.info("SPARK HOME detected " + sparkHome); - if (System.getenv("SPARK_MASTER") != null) { - sparkProperties.put("master", - new InterpreterProperty("master", System.getenv("SPARK_MASTER"))); - } else { - sparkProperties.put("master", new InterpreterProperty("master", "local[2]")); - } + String masterEnv = System.getenv("SPARK_MASTER"); + sparkProperties.put(SPARK_MASTER_PROPERTY_NAME, + new InterpreterProperty(SPARK_MASTER_PROPERTY_NAME, masterEnv == null ? "local[2]" : masterEnv)); sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", sparkHome)); - sparkProperties.put("spark.master", new InterpreterProperty("spark.master", "local[2]")); sparkProperties.put("spark.cores.max", new InterpreterProperty("spark.cores.max", "2")); sparkProperties.put("zeppelin.spark.useHiveContext", diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index f07389e..5735c31 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -350,7 +350,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } boolean isSparkOnKubernetes(Properties interpreteProperties) { - String propertySparkMaster = (String) interpreteProperties.getOrDefault("master", ""); + String propertySparkMaster = (String) interpreteProperties.getOrDefault("spark.master", ""); return propertySparkMaster.startsWith("k8s://"); } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 3718fa0..26467a0 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -153,7 +153,7 @@ public class K8sRemoteInterpreterProcessTest { Properties properties = new Properties(); properties.put("my.key1", "v1"); - properties.put("master", "k8s://http://api"); + properties.put("spark.master", "k8s://http://api"); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); @@ -206,7 +206,7 @@ public class K8sRemoteInterpreterProcessTest { Properties properties = new Properties(); properties.put("my.key1", "v1"); - properties.put("master", "k8s://http://api"); + properties.put("spark.master", "k8s://http://api"); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); @@ -258,7 +258,7 @@ public class K8sRemoteInterpreterProcessTest { Properties properties = new Properties(); properties.put("my.key1", "v1"); - properties.put("master", "k8s://http://api"); + properties.put("spark.master", "k8s://http://api"); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 51776ba..2d8fde5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -51,6 +52,9 @@ import java.util.Properties; public class SparkInterpreterLauncher extends StandardInterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class); + public static final String SPARK_MASTER_KEY = "spark.master"; + private static final String DEFAULT_MASTER = "local[*]"; + Optional<String> sparkMaster = Optional.empty(); public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { super(zConf, recoveryStorage); @@ -60,13 +64,17 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException { Map<String, String> env = super.buildEnvFromProperties(context); Properties sparkProperties = new Properties(); - String sparkMaster = getSparkMaster(properties); + String spMaster = getSparkMaster(); + if (spMaster != null) { + sparkProperties.put(SPARK_MASTER_KEY, spMaster); + } for (String key : properties.stringPropertyNames()) { - if (RemoteInterpreterUtils.isEnvString(key) && !StringUtils.isBlank(properties.getProperty(key))) { - env.put(key, properties.getProperty(key)); + String propValue = properties.getProperty(key); + if (RemoteInterpreterUtils.isEnvString(key) && !StringUtils.isBlank(propValue)) { + env.put(key, propValue); } - if (isSparkConf(key, properties.getProperty(key))) { - sparkProperties.setProperty(key, properties.getProperty(key)); + if (isSparkConf(key, propValue)) { + sparkProperties.setProperty(key, propValue); } } @@ -82,9 +90,6 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } StringBuilder sparkConfBuilder = new StringBuilder(); - if (sparkMaster != null) { - sparkConfBuilder.append(" --master " + sparkMaster); - } if (isYarnMode() && getDeployMode().equals("cluster")) { if (sparkProperties.containsKey("spark.files")) { sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," + @@ -294,7 +299,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { String sparkHome = getEnv("SPARK_HOME"); File sparkRBasePath = null; if (sparkHome == null) { - if (!getSparkMaster(properties).startsWith("local")) { + if (!getSparkMaster().startsWith("local")) { throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + " interpreter setting"); @@ -317,31 +322,36 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } /** + * Returns cached Spark Master value if it's present, or calculate it + * * Order to look for spark master * 1. master in interpreter setting * 2. spark.master interpreter setting * 3. use local[*] - * @param properties - * @return + * @return Spark Master string */ - private String getSparkMaster(Properties properties) { - String master = properties.getProperty("master"); - if (master == null) { - master = properties.getProperty("spark.master"); + private String getSparkMaster() { + if (!sparkMaster.isPresent()) { + String master = properties.getProperty(SPARK_MASTER_KEY); if (master == null) { - master = "local[*]"; + master = properties.getProperty("master"); + if (master == null) { + String masterEnv = System.getenv("SPARK_MASTER"); + master = (masterEnv == null ? DEFAULT_MASTER : masterEnv); + } + properties.put(SPARK_MASTER_KEY, master); } + sparkMaster = Optional.of(master); } - return master; + return sparkMaster.get(); } private String getDeployMode() { - String master = getSparkMaster(properties); - if (master.equals("yarn-client")) { + if (getSparkMaster().equals("yarn-client")) { return "client"; - } else if (master.equals("yarn-cluster")) { + } else if (getSparkMaster().equals("yarn-cluster")) { return "cluster"; - } else if (master.startsWith("local")) { + } else if (getSparkMaster().startsWith("local")) { return "client"; } else { String deployMode = properties.getProperty("spark.submit.deployMode"); @@ -357,7 +367,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } private boolean isYarnMode() { - return getSparkMaster(properties).startsWith("yarn"); + return getSparkMaster().startsWith("yarn"); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index b4b2889..ab2ebae 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -91,7 +91,7 @@ public class SparkInterpreterLauncherTest { properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("ENV_1", ""); properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "local[*]"); + properties.setProperty("spark.master", "local[*]"); properties.setProperty("spark.files", "file_1"); properties.setProperty("spark.jars", "jar_1"); @@ -107,7 +107,8 @@ public class SparkInterpreterLauncherTest { assertTrue(interpreterProcess.getEnv().size() >= 2); assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertFalse(interpreterProcess.getEnv().containsKey("ENV_1")); - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --master local[*] --conf spark.files=file_1 --conf spark.jars=jar_1"), + assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.files=file_1" + + " --conf spark.jars=jar_1 --conf spark.master=local[*]"), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -118,7 +119,7 @@ public class SparkInterpreterLauncherTest { Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn-client"); + properties.setProperty("spark.master", "yarn-client"); properties.setProperty("spark.files", "file_1"); properties.setProperty("spark.jars", "jar_1"); @@ -137,9 +138,9 @@ public class SparkInterpreterLauncherTest { String sparkJars = "jar_1"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --master yarn-client --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + - " --conf spark.yarn.isPython=true"), + " --conf spark.yarn.isPython=true --conf spark.master=yarn-client"), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -150,7 +151,7 @@ public class SparkInterpreterLauncherTest { Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn"); + properties.setProperty("spark.master", "yarn"); properties.setProperty("spark.submit.deployMode", "client"); properties.setProperty("spark.files", "file_1"); properties.setProperty("spark.jars", "jar_1"); @@ -170,10 +171,10 @@ public class SparkInterpreterLauncherTest { String sparkJars = "jar_1"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.submit.deployMode=client" + - " --conf spark.yarn.isPython=true"), + " --conf spark.yarn.isPython=true --conf spark.master=yarn"), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -184,7 +185,7 @@ public class SparkInterpreterLauncherTest { Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn-cluster"); + properties.setProperty("spark.master", "yarn-cluster"); properties.setProperty("spark.files", "file_1"); properties.setProperty("spark.jars", "jar_1"); @@ -206,11 +207,12 @@ public class SparkInterpreterLauncherTest { zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --master yarn-cluster --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.yarn.maxAppAttempts=1" + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.yarn.isPython=true" + - " --conf spark.yarn.submit.waitAppCompletion=false"), + " --conf spark.yarn.submit.waitAppCompletion=false" + + " --conf spark.master=yarn-cluster"), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -221,7 +223,7 @@ public class SparkInterpreterLauncherTest { Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn"); + properties.setProperty("spark.master", "yarn"); properties.setProperty("spark.submit.deployMode", "cluster"); properties.setProperty("spark.files", "file_1"); properties.setProperty("spark.jars", "jar_1"); @@ -250,11 +252,12 @@ public class SparkInterpreterLauncherTest { zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.yarn.maxAppAttempts=1" + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.submit.deployMode=cluster --conf spark.yarn.isPython=true" + - " --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1"), + " --conf spark.yarn.submit.waitAppCompletion=false --conf spark.master=yarn" + + " --proxy-user user1"), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); FileUtils.deleteDirectory(localRepoPath.toFile()); @@ -267,7 +270,7 @@ public class SparkInterpreterLauncherTest { Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); - properties.setProperty("master", "yarn"); + properties.setProperty("spark.master", "yarn"); properties.setProperty("spark.submit.deployMode", "cluster"); properties.setProperty("spark.files", "{}"); properties.setProperty("spark.jars", "jar_1"); @@ -296,11 +299,12 @@ public class SparkInterpreterLauncherTest { String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; // escape special characters String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.yarn.maxAppAttempts=1" + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.submit.deployMode=cluster --conf spark.yarn.isPython=true" + - " --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1"), + " --conf spark.yarn.submit.waitAppCompletion=false" + + " --conf spark.master=yarn --proxy-user user1"), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); FileUtils.deleteDirectory(localRepoPath.toFile()); }