This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ff2baf  [ZEPPELIN-5497] Use AutoCloseable interface to close all IO
8ff2baf is described below

commit 8ff2bafca66f334d6d67057347f85942a83d8b9b
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Wed Mar 3 15:56:08 2021 +0100

    [ZEPPELIN-5497] Use AutoCloseable interface to close all IO
    
    ### What is this PR for?
    
    Should close clientPool explicitly, otherwise it won't be garbage collected.
    Take over of #4208
    
    ### What type of PR is it?
     - Bug Fix
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5497
    
    ### How should this be tested?
    * via CI
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <philipp.dal...@gmail.com>
    
    Closes #4209 from Reamer/closeable_remote_client and squashes the following 
commits:
    
    55b4fd636 [Philipp Dallig] Use AutoCloseable interface to close all IO
---
 .../interpreter/remote/PooledRemoteClient.java     | 13 +++++++----
 .../interpreter/remote/RemoteClientFactory.java    |  4 +++-
 .../remote/RemoteInterpreterEventClient.java       | 27 +++++++++++++---------
 .../launcher/YarnRemoteInterpreterProcess.java     |  2 +-
 .../remote/ExecRemoteInterpreterProcess.java       |  2 +-
 .../remote/RemoteInterpreterManagedProcess.java    |  2 +-
 .../remote/RemoteInterpreterProcess.java           |  7 +++---
 .../remote/RemoteInterpreterRunningProcess.java    |  2 +-
 8 files changed, 36 insertions(+), 23 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
index 0117b95..cb53d36 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
  *
  * @param <T>
  */
-public class PooledRemoteClient<T extends TServiceClient> {
+public class PooledRemoteClient<T extends TServiceClient> implements 
AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PooledRemoteClient.class);
   private static final int RETRY_COUNT = 3;
@@ -52,14 +52,19 @@ public class PooledRemoteClient<T extends TServiceClient> {
   }
 
   public synchronized T getClient() throws Exception {
-    T t = clientPool.borrowObject(5_000);
-    return t;
+    return clientPool.borrowObject(5_000);
   }
 
-  public void shutdown() {
+  @Override
+  public void close() {
     // Close client socket connection
     if (remoteClientFactory != null) {
       remoteClientFactory.close();
+      this.remoteClientFactory = null;
+    }
+    if (this.clientPool != null) {
+      this.clientPool.close();
+      this.clientPool = null;
     }
   }
 
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
index 9c8656f..7f58424 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
@@ -28,7 +28,8 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * Factory class for creating thrift socket client.
  */
-public class RemoteClientFactory<T extends TServiceClient> extends 
BasePooledObjectFactory<T>{
+public class RemoteClientFactory<T extends TServiceClient> extends 
BasePooledObjectFactory<T>
+    implements AutoCloseable {
 
 
   private Set<T> clientSockets = ConcurrentHashMap.newKeySet();
@@ -38,6 +39,7 @@ public class RemoteClientFactory<T extends TServiceClient> 
extends BasePooledObj
     this.supplier = supplier;
   }
 
+  @Override
   public void close() {
     for (T clientSocket: clientSockets) {
       clientSocket.getInputProtocol().getTransport().close();
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 8090123..174de3b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -56,7 +56,7 @@ import java.util.Map;
  * All the methods are synchronized because thrift client is not thread safe.
  */
 public class RemoteInterpreterEventClient implements ResourcePoolConnector,
-    AngularObjectRegistryListener {
+    AngularObjectRegistryListener, AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
   private static final Gson GSON = new Gson();
 
@@ -145,7 +145,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
       ByteBuffer buffer = callRemoteFunction(client -> 
client.getResource(resourceId.toJson()));
       return Resource.deserializeObject(buffer);
     } catch (IOException | ClassNotFoundException e) {
-      LOGGER.warn("Fail to readResource: " + resourceId, e);
+      LOGGER.warn("Fail to readResource: {}", resourceId, e);
       return null;
     }
   }
@@ -287,7 +287,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to runParagraphs: " + event, e);
+      LOGGER.warn("Fail to runParagraphs: {}", event, e);
     }
   }
 
@@ -298,8 +298,8 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to checkpointOutput of paragraph: " +
-              paragraphId + " of note: " + noteId, e);
+      LOGGER.warn("Fail to checkpointOutput of paragraph: {} of note: {}",
+              paragraphId, noteId, e);
     }
   }
 
