This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new dfa12d4 [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient dfa12d4 is described below commit dfa12d4471079d242334176b3932805c54273182 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Dec 7 10:28:57 2020 +0800 [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient ### What is this PR for? The connection pool size property is only set in zeppelin-server side, but not in interpreter process side. This PR fix this issue via recreating RemoteInterpreterEventClient after it get the zeppelin site configuration in the init method. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/projects/ZEPPELIN/issues/ZEPPELIN-5151 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3988 from zjffdu/ZEPPELIN-5151 and squashes the following commits: 1b49c6f9c [Jeff Zhang] [ZEPPELIN-5151]. connection pool size is not set in RemoteInterpreterEventClient (cherry picked from commit 04d86db047d46da37a6310969cf008c34ceb0f23) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../remote/RemoteInterpreterEventClient.java | 4 ++-- .../remote/RemoteInterpreterServer.java | 18 ++++++++++++------ .../interpreter/InterpreterSettingManager.java | 22 ++++++++++++---------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 3eed43d..c2f38a3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -64,7 +64,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, private PooledRemoteClient<RemoteInterpreterEventService.Client> remoteClient; private String intpGroupId; - public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort) { + public RemoteInterpreterEventClient(String intpEventHost, int intpEventPort, int connectionPoolSize) { this.remoteClient = new PooledRemoteClient<>(() -> { TSocket transport = new TSocket(intpEventHost, intpEventPort); try { @@ -74,7 +74,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector, } TProtocol protocol = new TBinaryProtocol(transport); return new RemoteInterpreterEventService.Client(protocol); - }); + }, connectionPoolSize); } public <R> R callRemoteFunction(PooledRemoteClient.RemoteFunction<R, RemoteInterpreterEventService.Client> func) { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 453a7e3..6b1e5d4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -166,11 +166,6 @@ public class RemoteInterpreterServer extends Thread this.intpEventServerPort = intpEventServerPort; this.port = RemoteInterpreterUtils.findAvailablePort(portRange); this.host = RemoteInterpreterUtils.findAvailableHostAddress(); - if (!isTest) { - LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port, - intpEventServerHost, intpEventServerPort); - intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort); - } } else { // DevInterpreter this.port = intpEventServerPort; @@ -227,6 +222,15 @@ public class RemoteInterpreterServer extends Thread } catch (Exception e) { throw new TException("Fail to create LifeCycleManager", e); } + + if (!isTest) { + int connectionPoolSize = + this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE); + LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}", + connectionPoolSize); + intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, + connectionPoolSize); + } } @Override @@ -496,7 +500,8 @@ public class RemoteInterpreterServer extends Thread LOGGER.info("Reconnect to this interpreter process from {}:{}", host, port); this.intpEventServerHost = host; this.intpEventServerPort = port; - intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort); + intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, + this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE)); intpEventClient.setIntpGroupId(interpreterGroupId); this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient); @@ -600,6 +605,7 @@ public class RemoteInterpreterServer extends Thread if (!Thread.currentThread().isInterrupted()) { RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId); try { + intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, 10); LOGGER.info("Registering interpreter process"); intpEventClient.registerInterpreterProcess(registerInfo); LOGGER.info("Registered interpreter process"); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index e578dfc..0a5241d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -1107,16 +1107,18 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven @Override public void onNoteRemove(Note note, AuthenticationInfo subject) throws IOException { // stop all associated interpreters - for (Paragraph paragraph : note.getParagraphs()) { - try { - Interpreter interpreter = paragraph.getBindedInterpreter(); - InterpreterSetting interpreterSetting = - ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting(); - restart(interpreterSetting.getId(), subject.getUser(), note.getId()); - } catch (InterpreterNotFoundException e) { - - } catch (InterpreterException e) { - LOGGER.warn("Fail to stop interpreter setting", e); + if (note.getParagraphs() != null) { + for (Paragraph paragraph : note.getParagraphs()) { + try { + Interpreter interpreter = paragraph.getBindedInterpreter(); + InterpreterSetting interpreterSetting = + ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting(); + restart(interpreterSetting.getId(), subject.getUser(), note.getId()); + } catch (InterpreterNotFoundException e) { + + } catch (InterpreterException e) { + LOGGER.warn("Fail to stop interpreter setting", e); + } } }