This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.8 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.8 by this push: new 4e171d5 [ZEPPELIN-4600] Socket connection between zeppelin server and interpreter is not closed properly for branch 0.8 4e171d5 is described below commit 4e171d505033bef9458163a37e4a55dc6512c280 Author: tylerba-f <tyl...@126.com> AuthorDate: Sat Feb 15 16:16:48 2020 +0800 [ZEPPELIN-4600] Socket connection between zeppelin server and interpreter is not closed properly for branch 0.8 ### What is this PR for? Fix the stability problem of zengin for long time (7day) loop working , release the thread pools and socket connection when the interprether is closed the same bug was found in the version 0.81 and 0.82 ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4600 ### How should this be tested? * Single tast works 30 hours with 7 second delay, test passed. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update?No * Is there breaking changes for older versions?No * Does this needs documentation?No Author: tylerba-f <tyl...@126.com> Closes #3650 from tylerba-f/branch-0.8 and squashes the following commits: 467f3a377 [tylerba-f] [ZEPPELIN-4600] code style adjust 34add36bf [tylerba-f] [ZEPPELIN-4600] 306272a9a [tylerba-f] [ZEPPELIN-4600] nullpoint error in the long time process use api to run a note --- .../apache/zeppelin/interpreter/remote/ClientFactory.java | 7 +++++++ .../interpreter/remote/RemoteInterpreterEventPoller.java | 3 +++ .../remote/RemoteInterpreterManagedProcess.java | 4 ++++ .../interpreter/remote/RemoteInterpreterProcess.java | 15 ++++++++++++--- 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java index b2cb78f..9e5582b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -44,6 +44,13 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{ this.port = port; } + public void close() { + //Close transfer + for (TSocket eachTransfer: clientSocketMap.values()) { + eachTransfer.close(); + } + } + @Override public Client create() throws Exception { TSocket transport = new TSocket(host, port); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index abda81e..d437079 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -261,6 +261,9 @@ public class RemoteInterpreterEventPoller extends Thread { } if (appendFuture != null) { appendFuture.cancel(true); + //Close thread pool + appendService.shutdown(); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index b186e48..95abd94 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -229,6 +229,10 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } catch (Exception e) { logger.warn("ignore the exception when shutting down"); } + + // Shutdown connection + shutdown(); + watchdog.destroyProcess(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 08653ae..e282cee 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -19,13 +19,12 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Abstract class for interpreter process */ @@ -36,6 +35,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { private RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; private int connectTimeout; + private ClientFactory clientFactory = null; public RemoteInterpreterProcess( int connectTimeout) { @@ -51,13 +51,22 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { this.remoteInterpreterEventPoller = eventPoller; } + public void shutdown() { + + // Close client socket connection + if (clientFactory != null) { + clientFactory.close(); + } + } + public int getConnectTimeout() { return connectTimeout; } public synchronized Client getClient() throws Exception { if (clientPool == null || clientPool.isClosed()) { - clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); + clientFactory = new ClientFactory(getHost(), getPort()); + clientPool = new GenericObjectPool<>(clientFactory); } return clientPool.borrowObject(); }