This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new f7b2cea  [ZEPPELIN-4600] Socket connection between zeppelin server and 
interpreter is not closed properly for master branch
f7b2cea is described below

commit f7b2ceaecdb8d7b30c1fe3ae20a8d2304f325d2a
Author: tylerba-f <tyl...@126.com>
AuthorDate: Tue Feb 18 16:44:18 2020 +0800

    [ZEPPELIN-4600] Socket connection between zeppelin server and interpreter 
is not closed properly for master branch
    
    ### What is this PR for?
    Fix the stability problem of zengin for long time (7day) loop working ,
    release the thread pools and socket connection when the interprether is 
closed
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4600
    
    ### How should this be tested?
    * Code build passed
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?No
    * Is there breaking changes for older versions?No
    * Does this needs documentation?No
    
    Author: tylerba-f <tyl...@126.com>
    Author: fanlin <60836719+tylerb...@users.noreply.github.com>
    
    Closes #3653 from tylerba-f/master and squashes the following commits:
    
    4f2cd0471 [fanlin] Merge branch 'master' into master
    c0ac49a0b [tylerba-f] [ZEPPELIN-4600]
---
 .../apache/zeppelin/interpreter/remote/ClientFactory.java    |  7 +++++++
 .../interpreter/remote/RemoteInterpreterManagedProcess.java  |  5 +++++
 .../interpreter/remote/RemoteInterpreterProcess.java         | 12 +++++++++++-
 3 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
index b2cb78f..9e5582b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
@@ -44,6 +44,13 @@ public class ClientFactory extends 
BasePooledObjectFactory<Client>{
     this.port = port;
   }
 
+  public void close() {
+    //Close transfer
+    for (TSocket eachTransfer: clientSocketMap.values()) {
+      eachTransfer.close();
+    }
+  }
+
   @Override
   public Client create() throws Exception {
     TSocket transport = new TSocket(host, port);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 46a179f..3c0f1e4 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -141,9 +141,14 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
         LOGGER.warn("ignore the exception when shutting down", e);
       }
 
+      // Shutdown connection
+      shutdown();
+
       this.interpreterProcessLauncher.stop();
     }
 
+
+
     interpreterProcessLauncher = null;
     LOGGER.info("Remote process terminated");
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index d378da4..441dc76 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -32,6 +32,7 @@ public abstract class RemoteInterpreterProcess implements 
InterpreterClient {
 
   private GenericObjectPool<Client> clientPool;
   private int connectTimeout;
+  private ClientFactory clientFactory = null;
 
   public RemoteInterpreterProcess(
       int connectTimeout) {
@@ -44,11 +45,20 @@ public abstract class RemoteInterpreterProcess implements 
InterpreterClient {
 
   public synchronized Client getClient() throws Exception {
     if (clientPool == null || clientPool.isClosed()) {
-      clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), 
getPort()));
+      clientFactory = new ClientFactory(getHost(), getPort());
+      clientPool = new GenericObjectPool<>(clientFactory);
     }
     return clientPool.borrowObject();
   }
 
+  public void shutdown() {
+
+    // Close client socket connection
+    if (clientFactory != null) {
+      clientFactory.close();
+    }
+  }
+
   private void releaseClient(Client client) {
     releaseClient(client, false);
   }

Reply via email to