This is an automated email from the ASF dual-hosted git repository. moon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new b13651c [ZEPPELIN-3840] Zeppelin on Kubernetes b13651c is described below commit b13651cedf15cff45e823738144ec2e565dc5807 Author: Lee moon soo <leemoon...@gmail.com> AuthorDate: Fri Dec 7 10:45:35 2018 -0800 [ZEPPELIN-3840] Zeppelin on Kubernetes ### What type of PR is it? This PR adds ability to run Zeppelin on Kubernetes. It aims - Zero configuration to start Zeppelin on Kubernetes. (and Spark on Kubernetes) - Run everything on Kubernetes: Zeppelin, Interpreters, Spark. - Highly customizable to adopt various user configurations and extensions. Key features are - Provides zeppelin-server.yaml file for `kubectl` to run Zeppelin server - All interpreters are automatically running as a Pod. - Spark interpreter automatically configured to use [Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html) - Reverse proxy is configured to access Spark UI To do - [x] Document how reverse proxy for Spark UI works and how to configure custom domain. - [x] Document how to customize zeppelin-server and interpreter yaml. - [x] Document new configurations - [x] Document how to mount volume for notebook and configurations ### How it works #### Run Zeppelin Server on Kubernetes `k8s/zeppelin-server.yaml` is provided to run Zeppelin Server with few sidecars and configurations. This file is easy to publish (user can easily consume it using `curl`), highly customizable while it includes all the necessary things. #### K8s Interpreter launcher This PR adds new module, `launcher-k8s-standard` under `zeppelin/zeppelin-plugins/launcher/k8s-standard/` directory. This launcher is [automatically being selected](https://github.com/apache/zeppelin/pull/3240/files#diff-82fddd2ffb77aaffc4b9cf7b5b1eaa79) when Zeppelin is running on Kubernetes. The launcher both handles Spark interpreter and All other interpreters. The launcher launches interpreter as a Pod using template [k8s/interpreter/100-interpreter-pod.yaml](https://github.com/apache/zeppelin/pull/3240/files#diff-d9ce62e2c992d32f0184d7edb862f3c4). Reason filename has `100-` in prefix is because all files in the directory is consumed in alphabetical order by launcher on interpreter start/stop. User can drop more files here to extend/customize interpreter, and filename can be used to control order. The template is rendered by [jinjava](https://github.com/HubSpot/jinjava). #### Spark interpreter When interpreter group is `spark`, K8sRemoteInterpreterProcess [sets necessary spark configuration](https://github.com/apache/zeppelin/pull/3240/files#diff-6d1d3084f55bdd519e39ede4a619e73dR297) automatically to use [Spark on Kubernetes](https://spark.apache.org/docs/latest/running-on-kubernetes.html). User doesn't have to configure anything. It uses client mode. #### Spark UI We may make user manually configure port-forward or do something to access Spark UI, but that's not optimal. It is the best when Spark UI is automatically accessible when user have access to Zeppelin UI, without any extra configuration. To enable this, Zeppelin server Pod has a reverse proxy as a sidecar, and it split traffic to Zeppelin server and Spark UI running in the other Pod. It assume both `service.domain.com` and `*.service.domain.com` point the nginx proxy address. `service.domain.com` is directed to ZeppelinServer, `*.service.domain.com` is directed to interpreter Pod. `<port>-<interpreter pod svc name>.service.domain.com` is convention to access any application running in interpreter Pod. If Spark interpreter Pod is running with a name `spark-axefeg` and Spark UI is running on port 4040, ``` 4040-spark-axefeg.service.domain.com ``` is the address to access Spark UI. Default service domain is [local.zeppelin-project.org:8080](https://github.com/apache/zeppelin/pull/3240/files#diff-56ccb2e2c2617b27dbaae866d9431e51R22), while `local.zeppelin-project.org` and `*.local.zeppelin-project.org` point `127.0.0.1`, and it works with `kubectl port-forward`. ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-3840 ### How should this be tested? Prepare a Kubernetes cluster with enough resources (cpus > 5, mem > 6g). If you're using [minikube](https://github.com/kubernetes/minikube), check your capacity using `kubectl describe node` command before start. You'll need to build Zeppelin docker image and Spark docker image to test. Please follow guide docs/quickstart/kubernetes.md. To quickly try without building docker images, I have uploaded pre-built image on docker hub `moon/zeppelin:0.9.0-SNAPSHOT`, `moon/spark:2.4.0`. Try following command ``` ZEPPELIN_SERVER_YAML="curl -s https://raw.githubusercontent.com/Leemoonsoo/zeppelin/kubernetes/k8s/zeppelin-server.yaml" $ZEPPELIN_SERVER_YAML | sed 's/apache\/zeppelin:0.9.0-SNAPSHOT/moon\/zeppelin:0.9.0-SNAPSHOT/' | sed 's/spark:2.4.0/moon\/spark:2.4.0/' | kubectl apply -f - ``` And port forward ``` kubectl port-forward zeppelin-server 8080:80 ``` And browse http://localhost:8080 To clean up ``` $ZEPPELIN_SERVER_YAML | sed 's/apache\/zeppelin:0.9.0-SNAPSHOT/moon\/zeppelin:0.9.0-SNAPSHOT/' | sed 's/spark:2.4.0/moon\/spark:2.4.0/' | kubectl delete -f - ``` ### Screenshots (if appropriate) See this video https://youtu.be/7E4ZGn4pnTo ### Future work - Per interpreter docker image - Blocking communication between interpreter Pod. - Spark Interpreter Pod has Role CRUD for any pod/service in the same namespace. Which should be restricted to only Spark executors Pod. - Per note interpreter mode by default when Zeppelin is running on Kubernetes ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? yes Author: Lee moon soo <leemoon...@gmail.com> Author: Lee moon soo <m...@apache.org> Closes #3240 from Leemoonsoo/kubernetes and squashes the following commits: 0100a36f2 [Lee moon soo] update how it works on docs, add some comments on yaml files 423412a93 [Lee moon soo] zeppelin.k8s.mode -> zeppelin.run.mode 4e7d8170d [Lee moon soo] localtest.me -> local.zeppelin-project.org 993a0e44e [Lee moon soo] document configurations 9ab6fc420 [Lee moon soo] address code review 22e090f61 [Lee moon soo] logger -> LOGGER 11960dd59 [Lee moon soo] update corresponding test as well 3b652a48e [Lee moon soo] Make spark executor set ownerreference correctly 1a3a07098 [Lee moon soo] Set ownerreference to Role and Rolebinding of interpreter e2dc88a19 [Lee moon soo] suppress error log when wait target is already removed fa36c18e3 [Lee moon soo] Make spark master configurable b4f58a9a1 [Lee moon soo] sig term for quick termination 64a56b5c9 [Lee moon soo] Add docs e9ce64fe7 [Lee moon soo] update dockerfile ec09b8b88 [Lee moon soo] add test 3078bac55 [Lee moon soo] spark ui support 9341fcbfe [Lee moon soo] install kubectl and configure log4j in docker image 0f7c0d4e8 [Lee moon soo] add license f30561189 [Lee moon soo] rename file 2b579ff12 [Lee moon soo] let user override namespace f4166ad04 [Lee moon soo] make spark container image configurable 0d472ea52 [Lee moon soo] load properties and environment variables b0e2c36c6 [Lee moon soo] Rbac role, rolebinding 2960dcb87 [Lee moon soo] configure namespace a4072e6b9 [Lee moon soo] add signal handler 7a8736756 [Lee moon soo] configure spark on kubernetes 263d859d4 [Lee moon soo] use headless service for interpreter pod 7fe9823b1 [Lee moon soo] interpreter pod cascade delete on zeppelin-server delete 86e876435 [Lee moon soo] add services on RBAC 18b8f68cb [Lee moon soo] print spec file contents on debug log 0dea3836b [Lee moon soo] create and connect interpreter pod 9f1b7a169 [Lee moon soo] run kubernetes launcher 2fd2ac8c3 [Lee moon soo] kubernetes mode configuration 58f9f1909 [Lee moon soo] add rbac 36cf391a4 [Lee moon soo] correct plugin name 52bb6c7e1 [Lee moon soo] add k8s dir in package 5f602a65e [Lee moon soo] K8sRemoteInterpreterProcess 07489f76d [Lee moon soo] kubectl with exec d2f3d5b7e [Lee moon soo] add k8s-standard launcher module --- conf/zeppelin-site.xml.template | 29 ++ dev/change_zeppelin_version.sh | 3 + docs/_includes/themes/zeppelin/_navigation.html | 1 + docs/quickstart/kubernetes.md | 253 ++++++++++++++ docs/setup/operation/configuration.md | 30 ++ k8s/interpreter/100-interpreter-spec.yaml | 143 ++++++++ k8s/zeppelin-server.yaml | 201 +++++++++++ scripts/docker/zeppelin/bin/Dockerfile | 15 +- scripts/docker/zeppelin/bin/log4j.properties | 22 ++ .../src/assemble/distribution.xml | 3 + .../zeppelin/conf/ZeppelinConfiguration.java | 46 +++ .../remote/RemoteInterpreterServer.java | 15 + zeppelin-plugins/launcher/k8s-standard/pom.xml | 67 ++++ .../launcher/K8sRemoteInterpreterProcess.java | 382 +++++++++++++++++++++ .../interpreter/launcher/K8sSpecTemplate.java | 78 +++++ .../launcher/K8sStandardInterpreterLauncher.java | 177 ++++++++++ .../zeppelin/interpreter/launcher/Kubectl.java | 157 +++++++++ .../launcher/K8sRemoteInterpreterProcessTest.java | 196 +++++++++++ .../interpreter/launcher/K8sSpecTemplateTest.java | 140 ++++++++ .../K8sStandardInterpreterLauncherTest.java | 78 +++++ .../zeppelin/interpreter/launcher/KubectlTest.java | 105 ++++++ zeppelin-plugins/pom.xml | 1 + .../zeppelin/interpreter/InterpreterSetting.java | 14 +- .../interpreter/RemoteInterpreterEventServer.java | 5 +- .../remote/RemoteInterpreterManagedProcess.java | 2 +- .../remote/RemoteInterpreterProcess.java | 5 + .../remote/RemoteInterpreterRunningProcess.java | 5 + 27 files changed, 2164 insertions(+), 9 deletions(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 3920fb9..88b8d9a 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -584,4 +584,33 @@ <description>Notebook cron folders</description> </property> --> + +<property> + <name>zeppelin.run.mode</name> + <value>auto</value> + <description>'auto|local|k8s'</description> +</configuration> + +<property> + <name>zeppelin.k8s.portforward</name> + <value>false</value> + <description>Port forward to interpreter rpc port. Set 'true' only on local development when zeppelin.k8s.mode 'on'</description> +</configuration> + +<property> + <name>zeppelin.k8s.container.image</name> + <value>apache/zeppelin:0.9.0-SNAPSHOT</value> + <description>Docker image for interpreters</description> </configuration> + +<property> + <name>zeppelin.k8s.spark.container.image</name> + <value>apache/spark:latest</value> + <description>Docker image for Spark executors</description> +</configuration> + +<property> + <name>zeppelin.k8s.template.dir</name> + <value>k8s</value> + <description>Kubernetes yaml spec files</description> +</configuration> \ No newline at end of file diff --git a/dev/change_zeppelin_version.sh b/dev/change_zeppelin_version.sh index c4ef7e6..0097e5b 100755 --- a/dev/change_zeppelin_version.sh +++ b/dev/change_zeppelin_version.sh @@ -62,6 +62,9 @@ sed -i '' 's/"version": "'"${FROM_VERSION}"'",/"version": "'"${TO_VERSION}"'",/g # Change version in Dockerfile sed -i '' 's/Z_VERSION="'"${FROM_VERSION}"'"/Z_VERSION="'"${TO_VERSION}"'"/g' scripts/docker/zeppelin/bin/Dockerfile +# Change docker image version in configuration +sed -i '' sed 's/zeppelin:'"${OLD_VERSION}"'/zeppelin:'"${NEW_VERSION}"'/g' conf/zeppelin-site.xml.template + # When preparing new dev version from release tag, doesn't need to change docs version if is_dev_version "${FROM_VERSION}" || ! is_dev_version "${TO_VERSION}"; then # When prepare new rc for the maintenance release diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index d5f292d..1e9c345 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -25,6 +25,7 @@ <ul class="dropdown-menu"> <li class="title"><span>Getting Started</span></li> <li><a href="{{BASE_PATH}}/quickstart/install.html">Install</a></li> + <li><a href="{{BASE_PATH}}/quickstart/kubernetes.html">Kubernetes</a></li> <li><a href="{{BASE_PATH}}/quickstart/explore_ui.html">Explore UI</a></li> <li><a href="{{BASE_PATH}}/quickstart/tutorial.html">Tutorial</a></li> <li role="separator" class="divider"></li> diff --git a/docs/quickstart/kubernetes.md b/docs/quickstart/kubernetes.md new file mode 100644 index 0000000..f8f6a6a --- /dev/null +++ b/docs/quickstart/kubernetes.md @@ -0,0 +1,253 @@ +--- +layout: page +title: "Install" +description: "This page will help you get started and will guide you through installing Apache Zeppelin and running it in the command line." +group: quickstart +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +{% include JB/setup %} + +# Zeppelin on Kubernetes + +Zeppelin can run on clusters managed by [Kubernetes](https://kubernetes.io/). When Zeppelin runs in Pod, it creates pods for individual interpreter. Also Spark interpreter auto configured to use Spark on Kubernetes in client mode. + +Key benefits are + + - Interpreter scale-out + - Spark interpreter auto configure Spark on Kubernetes + - Able to customize Kubernetes yaml file + - Spark UI access + +## Prerequisites + + - Zeppelin >= 0.9.0 docker image + - Spark >= 2.4.0 docker image (in case of using Spark Interpreter) + - A running Kubernetes cluster with access configured to it using [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/) + - [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in your cluster + - Enough cpu and memory in your Kubernetes cluster. We recommend 4CPUs, 6g of memory to be able to start Spark Interpreter with few executors. + + - If you're using [minikube](https://kubernetes.io/docs/setup/minikube/), check your cluster capacity (`kubectl describe node`) and increase if necessary + + ``` + $ minikube delete # otherwise configuration won't apply + $ minikube config set cpus <number> + $ minikube config set memory <number in MB> + $ minikube start + $ minikube config view + ``` + +## Quickstart + +Get `zeppelin-server.yaml` from github repository or find it from Zeppelin distribution package. + +``` +# Get it from Zeppelin distribution package. +$ ls <zeppelin-distribution>/k8s/zeppelin-server.yaml + +# or download it from github +$ curl -s -O https://raw.githubusercontent.com/apache/zeppelin/master/k8s/zeppelin-server.yaml +``` + +Start zeppelin on kubernetes cluster, + +``` +kubectl apply -f zeppelin-server.yaml +``` + +Port forward Zeppelin server port, + +``` +kubectl port-forward zeppelin-server 8080:80 +``` + +and browse [localhost:8080](http://localhost:8080). +Try run some paragraphs and see each interpreter is running as a Pod (using `kubectl get pods`), instead of a local process. + +To shutdown, + +``` +kubectl delete -f zeppelin-server.yaml +``` + +## Spark Interpreter + +Build spark docker image to use Spark Interpreter. +Download spark binary distribution and run following command. +Spark 2.4.0 or later version is required. + +``` +# if you're using minikube, set docker-env +$ eval $(minikube docker-env) + +# build docker image +$ <spark-distribution>/bin/docker-image-tool.sh -m -t 2.4.0 build +``` + +Run `docker images` and check if `spark:2.4.0` is created. +Configure `sparkContainerImage` of `zeppelin-server-conf` ConfigMap in `zeppelin-server.yaml`. + + +Create note and configure executor number (default 1) + +``` +%spark.conf +spark.executor.instances 5 +``` + +And then start your spark interpreter + +``` +%spark +sc.parallelize(1 to 100).count +... +``` +While `master` property of SparkInterpreter starts with `k8s://` (default `k8s://https://kubernetes.default.svc` when Zeppelin started using zeppelin-server.yaml), Spark executors will be automatically created in your Kubernetes cluster. +Spark UI is accessible by clicking `SPARK JOB` on the Paragraph. + +Check [here](https://spark.apache.org/docs/latest/running-on-kubernetes.html) to know more about Running Spark on Kubernetes. + + +## Build Zeppelin image manually + +To build your own Zeppelin image, first build Zeppelin project with `-Pbuild-distr` flag. + +``` +$ mvn package -DskipTests -Pbuild-distr <your flags> +``` + +Binary package will be created under `zeppelin-distribution/target` directory. Move created package file under `scripts/docker/zeppelin/bin/` directory. + +``` +$ mv zeppelin-distribution/target/zeppelin-*.tar.gz scripts/docker/zeppelin/bin/ +``` + +`scripts/docker/zeppelin/bin/Dockerfile` downloads package from internet. Modify the file to add package from filesystem. + +``` +... + +# Find following section and comment out +#RUN echo "$LOG_TAG Download Zeppelin binary" && \ +# wget -O /tmp/zeppelin-${Z_VERSION}-bin-all.tgz http://archive.apache.org/dist/zeppelin/zeppelin-${Z_VERSION}/zeppelin-${Z_VERSION}-bin-all.tgz && \ +# tar -zxvf /tmp/zeppelin-${Z_VERSION}-bin-all.tgz && \ +# rm -rf /tmp/zeppelin-${Z_VERSION}-bin-all.tgz && \ +# mv /zeppelin-${Z_VERSION}-bin-all ${Z_HOME} + +# Add following lines right after the commented line above +ADD zeppelin-${Z_VERSION}.tar.gz / +RUN ln -s /zeppelin-${Z_VERSION} /zeppelin +... +``` + +Then build docker image. + +``` +# configure docker env, if you're using minikube +$ eval $(minikube docker-env) + +# change directory +$ cd scripts/docker/zeppelin/bin/ + +# build image. Replace <tag>. +$ docker build -t <tag> . +``` + +Finally, set custom image `<tag>` just created to `image` and `ZEPPELIN_K8S_CONTAINER_IMAGE` env variable of `zeppelin-server` container spec in `zeppelin-server.yaml` file. + +Currently, single docker image is being used in both Zeppelin server and Interpreter pods. Therefore, + +| Pod | Number of instances | Image | Note | +| --- | --- | --- | --- | +| Zeppelin Server | 1 | Zeppelin docker image | User creates/deletes with kubectl command | +| Zeppelin Interpreters | n | Zeppelin docker image | Zeppelin Server creates/deletes | +| Spark executors | m | Spark docker image | Spark Interpreter creates/deletes | + +Currently, size of Zeppelin docker image is quite big. Zeppelin project is planning to provides lightweight images for each individual interpreter in the future. + + +## How it works + +### Zeppelin on Kubernetes + +`k8s/zeppelin-server.yaml` is provided to run Zeppelin Server with few sidecars and configurations. +Once Zeppelin Server is started in side Kubernetes, it auto configure itself to use `K8sStandardInterpreterLauncher`. + +The launcher creates each interpreter in a Pod using templates located under `k8s/interpreter/` directory. +Templates in the directory applied in alphabetical order. Templates are rendered by [jinjava](https://github.com/HubSpot/jinjava) +and all interpreter properties are accessible inside the templates. + +### Spark on Kubernetes + +When interpreter group is `spark`, Zeppelin sets necessary spark configuration automatically to use Spark on Kubernetes. +It uses client mode, so Spark interpreter Pod works as a Spark driver, spark executors are launched in separate Pods. +This auto configuration can be overrided by manually setting `master` property of Spark interpreter. + + +### Accessing Spark UI (or Service running in interpreter Pod) + +Zeppelin server Pod has a reverse proxy as a sidecar, and it splits traffic to Zeppelin server and Spark UI running in the other Pods. +It assume both `<your service domain>` and `*.<your service domain>` point the nginx proxy address. +`<your service domain>` is directed to ZeppelinServer, `*.<your service domain>` is directed to interpreter Pods. + +`<port>-<interpreter pod svc name>.<your service domain>` is convention to access any application running in interpreter Pod. + + +For example, When your service domain name is `local.zeppelin-project.org` Spark interpreter Pod is running with a name `spark-axefeg` and Spark UI is running on port 4040, + +``` +4040-spark-axefeg.local.zeppelin-project.org +``` + +is the address to access Spark UI. + +Default service domain is `local.zeppelin-project.org:8080`. `local.zeppelin-project.org` and `*.local.zeppelin-project.org` configured to resolve `127.0.0.1`. +It allows access Zeppelin and Spark UI with `kubectl port-forward zeppelin-server 8080:80`. + + +If you like to use your custom domain + +1. Configure [Ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/) in Kubernetes cluster for `http` port of the service `zeppelin-server` defined in `k8s/zeppelin-server.yaml`. +2. Configure DNS record that your service domain and wildcard subdomain point the IP Addresses of your Ingress. +3. Modify `serviceDomain` of `zeppelin-server-conf` ConfigMap in `k8s/zeppelin-server.yaml` file. +4. Apply changes (e.g. `kubectl apply -f k8s/zeppelin-server.yaml`) + + +## Persist /notebook and /conf directory + +Notebook and configurations are not persisted by default. Please configure volume and update `k8s/zeppelin-server.yaml` +to use the volume to persiste /notebook and /conf directory if necessary. + + +## Customization + +### Zeppelin Server Pod +Edit `k8s/zeppelin-server.yaml` and apply. + +### Interpreter Pod +Since Interpreter Pod is created/deleted by ZeppelinServer using templates under `k8s/interpreter` directory, +to customize, + + 1. Prepare `k8s/interpreter` directory with customization (edit or create new yaml file), in a Kubernetes volume. + 2. Modify `k8s/zeppelin-server.yaml` and mount prepared volume dir `k8s/interpreter` to `/zeppelin/k8s/interpreter/`. + 3. Apply modified `k8s/zeppelin-server.yaml`. + 4. Run a paragraph will create an interpreter using modified yaml files. + + +## Future work + + - Smaller interpreter docker image. + - Blocking communication between interpreter Pod. + - Spark Interpreter Pod has Role CRUD for any pod/service in the same namespace. Which should be restricted to only Spark executors Pod. + - Per note interpreter mode by default when Zeppelin is running on Kubernetes diff --git a/docs/setup/operation/configuration.md b/docs/setup/operation/configuration.md index a9a253c..afa4b2a 100644 --- a/docs/setup/operation/configuration.md +++ b/docs/setup/operation/configuration.md @@ -365,6 +365,36 @@ If both are defined, then the **environment variables** will take priority. <td>token</td> <td>GitHub remote name. Default is `origin`</td> </tr> + <tr> + <td><h6 class="properties">ZEPPELIN_RUN_MODE</h6></td> + <td><h6 class="properties">zeppelin.run.mode</h6></td> + <td>auto</td> + <td>Run mode. 'auto|local|k8s'. 'auto' autodetect environment. 'local' runs interpreter as a local process. k8s runs interpreter on Kubernetes cluster</td> + </tr> + <tr> + <td><h6 class="properties">ZEPPELIN_K8S_PORTFORWARD</h6></td> + <td><h6 class="properties">zeppelin.k8s.portforward</h6></td> + <td>false</td> + <td>Port forward to interpreter rpc port. Set 'true' only on local development when zeppelin.k8s.mode 'on'. Don't use 'true' on production environment</td> + </tr> + <tr> + <td><h6 class="properties">ZEPPELIN_K8S_CONTAINER_IMAGE</h6></td> + <td><h6 class="properties">zeppelin.k8s.container.image</h6></td> + <td>apache/zeppelin:{{ site.ZEPPELIN_VERSION }}</td> + <td>Docker image for interpreters</td> + </tr> + <tr> + <td><h6 class="properties">ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE</h6></td> + <td><h6 class="properties">zeppelin.k8s.spark.container.image</h6></td> + <td>apache/spark:latest</td> + <td>Docker image for Spark executors</td> + </tr> + <tr> + <td><h6 class="properties">ZEPPELIN_K8S_TEMPLATE_DIR</h6></td> + <td><h6 class="properties">zeppelin.k8s.template.dir</h6></td> + <td>k8s</td> + <td>Kubernetes yaml spec files</td> + </tr> </table> diff --git a/k8s/interpreter/100-interpreter-spec.yaml b/k8s/interpreter/100-interpreter-spec.yaml new file mode 100644 index 0000000..c857ff2 --- /dev/null +++ b/k8s/interpreter/100-interpreter-spec.yaml @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +kind: Pod +apiVersion: v1 +metadata: + namespace: {{zeppelin.k8s.namespace}} + name: {{zeppelin.k8s.interpreter.pod.name}} + labels: + app: {{zeppelin.k8s.interpreter.pod.name}} + interpreterGroupId: {{zeppelin.k8s.interpreter.group.id}} + interpreterSettingName: {{zeppelin.k8s.interpreter.setting.name}} + {% if zeppelin.k8s.server.uid is defined %} + ownerReferences: + - apiVersion: v1 + controller: false + blockOwnerDeletion: false + kind: Pod + name: {{zeppelin.k8s.server.pod.name}} + uid: {{zeppelin.k8s.server.uid}} + {% endif %} +spec: + {% if zeppelin.k8s.interpreter.group.name == "spark" %} + automountServiceAccountToken: true + {% else %} + automountServiceAccountToken: false + {% endif %} + restartPolicy: Never + terminationGracePeriodSeconds: 30 + containers: + - name: {{zeppelin.k8s.interpreter.container.name}} + image: {{zeppelin.k8s.interpreter.container.image}} + command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}} -r {{zeppelin.k8s.interpreter.rpc.portRange}} -c {{zeppelin.k8s.server.rpc.host}} -p {{zeppelin.k8s.server.rpc.portRange}} -i {{zeppelin.k8s.interpreter.group.id}} -l {{zeppelin.k8s.interpreter.localRepo}} -g {{zeppelin.k8s.interpreter.setting.name}}"] + lifecycle: + preStop: + exec: + # SIGTERM triggers a quick exit; gracefully terminate instead + command: ["sh", "-c", "ps -ef | grep org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer | grep -v grep | awk '{print $2}' | xargs kill"] + env: + {% for key, value in zeppelin.k8s.envs.items() %} + - name: {{key}} + value: {{value}} + {% endfor %} + {% if zeppelin.k8s.interpreter.group.name == "spark" %} + volumeMounts: + - name: spark-home + mountPath: /spark + initContainers: + - name: spark-home-init + image: {{zeppelin.k8s.spark.container.image}} + command: ["sh", "-c", "cp -r /opt/spark/* /spark/"] + volumeMounts: + - name: spark-home + mountPath: /spark + volumes: + - name: spark-home + emptyDir: {} + {% endif %} +--- +kind: Service +apiVersion: v1 +metadata: + namespace: {{zeppelin.k8s.namespace}} + name: {{zeppelin.k8s.interpreter.pod.name}} # keep Service name the same to Pod name. + {% if zeppelin.k8s.server.uid is defined %} + ownerReferences: + - apiVersion: v1 + controller: false + blockOwnerDeletion: false + kind: Pod + name: {{zeppelin.k8s.server.pod.name}} + uid: {{zeppelin.k8s.server.uid}} + {% endif %} +spec: + clusterIP: None + ports: + - name: intp + port: 12321 + {% if zeppelin.k8s.interpreter.group.name == "spark" %} + - name: spark-driver + port: 22321 + - name: spark-blockmanager + port: 22322 + - name: spark-ui + port: 4040 + {% endif %} + selector: + app: {{zeppelin.k8s.interpreter.pod.name}} +{% if zeppelin.k8s.interpreter.group.name == "spark" %} +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{zeppelin.k8s.interpreter.pod.name}} + namespace: {{zeppelin.k8s.namespace}} + {% if zeppelin.k8s.server.uid is defined %} + ownerReferences: + - apiVersion: v1 + controller: false + blockOwnerDeletion: false + kind: Pod + name: {{zeppelin.k8s.server.pod.name}} + uid: {{zeppelin.k8s.server.uid}} + {% endif %} +rules: +- apiGroups: [""] + resources: ["pods", "services"] + verbs: ["create", "get", "update", "list", "delete", "watch" ] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{zeppelin.k8s.interpreter.pod.name}} + {% if zeppelin.k8s.server.uid is defined %} + ownerReferences: + - apiVersion: v1 + controller: false + blockOwnerDeletion: false + kind: Pod + name: {{zeppelin.k8s.server.pod.name}} + uid: {{zeppelin.k8s.server.uid}} + {% endif %} +subjects: +- kind: ServiceAccount + name: default +roleRef: + kind: Role + name: {{zeppelin.k8s.interpreter.pod.name}} + apiGroup: rbac.authorization.k8s.io +{% endif %} \ No newline at end of file diff --git a/k8s/zeppelin-server.yaml b/k8s/zeppelin-server.yaml new file mode 100644 index 0000000..dbee39c --- /dev/null +++ b/k8s/zeppelin-server.yaml @@ -0,0 +1,201 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +kind: ConfigMap +metadata: + name: zeppelin-server-conf +data: + # 'serviceDomain' is a Domain name to use for accessing Zeppelin UI. + # Should point IP address of 'zeppelin-server' service. + # + # Wildcard subdomain need to be point the same IP address to access service inside of Pod (such as SparkUI). + # i.e. if service domain is 'local.zeppelin-project.org', DNS configuration should make 'local.zeppelin-project.org' and '*.local.zeppelin-project.org' point the same address. + # + # Default value is 'local.zeppelin-project.org' while it points 127.0.0.1 and `kubectl port-forward zeppelin-server` will give localhost to connects. + # If you have your ingress controller configured to connect to `zeppelin-server` service and have a domain name for it (with wildcard subdomain point the same address), you can replace serviceDomain field with your own domain. + serviceDomain: local.zeppelin-project.org:8080 + sparkContainerImage: spark:2.4.0 + nginx.conf: | + daemon off; + worker_processes auto; + events { + worker_connections 1024; + } + http { + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + # first server block will be default. Proxy zeppelin server. + server { + listen 80; + location / { + proxy_pass http://localhost:8080; + proxy_set_header Host $host; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + proxy_redirect http://localhost $scheme://SERVICE_DOMAIN; + } + } + + # match request domain [port]-[service].[serviceDomain] + # proxy extra service such as spark-ui + server { + listen 80; + server_name "~(?<svc_port>[0-9]+)-(?<svc_name>[^.]*)\.(.*)"; + location / { + resolver 127.0.0.1:53 ipv6=off; + proxy_pass http://$svc_name.NAMESPACE.svc.cluster.local:$svc_port; + proxy_set_header Host $host; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + proxy_redirect http://localhost $scheme://SERVICE_DOMAIN; + + # redirect rule for spark ui. 302 redirect response misses port number of service domain + proxy_redirect ~(http:[/]+[0-9]+[-][^-]+[-][^.]+)[^/]+(\/jobs.*) $1.SERVICE_DOMAIN$2; + } + } + } +--- +kind: Pod +apiVersion: v1 +metadata: + name: zeppelin-server + labels: + app: zeppelin-server +spec: + automountServiceAccountToken: true + containers: + - name: zeppelin-server + image: apache/zeppelin:0.9.0-SNAPSHOT + command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/zeppelin.sh"] + lifecycle: + preStop: + exec: + # SIGTERM triggers a quick exit; gracefully terminate instead + command: ["sh", "-c", "ps -ef | grep org.apache.zeppelin.server.ZeppelinServer | grep -v grep | awk '{print $2}' | xargs kill"] + env: + - name: ZEPPELIN_K8S_CONTAINER_IMAGE + value: apache/zeppelin:0.9.0-SNAPSHOT + - name: ZEPPELIN_HOME + value: /zeppelin + - name: ZEPPELIN_SERVER_RPC_PORTRANGE + value: 12320:12320 + - name: POD_UID + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.uid + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE + valueFrom: + configMapKeyRef: + name: zeppelin-server-conf + key: sparkContainerImage + - name: SERVICE_DOMAIN + valueFrom: + configMapKeyRef: + name: zeppelin-server-conf + key: serviceDomain + - name: MASTER # default value of master property for spark interpreter. + value: k8s://https://kubernetes.default.svc + # volumeMounts: + # - name: zeppelin-server-notebook-volume # configure this to persist notebook + # mountPath: /zeppelin/notebook + # - name: zeppelin-server-conf # configure this to persist Zeppelin configuration + # mountPath: /zeppelin/conf + # - name: zeppelin-server-custom-k8s # configure this to mount customized Kubernetes spec for interpreter + # mountPath: /zeppelin/k8s + - name: zeppelin-server-gateway + image: nginx:1.14.0 + command: ["/bin/sh", "-c"] + args: + - cp -f /tmp/conf/nginx.conf /etc/nginx/nginx.conf; + sed -i -e "s/SERVICE_DOMAIN/$(cat /tmp/conf/serviceDomain)/g" /etc/nginx/nginx.conf; + sed -i -e "s/NAMESPACE/$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)/g" /etc/nginx/nginx.conf; + cat /etc/nginx/nginx.conf; + /usr/sbin/nginx + volumeMounts: + - name: zeppelin-server-conf-volume + mountPath: /tmp/conf + lifecycle: + preStop: + exec: + # SIGTERM triggers a quick exit; gracefully terminate instead + command: ["/usr/sbin/nginx", "-s", "quit"] + - name: dnsmasq # nginx requires dns resolver for dynamic dns resolution + image: "janeczku/go-dnsmasq:release-1.0.5" + args: + - --listen + - "127.0.0.1:53" + - --default-resolver + - --append-search-domains + - --hostsfile=/etc/hosts + - --verbose + volumes: + - name: zeppelin-server-conf-volume + configMap: + name: zeppelin-server-conf + items: + - key: nginx.conf + path: nginx.conf + - key: serviceDomain + path: serviceDomain +--- +kind: Service +apiVersion: v1 +metadata: + name: zeppelin-server # keep Service name the same to Pod name. +spec: + ports: + - name: http + port: 80 + - name: rpc # port name is referenced in the code. So it shouldn't be changed. + port: 12320 + selector: + app: zeppelin-server +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: zeppelin-server-role +rules: +- apiGroups: [""] + resources: ["pods", "services"] + verbs: ["create", "get", "update", "patch", "list", "delete", "watch"] +- apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles", "rolebindings"] + verbs: ["bind", "create", "get", "update", "patch", "list", "delete", "watch"] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: zeppelin-server-role-binding +subjects: +- kind: ServiceAccount + name: default +roleRef: + kind: Role + name: zeppelin-server-role + apiGroup: rbac.authorization.k8s.io diff --git a/scripts/docker/zeppelin/bin/Dockerfile b/scripts/docker/zeppelin/bin/Dockerfile index 4f6166a..080b05c 100644 --- a/scripts/docker/zeppelin/bin/Dockerfile +++ b/scripts/docker/zeppelin/bin/Dockerfile @@ -86,15 +86,24 @@ RUN echo "$LOG_TAG Install R related packages" && \ R -e "install.packages('Rcpp', repos='http://cran.us.r-project.org')" && \ Rscript -e "library('devtools'); library('Rcpp'); install_github('ramnathv/rCharts')" +# Install kubectl +RUN apt-get install -y apt-transport-https && \ + curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \ + echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list && \ + apt-get update && \ + apt-get install -y kubectl + +RUN echo "$LOG_TAG Cleanup" && \ + apt-get autoclean && \ + apt-get clean + RUN echo "$LOG_TAG Download Zeppelin binary" && \ wget -O /tmp/zeppelin-${Z_VERSION}-bin-all.tgz http://archive.apache.org/dist/zeppelin/zeppelin-${Z_VERSION}/zeppelin-${Z_VERSION}-bin-all.tgz && \ tar -zxvf /tmp/zeppelin-${Z_VERSION}-bin-all.tgz && \ rm -rf /tmp/zeppelin-${Z_VERSION}-bin-all.tgz && \ mv /zeppelin-${Z_VERSION}-bin-all ${Z_HOME} -RUN echo "$LOG_TAG Cleanup" && \ - apt-get autoclean && \ - apt-get clean +COPY log4j.properties ${Z_HOME}/conf/ EXPOSE 8080 diff --git a/scripts/docker/zeppelin/bin/log4j.properties b/scripts/docker/zeppelin/bin/log4j.properties new file mode 100644 index 0000000..8daee59 --- /dev/null +++ b/scripts/docker/zeppelin/bin/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger = INFO, stdout + +log4j.appender.stdout = org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout = org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n diff --git a/zeppelin-distribution/src/assemble/distribution.xml b/zeppelin-distribution/src/assemble/distribution.xml index 068f85b..378ecab 100644 --- a/zeppelin-distribution/src/assemble/distribution.xml +++ b/zeppelin-distribution/src/assemble/distribution.xml @@ -95,6 +95,9 @@ <directory>../plugins</directory> </fileSet> <fileSet> + <directory>../k8s</directory> + </fileSet> + <fileSet> <outputDirectory>/lib/interpreter</outputDirectory> <directory>../zeppelin-interpreter/target/lib</directory> </fileSet> diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 4e2b8c3..a2aac9e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -48,6 +48,11 @@ public class ZeppelinConfiguration extends XMLConfiguration { private Map<String, String> properties = new HashMap<>(); + public enum RUN_MODE { + LOCAL, + K8S + } + public ZeppelinConfiguration(URL url) throws ConfigurationException { setDelimiterParsingDisabled(true); load(url); @@ -665,6 +670,39 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT); } + public RUN_MODE getRunMode() { + String mode = getString(ConfVars.ZEPPELIN_RUN_MODE); + if ("auto".equalsIgnoreCase(mode)) { // auto detect + if (new File("/var/run/secrets/kubernetes.io").exists()) { + return RUN_MODE.K8S; + } else { + return RUN_MODE.LOCAL; + } + } else { + return RUN_MODE.valueOf(mode.toUpperCase()); + } + } + + public boolean getK8sPortForward() { + return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD); + } + + public String getK8sKubectlCmd() { + return getString(ConfVars.ZEPPELIN_K8S_KUBECTL); + } + + public String getK8sContainerImage() { + return getString(ConfVars.ZEPPELIN_K8S_CONTAINER_IMAGE); + } + + public String getK8sSparkContainerImage() { + return getString(ConfVars.ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE); + } + + public String getK8sTemplatesDir() { + return getRelativeDir(ConfVars.ZEPPELIN_K8S_TEMPLATE_DIR); + } + public Map<String, String> dumpConfigurations(Predicate<String> predicate) { Map<String, String> properties = new HashMap<>(); @@ -816,6 +854,14 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000), ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000), + ZEPPELIN_RUN_MODE("zeppelin.run.mode", "auto"), // auto | local | k8s + + ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes + ZEPPELIN_K8S_KUBECTL("zeppelin.k8s.kubectl", "kubectl"), // kubectl command + ZEPPELIN_K8S_CONTAINER_IMAGE("zeppelin.k8s.container.image", "apache/zeppelin:" + Util.getVersion()), + ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE("zeppelin.k8s.spark.container.image", "apache/spark:latest"), + ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"), + ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""), ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"), ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""), diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 037b293..a607a6f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -73,6 +73,8 @@ import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.misc.Signal; +import sun.misc.SignalHandler; import java.io.IOException; import java.lang.reflect.Constructor; @@ -278,6 +280,19 @@ public class RemoteInterpreterServer extends Thread RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange); remoteInterpreterServer.start(); + + // add signal handler + Signal.handle(new Signal("TERM"), new SignalHandler() { + @Override + public void handle(Signal signal) { + try { + remoteInterpreterServer.shutdown(); + } catch (TException e) { + logger.error("Error on shutdown RemoteInterpreterServer", e); + } + } + }); + remoteInterpreterServer.join(); System.exit(0); } diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml new file mode 100644 index 0000000..56696b0 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zengine-plugins-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../../../zeppelin-plugins</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>launcher-k8s-standard</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: Plugin Kubernetes StandardLauncher</name> + <description>Launcher implementation to run interpreters on Kubernetes</description> + + <properties> + <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name> + </properties> + + <dependencies> + <dependency> + <groupId>com.hubspot.jinjava</groupId> + <artifactId>jinjava</artifactId> + <version>2.4.12</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java new file mode 100644 index 0000000..58e28ad --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -0,0 +1,382 @@ +package org.apache.zeppelin.interpreter.launcher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { + private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); + private static final int K8S_INTERPRETER_SERVICE_PORT = 12321; + private final Kubectl kubectl; + private final String interpreterGroupId; + private final String interpreterGroupName; + private final String interpreterSettingName; + private final File specTempaltes; + private final String containerImage; + private final Properties properties; + private final Map<String, String> envs; + private final String zeppelinServiceHost; + private final String zeppelinServiceRpcPort; + + private final Gson gson = new Gson(); + private final String podName; + private final boolean portForward; + private final String sparkImage; + private ExecuteWatchdog portForwardWatchdog; + private int podPort = K8S_INTERPRETER_SERVICE_PORT; + + private AtomicBoolean started = new AtomicBoolean(false); + + public K8sRemoteInterpreterProcess( + Kubectl kubectl, + File specTemplates, + String containerImage, + String interpreterGroupId, + String interpreterGroupName, + String interpreterSettingName, + Properties properties, + Map<String, String> envs, + String zeppelinServiceHost, + String zeppelinServiceRpcPort, + boolean portForward, + String sparkImage, + int connectTimeout + ) { + super(connectTimeout); + this.kubectl = kubectl; + this.specTempaltes = specTemplates; + this.containerImage = containerImage; + this.interpreterGroupId = interpreterGroupId; + this.interpreterGroupName = interpreterGroupName; + this.interpreterSettingName = interpreterSettingName; + this.properties = properties; + this.envs = new HashMap(envs); + this.zeppelinServiceHost = zeppelinServiceHost; + this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; + this.portForward = portForward; + this.sparkImage = sparkImage; + this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6); + } + + + /** + * Get interpreter pod name + * @return + */ + @VisibleForTesting + String getPodName() { + return podName; + } + + @Override + public String getInterpreterSettingName() { + return interpreterSettingName; + } + + @Override + public void start(String userName) throws IOException { + // create new pod + apply(specTempaltes, false); + kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000); + + if (portForward) { + podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + portForwardWatchdog = kubectl.portForward( + String.format("pod/%s", getPodName()), + new String[] { + String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT) + }); + } + + long startTime = System.currentTimeMillis(); + + // wait until interpreter send started message through thrift rpc + synchronized (started) { + if (!started.get()) { + try { + started.wait(getConnectTimeout()); + } catch (InterruptedException e) { + LOGGER.error("Remote interpreter is not accessible"); + } + } + } + + if (!started.get()) { + LOGGER.info( + String.format("Interpreter pod creation is time out in %d seconds", + getConnectTimeout()/1000)); + } + + // waits for interpreter thrift rpc server ready + while (System.currentTimeMillis() - startTime < getConnectTimeout()) { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { + break; + } else { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } + } + + @Override + public void stop() { + // delete pod + try { + apply(specTempaltes, true); + } catch (IOException e) { + LOGGER.info("Error on removing interpreter pod", e); + } + + try { + kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60); + } catch (IOException e) { + LOGGER.debug("Error on waiting pod delete", e); + } + + + if (portForwardWatchdog != null) { + portForwardWatchdog.destroyProcess(); + } + } + + @Override + public String getHost() { + if (portForward) { + return "localhost"; + } else { + return getInterpreterPodDnsName(); + } + } + + @Override + public int getPort() { + return podPort; + } + + @Override + public boolean isRunning() { + try { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { + return true; + } + + String ret = kubectl.execAndGet(new String[]{ + "get", + String.format("pods/%s", getPodName()), + "-o", + "json" + }); + + if (ret == null) { + return false; + } + + Map<String, Object> pod = gson.fromJson(ret, new TypeToken<Map<String, Object>>() {}.getType()); + if (pod == null || !pod.containsKey("status")) { + return false; + } + + Map<String, Object> status = (Map<String, Object>) pod.get("status"); + if (status == null || !status.containsKey("phase")) { + return false; + } + + return "Running".equals(status.get("phase")) && started.get(); + } catch (Exception e) { + LOGGER.error("Can't get pod status", e); + return false; + } + } + + /** + * Apply spec file(s) in the path. + * @param path + */ + void apply(File path, boolean delete) throws IOException { + if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) { + LOGGER.info("Skip " + path.getAbsolutePath()); + } + + if (path.isDirectory()) { + File[] files = path.listFiles(); + Arrays.sort(files); + if (delete) { + ArrayUtils.reverse(files); + } + + for (File f : files) { + apply(f, delete); + } + } else if (path.isFile()) { + LOGGER.info("Apply " + path.getAbsolutePath()); + K8sSpecTemplate specTemplate = new K8sSpecTemplate(); + specTemplate.loadProperties(getTemplateBindings()); + + String spec = specTemplate.render(path); + if (delete) { + kubectl.delete(spec); + } else { + kubectl.apply(spec); + } + } else { + LOGGER.error("Can't apply " + path.getAbsolutePath()); + } + } + + @VisibleForTesting + Properties getTemplateBindings() throws IOException { + Properties k8sProperties = new Properties(); + + // k8s template properties + k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace()); + k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName()); + k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase()); + k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage); + k8sProperties.put("zeppelin.k8s.interpreter.group.id", interpreterGroupId); + k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName); + k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName); + k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo"); + k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort())); + k8sProperties.put("zeppelin.k8s.server.rpc.host", zeppelinServiceHost); + k8sProperties.put("zeppelin.k8s.server.rpc.portRange", zeppelinServiceRpcPort); + if (ownerUID() != null && ownerName() != null) { + k8sProperties.put("zeppelin.k8s.server.uid", ownerUID()); + k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName()); + } + + // environment variables + envs.put("SERVICE_DOMAIN", envs.getOrDefault("SERVICE_DOMAIN", System.getenv("SERVICE_DOMAIN"))); + envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin")); + + if (isSpark()) { + int webUiPort = 4040; + k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage); + if (isSparkOnKubernetes(properties)) { + envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions()); + } + envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark")); + + // configure interpreter property "zeppelin.spark.uiWebUrl" if not defined, to enable spark ui through reverse proxy + String webUrl = (String) properties.get("zeppelin.spark.uiWebUrl"); + if (webUrl == null || webUrl.trim().isEmpty()) { + properties.put("zeppelin.spark.uiWebUrl", + String.format("//%d-%s.%s", webUiPort, getPodName(), envs.get("SERVICE_DOMAIN"))); + } + } + + k8sProperties.put("zeppelin.k8s.envs", envs); + + // interpreter properties overrides the values + k8sProperties.putAll(Maps.fromProperties(properties)); + return k8sProperties; + } + + @VisibleForTesting + boolean isSpark() { + return "spark".equalsIgnoreCase(interpreterGroupName); + } + + boolean isSparkOnKubernetes(Properties interpreteProperties) { + String propertySparkMaster = (String) interpreteProperties.getOrDefault("master", ""); + if (propertySparkMaster.startsWith("k8s://")) { + return true; + } else { + return false; + } + } + + @VisibleForTesting + String buildSparkSubmitOptions() { + StringBuilder options = new StringBuilder(); + + options.append(" --master k8s://https://kubernetes.default.svc"); + options.append(" --deploy-mode client"); + if (properties.containsKey("spark.driver.memory")) { + options.append(" --driver-memory " + properties.get("spark.driver.memory")); + } + options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace()); + options.append(" --conf spark.executor.instances=1"); + options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName()); + options.append(" --conf spark.kubernetes.container.image=" + sparkImage); + options.append(" --conf spark.driver.bindAddress=0.0.0.0"); + options.append(" --conf spark.driver.host=" + getInterpreterPodDnsName()); + options.append(" --conf spark.driver.port=" + String.format("%d", getSparkDriverPort())); + options.append(" --conf spark.blockManager.port=" + String.format("%d", getSparkBlockmanagerPort())); + + return options.toString(); + } + + private String getInterpreterPodDnsName() { + return String.format("%s.%s.svc.cluster.local", + getPodName(), // service name and pod name is the same + kubectl.getNamespace()); + } + + /** + * See xxx-interpreter-pod.yaml + * @return + */ + @VisibleForTesting + int getSparkDriverPort() { + return 22321; + } + + /** + * See xxx-interpreter-pod.yaml + * @return + */ + @VisibleForTesting + int getSparkBlockmanagerPort() { + return 22322; + } + + + /** + * Get UID of owner (zeppelin-server pod) for garbage collection + * https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ + */ + private String ownerUID() { + return System.getenv("POD_UID"); + } + + private String ownerName() { + return System.getenv("POD_NAME"); + } + + private String getRandomString(int length) { + char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + + StringBuilder sb = new StringBuilder(); + Random random = new Random(); + for (int i = 0; i < length; i++) { + char c = chars[random.nextInt(chars.length)]; + sb.append(c); + } + String randomStr = sb.toString(); + + return randomStr; + } + + @Override + public void processStarted(int port, String host) { + LOGGER.info("Interpreter pod created {}:{}", host, port); + synchronized (started) { + started.set(true); + started.notify(); + } + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java new file mode 100644 index 0000000..2ed2c13 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.launcher; + +import com.hubspot.jinjava.Jinjava; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public class K8sSpecTemplate extends HashMap<String, Object> { + public String render(File templateFile) throws IOException { + String template = FileUtils.readFileToString(templateFile, Charset.defaultCharset()); + return render(template); + } + + public String render(String template) { + ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + Jinjava jinja = new Jinjava(); + return jinja.render(template, this); + } finally { + Thread.currentThread().setContextClassLoader(oldCl); + } + } + + public void loadProperties(Properties properties) { + Set<Entry<Object, Object>> entries = properties.entrySet(); + for (Entry entry : entries) { + String key = (String) entry.getKey(); + Object value = entry.getValue(); + + String[] keySplit = key.split("[.]"); + Map<String, Object> target = this; + for (int i = 0; i < keySplit.length - 1; i++) { + if (!target.containsKey(keySplit[i])) { + HashMap subEntry = new HashMap(); + target.put(keySplit[i], subEntry); + target = subEntry; + } else { + Object subEntry = target.get(keySplit[i]); + if (!(subEntry instanceof Map)) { + HashMap replace = new HashMap(); + replace.put("_", subEntry); + target.put(keySplit[i], replace); + } + target = (Map<String, Object>) target.get(keySplit[i]); + } + } + + if (target.get(keySplit[keySplit.length - 1]) instanceof Map) { + ((Map) target.get(keySplit[keySplit.length - 1])).put("_", value); + } else { + target.put(keySplit[keySplit.length - 1], value); + } + } + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java new file mode 100644 index 0000000..4f2ed91 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Interpreter Launcher which use shell script to launch the interpreter process. + */ +public class K8sStandardInterpreterLauncher extends InterpreterLauncher { + + private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); + private final Kubectl kubectl; + private InterpreterLaunchContext context; + + + public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException { + super(zConf, recoveryStorage); + kubectl = new Kubectl(zConf.getK8sKubectlCmd()); + kubectl.setNamespace(getNamespace()); + } + + @VisibleForTesting + K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) { + super(zConf, recoveryStorage); + this.kubectl = kubectl; + } + + + /** + * Check if i'm running inside of kubernetes or not. + * It should return truth regardless of ZeppelinConfiguration.getRunMode(). + * + * Normally, unless Zeppelin is running on Kubernetes, K8sStandardInterpreterLauncher shouldn't even have initialized. + * However, when ZeppelinConfiguration.getRunMode() is force 'k8s', InterpreterSetting.getLauncherPlugin() will try + * to use K8sStandardInterpreterLauncher. This is useful for development. It allows Zeppelin server running on your + * IDE and creates your interpreters in Kubernetes. So any code changes on Zeppelin server or kubernetes yaml spec + * can be applied without re-building docker image. + * @return + */ + boolean isRunningOnKubernetes() { + if (new File("/var/run/secrets/kubernetes.io").exists()) { + return true; + } else { + return false; + } + } + + /** + * Get current namespace + * @throws IOException + */ + String getNamespace() throws IOException { + if (isRunningOnKubernetes()) { + return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim(); + } else { + return "default"; + } + } + + /** + * Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes + * @return + */ + String getHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return "localhost"; + } + } + + /** + * get Zeppelin server host dns. + * return <hostname>.<namespace>.svc.cluster.local + * @throws IOException + */ + private String getZeppelinServiceHost() throws IOException { + if (isRunningOnKubernetes()) { + return String.format("%s.%s.svc.cluster.local", + getHostname(), // service name and pod name should be the same + getNamespace()); + } else { + return context.getZeppelinServerHost(); + } + } + + /** + * get Zeppelin server rpc port + * Read env variable "<HOSTNAME>_SERVICE_PORT_RPC" + */ + private String getZeppelinServiceRpcPort() { + String envServicePort = System.getenv( + String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase())); + if (envServicePort != null) { + return envServicePort; + } else { + return Integer.toString(context.getZeppelinServerRPCPort()); + } + } + + @Override + public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); + this.context = context; + this.properties = context.getProperties(); + int connectTimeout = getConnectTimeout(); + + return new K8sRemoteInterpreterProcess( + kubectl, + new File(zConf.getK8sTemplatesDir(), "interpreter"), + zConf.getK8sContainerImage(), + context.getInterpreterGroupId(), + context.getInterpreterSettingGroup(), + context.getInterpreterSettingName(), + properties, + buildEnvFromProperties(context), + getZeppelinServiceHost(), + getZeppelinServiceRpcPort(), + zConf.getK8sPortForward(), + zConf.getK8sSparkContainerImage(), + connectTimeout); + } + + protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { + Map<String, String> env = new HashMap<>(); + for (Object key : context.getProperties().keySet()) { + if (RemoteInterpreterUtils.isEnvString((String) key)) { + env.put((String) key, context.getProperties().getProperty((String) key)); + } + // TODO(zjffdu) move this to FlinkInterpreterLauncher + if (key.toString().equals("FLINK_HOME")) { + String flinkHome = context.getProperties().get(key).toString(); + env.put("FLINK_CONF_DIR", flinkHome + "/conf"); + env.put("FLINK_LIB_DIR", flinkHome + "/lib"); + } + } + env.put("INTERPRETER_GROUP_ID", context.getInterpreterGroupId()); + return env; + } + + String readFile(String path, Charset encoding) throws IOException { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + return new String(encoded, encoding); + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java new file mode 100644 index 0000000..2079d16 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.commons.exec.*; +import org.apache.commons.io.IOUtils; + +import java.io.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Kubectl { + private final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class); + private final String kubectlCmd; + private final Gson gson = new Gson(); + private String namespace; + + public Kubectl(String kubectlCmd) { + this.kubectlCmd = kubectlCmd; + } + + /** + * Override namespace. Otherwise use namespace provided in schema + * @param namespace + */ + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getNamespace() { + return namespace; + } + + public String apply(String spec) throws IOException { + return execAndGet(new String[]{"apply", "-f", "-"}, spec); + } + + public String delete(String spec) throws IOException { + return execAndGet(new String[]{"delete", "-f", "-"}, spec); + } + + public String wait(String resource, String waitFor, int timeoutSec) throws IOException { + try { + return execAndGet(new String[]{ + "wait", + resource, + String.format("--for=%s", waitFor), + String.format("--timeout=%ds", timeoutSec)}); + } catch (IOException e) { + if ("delete".equals(waitFor) && e.getMessage().contains("NotFound")) { + LOGGER.info("{} Not found. Maybe already deleted.", resource); + return ""; + } else { + throw e; + } + } + } + + public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException { + DefaultExecutor executor = new DefaultExecutor(); + CommandLine cmd = new CommandLine(kubectlCmd); + cmd.addArguments("port-forward"); + cmd.addArguments(resource); + cmd.addArguments(ports); + + ExecuteWatchdog watchdog = new ExecuteWatchdog(-1); + executor.setWatchdog(watchdog); + + executor.execute(cmd, new ExecuteResultHandler() { + @Override + public void onProcessComplete(int i) { + LOGGER.info("Port-forward stopped"); + } + + @Override + public void onProcessFailed(ExecuteException e) { + LOGGER.debug("port-forward process exit", e); + } + }); + + return watchdog; + } + + String execAndGet(String [] args) throws IOException { + return execAndGet(args, ""); + } + + @VisibleForTesting + String execAndGet(String [] args, String stdin) throws IOException { + InputStream ins = IOUtils.toInputStream(stdin); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + ArrayList<String> argsToOverride = new ArrayList<>(Arrays.asList(args)); + + // set namespace + if (namespace != null) { + argsToOverride.add("--namespace=" + namespace); + } + + LOGGER.info("kubectl " + argsToOverride); + LOGGER.debug(stdin); + + try { + int exitCode = execute( + argsToOverride.toArray(new String[0]), + ins, + stdout, + stderr + ); + + if (exitCode == 0) { + String output = new String(stdout.toByteArray()); + return output; + } else { + String output = new String(stderr.toByteArray()); + throw new IOException(String.format("non zero return code (%d). %s", exitCode, output)); + } + } catch (Exception e) { + String output = new String(stderr.toByteArray()); + throw new IOException(output, e); + } + } + + public int execute(String [] args, InputStream stdin, OutputStream stdout, OutputStream stderr) throws IOException { + DefaultExecutor executor = new DefaultExecutor(); + CommandLine cmd = new CommandLine(kubectlCmd); + cmd.addArguments(args); + + ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000); + executor.setWatchdog(watchdog); + + PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin); + executor.setStreamHandler(streamHandler); + return executor.execute(cmd); + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java new file mode 100644 index 0000000..ce2a2cb --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class K8sRemoteInterpreterProcessTest { + + @Test + public void testGetHostPort() { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + HashMap<String, String> envs = new HashMap<String, String>(); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "sh", + "shell", + properties, + envs, + "zeppelin.server.hostname", + "12320", + false, + "spark-container:1.0", + 10); + + // when + String host = intp.getHost(); + int port = intp.getPort(); + + // then + assertEquals(String.format("%s.%s.svc.cluster.local", intp.getPodName(), kubectl.getNamespace()), intp.getHost()); + assertEquals(12321, intp.getPort()); + } + + @Test + public void testPredefinedPortNumbers() { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + HashMap<String, String> envs = new HashMap<String, String>(); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "sh", + "shell", + properties, + envs, + "zeppelin.server.hostname", + "12320", + false, + "spark-container:1.0", + 10); + + + // following values are hardcoded in k8s/interpreter/100-interpreter.yaml. + // when change those values, update the yaml file as well. + assertEquals(12321, intp.getPort()); + assertEquals(22321, intp.getSparkDriverPort()); + assertEquals(22322, intp.getSparkBlockmanagerPort()); + } + + @Test + public void testGetTemplateBindings() throws IOException { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + properties.put("my.key1", "v1"); + HashMap<String, String> envs = new HashMap<String, String>(); + envs.put("MY_ENV1", "V1"); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "sh", + "shell", + properties, + envs, + "zeppelin.server.hostname", + "12320", + false, + "spark-container:1.0", + 10); + + // when + Properties p = intp.getTemplateBindings(); + + // then + assertEquals("default", p.get("zeppelin.k8s.namespace")); + assertEquals(intp.getPodName(), p.get("zeppelin.k8s.interpreter.pod.name")); + assertEquals("sh", p.get("zeppelin.k8s.interpreter.container.name")); + assertEquals("interpreter-container:1.0", p.get("zeppelin.k8s.interpreter.container.image")); + assertEquals("shared_process", p.get("zeppelin.k8s.interpreter.group.id")); + assertEquals("sh", p.get("zeppelin.k8s.interpreter.group.name")); + assertEquals("shell", p.get("zeppelin.k8s.interpreter.setting.name")); + assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo")); + assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange")); + assertEquals("zeppelin.server.hostname" , p.get("zeppelin.k8s.server.rpc.host")); + assertEquals("12320" , p.get("zeppelin.k8s.server.rpc.portRange")); + assertEquals("v1", p.get("my.key1")); + assertEquals("V1", envs.get("MY_ENV1")); + + envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs"); + assertEquals(true, envs.containsKey("SERVICE_DOMAIN")); + assertEquals(true, envs.containsKey("ZEPPELIN_HOME")); + } + + @Test + public void testGetTemplateBindingsForSpark() throws IOException { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + properties.put("my.key1", "v1"); + properties.put("master", "k8s://http://api"); + HashMap<String, String> envs = new HashMap<String, String>(); + envs.put("MY_ENV1", "V1"); + envs.put("SPARK_SUBMIT_OPTIONS", "my options"); + envs.put("SERVICE_DOMAIN", "mydomain"); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "spark", + "myspark", + properties, + envs, + "zeppelin.server.hostname", + "12320", + false, + "spark-container:1.0", + 10); + + // when + Properties p = intp.getTemplateBindings(); + + // then + assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image")); + assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl")); + + envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs"); + assertTrue( envs.containsKey("SPARK_HOME")); + + String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); + assertTrue(sparkSubmitOptions.startsWith("my options ")); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace())); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0")); + assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost())); + assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort())); + assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort())); + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java new file mode 100644 index 0000000..daf3773 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.launcher; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class K8sSpecTemplateTest { + @Test + public void testRender() { + // given template variables + K8sSpecTemplate template = new K8sSpecTemplate(); + template.put("name", "world"); + + // when + String spec = template.render("Hello {{name}}"); + + // then + assertEquals("Hello world", spec); + } + + @Test + public void testObject() { + K8sSpecTemplate template = new K8sSpecTemplate(); + template.put("k8s", ImmutableMap.of("key", "world")); + + // when + String spec = template.render("Hello {{k8s.key}}"); + + // then + assertEquals("Hello world", spec); + } + + @Test + public void testIterate() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + template.put("dict", ImmutableMap.of( + "k1", "v1", + "k2", "v2" + )); + + // when + String spec = template.render( + "{% for key, value in dict.items() %}" + + "key = {{key}}, value = {{value}}\n" + + "{% endfor %}" + ); + + // then + assertEquals( + "key = k1, value = v1\n" + + "key = k2, value = v2\n", spec); + } + + @Test + public void testLoadProperties() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s.intp.key1", "v1"); + p.put("k8s.intp.key2", "v2"); + p.put("k8s.key3", "v3"); + p.put("key4", "v4"); + + // when + template.loadProperties(p); + + // then + assertEquals("v4", template.get("key4")); + assertEquals("v3", ((Map) template.get("k8s")).get("key3")); + assertEquals("v2", ((Map) ((Map) template.get("k8s")).get("intp")).get("key2")); + assertEquals("v1", ((Map) ((Map) template.get("k8s")).get("intp")).get("key1")); + } + + @Test + public void testLoadPropertyOverrideString() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s", "v1"); + p.put("k8s.key1", "v2"); + + // when + template.loadProperties(p); + + // then + assertEquals("v1", ((Map) template.get("k8s")).get("_")); + assertEquals("v2", ((Map) template.get("k8s")).get("key1")); + } + + @Test + public void testLoadPropertyOverrideDict() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s.key1", "v2"); + p.put("k8s", "v1"); + + // when + template.loadProperties(p); + + // then + assertEquals("v1", ((Map) template.get("k8s")).get("_")); + assertEquals("v2", ((Map) template.get("k8s")).get("key1")); + } + + @Test + public void testLoadPropertyWithMap() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + Properties p = new Properties(); + p.put("k8s", ImmutableMap.of("k1", "v1")); + + // when + template.loadProperties(p); + + // then + assertEquals("v1", ((Map) template.get("k8s")).get("k1")); + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java new file mode 100644 index 0000000..74805cd --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * In the future, test may use minikube on travis for end-to-end test + * https://github.com/LiliC/travis-minikube + * https://blog.travis-ci.com/2017-10-26-running-kubernetes-on-travis-ci-with-minikube + */ +public class K8sStandardInterpreterLauncherTest { + @Before + public void setUp() { + for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { + System.clearProperty(confVar.getVarName()); + } + } + + @Test + public void testK8sLauncher() throws IOException { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl); + Properties properties = new Properties(); + properties.setProperty("ENV_1", "VALUE_1"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("CALLBACK_HOST", "zeppelin-server.default.svc.cluster.local"); + properties.setProperty("CALLBACK_PORT", "12320"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext( + properties, + option, + null, + "user1", + "intpGroupId", + "groupId", + "sh", + "name", + 0, + "host"); + + // when + InterpreterClient client = launcher.launch(context); + + // then + assertTrue(client instanceof K8sRemoteInterpreterProcess); + } +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java new file mode 100644 index 0000000..072cf94 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.commons.io.IOUtils; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class KubectlTest { + + @Test(expected = IOException.class) + public void testKubeclCommandNotExists() throws IOException { + // given + Kubectl kubectl = new Kubectl("invalidcommand"); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + + // when + kubectl.execute(new String[] {}, null, stdout, stderr); + + // then throw IOException + } + + @Test + public void testStdout() throws IOException { + // given + Kubectl kubectl = new Kubectl("echo"); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + + // when + kubectl.execute(new String[] {"hello"}, null, stdout, stderr); + + // then + assertEquals("hello\n", stdout.toString()); + assertEquals("", stderr.toString()); + } + + @Test + public void testStderr() throws IOException { + // given + Kubectl kubectl = new Kubectl("sh"); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + + // when + try { + kubectl.execute(new String[]{"-c", "yoyo"}, null, stdout, stderr); + } catch (IOException e) { + } + + // then + assertEquals("", stdout.toString()); + assertTrue(0 < stderr.toString().length()); + } + + @Test + public void testStdin() throws IOException { + // given + Kubectl kubectl = new Kubectl("wc"); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + InputStream stdin = IOUtils.toInputStream("Hello"); + + // when + kubectl.execute(new String[]{"-c"}, stdin, stdout, stderr); + + // then + assertEquals("5", stdout.toString().trim()); + assertEquals("", stderr.toString()); + } + + @Test + public void testExecSpecAndGet() throws IOException { + // given + Kubectl kubectl = new Kubectl("cat"); + String spec = "{'k1': 'v1', 'k2': 2}"; + + // when + String result = kubectl.execAndGet(new String[]{}, spec); + + // then + assertEquals(spec, result); + } +} diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml index b9dc388..fc7bd70 100644 --- a/zeppelin-plugins/pom.xml +++ b/zeppelin-plugins/pom.xml @@ -47,6 +47,7 @@ <module>notebookrepo/filesystem</module> <module>launcher/standard</module> + <module>launcher/k8s-standard</module> <module>launcher/spark</module> </modules> diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 4411674..95530b0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -649,13 +649,21 @@ public class InterpreterSetting { } public String getLauncherPlugin() { - if (group.equals("spark")) { - return "SparkInterpreterLauncher"; + if (isRunningOnKubernetes()) { + return "K8sStandardInterpreterLauncher"; } else { - return "StandardInterpreterLauncher"; + if (group.equals("spark")) { + return "SparkInterpreterLauncher"; + } else { + return "StandardInterpreterLauncher"; + } } } + private boolean isRunningOnKubernetes() { + return conf.getRunMode() == ZeppelinConfiguration.RUN_MODE.K8S; + } + public boolean isUserAuthorized(List<String> userAndRoles) { if (!option.permissionIsSet()) { return true; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index bd612d6..d7dd304 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -48,6 +48,7 @@ import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourceId; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,8 +165,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " + registerInfo.getInterpreterGroupId()); } - ((RemoteInterpreterManagedProcess) interpreterProcess) - .processStarted(registerInfo.port, registerInfo.host); + + interpreterProcess.processStarted(registerInfo.port, registerInfo.host); } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index db6d263..a990808 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -189,7 +189,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } - // called by RemoteInterpreterServer to notify that RemoteInterpreter Process is started + @Override public void processStarted(int port, String host) { this.port = port; this.host = host; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index e8b3482..c768143 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -137,4 +137,9 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { public interface RemoteFunction<T> { T call(Client client) throws Exception; } + + /** + * called by RemoteInterpreterEventServer to notify that RemoteInterpreter Process is started + */ + public abstract void processStarted(int port, String host); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index 69daa6f..19da682 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -88,4 +88,9 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { public boolean isRunning() { return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); } + + @Override + public void processStarted(int port, String host) { + // assume process is externally managed. nothing to do + } }