@@ -313,7 +313,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to appendAppOutput: " + event, e);
+      LOGGER.warn("Fail to appendAppOutput: {}", event, e);
     }
   }
 
@@ -329,7 +329,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to updateAppOutput: " + event, e);
+      LOGGER.warn("Fail to updateAppOutput: {}", event, e);
     }
   }
 
@@ -342,7 +342,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to updateAppStatus: " + event, e);
+      LOGGER.warn("Fail to updateAppStatus: {}", event, e);
     }
   }
 
@@ -353,7 +353,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to onParaInfosReceived: " + infos, e);
+      LOGGER.warn("Fail to onParaInfosReceived: {}", infos, e);
     }
   }
 
@@ -365,7 +365,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to add AngularObject: " + angularObject, e);
+      LOGGER.warn("Fail to add AngularObject: {}", angularObject, e);
     }
   }
 
@@ -377,7 +377,7 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
         return null;
       });
     } catch (Exception e) {
-      LOGGER.warn("Fail to update AngularObject: " + angularObject, e);
+      LOGGER.warn("Fail to update AngularObject: {}", angularObject, e);
     }
   }
 
@@ -406,4 +406,9 @@ public class RemoteInterpreterEventClient implements 
ResourcePoolConnector,
       LOGGER.warn("Fail to updateParagraphConfig", e);
     }
   }
+
+  @Override
+  public void close() {
+    remoteClient.close();
+  }
 }
diff --git 
a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
 
b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
index b7ad5b5..61adfc4 100644
--- 
a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
@@ -610,7 +610,7 @@ public class YarnRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
       }
 
       // Shutdown connection
-      shutdown();
+      super.close();
     }
 
     yarnClient.stop();
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
index 85746c3..f7b85a2 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
@@ -136,7 +136,7 @@ public class ExecRemoteInterpreterProcess extends 
RemoteInterpreterManagedProces
       LOGGER.info("Remote exec process of interpreter group: {} is 
terminated", getInterpreterGroupId());
     } else {
       // Shutdown connection
-      shutdown();
+      super.close();
       LOGGER.warn("Try to stop a not running interpreter process of 
interpreter group: {}", getInterpreterGroupId());
     }
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index c2aca53..02cedb3 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -84,7 +84,7 @@ public abstract class RemoteInterpreterManagedProcess extends 
RemoteInterpreterP
         return null;
       });
       // Shutdown connection
-      shutdown();
+      super.close();
     } catch (Exception e) {
       LOGGER.warn("ignore the exception when shutting down", e);
     }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index df81822..95802a6 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -34,7 +34,7 @@ import java.util.Date;
 /**
  * Abstract class for interpreter process
  */
-public abstract class RemoteInterpreterProcess implements InterpreterClient {
+public abstract class RemoteInterpreterProcess implements InterpreterClient, 
AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteInterpreterProcess.class);
   private static final Gson GSON = new Gson();
 
@@ -72,9 +72,10 @@ public abstract class RemoteInterpreterProcess implements 
InterpreterClient {
     return startTime;
   }
 
-  public void shutdown() {
+  @Override
+  public void close() {
     if (remoteClient != null) {
-      remoteClient.shutdown();
+      remoteClient.close();
       remoteClient = null;
     }
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index 85ed68f..b2e055e 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -91,7 +91,7 @@ public class RemoteInterpreterRunningProcess extends 
RemoteInterpreterProcess {
         }
 
         // Shutdown connection
-        shutdown();
+        super.close();
         LOGGER.info("Remote process of interpreter group: {} is terminated.", 
getInterpreterGroupId());
       }
     }

Reply via email to