http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java new file mode 100644 index 0000000..ed8982b --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.*; + +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * Proxy for Interpreter instance that runs on separate process + */ +public class RemoteInterpreter extends Interpreter { + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); + + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private final ApplicationEventListener applicationEventListener; + private Gson gson = new Gson(); + private String interpreterRunner; + private String interpreterPath; + private String localRepoPath; + private String className; + private String sessionKey; + private FormType formType; + private boolean initialized; + private Map<String, String> env; + private int connectTimeout; + private int maxPoolSize; + private String host; + private int port; + private String userName; + private Boolean isUserImpersonate; + private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; + private String interpreterGroupName; + + /** + * Remote interpreter and manage interpreter process + */ + public RemoteInterpreter(Properties property, String sessionKey, String className, + String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, + int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, + int outputLimit, String interpreterGroupName) { + super(property); + this.sessionKey = sessionKey; + this.className = className; + initialized = false; + this.interpreterRunner = interpreterRunner; + this.interpreterPath = interpreterPath; + this.localRepoPath = localRepoPath; + env = getEnvFromInterpreterProperty(property); + this.connectTimeout = connectTimeout; + this.maxPoolSize = maxPoolSize; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + this.userName = userName; + this.isUserImpersonate = isUserImpersonate; + this.outputLimit = outputLimit; + this.interpreterGroupName = interpreterGroupName; + } + + + /** + * Connect to existing process + */ + public RemoteInterpreter(Properties property, String sessionKey, String className, String host, + int port, String localRepoPath, int connectTimeout, int maxPoolSize, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, + int outputLimit) { + super(property); + this.sessionKey = sessionKey; + this.className = className; + initialized = false; + this.host = host; + this.port = port; + this.localRepoPath = localRepoPath; + this.connectTimeout = connectTimeout; + this.maxPoolSize = maxPoolSize; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + this.userName = userName; + this.isUserImpersonate = isUserImpersonate; + this.outputLimit = outputLimit; + } + + + // VisibleForTesting + public RemoteInterpreter(Properties property, String sessionKey, String className, + String interpreterRunner, String interpreterPath, String localRepoPath, + Map<String, String> env, int connectTimeout, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { + super(property); + this.className = className; + this.sessionKey = sessionKey; + this.interpreterRunner = interpreterRunner; + this.interpreterPath = interpreterPath; + this.localRepoPath = localRepoPath; + env.putAll(getEnvFromInterpreterProperty(property)); + this.env = env; + this.connectTimeout = connectTimeout; + this.maxPoolSize = 10; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + this.userName = userName; + this.isUserImpersonate = isUserImpersonate; + } + + private Map<String, String> getEnvFromInterpreterProperty(Properties property) { + Map<String, String> env = new HashMap<>(); + for (Object key : property.keySet()) { + if (RemoteInterpreterUtils.isEnvString((String) key)) { + env.put((String) key, property.getProperty((String) key)); + } + } + return env; + } + + @Override + public String getClassName() { + return className; + } + + private boolean connectToExistingProcess() { + return host != null && port > 0; + } + + public RemoteInterpreterProcess getInterpreterProcess() { + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup == null) { + return null; + } + + synchronized (intpGroup) { + if (intpGroup.getRemoteInterpreterProcess() == null) { + RemoteInterpreterProcess remoteProcess; + if (connectToExistingProcess()) { + remoteProcess = new RemoteInterpreterRunningProcess( + connectTimeout, + remoteInterpreterProcessListener, + applicationEventListener, + host, + port); + } else { + // create new remote process + remoteProcess = new RemoteInterpreterManagedProcess( + interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, + remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName); + } + + intpGroup.setRemoteInterpreterProcess(remoteProcess); + } + + return intpGroup.getRemoteInterpreterProcess(); + } + } + + public synchronized void init() { + if (initialized == true) { + return; + } + + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + + final InterpreterGroup interpreterGroup = getInterpreterGroup(); + + interpreterProcess.setMaxPoolSize( + Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize())); + String groupId = interpreterGroup.getId(); + + synchronized (interpreterProcess) { + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + logger.info("Create remote interpreter {}", getClassName()); + if (localRepoPath != null) { + property.put("zeppelin.interpreter.localRepo", localRepoPath); + } + + property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit)); + client.createInterpreter(groupId, sessionKey, + getClassName(), (Map) property, userName); + // Push angular object loaded from JSON file to remote interpreter + if (!interpreterGroup.isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + interpreterGroup.setAngularRegistryPushed(true); + } + + } catch (TException e) { + logger.error("Failed to create interpreter: {}", getClassName()); + throw new InterpreterException(e); + } finally { + // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken + interpreterProcess.releaseClient(client, broken); + } + } + initialized = true; + } + + + @Override + public void open() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + + synchronized (interpreterGroup) { + // initialize all interpreters in this interpreter group + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, + // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it + // doesn't call open method, it's not open. It causes problem while running intp.close() + // In case of Spark, this method initializes all of interpreters and init() method increases + // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all + // other interpreters doesn't do anything because those LazyInterpreters aren't open. + // But for now, we have to initialise all of interpreters for some reasons. + // See Interpreter.getInterpreterInTheSameSessionByClassName(String) + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + if (!initialized) { + // reference per session + interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate); + } + for (Interpreter intp : new ArrayList<>(interpreters)) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + try { + ((RemoteInterpreter) p).init(); + } catch (InterpreterException e) { + logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", + p.getClassName()); + interpreters.remove(p); + } + } + } + } + + @Override + public void close() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + synchronized (interpreterGroup) { + // close all interpreters in this session + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, + // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it + // doesn't call open method, it's not open. It causes problem while running intp.close() + // In case of Spark, this method initializes all of interpreters and init() method increases + // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all + // other interpreters doesn't do anything because those LazyInterpreters aren't open. + // But for now, we have to initialise all of interpreters for some reasons. + // See Interpreter.getInterpreterInTheSameSessionByClassName(String) + if (initialized) { + // dereference per session + getInterpreterProcess().dereference(); + } + for (Interpreter intp : new ArrayList<>(interpreters)) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + try { + ((RemoteInterpreter) p).closeInterpreter(); + } catch (InterpreterException e) { + logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", + p.getClassName()); + interpreters.remove(p); + } + } + } + } + + public void closeInterpreter() { + if (this.initialized == false) { + return; + } + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + boolean broken = false; + try { + client = interpreterProcess.getClient(); + if (client != null) { + client.close(sessionKey, className); + } + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } catch (Exception e1) { + throw new InterpreterException(e1); + } finally { + if (client != null) { + interpreterProcess.releaseClient(client, broken); + } + this.initialized = false; + } + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (logger.isDebugEnabled()) { + logger.debug("st:\n{}", st); + } + + FormType form = getFormType(); + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess + .getInterpreterContextRunnerPool(); + + List<InterpreterContextRunner> runners = context.getRunners(); + if (runners != null && runners.size() != 0) { + // assume all runners in this InterpreterContext have the same note id + String noteId = runners.get(0).getNoteId(); + + interpreterContextRunnerPool.clear(noteId); + interpreterContextRunnerPool.addAll(noteId, runners); + } + + boolean broken = false; + try { + + final GUI currentGUI = context.getGui(); + RemoteInterpreterResult remoteResult = client.interpret( + sessionKey, className, st, convert(context)); + + Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( + remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { + }.getType()); + context.getConfig().clear(); + context.getConfig().putAll(remoteConfig); + + if (form == FormType.NATIVE) { + GUI remoteGui = GUI.fromJson(remoteResult.getGui()); + currentGUI.clear(); + currentGUI.setParams(remoteGui.getParams()); + currentGUI.setForms(remoteGui.getForms()); + } else if (form == FormType.SIMPLE) { + final Map<String, Input> currentForms = currentGUI.getForms(); + final Map<String, Object> currentParams = currentGUI.getParams(); + final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); + final Map<String, Input> remoteForms = remoteGUI.getForms(); + final Map<String, Object> remoteParams = remoteGUI.getParams(); + currentForms.putAll(remoteForms); + currentParams.putAll(remoteParams); + } + + InterpreterResult result = convert(remoteResult); + return result; + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public void cancel(InterpreterContext context) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + client.cancel(sessionKey, className, convert(context)); + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public FormType getFormType() { + open(); + + if (formType != null) { + return formType; + } + + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + formType = FormType.valueOf(client.getFormType(sessionKey, className)); + return formType; + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public int getProgress(InterpreterContext context) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + if (interpreterProcess == null || !interpreterProcess.isRunning()) { + return 0; + } + + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + return client.getProgress(sessionKey, className, convert(context)); + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + List completion = client.completion(sessionKey, className, buf, cursor, + convert(interpreterContext)); + return completion; + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public Scheduler getScheduler() { + int maxConcurrency = maxPoolSize; + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + if (interpreterProcess == null) { + return null; + } else { + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(), + sessionKey, interpreterProcess, maxConcurrency); + } + } + + private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { + return interpreterGroup.getId(); + } + + private RemoteInterpreterContext convert(InterpreterContext ic) { + return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), + ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()), + gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners())); + } + + private InterpreterResult convert(RemoteInterpreterResult result) { + InterpreterResult r = new InterpreterResult( + InterpreterResult.Code.valueOf(result.getCode())); + + for (RemoteInterpreterResultMessage m : result.getMsg()) { + r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); + } + + return r; + } + + /** + * Push local angular object registry to + * remote interpreter. This method should be + * call ONLY inside the init() method + */ + void pushAngularObjectRegistryToRemote(Client client) throws TException { + final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() + .getAngularObjectRegistry(); + + if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { + final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry + .getRegistry(); + + logger.info("Push local angular object registry from ZeppelinServer to" + + " remote interpreter group {}", this.getInterpreterGroup().getId()); + + final java.lang.reflect.Type registryType = new TypeToken<Map<String, + Map<String, AngularObject>>>() { + }.getType(); + + Gson gson = new Gson(); + client.angularRegistryPush(gson.toJson(registry, registryType)); + } + } + + public Map<String, String> getEnv() { + return env; + } + + public void setEnv(Map<String, String> env) { + this.env = env; + } + + public void addEnv(Map<String, String> env) { + if (this.env == null) { + this.env = new HashMap<>(); + } + this.env.putAll(env); + } + + //Only for test + public String getInterpreterRunner() { + return interpreterRunner; + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java new file mode 100644 index 0000000..1fb9b90 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.commons.exec.*; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + * This class manages start / stop of remote interpreter process + */ +public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess + implements ExecuteResultHandler { + private static final Logger logger = LoggerFactory.getLogger( + RemoteInterpreterManagedProcess.class); + private final String interpreterRunner; + + private DefaultExecutor executor; + private ExecuteWatchdog watchdog; + boolean running = false; + private int port = -1; + private final String interpreterDir; + private final String localRepoDir; + private final String interpreterGroupName; + + private Map<String, String> env; + + public RemoteInterpreterManagedProcess( + String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String interpreterGroupName) { + super(new RemoteInterpreterEventPoller(listener, appListener), + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + this.interpreterGroupName = interpreterGroupName; + } + + RemoteInterpreterManagedProcess(String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout, + String interpreterGroupName) { + super(remoteInterpreterEventPoller, + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + this.interpreterGroupName = interpreterGroupName; + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start(String userName, Boolean isUserImpersonate) { + // start server process + try { + port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + } catch (IOException e1) { + throw new InterpreterException(e1); + } + + CommandLine cmdLine = CommandLine.parse(interpreterRunner); + cmdLine.addArgument("-d", false); + cmdLine.addArgument(interpreterDir, false); + cmdLine.addArgument("-p", false); + cmdLine.addArgument(Integer.toString(port), false); + if (isUserImpersonate && !userName.equals("anonymous")) { + cmdLine.addArgument("-u", false); + cmdLine.addArgument(userName, false); + } + cmdLine.addArgument("-l", false); + cmdLine.addArgument(localRepoDir, false); + cmdLine.addArgument("-g", false); + cmdLine.addArgument(interpreterGroupName, false); + + executor = new DefaultExecutor(); + + ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); + ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); + processOutput.setOutputStream(cmdOut); + + executor.setStreamHandler(new PumpStreamHandler(processOutput)); + watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); + executor.setWatchdog(watchdog); + + try { + Map procEnv = EnvironmentUtils.getProcEnvironment(); + procEnv.putAll(env); + + logger.info("Run interpreter process {}", cmdLine); + executor.execute(cmdLine, procEnv, this); + running = true; + } catch (IOException e) { + running = false; + throw new InterpreterException(e); + } + + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < getConnectTimeout()) { + if (!running) { + try { + cmdOut.flush(); + } catch (IOException e) { + // nothing to do + } + throw new InterpreterException(new String(cmdOut.toByteArray())); + } + + try { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { + break; + } else { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + + "Thread.sleep", e); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Remote interpreter not yet accessible at localhost:" + port); + } + } + } + processOutput.setOutputStream(null); + } + + public void stop() { + if (isRunning()) { + logger.info("kill interpreter process"); + watchdog.destroyProcess(); + } + + executor = null; + watchdog = null; + running = false; + logger.info("Remote process terminated"); + } + + @Override + public void onProcessComplete(int exitValue) { + logger.info("Interpreter process exited {}", exitValue); + running = false; + + } + + @Override + public void onProcessFailed(ExecuteException e) { + logger.info("Interpreter process failed {}", e); + running = false; + } + + public boolean isRunning() { + return running; + } + + private static class ProcessLogOutputStream extends LogOutputStream { + + private Logger logger; + OutputStream out; + + public ProcessLogOutputStream(Logger logger) { + this.logger = logger; + } + + @Override + protected void processLine(String s, int i) { + this.logger.debug(s); + } + + @Override + public void write(byte [] b) throws IOException { + super.write(b); + + if (out != null) { + synchronized (this) { + if (out != null) { + out.write(b); + } + } + } + } + + @Override + public void write(byte [] b, int offset, int len) throws IOException { + super.write(b, offset, len); + + if (out != null) { + synchronized (this) { + if (out != null) { + out.write(b, offset, len); + } + } + } + } + + public void setOutputStream(OutputStream out) { + synchronized (this) { + this.out = out; + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java new file mode 100644 index 0000000..bb176be --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class connects to existing process + */ +public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { + private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; + private final int port; + + public RemoteInterpreterRunningProcess( + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String host, + int port + ) { + super(connectTimeout, listener, appListener); + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start(String userName, Boolean isUserImpersonate) { + // assume process is externally managed. nothing to do + } + + @Override + public void stop() { + // assume process is externally managed. nothing to do + } + + @Override + public boolean isRunning() { + return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java new file mode 100644 index 0000000..c8c64ea --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class AppendOutputRunnerTest { + + private static final int NUM_EVENTS = 10000; + private static final int NUM_CLUBBED_EVENTS = 100; + private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); + private static ScheduledFuture<?> future = null; + /* It is being accessed by multiple threads. + * While loop for 'loopForBufferCompletion' could + * run for-ever. + */ + private volatile static int numInvocations = 0; + + @After + public void afterEach() { + if (future != null) { + future.cancel(true); + } + } + + @Test + public void testSingleEvent() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String[][] buffer = {{"note", "para", "data\n"}}; + + loopForCompletingEvents(listener, 1, buffer); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n"); + } + + @Test + public void testMultipleEventsOfSameParagraph() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String para1 = "para1"; + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para1, "data2\n"}, + {note1, para1, "data3\n"} + }; + + loopForCompletingEvents(listener, 1, buffer); + verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n"); + } + + @Test + public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + String note1 = "note1"; + String note2 = "note2"; + String para1 = "para1"; + String para2 = "para2"; + String[][] buffer = { + {note1, para1, "data1\n"}, + {note1, para2, "data2\n"}, + {note2, para1, "data3\n"}, + {note2, para2, "data4\n"} + }; + loopForCompletingEvents(listener, 4, buffer); + + verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n"); + verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n"); + verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n"); + verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n"); + } + + @Test + public void testClubbedData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + Thread thread = new Thread(new BombardEvents(runner)); + thread.start(); + thread.join(); + Thread.sleep(1000); + + /* NUM_CLUBBED_EVENTS is a heuristic number. + * It has been observed that for 10,000 continuos event + * calls, 30-40 Web-socket calls are made. Keeping + * the unit-test to a pessimistic 100 web-socket calls. + */ + verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + } + + @Test + public void testWarnLoggerForLargeData() throws InterruptedException { + RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class); + AppendOutputRunner runner = new AppendOutputRunner(listener); + String data = "data\n"; + int numEvents = 100000; + + for (int i=0; i<numEvents; i++) { + runner.appendBuffer("noteId", "paraId", 0, data); + } + + TestAppender appender = new TestAppender(); + Logger logger = Logger.getRootLogger(); + logger.addAppender(appender); + Logger.getLogger(RemoteInterpreterEventPoller.class); + + runner.run(); + List<LoggingEvent> log; + + int warnLogCounter; + LoggingEvent sizeWarnLogEntry = null; + do { + warnLogCounter = 0; + log = appender.getLog(); + for (LoggingEvent logEntry: log) { + if (Level.WARN.equals(logEntry.getLevel())) { + sizeWarnLogEntry = logEntry; + warnLogCounter += 1; + } + } + } while(warnLogCounter != 2); + + String loggerString = "Processing size for buffered append-output is high: " + + (data.length() * numEvents) + " characters."; + assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage())); + } + + private class BombardEvents implements Runnable { + + private final AppendOutputRunner runner; + + private BombardEvents(AppendOutputRunner runner) { + this.runner = runner; + } + + @Override + public void run() { + String noteId = "noteId"; + String paraId = "paraId"; + for (int i=0; i<NUM_EVENTS; i++) { + runner.appendBuffer(noteId, paraId, 0, "data\n"); + } + } + } + + private class TestAppender extends AppenderSkeleton { + private final List<LoggingEvent> log = new ArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(final LoggingEvent loggingEvent) { + log.add(loggingEvent); + } + + @Override + public void close() { + } + + public List<LoggingEvent> getLog() { + return new ArrayList<>(log); + } + } + + private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) { + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + numInvocations += 1; + return null; + } + }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class)); + } + + private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, + int numTimes, String[][] buffer) { + numInvocations = 0; + prepareInvocationCounts(listener); + AppendOutputRunner runner = new AppendOutputRunner(listener); + for (String[] bufferElement: buffer) { + runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]); + } + future = service.scheduleWithFixedDelay(runner, 0, + AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + long startTimeMs = System.currentTimeMillis(); + while(numInvocations != numTimes) { + if (System.currentTimeMillis() - startTimeMs > 2000) { + fail("Buffered events were not sent for 2 seconds"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java new file mode 100644 index 0000000..f7404e3 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zeppelin.display.*; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RemoteAngularObjectTest implements AngularObjectRegistryListener { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + private RemoteInterpreter intp; + private InterpreterContext context; + private RemoteAngularObjectRegistry localRegistry; + + private AtomicInteger onAdd; + private AtomicInteger onUpdate; + private AtomicInteger onRemove; + + @Before + public void setUp() throws Exception { + onAdd = new AtomicInteger(0); + onUpdate = new AtomicInteger(0); + onRemove = new AtomicInteger(0); + + intpGroup = new InterpreterGroup("intpId"); + localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup); + intpGroup.setAngularObjectRegistry(localRegistry); + env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + Properties p = new Properties(); + + intp = new RemoteInterpreter( + p, + "note", + MockInterpreterAngular.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false + ); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intp); + intp.setInterpreterGroup(intpGroup); + + context = new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + intp.open(); + } + + @After + public void tearDown() throws Exception { + intp.close(); + intpGroup.close(); + } + + @Test + public void testAngularObjectInterpreterSideCRUD() throws InterruptedException { + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + assertEquals("0", result[1]); // num watcher called + + // create object + ret = intp.interpret("add n1 v1", context); + Thread.sleep(500); + result = ret.message().get(0).getData().split(" "); + assertEquals("1", result[0]); // size of registry + assertEquals("0", result[1]); // num watcher called + assertEquals("v1", localRegistry.get("n1", "note", null).get()); + + // update object + ret = intp.interpret("update n1 v11", context); + result = ret.message().get(0).getData().split(" "); + Thread.sleep(500); + assertEquals("1", result[0]); // size of registry + assertEquals("1", result[1]); // num watcher called + assertEquals("v11", localRegistry.get("n1", "note", null).get()); + + // remove object + ret = intp.interpret("remove n1", context); + result = ret.message().get(0).getData().split(" "); + Thread.sleep(500); + assertEquals("0", result[0]); // size of registry + assertEquals("1", result[1]); // num watcher called + assertEquals(null, localRegistry.get("n1", "note", null)); + } + + @Test + public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException { + // test if angularobject removal from server side propagate to interpreter process's registry. + // will happen when notebook is removed. + + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + + // create object + ret = intp.interpret("add n1 v1", context); + Thread.sleep(500); + result = ret.message().get(0).getData().split(" "); + assertEquals("1", result[0]); // size of registry + assertEquals("v1", localRegistry.get("n1", "note", null).get()); + + // remove object in local registry. + localRegistry.removeAndNotifyRemoteProcess("n1", "note", null); + ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + } + + @Test + public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException { + // test if angularobject add from server side propagate to interpreter process's registry. + // will happen when zeppelin server loads notebook and restore the object into registry + + InterpreterResult ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + String[] result = ret.message().get(0).getData().split(" "); + assertEquals("0", result[0]); // size of registry + + // create object + localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null); + + // get from remote registry + ret = intp.interpret("get", context); + Thread.sleep(500); // waitFor eventpoller pool event + result = ret.message().get(0).getData().split(" "); + assertEquals("1", result[0]); // size of registry + } + + @Override + public void onAdd(String interpreterGroupId, AngularObject object) { + onAdd.incrementAndGet(); + } + + @Override + public void onUpdate(String interpreterGroupId, AngularObject object) { + onUpdate.incrementAndGet(); + } + + @Override + public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { + onRemove.incrementAndGet(); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java new file mode 100644 index 0000000..49aa7aa --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.junit.Test; + +import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteInterpreterEventPollerTest { + + @Test + public void shouldClearUnreadEventsOnShutdown() throws Exception { + RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess(); + RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null); + + eventPoller.setInterpreterProcess(interpreterProc); + eventPoller.shutdown(); + eventPoller.start(); + eventPoller.join(); + + assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType()); + } + + private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception { + RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent(); + RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, ""); + RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class); + RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class); + + when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents); + when(intProc.getClient()).thenReturn(client); + + return intProc; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java new file mode 100644 index 0000000..3f865cb --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + + +/** + * Test for remote interpreter output stream + */ +public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + + @Before + public void setUp() throws Exception { + intpGroup = new InterpreterGroup(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + } + + @After + public void tearDown() throws Exception { + intpGroup.close(); + } + + private RemoteInterpreter createMockInterpreter() { + RemoteInterpreter intp = new RemoteInterpreter( + new Properties(), + "note", + MockInterpreterOutputStream.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + this, + null, + "anonymous", + false); + + intpGroup.get("note").add(intp); + intp.setInterpreterGroup(intpGroup); + return intp; + } + + private InterpreterContext createInterpreterContext() { + return new InterpreterContext( + "noteId", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + null, + new LinkedList<InterpreterContextRunner>(), null); + } + + @Test + public void testInterpreterResultOnly() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult", ret.message().get(0).getData()); + + ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("staticresult2", ret.message().get(0).getData()); + + ret = intp.interpret("ERROR::staticresult3", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("staticresult3", ret.message().get(0).getData()); + } + + @Test + public void testInterpreterOutputStreamOnly() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("streamresult", ret.message().get(0).getData()); + + ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext()); + assertEquals(InterpreterResult.Code.ERROR, ret.code()); + assertEquals("streamresult2", ret.message().get(0).getData()); + } + + @Test + public void testInterpreterResultOutputStreamMixed() { + RemoteInterpreter intp = createMockInterpreter(); + InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals("stream", ret.message().get(0).getData()); + assertEquals("static", ret.message().get(1).getData()); + } + + @Test + public void testOutputType() { + RemoteInterpreter intp = createMockInterpreter(); + + InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); + assertEquals("hello", ret.message().get(0).getData()); + + ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); + assertEquals("hello", ret.message().get(0).getData()); + + ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext()); + assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType()); + assertEquals("hello", ret.message().get(0).getData()); + assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType()); + assertEquals("world", ret.message().get(1).getData()); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, int index, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { + + } + + @Override + public void onOutputClear(String noteId, String paragraphId) { + + } + + @Override + public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { + + } + + @Override + public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { + if (callback != null) { + callback.onFinished(new LinkedList<>()); + } + } + + @Override + public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { + + } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java new file mode 100644 index 0000000..b85d7ef --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.Properties; + +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.interpreter.Constants; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.junit.Test; + +public class RemoteInterpreterProcessTest { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + private static final int DUMMY_PORT=3678; + + @Test + public void testStartStop() { + InterpreterGroup intpGroup = new InterpreterGroup(); + RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( + INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(), + 10 * 1000, null, null,"fakeName"); + assertFalse(rip.isRunning()); + assertEquals(0, rip.referenceCount()); + assertEquals(1, rip.reference(intpGroup, "anonymous", false)); + assertEquals(2, rip.reference(intpGroup, "anonymous", false)); + assertEquals(true, rip.isRunning()); + assertEquals(1, rip.dereference()); + assertEquals(true, rip.isRunning()); + assertEquals(0, rip.dereference()); + assertEquals(false, rip.isRunning()); + } + + @Test + public void testClientFactory() throws Exception { + InterpreterGroup intpGroup = new InterpreterGroup(); + RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( + INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(), + mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName"); + rip.reference(intpGroup, "anonymous", false); + assertEquals(0, rip.getNumActiveClient()); + assertEquals(0, rip.getNumIdleClient()); + + Client client = rip.getClient(); + assertEquals(1, rip.getNumActiveClient()); + assertEquals(0, rip.getNumIdleClient()); + + rip.releaseClient(client); + assertEquals(0, rip.getNumActiveClient()); + assertEquals(1, rip.getNumIdleClient()); + + rip.dereference(); + } + + @Test + public void testStartStopRemoteInterpreter() throws TException, InterruptedException { + RemoteInterpreterServer server = new RemoteInterpreterServer(3678); + server.start(); + boolean running = false; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 10 * 1000) { + if (server.isRunning()) { + running = true; + break; + } else { + Thread.sleep(200); + } + } + Properties properties = new Properties(); + properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678"); + properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost"); + InterpreterGroup intpGroup = mock(InterpreterGroup.class); + when(intpGroup.getProperty()).thenReturn(properties); + when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true); + + RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess( + INTERPRETER_SCRIPT, + "nonexists", + "fakeRepo", + new HashMap<String, String>(), + mock(RemoteInterpreterEventPoller.class) + , 10 * 1000, + "fakeName"); + assertFalse(rip.isRunning()); + assertEquals(0, rip.referenceCount()); + assertEquals(1, rip.reference(intpGroup, "anonymous", false)); + assertEquals(true, rip.isRunning()); + } + + + @Test + public void testPropagateError() throws TException, InterruptedException { + InterpreterGroup intpGroup = new InterpreterGroup(); + RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess( + "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(), + 10 * 1000, null, null, "fakeName"); + assertFalse(rip.isRunning()); + assertEquals(0, rip.referenceCount()); + try { + assertEquals(1, rip.reference(intpGroup, "anonymous", false)); + } catch (InterpreterException e) { + e.getMessage().contains("hello_world"); + } + assertEquals(0, rip.referenceCount()); + } +}