This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new f7b2cea [ZEPPELIN-4600] Socket connection between zeppelin server and interpreter is not closed properly for master branch f7b2cea is described below commit f7b2ceaecdb8d7b30c1fe3ae20a8d2304f325d2a Author: tylerba-f <tyl...@126.com> AuthorDate: Tue Feb 18 16:44:18 2020 +0800 [ZEPPELIN-4600] Socket connection between zeppelin server and interpreter is not closed properly for master branch ### What is this PR for? Fix the stability problem of zengin for long time (7day) loop working , release the thread pools and socket connection when the interprether is closed ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4600 ### How should this be tested? * Code build passed ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update?No * Is there breaking changes for older versions?No * Does this needs documentation?No Author: tylerba-f <tyl...@126.com> Author: fanlin <60836719+tylerb...@users.noreply.github.com> Closes #3653 from tylerba-f/master and squashes the following commits: 4f2cd0471 [fanlin] Merge branch 'master' into master c0ac49a0b [tylerba-f] [ZEPPELIN-4600] --- .../apache/zeppelin/interpreter/remote/ClientFactory.java | 7 +++++++ .../interpreter/remote/RemoteInterpreterManagedProcess.java | 5 +++++ .../interpreter/remote/RemoteInterpreterProcess.java | 12 +++++++++++- 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java index b2cb78f..9e5582b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -44,6 +44,13 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{ this.port = port; } + public void close() { + //Close transfer + for (TSocket eachTransfer: clientSocketMap.values()) { + eachTransfer.close(); + } + } + @Override public Client create() throws Exception { TSocket transport = new TSocket(host, port); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 46a179f..3c0f1e4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -141,9 +141,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { LOGGER.warn("ignore the exception when shutting down", e); } + // Shutdown connection + shutdown(); + this.interpreterProcessLauncher.stop(); } + + interpreterProcessLauncher = null; LOGGER.info("Remote process terminated"); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index d378da4..441dc76 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -32,6 +32,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { private GenericObjectPool<Client> clientPool; private int connectTimeout; + private ClientFactory clientFactory = null; public RemoteInterpreterProcess( int connectTimeout) { @@ -44,11 +45,20 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { public synchronized Client getClient() throws Exception { if (clientPool == null || clientPool.isClosed()) { - clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); + clientFactory = new ClientFactory(getHost(), getPort()); + clientPool = new GenericObjectPool<>(clientFactory); } return clientPool.borrowObject(); } + public void shutdown() { + + // Close client socket connection + if (clientFactory != null) { + clientFactory.close(); + } + } + private void releaseClient(Client client) { releaseClient(client, false); }