This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new e4eaa5e [ZEPPELIN-5070] Improve start/shutdown and signal handling e4eaa5e is described below commit e4eaa5ee974465b24c2f3bf89ce14040d3559f7b Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Oct 21 16:55:22 2020 +0200 [ZEPPELIN-5070] Improve start/shutdown and signal handling ### What is this PR for? These PR touch the start and stop procedures of all interpreters. - improved start script with [shellcheck](https://www.shellcheck.net/) recommendations - Use `exec [...]` instead of `eval [..] &`, which means that the Java interpreter process is not a fork of the shell script -> no trap handling required in the start script -> signals land in the JVM - Correct escaping is done by the start-script - remove anonymous threads in 'RemoteInterpreterServer.java and give the thread nice names - Use TServer's StopTimeoutVal and stopTimeoutUnit for faster shutdown - remove special K8s shutdown handling - unregister interpreter during unpredictable shutdown ### What type of PR is it? - Improvement ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5070 ### How should this be tested? * Travis-CI: https://travis-ci.com/github/Reamer/zeppelin/builds/198263926 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #3925 from Reamer/signal_start_stop_handling and squashes the following commits: c4da96114 [Philipp Dallig] Add logback.xml for S3 tests to disable debug log messages 6fbb9735e [Philipp Dallig] Escaping is done in start-script bc2f7dc46 [Philipp Dallig] Start ThriftServer in Run-Thread RemoteInterpreterServer-Thread and close the TServerSocket 33d951c3b [Philipp Dallig] Use args instead of command and start zeppelin-interpreter application in tini 2626af28c [Philipp Dallig] Remove special signal handling in K8s 683345de7 [Philipp Dallig] Use exec instead of eval to start interpreter application, no need to fork b5c9bb0c0 [Philipp Dallig] Use glob instead of find - SC2044 7766f79c5 [Philipp Dallig] Use { ..; } instead of (..) to avoid subshell overhead. - SC2235 2194adade [Philipp Dallig] Declare and assign separately to avoid masking return values - SC2155 d28abe167 [Philipp Dallig] Remove quotes from lofgiles 143412181 [Philipp Dallig] Double quote to prevent globbing and word splitting - SC2086 3597c0fd8 [Philipp Dallig] remove unnecessary function calls or move to new $() style ee3cbdad3 [Philipp Dallig] Use "-n" instead of "! -z" SC2236 1a8e3a81b [Philipp Dallig] Improve py4j pattern 13bbfecd4 [Philipp Dallig] cleanup and style changes 282f16732 [Philipp Dallig] Indicate a force shutting down with an other status code bce7cc6cb [Philipp Dallig] Add stopTimeoutVal and stopTimeoutUnit to TServer 3d9c36487 [Philipp Dallig] Remove sun.misc.Signal and sun.misc.SignalHandler, which prints compilation warnings 05201c40b [Philipp Dallig] Add a RegisterRunnable --- bin/common.sh | 27 +- bin/interpreter.sh | 79 ++--- k8s/interpreter/100-interpreter-spec.yaml | 22 +- scripts/docker/zeppelin-interpreter/Dockerfile | 3 +- .../remote/RemoteInterpreterServer.java | 332 ++++++++++++--------- .../interpreter/remote/RemoteInterpreterUtils.java | 36 +-- .../zeppelin/interpreter/remote/YarnUtils.java | 4 +- .../zeppelin/interpreter/util/ProcessLauncher.java | 11 +- .../remote/RemoteInterpreterUtilsTest.java | 15 +- .../notebookrepo/s3/src/test/resources/logback.xml | 28 ++ .../interpreter/RemoteInterpreterEventServer.java | 23 +- .../launcher/SparkInterpreterLauncher.java | 8 +- .../launcher/SparkInterpreterLauncherTest.java | 24 +- .../org/apache/zeppelin/notebook/NotebookTest.java | 2 + 14 files changed, 319 insertions(+), 295 deletions(-) diff --git a/bin/common.sh b/bin/common.sh index efb2ebe..6b8a4bc 100644 --- a/bin/common.sh +++ b/bin/common.sh @@ -16,8 +16,8 @@ # limitations under the License. # -if [ -L ${BASH_SOURCE-$0} ]; then - FWDIR=$(dirname $(readlink "${BASH_SOURCE-$0}")) +if [ -L "${BASH_SOURCE-$0}" ]; then + FWDIR=$(dirname "$(readlink "${BASH_SOURCE-$0}")") else FWDIR=$(dirname "${BASH_SOURCE-$0}") fi @@ -25,7 +25,8 @@ fi if [[ -z "${ZEPPELIN_HOME}" ]]; then # Make ZEPPELIN_HOME look cleaner in logs by getting rid of the # extra ../ - export ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)" + ZEPPELIN_HOME="$(cd "${FWDIR}/.." || exit; pwd)" + export ZEPPELIN_HOME fi if [[ -z "${ZEPPELIN_CONF_DIR}" ]]; then @@ -44,7 +45,8 @@ if [[ -z "${ZEPPELIN_WAR}" ]]; then if [[ -d "${ZEPPELIN_HOME}/zeppelin-web/dist" ]]; then export ZEPPELIN_WAR="${ZEPPELIN_HOME}/zeppelin-web/dist" else - export ZEPPELIN_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-[0-9]*.war") + ZEPPELIN_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-[0-9]*.war") + export ZEPPELIN_WAR fi fi @@ -52,7 +54,8 @@ if [[ -z "${ZEPPELIN_ANGULAR_WAR}" ]]; then if [[ -d "${ZEPPELIN_HOME}/zeppelin-web/dist" ]]; then export ZEPPELIN_ANGULAR_WAR="${ZEPPELIN_HOME}/zeppelin-web-angular/dist/zeppelin" else - export ZEPPELIN_ANGULAR_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-angular*.war") + ZEPPELIN_ANGULAR_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-angular*.war") + export ZEPPELIN_ANGULAR_WAR fi fi @@ -70,7 +73,7 @@ function check_java_version() { JVM_VERSION=$(echo "$jvmver"|sed -e 's|^1\.\([0-9][0-9]*\)\..*$|\1|') fi - if [ "$JVM_VERSION" -lt 8 ] || ([ "$JVM_VERSION" -eq 8 ] && [ "${jvmver#*_}" -lt 151 ]) ; then + if [ "$JVM_VERSION" -lt 8 ] || { [ "$JVM_VERSION" -eq 8 ] && [ "${jvmver#*_}" -lt 151 ]; } ; then echo "Apache Zeppelin requires either Java 8 update 151 or newer" exit 1; fi @@ -78,7 +81,7 @@ function check_java_version() { function addEachJarInDir(){ if [[ -d "${1}" ]]; then - for jar in $(find -L "${1}" -maxdepth 1 -name '*jar'); do + for jar in "${1}"/*.jar ; do ZEPPELIN_CLASSPATH="$jar:$ZEPPELIN_CLASSPATH" done fi @@ -86,7 +89,7 @@ function addEachJarInDir(){ function addEachJarInDirRecursive(){ if [[ -d "${1}" ]]; then - for jar in $(find -L "${1}" -type f -name '*jar'); do + for jar in "${1}"/**/*.jar ; do ZEPPELIN_CLASSPATH="$jar:$ZEPPELIN_CLASSPATH" done fi @@ -94,7 +97,7 @@ function addEachJarInDirRecursive(){ function addEachJarInDirRecursiveForIntp(){ if [[ -d "${1}" ]]; then - for jar in ${1}/*.jar; do + for jar in "${1}"/*.jar; do ZEPPELIN_INTP_CLASSPATH="$jar:${ZEPPELIN_INTP_CLASSPATH}" done fi @@ -120,7 +123,7 @@ function getZeppelinVersion(){ fi addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib" CLASSPATH+=":${ZEPPELIN_CLASSPATH}" - $ZEPPELIN_RUNNER -cp $CLASSPATH $ZEPPELIN_COMMANDLINE_MAIN -v + $ZEPPELIN_RUNNER -cp "${CLASSPATH}" "${ZEPPELIN_COMMANDLINE_MAIN}" -v exit 0 } @@ -149,9 +152,9 @@ export JAVA_OPTS JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}" if [[ -n "${ZEPPELIN_IN_DOCKER}" ]]; then - JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j_docker.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2_docker.properties'" + JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j_docker.properties -Dlog4j.configurationFile=file://${ZEPPELIN_CONF_DIR}/log4j2_docker.properties" elif [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then - JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2.properties'" + JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties -Dlog4j.configurationFile=file://${ZEPPELIN_CONF_DIR}/log4j2.properties" else JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties" fi diff --git a/bin/interpreter.sh b/bin/interpreter.sh index d1d6315..2eac09d 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -35,7 +35,7 @@ if [ -f /proc/self/cgroup ] && [ -n "$(command -v getent)" ]; then set +e uidentry="$(getent passwd "$myuid")" set -e - + # If there is no passwd entry for the container UID, attempt to create one if [ -z "$uidentry" ] ; then if [ -w /etc/passwd ] ; then @@ -129,22 +129,22 @@ ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/zeppelin-interpreter-${INTERPRETER_GROUP_I if [[ -z "$ZEPPELIN_IMPERSONATE_CMD" ]]; then if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then - ZEPPELIN_IMPERSONATE_RUN_CMD=`echo "ssh ${ZEPPELIN_IMPERSONATE_USER}@localhost" ` + ZEPPELIN_IMPERSONATE_RUN_CMD=("ssh" "${ZEPPELIN_IMPERSONATE_USER}@localhost") fi else ZEPPELIN_IMPERSONATE_RUN_CMD=$(eval "echo ${ZEPPELIN_IMPERSONATE_CMD} ") fi -if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then +if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]]; then ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-" fi ZEPPELIN_LOGFILE+="${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log" -JAVA_INTP_OPTS+=" -Dzeppelin.log.file='${ZEPPELIN_LOGFILE}'" +JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}" if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}" - $(mkdir -p "${ZEPPELIN_LOG_DIR}") + mkdir -p "${ZEPPELIN_LOG_DIR}" fi # set spark related env variables @@ -152,16 +152,15 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then # run kinit if [[ -n "${ZEPPELIN_SERVER_KERBEROS_KEYTAB}" ]] && [[ -n "${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}" ]]; then - kinit -kt ${ZEPPELIN_SERVER_KERBEROS_KEYTAB} ${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL} + kinit -kt "${ZEPPELIN_SERVER_KERBEROS_KEYTAB}" "${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}" fi if [[ -n "${SPARK_HOME}" ]]; then export SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit" - SPARK_APP_JAR="$(ls ${ZEPPELIN_HOME}/interpreter/spark/spark-interpreter*.jar)" + SPARK_APP_JAR="$(ls "${ZEPPELIN_HOME}"/interpreter/spark/spark-interpreter*.jar)" # This will evantually passes SPARK_APP_JAR to classpath of SparkIMain ZEPPELIN_INTP_CLASSPATH+=":${SPARK_APP_JAR}" - pattern="$SPARK_HOME/python/lib/py4j-*-src.zip" - py4j=($pattern) + py4j=("${SPARK_HOME}"/python/lib/py4j-*-src.zip) # pick the first match py4j zip - there should only be one export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH" export PYTHONPATH="${py4j[0]}:$PYTHONPATH" @@ -178,8 +177,7 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then addJarInDirForIntp "${INTERPRETER_DIR}/dep" - pattern="${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-*-src.zip" - py4j=($pattern) + py4j=("${ZEPPELIN_HOME}"/interpreter/spark/pyspark/py4j-*-src.zip) # pick the first match py4j zip - there should only be one PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${py4j[0]}" @@ -246,14 +244,14 @@ elif [[ "${INTERPRETER_ID}" == "flink" ]]; then ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" # Don't use `hadoop classpath` if flink-hadoop-shaded in in lib folder flink_hadoop_shaded_jar=$(find "${FLINK_HOME}/lib" -name 'flink-shaded-hadoop-*.jar') - if [[ ! -z "$flink_hadoop_shaded_jar" ]]; then + if [[ -n "$flink_hadoop_shaded_jar" ]]; then echo "" else if [[ ! ( -x "$(command -v hadoop)" ) && ( "${ZEPPELIN_INTERPRETER_LAUNCHER}" != "yarn" ) ]]; then echo 'Error: hadoop is not in PATH when HADOOP_CONF_DIR is specified and no flink-shaded-hadoop jar ' exit 1 fi - ZEPPELIN_INTP_CLASSPATH+=":`hadoop classpath`" + ZEPPELIN_INTP_CLASSPATH+=":$(hadoop classpath)" fi export HADOOP_CONF_DIR=${HADOOP_CONF_DIR} else @@ -271,59 +269,26 @@ fi addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}" -if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then +if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]]; then if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then - suid="$(id -u ${ZEPPELIN_IMPERSONATE_USER})" + suid="$(id -u "${ZEPPELIN_IMPERSONATE_USER}")" if [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then - INTERPRETER_RUN_COMMAND=${ZEPPELIN_IMPERSONATE_RUN_CMD}" '" + INTERPRETER_RUN_COMMAND+=("${ZEPPELIN_IMPERSONATE_RUN_CMD[@]}") if [[ -f "${ZEPPELIN_CONF_DIR}/zeppelin-env.sh" ]]; then - INTERPRETER_RUN_COMMAND+=" source "${ZEPPELIN_CONF_DIR}'/zeppelin-env.sh;' + INTERPRETER_RUN_COMMAND+=("source" "${ZEPPELIN_CONF_DIR}/zeppelin-env.sh;") fi fi fi fi if [[ -n "${SPARK_SUBMIT}" ]]; then - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} \"${INTP_GROUP_ID}\" ${INTP_PORT}` -else - INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} \"${INTP_GROUP_ID}\" ${INTP_PORT}` -fi - - -if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then - INTERPRETER_RUN_COMMAND+="'" -fi - -echo "Interpreter launch command: $INTERPRETER_RUN_COMMAND" -eval $INTERPRETER_RUN_COMMAND & -pid=$! - -if [[ -z "${pid}" ]]; then - exit 1; + IFS=' ' read -r -a SPARK_SUBMIT_OPTIONS_ARRAY <<< "${SPARK_SUBMIT_OPTIONS}" + IFS=' ' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}" + INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}") else - echo ${pid} > "${ZEPPELIN_PID}" + IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}" + IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}" + INTERPRETER_RUN_COMMAND+=("${ZEPPELIN_RUNNER}" "${JAVA_INTP_OPTS_ARRAY[@]}" "${ZEPPELIN_INTP_MEM_ARRAY[@]}" "-cp" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "${ZEPPELIN_SERVER}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}") fi - -trap 'shutdown_hook;' SIGTERM SIGINT SIGQUIT -function shutdown_hook() { - local count - count=0 - echo "trying to shutdown..." - while [[ "${count}" -lt 10 ]]; do - $(kill ${pid} > /dev/null 2> /dev/null) - if kill -0 ${pid} > /dev/null 2>&1; then - sleep 3 - let "count+=1" - else - break - fi - if [[ "${count}" == "5" ]]; then - $(kill -9 ${pid} > /dev/null 2> /dev/null) - fi - done -} - -wait - -rm -f "${ZEPPELIN_PID}" > /dev/null 2> /dev/null +exec "${INTERPRETER_RUN_COMMAND[@]}" diff --git a/k8s/interpreter/100-interpreter-spec.yaml b/k8s/interpreter/100-interpreter-spec.yaml index 76f1dea..116b0df 100644 --- a/k8s/interpreter/100-interpreter-spec.yaml +++ b/k8s/interpreter/100-interpreter-spec.yaml @@ -43,12 +43,22 @@ spec: containers: - name: {{zeppelin.k8s.interpreter.container.name}} image: {{zeppelin.k8s.interpreter.container.image}} - command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}} -r {{zeppelin.k8s.interpreter.rpc.portRange}} -c {{zeppelin.k8s.server.rpc.service}} -p {{zeppelin.k8s.server.rpc.portRange}} -i {{zeppelin.k8s.interpreter.group.id}} -l {{zeppelin.k8s.interpreter.localRepo}} -g {{zeppelin.k8s.interpreter.setting.name}}"] - lifecycle: - preStop: - exec: - # SIGTERM triggers a quick exit; gracefully terminate instead - command: ["sh", "-c", "ps -ef | grep org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer | grep -v grep | awk '{print $2}' | xargs kill"] + args: + - "$(ZEPPELIN_HOME)/bin/interpreter.sh" + - "-d" + - "$(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}}" + - "-r" + - "{{zeppelin.k8s.interpreter.rpc.portRange}}" + - "-c" + - "{{zeppelin.k8s.server.rpc.service}}" + - "-p" + - "{{zeppelin.k8s.server.rpc.portRange}}" + - "-i" + - "{{zeppelin.k8s.interpreter.group.id}}" + - "-l" + - "{{zeppelin.k8s.interpreter.localRepo}}/{{zeppelin.k8s.interpreter.setting.name}}" + - "-g" + - "{{zeppelin.k8s.interpreter.setting.name}}" env: {% for key, value in zeppelin.k8s.envs.items() %} - name: {{key}} diff --git a/scripts/docker/zeppelin-interpreter/Dockerfile b/scripts/docker/zeppelin-interpreter/Dockerfile index 090d1cc..0c9d956 100644 --- a/scripts/docker/zeppelin-interpreter/Dockerfile +++ b/scripts/docker/zeppelin-interpreter/Dockerfile @@ -26,7 +26,7 @@ ENV VERSION="${version}" \ RUN set -ex && \ apt-get -y update && \ - DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-8-jre-headless wget && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-8-jre-headless wget tini && \ # Cleanup rm -rf /var/lib/apt/lists/* && \ apt-get autoclean && \ @@ -80,4 +80,5 @@ RUN mkdir -p "${Z_HOME}/logs" "${Z_HOME}/run" "${Z_HOME}/local-repo" && \ chmod -R 775 "${Z_HOME}/logs" "${Z_HOME}/run" "${Z_HOME}/local-repo" USER 1000 +ENTRYPOINT [ "/usr/bin/tini", "--" ] WORKDIR ${Z_HOME} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 2bf9adf..453a7e3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransportException; import org.apache.zeppelin.cluster.ClusterManagerClient; import org.apache.zeppelin.cluster.meta.ClusterMeta; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -73,9 +74,6 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.misc.Signal; -import sun.misc.SignalHandler; - import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -109,6 +107,8 @@ public class RemoteInterpreterServer extends Thread private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterServer.class); + private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000; + private String interpreterGroupId; private InterpreterGroup interpreterGroup; private AngularObjectRegistry angularObjectRegistry; @@ -130,10 +130,6 @@ public class RemoteInterpreterServer extends Thread private final Map<String, RunningApplication> runningApplications = Collections.synchronizedMap(new HashMap<String, RunningApplication>()); - private Map<String, Object> remoteWorksResponsePool; - - private static final long DEFAULT_SHUTDOWN_TIMEOUT = 2000; - // Hold information for manual progress update private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>(); @@ -150,6 +146,8 @@ public class RemoteInterpreterServer extends Thread // cluster manager client private ClusterManagerClient clusterManagerClient; + private static Thread shutdownThread; + public RemoteInterpreterServer(String intpEventServerHost, int intpEventServerPort, String interpreterGroupId, @@ -166,6 +164,8 @@ public class RemoteInterpreterServer extends Thread if (null != intpEventServerHost) { this.intpEventServerHost = intpEventServerHost; this.intpEventServerPort = intpEventServerPort; + this.port = RemoteInterpreterUtils.findAvailablePort(portRange); + this.host = RemoteInterpreterUtils.findAvailableHostAddress(); if (!isTest) { LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port, intpEventServerHost, intpEventServerPort); @@ -177,69 +177,30 @@ public class RemoteInterpreterServer extends Thread } this.isTest = isTest; this.interpreterGroupId = interpreterGroupId; - RemoteInterpreterService.Processor<RemoteInterpreterServer> processor = - new RemoteInterpreterService.Processor<>(this); - TServerSocket serverTransport; - if (null == intpEventServerHost) { - // Dev Interpreter - serverTransport = new TServerSocket(intpEventServerPort); - } else { - serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange); - this.port = serverTransport.getServerSocket().getLocalPort(); - this.host = RemoteInterpreterUtils.findAvailableHostAddress(); - LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port); - } - server = new TThreadPoolServer( - new TThreadPoolServer.Args(serverTransport).processor(processor)); - remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>()); - } @Override public void run() { - if (null != intpEventServerHost && !isTest) { - new Thread(new Runnable() { - boolean interrupted = false; - - @Override - public void run() { - while (!interrupted && !server.isServing()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - interrupted = true; - } - } - if (!interrupted) { - RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId); - try { - LOGGER.info("Registering interpreter process"); - intpEventClient.registerInterpreterProcess(registerInfo); - LOGGER.info("Registered interpreter process"); - } catch (Exception e) { - LOGGER.error("Error while registering interpreter: {}, cause: {}", registerInfo, e); - try { - shutdown(); - } catch (TException e1) { - LOGGER.warn("Exception occurs while shutting down", e1); - } - } - } - - if (launcherEnv != null && "yarn".endsWith(launcherEnv)) { - try { - YarnUtils.register(host, port); - ScheduledExecutorService yarnHeartbeat = ExecutorFactory.singleton() - .createOrGetScheduled("RM-Heartbeat", 1); - yarnHeartbeat.scheduleAtFixedRate(YarnUtils::heartbeat, 0, 1, TimeUnit.MINUTES); - } catch (Exception e) { - LOGGER.error("Fail to register yarn app", e); - } - } - } - }).start(); + RemoteInterpreterService.Processor<RemoteInterpreterServer> processor = + new RemoteInterpreterService.Processor<>(this); + try (TServerSocket tSocket = new TServerSocket(port)){ + server = new TThreadPoolServer( + new TThreadPoolServer.Args(tSocket) + .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT) + .stopTimeoutUnit(TimeUnit.MILLISECONDS) + .processor(processor)); + + if (null != intpEventServerHost && !isTest) { + Thread registerThread = new Thread(new RegisterRunnable()); + registerThread.setName("RegisterThread"); + registerThread.start(); + } + LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port); + server.serve(); + } catch (TTransportException e) { + LOGGER.error("Failure in TTransport", e); } - server.serve(); + LOGGER.info("RemoteInterpreterServer-Thread finished"); } @Override @@ -270,7 +231,6 @@ public class RemoteInterpreterServer extends Thread @Override public void shutdown() throws TException { - // unRegisterInterpreterProcess should be a sync operation (outside of shutdown thread), // otherwise it would cause data mismatch between zeppelin server & interpreter process. // e.g. zeppelin server start a new interpreter process, while previous interpreter process @@ -283,62 +243,16 @@ public class RemoteInterpreterServer extends Thread LOGGER.error("Fail to unregister remote interpreter process", e); } } - - Thread shutDownThread = new Thread(() -> { - LOGGER.info("Shutting down..."); - // delete interpreter cluster meta - deleteClusterMeta(); - - if (interpreterGroup != null) { - synchronized (interpreterGroup) { - for (List<Interpreter> session : interpreterGroup.values()) { - for (Interpreter interpreter : session) { - try { - interpreter.close(); - } catch (InterpreterException e) { - LOGGER.warn("Fail to close interpreter", e); - } - } - } - } + if (shutdownThread != null) { + // no need to call shutdownhook twice + if (Runtime.getRuntime().removeShutdownHook(shutdownThread)) { + LOGGER.debug("ShutdownHook removed, because of a regular shutdown"); + } else { + LOGGER.warn("The ShutdownHook could not be removed"); } - if (!isTest) { - SchedulerFactory.singleton().destroy(); - ExecutorFactory.singleton().shutdownAll(); - } - - if ("yarn".equals(launcherEnv)) { - try { - YarnUtils.unregister(true, ""); - } catch (Exception e) { - LOGGER.error("Fail to unregister yarn app", e); - } - } - - server.stop(); - - // server.stop() does not always finish server.serve() loop - // sometimes server.serve() is hanging even after server.stop() call. - // this case, need to force kill the process - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < DEFAULT_SHUTDOWN_TIMEOUT && - server.isServing()) { - try { - Thread.sleep(300); - } catch (InterruptedException e) { - LOGGER.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e); - } - } - - if (server.isServing()) { - LOGGER.info("Force shutting down"); - System.exit(0); - } - - LOGGER.info("Shutting down"); - }, "Shutdown-Thread"); + } + Thread shutDownThread = new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_CALL); shutDownThread.start(); } @@ -387,21 +301,21 @@ public class RemoteInterpreterServer extends Thread new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange); remoteInterpreterServer.start(); - // add signal handler - Signal.handle(new Signal("TERM"), new SignalHandler() { - @Override - public void handle(Signal signal) { - try { - LOGGER.info("Receive TERM Signal"); - remoteInterpreterServer.shutdown(); - } catch (TException e) { - LOGGER.error("Error on shutdown RemoteInterpreterServer", e); - } - } - }); + /* + * Registration of a ShutdownHook in case of an unpredictable system call + * Examples: STRG+C, SIGTERM via kill + */ + shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK); + Runtime.getRuntime().addShutdownHook(shutdownThread); remoteInterpreterServer.join(); LOGGER.info("RemoteInterpreterServer thread is finished"); + + /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully. + * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd + * should be part of the next release and solve the problem. + * We may have other threads that are not terminated successfully. + */ System.exit(0); } @@ -425,20 +339,6 @@ public class RemoteInterpreterServer extends Thread clusterManagerClient.putClusterMeta(INTP_PROCESS_META, interpreterGroupId, meta); } - private void deleteClusterMeta() { - if (!zConf.isClusterMode()){ - return; - } - - try { - // delete interpreter cluster meta - clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId); - Thread.sleep(300); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - @Override public void createInterpreter(String interpreterGroupId, String sessionId, String className, Map<String, String> properties, String userName) throws TException { @@ -683,6 +583,144 @@ public class RemoteInterpreterServer extends Thread context.getNoteGui()); } + class RegisterRunnable implements Runnable { + + @Override + public void run() { + LOGGER.info("Start registration"); + // wait till the server is serving + while (!Thread.currentThread().isInterrupted() && server != null && !server.isServing()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.info("InterruptedException received", e); + Thread.currentThread().interrupt(); + } + } + if (!Thread.currentThread().isInterrupted()) { + RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId); + try { + LOGGER.info("Registering interpreter process"); + intpEventClient.registerInterpreterProcess(registerInfo); + LOGGER.info("Registered interpreter process"); + } catch (Exception e) { + LOGGER.error("Error while registering interpreter: {}, cause: {}", registerInfo, e); + try { + shutdown(); + } catch (TException e1) { + LOGGER.warn("Exception occurs while shutting down", e1); + } + } + } + + if (launcherEnv != null && "yarn".endsWith(launcherEnv)) { + try { + YarnUtils.register(host, port); + ScheduledExecutorService yarnHeartbeat = ExecutorFactory.singleton() + .createOrGetScheduled("RM-Heartbeat", 1); + yarnHeartbeat.scheduleAtFixedRate(YarnUtils::heartbeat, 0, 1, TimeUnit.MINUTES); + } catch (Exception e) { + LOGGER.error("Fail to register yarn app", e); + } + } + LOGGER.info("Registration finished"); + } + } + + class ShutdownThread extends Thread { + private final String cause; + + public static final String CAUSE_SHUTDOWN_HOOK = "ShutdownHook"; + public static final String CAUSE_SHUTDOWN_CALL = "ShutdownCall"; + + public ShutdownThread(String cause) { + super("ShutdownThread"); + this.cause = cause; + } + + @Override + public void run() { + LOGGER.info("Shutting down..."); + LOGGER.info("Shutdown initialized by {}", cause); + // delete interpreter cluster meta + deleteClusterMeta(); + + if (interpreterGroup != null) { + synchronized (interpreterGroup) { + for (List<Interpreter> session : interpreterGroup.values()) { + for (Interpreter interpreter : session) { + try { + interpreter.close(); + } catch (InterpreterException e) { + LOGGER.warn("Fail to close interpreter", e); + } + } + } + } + } + if (!isTest) { + SchedulerFactory.singleton().destroy(); + ExecutorFactory.singleton().shutdownAll(); + } + + if ("yarn".equals(launcherEnv)) { + try { + YarnUtils.unregister(true, ""); + } catch (Exception e) { + LOGGER.error("Fail to unregister yarn app", e); + } + } + // Try to unregister the interpreter process in case the interpreter process exit unpredictable via ShutdownHook + if (intpEventClient != null && CAUSE_SHUTDOWN_HOOK.equals(cause)) { + try { + LOGGER.info("Unregister interpreter process"); + intpEventClient.unRegisterInterpreterProcess(); + } catch (Exception e) { + LOGGER.error("Fail to unregister remote interpreter process", e); + } + } + + server.stop(); + + // server.stop() does not always finish server.serve() loop + // sometimes server.serve() is hanging even after server.stop() call. + // this case, need to force kill the process + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < (DEFAULT_SHUTDOWN_TIMEOUT + 100) && + server.isServing()) { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + LOGGER.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e); + Thread.currentThread().interrupt(); + } + } + + if (server.isServing()) { + LOGGER.info("Force shutting down"); + System.exit(1); + } + + LOGGER.info("Shutting down"); + } + + private void deleteClusterMeta() { + if (zConf == null || !zConf.isClusterMode()){ + return; + } + + try { + // delete interpreter cluster meta + clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId); + Thread.sleep(300); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } + } + } + class InterpretJobListener implements JobListener { @Override @@ -1385,6 +1423,4 @@ public class RemoteInterpreterServer extends Thread this.paragraphId = paragraphId; } } - - ; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 5a0a3ab..e30b325 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -18,8 +18,6 @@ package org.apache.zeppelin.interpreter.remote; import org.apache.commons.lang3.StringUtils; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,31 +44,18 @@ public class RemoteInterpreterUtils { } public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException { - int port; - try (ServerSocket socket = new ServerSocket(0);) { - port = socket.getLocalPort(); + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); } - return port; } - /** - * start:end - * - * @param portRange - * @return - * @throws IOException - */ - public static TServerSocket createTServerSocket(String portRange) - throws IOException { - - TServerSocket tSocket = null; + public static int findAvailablePort(String portRange) throws IOException { // ':' is the default value which means no constraints on the portRange if (StringUtils.isBlank(portRange) || portRange.equals(":")) { - try { - tSocket = new TServerSocket(0); - return tSocket; - } catch (TTransportException e) { - throw new IOException("Fail to create TServerSocket", e); + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new IOException("Failed to allocate a automatic port", e); } } // valid user registered port https://en.wikipedia.org/wiki/Registered_port @@ -84,10 +69,9 @@ public class RemoteInterpreterUtils { end = Integer.parseInt(ports[1]); } for (int i = start; i <= end; ++i) { - try { - tSocket = new TServerSocket(i); - return tSocket; - } catch (Exception e) { + try (ServerSocket socket = new ServerSocket(i)) { + return socket.getLocalPort(); + } catch (IOException e) { // ignore this } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java index 40d3dec..655bb4f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; */ public class YarnUtils { - private static Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class); + private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class); private static AMRMClient<ContainerRequest> amClient = AMRMClient.createAMRMClient(); private static Configuration conf = new YarnConfiguration(); @@ -46,7 +46,7 @@ public class YarnUtils { } public static void register(String host, int port) throws Exception { - LOGGER.info("Registering yarn app at " + host + ":" + port); + LOGGER.info("Registering yarn app at {}:{}", host, port); try { amClient.registerApplicationMaster(host, port, null); } catch (YarnException e) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java index 9f54e2c..abe6d0a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java @@ -96,7 +96,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { LOGGER.info("Process is launched: {}", commandLine); } catch (IOException e) { this.processOutput.stopCatchLaunchOutput(); - LOGGER.error("Fail to launch process: " + commandLine, e); + LOGGER.error("Fail to launch process: {}", commandLine, e); transition(State.TERMINATED); errorMessage = e.getMessage(); } @@ -106,7 +106,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { public void transition(State state) { this.state = state; - LOGGER.info("Process state is transitioned to " + state); + LOGGER.info("Process state is transitioned to {}", state); } public void onTimeout() { @@ -121,7 +121,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { @Override public void onProcessComplete(int exitValue) { - LOGGER.warn("Process is exited with exit value " + exitValue); + LOGGER.warn("Process is exited with exit value {}", exitValue); if (exitValue == 0) { transition(State.COMPLETED); } else { @@ -131,7 +131,8 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { @Override public void onProcessFailed(ExecuteException e) { - LOGGER.warn("Process is failed due to " + e); + LOGGER.warn("Process with cmd {} is failed due to", commandLine, e); + errorMessage = ExceptionUtils.getStackTrace(e); transition(State.TERMINATED); } @@ -187,7 +188,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler { if (s.startsWith("Interpreter launch command")) { LOGGER.info(s); } else { - LOGGER.debug("Process Output: " + s); + LOGGER.debug("Process Output: {}", s); } if (catchLaunchOutput) { launchOutput.append(s + "\n"); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java index 68981da..1d6fb52 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -26,23 +26,18 @@ import static org.junit.Assert.assertTrue; public class RemoteInterpreterUtilsTest { @Test - public void testCreateTServerSocket() throws IOException { - assertTrue(RemoteInterpreterUtils.createTServerSocket(":") - .getServerSocket().getLocalPort() > 0); + public void testfindAvailablePort() throws IOException { + assertTrue(RemoteInterpreterUtils.findAvailablePort(":") > 0); String portRange = ":30000"; - assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange) - .getServerSocket().getLocalPort() <= 30000); + assertTrue(RemoteInterpreterUtils.findAvailablePort(portRange) <= 30000); portRange = "30000:"; - assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange) - .getServerSocket().getLocalPort() >= 30000); + assertTrue(RemoteInterpreterUtils.findAvailablePort(portRange) >= 30000); portRange = "30000:40000"; - int port = RemoteInterpreterUtils.createTServerSocket(portRange) - .getServerSocket().getLocalPort(); + int port = RemoteInterpreterUtils.findAvailablePort(portRange); assertTrue(port >= 30000 && port <= 40000); } - } diff --git a/zeppelin-plugins/notebookrepo/s3/src/test/resources/logback.xml b/zeppelin-plugins/notebookrepo/s3/src/test/resources/logback.xml new file mode 100644 index 0000000..0078a20 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/s3/src/test/resources/logback.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> + </encoder> + </appender> + <root level="INFO"> + <appender-ref ref="STDOUT" /> + </root> +</configuration> diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index aacccc8..b0be31a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -22,6 +22,7 @@ import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransportException; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.helium.ApplicationEventListener; @@ -97,21 +98,19 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi Thread startingThread = new Thread() { @Override public void run() { - TServerSocket tSocket = null; - try { - tSocket = RemoteInterpreterUtils.createTServerSocket(portRange); + try (TServerSocket tSocket = new TServerSocket(RemoteInterpreterUtils.findAvailablePort(portRange))){ port = tSocket.getServerSocket().getLocalPort(); host = RemoteInterpreterUtils.findAvailableHostAddress(); - } catch (IOException e1) { - throw new RuntimeException(e1); + LOGGER.info("InterpreterEventServer is starting at {}:{}", host, port); + RemoteInterpreterEventService.Processor<RemoteInterpreterEventServer> processor = + new RemoteInterpreterEventService.Processor<>(RemoteInterpreterEventServer.this); + thriftServer = new TThreadPoolServer( + new TThreadPoolServer.Args(tSocket).processor(processor)); + thriftServer.serve(); + } catch (IOException | TTransportException e ) { + throw new RuntimeException("Fail to create TServerSocket", e); } - - LOGGER.info("InterpreterEventServer is starting at {}:{}", host, port); - RemoteInterpreterEventService.Processor processor = - new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this); - thriftServer = new TThreadPoolServer( - new TThreadPoolServer.Args(tSocket).processor(processor)); - thriftServer.serve(); + LOGGER.info("ThriftServer-Thread finished"); } }; startingThread.start(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 7de01db..dba9b03 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -144,7 +144,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { && jar.toFile().getName().endsWith(".jar")) .map(jar -> jar.toAbsolutePath().toString()) .collect(Collectors.toList()); - if (interpreterJars.size() == 0) { + if (interpreterJars.isEmpty()) { throw new IOException("zeppelin-interpreter-shaded jar is not found"); } else if (interpreterJars.size() > 1) { throw new IOException("more than 1 zeppelin-interpreter-shaded jars are found: " @@ -171,7 +171,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { sparkConfBuilder.append(" --proxy-user " + context.getUserName()); } - env.put("ZEPPELIN_SPARK_CONF", escapeSpecialCharacter(sparkConfBuilder.toString())); + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); // set these env in the order of // 1. interpreter-setting @@ -215,10 +215,10 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { env.put("ZEPPELIN_INTP_CLASSPATH", driverExtraClassPath); } } else { - LOGGER.warn("spark-defaults.conf doesn't exist: " + sparkDefaultFile.getAbsolutePath()); + LOGGER.warn("spark-defaults.conf doesn't exist: {}", sparkDefaultFile.getAbsolutePath()); } - LOGGER.debug("buildEnvFromProperties: " + env); + LOGGER.debug("buildEnvFromProperties: {}", env); return env; } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index 4a7189b..5195710 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -107,8 +107,8 @@ public class SparkInterpreterLauncherTest { assertTrue(interpreterProcess.getEnv().size() >= 2); assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME")); assertFalse(interpreterProcess.getEnv().containsKey("ENV_1")); - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.files=file_1" + - " --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]"), + assertEquals(" --conf spark.files=file_1" + + " --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -138,9 +138,9 @@ public class SparkInterpreterLauncherTest { String sparkJars = "jar_1"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + - " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client"), + " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -171,10 +171,10 @@ public class SparkInterpreterLauncherTest { String sparkJars = "jar_1"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.submit.deployMode=client" + - " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn"), + " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -207,13 +207,13 @@ public class SparkInterpreterLauncherTest { zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.yarn.maxAppAttempts=1" + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.yarn.isPython=true" + " --conf spark.yarn.submit.waitAppCompletion=false" + " --conf spark.app.name=intpGroupId" + - " --conf spark.master=yarn-cluster"), + " --conf spark.master=yarn-cluster", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); } @@ -253,14 +253,14 @@ public class SparkInterpreterLauncherTest { zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId" + " --conf spark.yarn.maxAppAttempts=1" + " --conf spark.master=yarn" + " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.submit.deployMode=cluster" + " --conf spark.yarn.submit.waitAppCompletion=false" + - " --proxy-user user1"), + " --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); FileUtils.deleteDirectory(localRepoPath.toFile()); @@ -302,7 +302,7 @@ public class SparkInterpreterLauncherTest { String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; // escape special characters String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; - assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip + + assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip + " --conf spark.yarn.isPython=true" + " --conf spark.app.name=intpGroupId" + " --conf spark.yarn.maxAppAttempts=1" + @@ -310,7 +310,7 @@ public class SparkInterpreterLauncherTest { " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars + " --conf spark.submit.deployMode=cluster" + " --conf spark.yarn.submit.waitAppCompletion=false" + - " --proxy-user user1"), + " --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); FileUtils.deleteDirectory(localRepoPath.toFile()); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 20056ec..9554dcf 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -85,6 +85,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo private StatusChangedListener afterStatusChangedListener; private QuartzSchedulerService schedulerService; + @Override @Before public void setUp() throws Exception { System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC.getVarName(), "true"); @@ -105,6 +106,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo schedulerService.waitForFinishInit(); } + @Override @After public void tearDown() throws Exception { super.tearDown();