[MINOR] Move remoteinterpreter into zengine ### What is this PR for? RemoteInterpreter is only used in the server side then zeppelin-interpreter doesn't have to include this class. Moving this class helps to reduce interpreter binary size and change RemoteInterpreter without adding more dependencies if we want
### What type of PR is it? [Refactoring] ### Todos * [x] - Move RemoteInterpreter and related files out of zeppelin-interpreter module ### What is the Jira issue? N/A ### How should this be tested? N/A ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jongyoul Lee <jongy...@gmail.com> Closes #2320 from jongyoul/minor/move-remoteinterpreter-into-zengine and squashes the following commits: 80979913c [Jongyoul Lee] Removed author tag e1425dfa8 [Jongyoul Lee] Adopted DummyInterpreter 99c093229 [Jongyoul Lee] Made DummyInterpreter 5ac8dfbbd [Jongyoul Lee] Moved RemoteInterpreterServer to zeppelin-interpreter 0a881c1b3 [Jongyoul Lee] Removed unused package imported Removed unnecessary classes imported b7e0b9436 [Jongyoul Lee] moved some files related remote interpreter and fix some minor things 7e8721592 [Jongyoul Lee] move some files of remote packages from zeppelin-interpreter to zeppelin-zengine Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9c4a5f0 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9c4a5f0 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9c4a5f0 Branch: refs/heads/master Commit: d9c4a5f0b6c3355753b50f466199cbe551cbd89a Parents: 8e96d8b Author: Jongyoul Lee <jongy...@gmail.com> Authored: Sat May 6 02:28:31 2017 +0900 Committer: Jongyoul Lee <jongy...@apache.org> Committed: Sat May 6 22:46:36 2017 +0900 ---------------------------------------------------------------------- .../remote/RemoteAngularObjectRegistry.java | 133 --- .../interpreter/remote/RemoteInterpreter.java | 585 ------------ .../remote/RemoteInterpreterManagedProcess.java | 247 ----- .../remote/RemoteInterpreterRunningProcess.java | 67 -- .../remote/RemoteInterpreterServer.java | 2 +- .../remote/RemoteInterpreterUtils.java | 9 +- .../zeppelin/resource/ResourcePoolUtils.java | 1 - .../zeppelin/scheduler/RemoteScheduler.java | 1 - .../zeppelin/interpreter/DummyInterpreter.java | 43 + .../zeppelin/interpreter/InterpreterTest.java | 7 +- .../remote/AppendOutputRunnerTest.java | 236 ----- .../remote/RemoteAngularObjectTest.java | 201 ---- .../RemoteInterpreterEventPollerTest.java | 55 -- .../RemoteInterpreterOutputTestStream.java | 191 ---- .../remote/RemoteInterpreterProcessTest.java | 131 --- .../remote/RemoteInterpreterTest.java | 914 ------------------- .../remote/RemoteInterpreterUtilsTest.java | 34 - .../remote/mock/MockInterpreterA.java | 94 -- .../remote/mock/MockInterpreterAngular.java | 113 --- .../remote/mock/MockInterpreterB.java | 126 --- .../remote/mock/MockInterpreterEnv.java | 80 -- .../mock/MockInterpreterOutputStream.java | 90 -- .../mock/MockInterpreterResourcePool.java | 128 --- .../resource/DistributedResourcePoolTest.java | 303 ------ .../zeppelin/scheduler/RemoteSchedulerTest.java | 364 -------- .../remote/RemoteAngularObjectRegistry.java | 133 +++ .../interpreter/remote/RemoteInterpreter.java | 577 ++++++++++++ .../remote/RemoteInterpreterManagedProcess.java | 247 +++++ .../remote/RemoteInterpreterRunningProcess.java | 67 ++ .../remote/AppendOutputRunnerTest.java | 236 +++++ .../remote/RemoteAngularObjectTest.java | 201 ++++ .../RemoteInterpreterEventPollerTest.java | 55 ++ .../RemoteInterpreterOutputTestStream.java | 191 ++++ .../remote/RemoteInterpreterProcessTest.java | 131 +++ .../remote/RemoteInterpreterTest.java | 914 +++++++++++++++++++ .../remote/RemoteInterpreterUtilsTest.java | 34 + .../remote/mock/MockInterpreterA.java | 94 ++ .../remote/mock/MockInterpreterAngular.java | 113 +++ .../remote/mock/MockInterpreterB.java | 126 +++ .../remote/mock/MockInterpreterEnv.java | 80 ++ .../mock/MockInterpreterOutputStream.java | 90 ++ .../mock/MockInterpreterResourcePool.java | 128 +++ .../resource/DistributedResourcePoolTest.java | 303 ++++++ .../zeppelin/scheduler/RemoteSchedulerTest.java | 364 ++++++++ 44 files changed, 4139 insertions(+), 4100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java deleted file mode 100644 index 0ac7116..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import java.util.List; - -import org.apache.thrift.TException; -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -/** - * Proxy for AngularObjectRegistry that exists in remote interpreter process - */ -public class RemoteAngularObjectRegistry extends AngularObjectRegistry { - Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); - private InterpreterGroup interpreterGroup; - - public RemoteAngularObjectRegistry(String interpreterId, - AngularObjectRegistryListener listener, - InterpreterGroup interpreterGroup) { - super(interpreterId, listener); - this.interpreterGroup = interpreterGroup; - } - - private RemoteInterpreterProcess getRemoteInterpreterProcess() { - return interpreterGroup.getRemoteInterpreterProcess(); - } - - /** - * When ZeppelinServer side code want to add angularObject to the registry, - * this method should be used instead of add() - * @param name - * @param o - * @param noteId - * @return - */ - public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String - paragraphId) { - Gson gson = new Gson(); - RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); - if (!remoteInterpreterProcess.isRunning()) { - return super.add(name, o, noteId, paragraphId, true); - } - - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); - return super.add(name, o, noteId, paragraphId, true); - } catch (TException e) { - broken = true; - logger.error("Error", e); - } catch (Exception e) { - logger.error("Error", e); - } finally { - if (client != null) { - remoteInterpreterProcess.releaseClient(client, broken); - } - } - return null; - } - - /** - * When ZeppelinServer side code want to remove angularObject from the registry, - * this method should be used instead of remove() - * @param name - * @param noteId - * @param paragraphId - * @return - */ - public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String - paragraphId) { - RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); - if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { - return super.remove(name, noteId, paragraphId); - } - - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - client.angularObjectRemove(name, noteId, paragraphId); - return super.remove(name, noteId, paragraphId); - } catch (TException e) { - broken = true; - logger.error("Error", e); - } catch (Exception e) { - logger.error("Error", e); - } finally { - if (client != null) { - remoteInterpreterProcess.releaseClient(client, broken); - } - } - return null; - } - - public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { - List<AngularObject> all = getAll(noteId, paragraphId); - for (AngularObject ao : all) { - removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId); - } - } - - @Override - protected AngularObject createNewAngularObject(String name, Object o, String noteId, String - paragraphId) { - return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, - getAngularObjectListener()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java deleted file mode 100644 index 123ad75..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ /dev/null @@ -1,585 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import java.util.*; - -import org.apache.thrift.TException; -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -/** - * Proxy for Interpreter instance that runs on separate process - */ -public class RemoteInterpreter extends Interpreter { - private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); - - private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; - private final ApplicationEventListener applicationEventListener; - private Gson gson = new Gson(); - private String interpreterRunner; - private String interpreterPath; - private String localRepoPath; - private String className; - private String sessionKey; - private FormType formType; - private boolean initialized; - private Map<String, String> env; - private int connectTimeout; - private int maxPoolSize; - private String host; - private int port; - private String userName; - private Boolean isUserImpersonate; - private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; - private String interpreterGroupName; - - /** - * Remote interpreter and manage interpreter process - */ - public RemoteInterpreter(Properties property, String sessionKey, String className, - String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, - int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit, String interpreterGroupName) { - super(property); - this.sessionKey = sessionKey; - this.className = className; - initialized = false; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.localRepoPath = localRepoPath; - env = getEnvFromInterpreterProperty(property); - this.connectTimeout = connectTimeout; - this.maxPoolSize = maxPoolSize; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - this.outputLimit = outputLimit; - this.interpreterGroupName = interpreterGroupName; - } - - - /** - * Connect to existing process - */ - public RemoteInterpreter(Properties property, String sessionKey, String className, String host, - int port, String localRepoPath, int connectTimeout, int maxPoolSize, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit) { - super(property); - this.sessionKey = sessionKey; - this.className = className; - initialized = false; - this.host = host; - this.port = port; - this.localRepoPath = localRepoPath; - this.connectTimeout = connectTimeout; - this.maxPoolSize = maxPoolSize; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - this.outputLimit = outputLimit; - } - - - // VisibleForTesting - public RemoteInterpreter(Properties property, String sessionKey, String className, - String interpreterRunner, String interpreterPath, String localRepoPath, - Map<String, String> env, int connectTimeout, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { - super(property); - this.className = className; - this.sessionKey = sessionKey; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.localRepoPath = localRepoPath; - env.putAll(getEnvFromInterpreterProperty(property)); - this.env = env; - this.connectTimeout = connectTimeout; - this.maxPoolSize = 10; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - } - - private Map<String, String> getEnvFromInterpreterProperty(Properties property) { - Map<String, String> env = new HashMap<>(); - for (Object key : property.keySet()) { - if (isEnvString((String) key)) { - env.put((String) key, property.getProperty((String) key)); - } - } - return env; - } - - static boolean isEnvString(String key) { - if (key == null || key.length() == 0) { - return false; - } - - return key.matches("^[A-Z_0-9]*"); - } - - @Override - public String getClassName() { - return className; - } - - private boolean connectToExistingProcess() { - return host != null && port > 0; - } - - public RemoteInterpreterProcess getInterpreterProcess() { - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup == null) { - return null; - } - - synchronized (intpGroup) { - if (intpGroup.getRemoteInterpreterProcess() == null) { - RemoteInterpreterProcess remoteProcess; - if (connectToExistingProcess()) { - remoteProcess = new RemoteInterpreterRunningProcess( - connectTimeout, - remoteInterpreterProcessListener, - applicationEventListener, - host, - port); - } else { - // create new remote process - remoteProcess = new RemoteInterpreterManagedProcess( - interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, - remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName); - } - - intpGroup.setRemoteInterpreterProcess(remoteProcess); - } - - return intpGroup.getRemoteInterpreterProcess(); - } - } - - public synchronized void init() { - if (initialized == true) { - return; - } - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - - final InterpreterGroup interpreterGroup = getInterpreterGroup(); - - interpreterProcess.setMaxPoolSize( - Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize())); - String groupId = interpreterGroup.getId(); - - synchronized (interpreterProcess) { - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - logger.info("Create remote interpreter {}", getClassName()); - if (localRepoPath != null) { - property.put("zeppelin.interpreter.localRepo", localRepoPath); - } - - property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit)); - client.createInterpreter(groupId, sessionKey, - getClassName(), (Map) property, userName); - // Push angular object loaded from JSON file to remote interpreter - if (!interpreterGroup.isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - interpreterGroup.setAngularRegistryPushed(true); - } - - } catch (TException e) { - logger.error("Failed to create interpreter: {}", getClassName()); - throw new InterpreterException(e); - } finally { - // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken - interpreterProcess.releaseClient(client, broken); - } - } - initialized = true; - } - - - @Override - public void open() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - - synchronized (interpreterGroup) { - // initialize all interpreters in this interpreter group - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, - // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it - // doesn't call open method, it's not open. It causes problem while running intp.close() - // In case of Spark, this method initializes all of interpreters and init() method increases - // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all - // other interpreters doesn't do anything because those LazyInterpreters aren't open. - // But for now, we have to initialise all of interpreters for some reasons. - // See Interpreter.getInterpreterInTheSameSessionByClassName(String) - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (!initialized) { - // reference per session - interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate); - } - for (Interpreter intp : new ArrayList<>(interpreters)) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - try { - ((RemoteInterpreter) p).init(); - } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", - p.getClassName()); - interpreters.remove(p); - } - } - } - } - - @Override - public void close() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - synchronized (interpreterGroup) { - // close all interpreters in this session - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, - // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it - // doesn't call open method, it's not open. It causes problem while running intp.close() - // In case of Spark, this method initializes all of interpreters and init() method increases - // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all - // other interpreters doesn't do anything because those LazyInterpreters aren't open. - // But for now, we have to initialise all of interpreters for some reasons. - // See Interpreter.getInterpreterInTheSameSessionByClassName(String) - if (initialized) { - // dereference per session - getInterpreterProcess().dereference(); - } - for (Interpreter intp : new ArrayList<>(interpreters)) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - try { - ((RemoteInterpreter) p).closeInterpreter(); - } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", - p.getClassName()); - interpreters.remove(p); - } - } - } - } - - public void closeInterpreter() { - if (this.initialized == false) { - return; - } - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - boolean broken = false; - try { - client = interpreterProcess.getClient(); - if (client != null) { - client.close(sessionKey, className); - } - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } catch (Exception e1) { - throw new InterpreterException(e1); - } finally { - if (client != null) { - interpreterProcess.releaseClient(client, broken); - } - this.initialized = false; - } - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - if (logger.isDebugEnabled()) { - logger.debug("st:\n{}", st); - } - - FormType form = getFormType(); - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess - .getInterpreterContextRunnerPool(); - - List<InterpreterContextRunner> runners = context.getRunners(); - if (runners != null && runners.size() != 0) { - // assume all runners in this InterpreterContext have the same note id - String noteId = runners.get(0).getNoteId(); - - interpreterContextRunnerPool.clear(noteId); - interpreterContextRunnerPool.addAll(noteId, runners); - } - - boolean broken = false; - try { - - final GUI currentGUI = context.getGui(); - RemoteInterpreterResult remoteResult = client.interpret( - sessionKey, className, st, convert(context)); - - Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( - remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { - }.getType()); - context.getConfig().clear(); - context.getConfig().putAll(remoteConfig); - - if (form == FormType.NATIVE) { - GUI remoteGui = GUI.fromJson(remoteResult.getGui()); - currentGUI.clear(); - currentGUI.setParams(remoteGui.getParams()); - currentGUI.setForms(remoteGui.getForms()); - } else if (form == FormType.SIMPLE) { - final Map<String, Input> currentForms = currentGUI.getForms(); - final Map<String, Object> currentParams = currentGUI.getParams(); - final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); - final Map<String, Input> remoteForms = remoteGUI.getForms(); - final Map<String, Object> remoteParams = remoteGUI.getParams(); - currentForms.putAll(remoteForms); - currentParams.putAll(remoteParams); - } - - InterpreterResult result = convert(remoteResult); - return result; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public void cancel(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - client.cancel(sessionKey, className, convert(context)); - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public FormType getFormType() { - open(); - - if (formType != null) { - return formType; - } - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - formType = FormType.valueOf(client.getFormType(sessionKey, className)); - return formType; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public int getProgress(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (interpreterProcess == null || !interpreterProcess.isRunning()) { - return 0; - } - - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - return client.getProgress(sessionKey, className, convert(context)); - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - List completion = client.completion(sessionKey, className, buf, cursor, - convert(interpreterContext)); - return completion; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public Scheduler getScheduler() { - int maxConcurrency = maxPoolSize; - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (interpreterProcess == null) { - return null; - } else { - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(), - sessionKey, interpreterProcess, maxConcurrency); - } - } - - private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { - return interpreterGroup.getId(); - } - - private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), - ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()), - gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners())); - } - - private InterpreterResult convert(RemoteInterpreterResult result) { - InterpreterResult r = new InterpreterResult( - InterpreterResult.Code.valueOf(result.getCode())); - - for (RemoteInterpreterResultMessage m : result.getMsg()) { - r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); - } - - return r; - } - - /** - * Push local angular object registry to - * remote interpreter. This method should be - * call ONLY inside the init() method - */ - void pushAngularObjectRegistryToRemote(Client client) throws TException { - final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() - .getAngularObjectRegistry(); - - if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { - final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry - .getRegistry(); - - logger.info("Push local angular object registry from ZeppelinServer to" + - " remote interpreter group {}", this.getInterpreterGroup().getId()); - - final java.lang.reflect.Type registryType = new TypeToken<Map<String, - Map<String, AngularObject>>>() { - }.getType(); - - Gson gson = new Gson(); - client.angularRegistryPush(gson.toJson(registry, registryType)); - } - } - - public Map<String, String> getEnv() { - return env; - } - - public void setEnv(Map<String, String> env) { - this.env = env; - } - - public void addEnv(Map<String, String> env) { - if (this.env == null) { - this.env = new HashMap<>(); - } - this.env.putAll(env); - } - - //Only for test - public String getInterpreterRunner() { - return interpreterRunner; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java deleted file mode 100644 index 1fb9b90..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.commons.exec.*; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; - -/** - * This class manages start / stop of remote interpreter process - */ -public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess - implements ExecuteResultHandler { - private static final Logger logger = LoggerFactory.getLogger( - RemoteInterpreterManagedProcess.class); - private final String interpreterRunner; - - private DefaultExecutor executor; - private ExecuteWatchdog watchdog; - boolean running = false; - private int port = -1; - private final String interpreterDir; - private final String localRepoDir; - private final String interpreterGroupName; - - private Map<String, String> env; - - public RemoteInterpreterManagedProcess( - String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, - String interpreterGroupName) { - super(new RemoteInterpreterEventPoller(listener, appListener), - connectTimeout); - this.interpreterRunner = intpRunner; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - - RemoteInterpreterManagedProcess(String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - RemoteInterpreterEventPoller remoteInterpreterEventPoller, - int connectTimeout, - String interpreterGroupName) { - super(remoteInterpreterEventPoller, - connectTimeout); - this.interpreterRunner = intpRunner; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - - @Override - public String getHost() { - return "localhost"; - } - - @Override - public int getPort() { - return port; - } - - @Override - public void start(String userName, Boolean isUserImpersonate) { - // start server process - try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - - CommandLine cmdLine = CommandLine.parse(interpreterRunner); - cmdLine.addArgument("-d", false); - cmdLine.addArgument(interpreterDir, false); - cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); - if (isUserImpersonate && !userName.equals("anonymous")) { - cmdLine.addArgument("-u", false); - cmdLine.addArgument(userName, false); - } - cmdLine.addArgument("-l", false); - cmdLine.addArgument(localRepoDir, false); - cmdLine.addArgument("-g", false); - cmdLine.addArgument(interpreterGroupName, false); - - executor = new DefaultExecutor(); - - ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); - ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); - processOutput.setOutputStream(cmdOut); - - executor.setStreamHandler(new PumpStreamHandler(processOutput)); - watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); - executor.setWatchdog(watchdog); - - try { - Map procEnv = EnvironmentUtils.getProcEnvironment(); - procEnv.putAll(env); - - logger.info("Run interpreter process {}", cmdLine); - executor.execute(cmdLine, procEnv, this); - running = true; - } catch (IOException e) { - running = false; - throw new InterpreterException(e); - } - - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < getConnectTimeout()) { - if (!running) { - try { - cmdOut.flush(); - } catch (IOException e) { - // nothing to do - } - throw new InterpreterException(new String(cmdOut.toByteArray())); - } - - try { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + - "Thread.sleep", e); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Remote interpreter not yet accessible at localhost:" + port); - } - } - } - processOutput.setOutputStream(null); - } - - public void stop() { - if (isRunning()) { - logger.info("kill interpreter process"); - watchdog.destroyProcess(); - } - - executor = null; - watchdog = null; - running = false; - logger.info("Remote process terminated"); - } - - @Override - public void onProcessComplete(int exitValue) { - logger.info("Interpreter process exited {}", exitValue); - running = false; - - } - - @Override - public void onProcessFailed(ExecuteException e) { - logger.info("Interpreter process failed {}", e); - running = false; - } - - public boolean isRunning() { - return running; - } - - private static class ProcessLogOutputStream extends LogOutputStream { - - private Logger logger; - OutputStream out; - - public ProcessLogOutputStream(Logger logger) { - this.logger = logger; - } - - @Override - protected void processLine(String s, int i) { - this.logger.debug(s); - } - - @Override - public void write(byte [] b) throws IOException { - super.write(b); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b); - } - } - } - } - - @Override - public void write(byte [] b, int offset, int len) throws IOException { - super.write(b, offset, len); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b, offset, len); - } - } - } - } - - public void setOutputStream(OutputStream out) { - synchronized (this) { - this.out = out; - } - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java deleted file mode 100644 index bb176be..0000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class connects to existing process - */ -public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); - private final String host; - private final int port; - - public RemoteInterpreterRunningProcess( - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, - String host, - int port - ) { - super(connectTimeout, listener, appListener); - this.host = host; - this.port = port; - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getPort() { - return port; - } - - @Override - public void start(String userName, Boolean isUserImpersonate) { - // assume process is externally managed. nothing to do - } - - @Override - public void stop() { - // assume process is externally managed. nothing to do - } - - @Override - public boolean isRunning() { - return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 8f40ec4..50881ca 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -218,7 +218,7 @@ public class RemoteInterpreterServer private void setSystemProperty(Properties properties) { for (Object key : properties.keySet()) { - if (!RemoteInterpreter.isEnvString((String) key)) { + if (!RemoteInterpreterUtils.isEnvString((String) key)) { String value = properties.getProperty((String) key); if (value == null || value.isEmpty()) { System.clearProperty((String) key); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 8308222..4ee6690 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.interpreter.remote; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,4 +72,12 @@ public class RemoteInterpreterUtils { } return settingId; } + + public static boolean isEnvString(String key) { + if (key == null || key.length() == 0) { + return false; + } + + return key.matches("^[A-Z_0-9]*"); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 1a7f606..a55cdf9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -19,7 +19,6 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index a4ab00e..f9ddc4e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -20,7 +20,6 @@ package org.apache.zeppelin.scheduler; import org.apache.thrift.TException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.apache.zeppelin.scheduler.Job.Status; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java new file mode 100644 index 0000000..a7a6eb9 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java @@ -0,0 +1,43 @@ +package org.apache.zeppelin.interpreter; + +import java.util.Properties; + +/** + * + */ +public class DummyInterpreter extends Interpreter { + + public DummyInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return null; + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return null; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java index a9ac1fc..4141e95 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java @@ -19,7 +19,6 @@ package org.apache.zeppelin.interpreter; import java.util.Properties; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.Test; @@ -31,7 +30,7 @@ public class InterpreterTest { public void testDefaultProperty() { Properties p = new Properties(); p.put("p1", "v1"); - MockInterpreterA intp = new MockInterpreterA(p); + Interpreter intp = new DummyInterpreter(p); assertEquals(1, intp.getProperty().size()); assertEquals("v1", intp.getProperty().get("p1")); @@ -42,7 +41,7 @@ public class InterpreterTest { public void testOverriddenProperty() { Properties p = new Properties(); p.put("p1", "v1"); - MockInterpreterA intp = new MockInterpreterA(p); + Interpreter intp = new DummyInterpreter(p); Properties overriddenProperty = new Properties(); overriddenProperty.put("p1", "v2"); intp.setProperty(overriddenProperty); @@ -74,7 +73,7 @@ public class InterpreterTest { Properties p = new Properties(); p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, #{replName}, #{noteId}, #{user}," + " #{authenticationInfo}"); - MockInterpreterA intp = new MockInterpreterA(p); + Interpreter intp = new DummyInterpreter(p); intp.setUserName(user); String actual = intp.getProperty("p1"); InterpreterContext.remove(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java deleted file mode 100644 index c8c64ea..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.atMost; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.LoggingEvent; -import org.junit.After; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class AppendOutputRunnerTest { - - private static final int NUM_EVENTS = 10000; - private static final int NUM_CLUBBED_EVENTS = 100; - private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - private static ScheduledFuture<?> future = null; - /* It is being accessed by multiple threads. - * While loop for 'loopForBufferCompletion' could - * run for-ever. - */ - private volatile static int numInvocations = 0; - - @After - public void afterEach() { - if (future != null) { - future.cancel(true); - } - } - - @Test - public void testSingleEvent() throws InterruptedException { - RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - String[][] buffer = {{"note", "para", "data\n"}}; - - loopForCompletingEvents(listener, 1, buffer); - verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); - verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n"); - } - - @Test - public void testMultipleEventsOfSameParagraph() throws InterruptedException { - RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - String note1 = "note1"; - String para1 = "para1"; - String[][] buffer = { - {note1, para1, "data1\n"}, - {note1, para1, "data2\n"}, - {note1, para1, "data3\n"} - }; - - loopForCompletingEvents(listener, 1, buffer); - verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); - verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n"); - } - - @Test - public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException { - RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - String note1 = "note1"; - String note2 = "note2"; - String para1 = "para1"; - String para2 = "para2"; - String[][] buffer = { - {note1, para1, "data1\n"}, - {note1, para2, "data2\n"}, - {note2, para1, "data3\n"}, - {note2, para2, "data4\n"} - }; - loopForCompletingEvents(listener, 4, buffer); - - verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); - verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n"); - verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n"); - verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n"); - verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n"); - } - - @Test - public void testClubbedData() throws InterruptedException { - RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - AppendOutputRunner runner = new AppendOutputRunner(listener); - future = service.scheduleWithFixedDelay(runner, 0, - AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); - Thread thread = new Thread(new BombardEvents(runner)); - thread.start(); - thread.join(); - Thread.sleep(1000); - - /* NUM_CLUBBED_EVENTS is a heuristic number. - * It has been observed that for 10,000 continuos event - * calls, 30-40 Web-socket calls are made. Keeping - * the unit-test to a pessimistic 100 web-socket calls. - */ - verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); - } - - @Test - public void testWarnLoggerForLargeData() throws InterruptedException { - RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); - AppendOutputRunner runner = new AppendOutputRunner(listener); - String data = "data\n"; - int numEvents = 100000; - - for (int i=0; i<numEvents; i++) { - runner.appendBuffer("noteId", "paraId", 0, data); - } - - TestAppender appender = new TestAppender(); - Logger logger = Logger.getRootLogger(); - logger.addAppender(appender); - Logger.getLogger(RemoteInterpreterEventPoller.class); - - runner.run(); - List<LoggingEvent> log; - - int warnLogCounter; - LoggingEvent sizeWarnLogEntry = null; - do { - warnLogCounter = 0; - log = appender.getLog(); - for (LoggingEvent logEntry: log) { - if (Level.WARN.equals(logEntry.getLevel())) { - sizeWarnLogEntry = logEntry; - warnLogCounter += 1; - } - } - } while(warnLogCounter != 2); - - String loggerString = "Processing size for buffered append-output is high: " + - (data.length() * numEvents) + " characters."; - assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); - } - - private class BombardEvents implements Runnable { - - private final AppendOutputRunner runner; - - private BombardEvents(AppendOutputRunner runner) { - this.runner = runner; - } - - @Override - public void run() { - String noteId = "noteId"; - String paraId = "paraId"; - for (int i=0; i<NUM_EVENTS; i++) { - runner.appendBuffer(noteId, paraId, 0, "data\n"); - } - } - } - - private class TestAppender extends AppenderSkeleton { - private final List<LoggingEvent> log = new ArrayList<>(); - - @Override - public boolean requiresLayout() { - return false; - } - - @Override - protected void append(final LoggingEvent loggingEvent) { - log.add(loggingEvent); - } - - @Override - public void close() { - } - - public List<LoggingEvent> getLog() { - return new ArrayList<>(log); - } - } - - private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) { - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - numInvocations += 1; - return null; - } - }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); - } - - private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, - int numTimes, String[][] buffer) { - numInvocations = 0; - prepareInvocationCounts(listener); - AppendOutputRunner runner = new AppendOutputRunner(listener); - for (String[] bufferElement: buffer) { - runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]); - } - future = service.scheduleWithFixedDelay(runner, 0, - AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); - long startTimeMs = System.currentTimeMillis(); - while(numInvocations != numTimes) { - if (System.currentTimeMillis() - startTimeMs > 2000) { - fail("Buffered events were not sent for 2 seconds"); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java deleted file mode 100644 index f7404e3..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.zeppelin.display.*; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class RemoteAngularObjectTest implements AngularObjectRegistryListener { - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - - private InterpreterGroup intpGroup; - private HashMap<String, String> env; - private RemoteInterpreter intp; - private InterpreterContext context; - private RemoteAngularObjectRegistry localRegistry; - - private AtomicInteger onAdd; - private AtomicInteger onUpdate; - private AtomicInteger onRemove; - - @Before - public void setUp() throws Exception { - onAdd = new AtomicInteger(0); - onUpdate = new AtomicInteger(0); - onRemove = new AtomicInteger(0); - - intpGroup = new InterpreterGroup("intpId"); - localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup); - intpGroup.setAngularObjectRegistry(localRegistry); - env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - - Properties p = new Properties(); - - intp = new RemoteInterpreter( - p, - "note", - MockInterpreterAngular.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false - ); - - intpGroup.put("note", new LinkedList<Interpreter>()); - intpGroup.get("note").add(intp); - intp.setInterpreterGroup(intpGroup); - - context = new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - intp.open(); - } - - @After - public void tearDown() throws Exception { - intp.close(); - intpGroup.close(); - } - - @Test - public void testAngularObjectInterpreterSideCRUD() throws InterruptedException { - InterpreterResult ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - String[] result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - assertEquals("0", result[1]); // num watcher called - - // create object - ret = intp.interpret("add n1 v1", context); - Thread.sleep(500); - result = ret.message().get(0).getData().split(" "); - assertEquals("1", result[0]); // size of registry - assertEquals("0", result[1]); // num watcher called - assertEquals("v1", localRegistry.get("n1", "note", null).get()); - - // update object - ret = intp.interpret("update n1 v11", context); - result = ret.message().get(0).getData().split(" "); - Thread.sleep(500); - assertEquals("1", result[0]); // size of registry - assertEquals("1", result[1]); // num watcher called - assertEquals("v11", localRegistry.get("n1", "note", null).get()); - - // remove object - ret = intp.interpret("remove n1", context); - result = ret.message().get(0).getData().split(" "); - Thread.sleep(500); - assertEquals("0", result[0]); // size of registry - assertEquals("1", result[1]); // num watcher called - assertEquals(null, localRegistry.get("n1", "note", null)); - } - - @Test - public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException { - // test if angularobject removal from server side propagate to interpreter process's registry. - // will happen when notebook is removed. - - InterpreterResult ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - String[] result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - - // create object - ret = intp.interpret("add n1 v1", context); - Thread.sleep(500); - result = ret.message().get(0).getData().split(" "); - assertEquals("1", result[0]); // size of registry - assertEquals("v1", localRegistry.get("n1", "note", null).get()); - - // remove object in local registry. - localRegistry.removeAndNotifyRemoteProcess("n1", "note", null); - ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - } - - @Test - public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException { - // test if angularobject add from server side propagate to interpreter process's registry. - // will happen when zeppelin server loads notebook and restore the object into registry - - InterpreterResult ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - String[] result = ret.message().get(0).getData().split(" "); - assertEquals("0", result[0]); // size of registry - - // create object - localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); - - // get from remote registry - ret = intp.interpret("get", context); - Thread.sleep(500); // waitFor eventpoller pool event - result = ret.message().get(0).getData().split(" "); - assertEquals("1", result[0]); // size of registry - } - - @Override - public void onAdd(String interpreterGroupId, AngularObject object) { - onAdd.incrementAndGet(); - } - - @Override - public void onUpdate(String interpreterGroupId, AngularObject object) { - onUpdate.incrementAndGet(); - } - - @Override - public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { - onRemove.incrementAndGet(); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java deleted file mode 100644 index 49aa7aa..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; -import org.junit.Test; - -import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class RemoteInterpreterEventPollerTest { - - @Test - public void shouldClearUnreadEventsOnShutdown() throws Exception { - RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess(); - RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null); - - eventPoller.setInterpreterProcess(interpreterProc); - eventPoller.shutdown(); - eventPoller.start(); - eventPoller.join(); - - assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType()); - } - - private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception { - RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent(); - RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, ""); - RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class); - RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class); - - when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents); - when(intProc.getClient()).thenReturn(client); - - return intProc; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java deleted file mode 100644 index 3f865cb..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; - - -/** - * Test for remote interpreter output stream - */ -public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener { - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - private InterpreterGroup intpGroup; - private HashMap<String, String> env; - - @Before - public void setUp() throws Exception { - intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - } - - @After - public void tearDown() throws Exception { - intpGroup.close(); - } - - private RemoteInterpreter createMockInterpreter() { - RemoteInterpreter intp = new RemoteInterpreter( - new Properties(), - "note", - MockInterpreterOutputStream.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - this, - null, - "anonymous", - false); - - intpGroup.get("note").add(intp); - intp.setInterpreterGroup(intpGroup); - return intp; - } - - private InterpreterContext createInterpreterContext() { - return new InterpreterContext( - "noteId", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - null, - new LinkedList<InterpreterContextRunner>(), null); - } - - @Test - public void testInterpreterResultOnly() { - RemoteInterpreter intp = createMockInterpreter(); - InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("staticresult", ret.message().get(0).getData()); - - ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("staticresult2", ret.message().get(0).getData()); - - ret = intp.interpret("ERROR::staticresult3", createInterpreterContext()); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertEquals("staticresult3", ret.message().get(0).getData()); - } - - @Test - public void testInterpreterOutputStreamOnly() { - RemoteInterpreter intp = createMockInterpreter(); - InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("streamresult", ret.message().get(0).getData()); - - ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext()); - assertEquals(InterpreterResult.Code.ERROR, ret.code()); - assertEquals("streamresult2", ret.message().get(0).getData()); - } - - @Test - public void testInterpreterResultOutputStreamMixed() { - RemoteInterpreter intp = createMockInterpreter(); - InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals("stream", ret.message().get(0).getData()); - assertEquals("static", ret.message().get(1).getData()); - } - - @Test - public void testOutputType() { - RemoteInterpreter intp = createMockInterpreter(); - - InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext()); - assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); - assertEquals("hello", ret.message().get(0).getData()); - - ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext()); - assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); - assertEquals("hello", ret.message().get(0).getData()); - - ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext()); - assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); - assertEquals("hello", ret.message().get(0).getData()); - assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType()); - assertEquals("world", ret.message().get(1).getData()); - } - - @Override - public void onOutputAppend(String noteId, String paragraphId, int index, String output) { - - } - - @Override - public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { - - } - - @Override - public void onOutputClear(String noteId, String paragraphId) { - - } - - @Override - public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { - - } - - @Override - public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { - if (callback != null) { - callback.onFinished(new LinkedList<>()); - } - } - - @Override - public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { - - } - - @Override - public void onParaInfosReceived(String noteId, String paragraphId, - String interpreterSettingId, Map<String, String> metaInfos) { - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java deleted file mode 100644 index b85d7ef..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.*; - -import java.util.HashMap; -import java.util.Properties; - -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.interpreter.Constants; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.junit.Test; - -public class RemoteInterpreterProcessTest { - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - private static final int DUMMY_PORT=3678; - - @Test - public void testStartStop() { - InterpreterGroup intpGroup = new InterpreterGroup(); - RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( - INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(), - 10 * 1000, null, null,"fakeName"); - assertFalse(rip.isRunning()); - assertEquals(0, rip.referenceCount()); - assertEquals(1, rip.reference(intpGroup, "anonymous", false)); - assertEquals(2, rip.reference(intpGroup, "anonymous", false)); - assertEquals(true, rip.isRunning()); - assertEquals(1, rip.dereference()); - assertEquals(true, rip.isRunning()); - assertEquals(0, rip.dereference()); - assertEquals(false, rip.isRunning()); - } - - @Test - public void testClientFactory() throws Exception { - InterpreterGroup intpGroup = new InterpreterGroup(); - RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( - INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(), - mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName"); - rip.reference(intpGroup, "anonymous", false); - assertEquals(0, rip.getNumActiveClient()); - assertEquals(0, rip.getNumIdleClient()); - - Client client = rip.getClient(); - assertEquals(1, rip.getNumActiveClient()); - assertEquals(0, rip.getNumIdleClient()); - - rip.releaseClient(client); - assertEquals(0, rip.getNumActiveClient()); - assertEquals(1, rip.getNumIdleClient()); - - rip.dereference(); - } - - @Test - public void testStartStopRemoteInterpreter() throws TException, InterruptedException { - RemoteInterpreterServer server = new RemoteInterpreterServer(3678); - server.start(); - boolean running = false; - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < 10 * 1000) { - if (server.isRunning()) { - running = true; - break; - } else { - Thread.sleep(200); - } - } - Properties properties = new Properties(); - properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678"); - properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost"); - InterpreterGroup intpGroup = mock(InterpreterGroup.class); - when(intpGroup.getProperty()).thenReturn(properties); - when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true); - - RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess( - INTERPRETER_SCRIPT, - "nonexists", - "fakeRepo", - new HashMap<String, String>(), - mock(RemoteInterpreterEventPoller.class) - , 10 * 1000, - "fakeName"); - assertFalse(rip.isRunning()); - assertEquals(0, rip.referenceCount()); - assertEquals(1, rip.reference(intpGroup, "anonymous", false)); - assertEquals(true, rip.isRunning()); - } - - - @Test - public void testPropagateError() throws TException, InterruptedException { - InterpreterGroup intpGroup = new InterpreterGroup(); - RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( - "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(), - 10 * 1000, null, null, "fakeName"); - assertFalse(rip.isRunning()); - assertEquals(0, rip.referenceCount()); - try { - assertEquals(1, rip.reference(intpGroup, "anonymous", false)); - } catch (InterpreterException e) { - e.getMessage().contains("hello_world"); - } - assertEquals(0, rip.referenceCount()); - } -}