This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 8ff2baf [ZEPPELIN-5497] Use AutoCloseable interface to close all IO 8ff2baf is described below commit 8ff2bafca66f334d6d67057347f85942a83d8b9b Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Mar 3 15:56:08 2021 +0100 [ZEPPELIN-5497] Use AutoCloseable interface to close all IO ### What is this PR for? Should close clientPool explicitly, otherwise it won't be garbage collected. Take over of #4208 ### What type of PR is it? - Bug Fix ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5497 ### How should this be tested? * via CI ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #4209 from Reamer/closeable_remote_client and squashes the following commits: 55b4fd636 [Philipp Dallig] Use AutoCloseable interface to close all IO --- .../interpreter/remote/PooledRemoteClient.java | 13 +++++++---- .../interpreter/remote/RemoteClientFactory.java | 4 +++- .../remote/RemoteInterpreterEventClient.java | 27 +++++++++++++--------- .../launcher/YarnRemoteInterpreterProcess.java | 2 +- .../remote/ExecRemoteInterpreterProcess.java | 2 +- .../remote/RemoteInterpreterManagedProcess.java | 2 +- .../remote/RemoteInterpreterProcess.java | 7 +++--- .../remote/RemoteInterpreterRunningProcess.java | 2 +- 8 files changed, 36 insertions(+), 23 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java index 0117b95..cb53d36 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; * * @param <T> */ -public class PooledRemoteClient<T extends TServiceClient> { +public class PooledRemoteClient<T extends TServiceClient> implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(PooledRemoteClient.class); private static final int RETRY_COUNT = 3; @@ -52,14 +52,19 @@ public class PooledRemoteClient<T extends TServiceClient> { } public synchronized T getClient() throws Exception { - T t = clientPool.borrowObject(5_000); - return t; + return clientPool.borrowObject(5_000); } - public void shutdown() { + @Override + public void close() { // Close client socket connection if (remoteClientFactory != null) { remoteClientFactory.close(); + this.remoteClientFactory = null; + } + if (this.clientPool != null) { + this.clientPool.close(); + this.clientPool = null; } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java index 9c8656f..7f58424 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java @@ -28,7 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; /** * Factory class for creating thrift socket client. */ -public class RemoteClientFactory<T extends TServiceClient> extends BasePooledObjectFactory<T>{ +public class RemoteClientFactory<T extends TServiceClient> extends BasePooledObjectFactory<T> + implements AutoCloseable { private Set<T> clientSockets = ConcurrentHashMap.newKeySet(); @@ -38,6 +39,7 @@ public class RemoteClientFactory<T extends TServiceClient> extends BasePooledObj this.supplier = supplier; } + @Override public void close() { for (T clientSocket: clientSockets) { clientSocket.getInputProtocol().getTransport().close(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 8090123..174de3b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -56,7 +56,7 @@ import java.util.Map; * All the methods are synchronized because thrift client is not thread safe. */ public class RemoteInterpreterEventClient implements ResourcePoolConnector, - AngularObjectRegistryListener { + AngularObjectRegistryListener, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventClient.class); private static final Gson GSON = new Gson(); @@ -145,7 +145,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, ByteBuffer buffer = callRemoteFunction(client -> client.getResource(resourceId.toJson())); return Resource.deserializeObject(buffer); } catch (IOException | ClassNotFoundException e) { - LOGGER.warn("Fail to readResource: " + resourceId, e); + LOGGER.warn("Fail to readResource: {}", resourceId, e); return null; } } @@ -287,7 +287,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to runParagraphs: " + event, e); + LOGGER.warn("Fail to runParagraphs: {}", event, e); } } @@ -298,8 +298,8 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to checkpointOutput of paragraph: " + - paragraphId + " of note: " + noteId, e); + LOGGER.warn("Fail to checkpointOutput of paragraph: {} of note: {}", + paragraphId, noteId, e); } } @@ -313,7 +313,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to appendAppOutput: " + event, e); + LOGGER.warn("Fail to appendAppOutput: {}", event, e); } } @@ -329,7 +329,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to updateAppOutput: " + event, e); + LOGGER.warn("Fail to updateAppOutput: {}", event, e); } } @@ -342,7 +342,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to updateAppStatus: " + event, e); + LOGGER.warn("Fail to updateAppStatus: {}", event, e); } } @@ -353,7 +353,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to onParaInfosReceived: " + infos, e); + LOGGER.warn("Fail to onParaInfosReceived: {}", infos, e); } } @@ -365,7 +365,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to add AngularObject: " + angularObject, e); + LOGGER.warn("Fail to add AngularObject: {}", angularObject, e); } } @@ -377,7 +377,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, return null; }); } catch (Exception e) { - LOGGER.warn("Fail to update AngularObject: " + angularObject, e); + LOGGER.warn("Fail to update AngularObject: {}", angularObject, e); } } @@ -406,4 +406,9 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, LOGGER.warn("Fail to updateParagraphConfig", e); } } + + @Override + public void close() { + remoteClient.close(); + } } diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java index b7ad5b5..61adfc4 100644 --- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java @@ -610,7 +610,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess { } // Shutdown connection - shutdown(); + super.close(); } yarnClient.stop(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java index 85746c3..f7b85a2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java @@ -136,7 +136,7 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces LOGGER.info("Remote exec process of interpreter group: {} is terminated", getInterpreterGroupId()); } else { // Shutdown connection - shutdown(); + super.close(); LOGGER.warn("Try to stop a not running interpreter process of interpreter group: {}", getInterpreterGroupId()); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index c2aca53..02cedb3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -84,7 +84,7 @@ public abstract class RemoteInterpreterManagedProcess extends RemoteInterpreterP return null; }); // Shutdown connection - shutdown(); + super.close(); } catch (Exception e) { LOGGER.warn("ignore the exception when shutting down", e); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index df81822..95802a6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -34,7 +34,7 @@ import java.util.Date; /** * Abstract class for interpreter process */ -public abstract class RemoteInterpreterProcess implements InterpreterClient { +public abstract class RemoteInterpreterProcess implements InterpreterClient, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private static final Gson GSON = new Gson(); @@ -72,9 +72,10 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { return startTime; } - public void shutdown() { + @Override + public void close() { if (remoteClient != null) { - remoteClient.shutdown(); + remoteClient.close(); remoteClient = null; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index 85ed68f..b2e055e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -91,7 +91,7 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { } // Shutdown connection - shutdown(); + super.close(); LOGGER.info("Remote process of interpreter group: {} is terminated.", getInterpreterGroupId()); } }