This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 4e171d5  [ZEPPELIN-4600] Socket connection between zeppelin server and 
interpreter is not closed properly for branch 0.8
4e171d5 is described below

commit 4e171d505033bef9458163a37e4a55dc6512c280
Author: tylerba-f <tyl...@126.com>
AuthorDate: Sat Feb 15 16:16:48 2020 +0800

    [ZEPPELIN-4600] Socket connection between zeppelin server and interpreter 
is not closed properly for branch 0.8
    
    ### What is this PR for?
    Fix the stability problem of zengin for long time (7day) loop working ,
    release the thread pools and socket connection when the interprether is 
closed
    the same bug was found in the version 0.81 and 0.82
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4600
    
    ### How should this be tested?
    * Single tast works 30 hours with 7 second delay, test passed.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?No
    * Is there breaking changes for older versions?No
    * Does this needs documentation?No
    
    Author: tylerba-f <tyl...@126.com>
    
    Closes #3650 from tylerba-f/branch-0.8 and squashes the following commits:
    
    467f3a377 [tylerba-f] [ZEPPELIN-4600] code style adjust
    34add36bf [tylerba-f] [ZEPPELIN-4600]
    306272a9a [tylerba-f] [ZEPPELIN-4600] nullpoint error in the long time 
process use api to run a note
---
 .../apache/zeppelin/interpreter/remote/ClientFactory.java |  7 +++++++
 .../interpreter/remote/RemoteInterpreterEventPoller.java  |  3 +++
 .../remote/RemoteInterpreterManagedProcess.java           |  4 ++++
 .../interpreter/remote/RemoteInterpreterProcess.java      | 15 ++++++++++++---
 4 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
index b2cb78f..9e5582b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
@@ -44,6 +44,13 @@ public class ClientFactory extends 
BasePooledObjectFactory<Client>{
     this.port = port;
   }
 
+  public void close() {
+    //Close transfer
+    for (TSocket eachTransfer: clientSocketMap.values()) {
+      eachTransfer.close();
+    }
+  }
+
   @Override
   public Client create() throws Exception {
     TSocket transport = new TSocket(host, port);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index abda81e..d437079 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -261,6 +261,9 @@ public class RemoteInterpreterEventPoller extends Thread {
     }
     if (appendFuture != null) {
       appendFuture.cancel(true);
+      //Close thread pool
+      appendService.shutdown();
+
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index b186e48..95abd94 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -229,6 +229,10 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
       } catch (Exception e) {
         logger.warn("ignore the exception when shutting down");
       }
+
+      // Shutdown connection
+      shutdown();
+
       watchdog.destroyProcess();
     }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 08653ae..e282cee 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -19,13 +19,12 @@ package org.apache.zeppelin.interpreter.remote;
 import com.google.gson.Gson;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.thrift.TException;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Abstract class for interpreter process
  */
@@ -36,6 +35,7 @@ public abstract class RemoteInterpreterProcess implements 
InterpreterClient {
   private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
   private final InterpreterContextRunnerPool interpreterContextRunnerPool;
   private int connectTimeout;
+  private ClientFactory clientFactory = null;
 
   public RemoteInterpreterProcess(
       int connectTimeout) {
@@ -51,13 +51,22 @@ public abstract class RemoteInterpreterProcess implements 
InterpreterClient {
     this.remoteInterpreterEventPoller = eventPoller;
   }
 
+  public void shutdown() {
+
+    // Close client socket connection
+    if (clientFactory != null) {
+      clientFactory.close();
+    }
+  }
+
   public int getConnectTimeout() {
     return connectTimeout;
   }
 
   public synchronized Client getClient() throws Exception {
     if (clientPool == null || clientPool.isClosed()) {
-      clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), 
getPort()));
+      clientFactory = new ClientFactory(getHost(), getPort());
+      clientPool = new GenericObjectPool<>(clientFactory);
     }
     return clientPool.borrowObject();
   }

Reply via email to