Repository: zeppelin Updated Branches: refs/heads/master ae35ec337 -> ed5349729
ZEPPELIN-1861 Support custom interpreter.sh script to run interpreter ### What is this PR for? Supporting custom interpreter.sh for an individual interpreter. Some interpreters doesn't need current complicated interpreter.sh and want to use custom RemoteInterpreterServer class to launch their interpreters. ### What type of PR is it? [Feature] ### Todos * [x] - Add new way to be able to run custom interpreter.sh ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1861 ### How should this be tested? Users can add ``` "runner": { "linux": "interpreter.sh", "win": "interpreter.cmd" } ``` in their interpreter-setting.json. This path is relative from `${INTERPRETER_DIR}/${INTERPRETER_NAME}` ### Screenshots (if appropriate) N/A ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jongyoul Lee <jongy...@gmail.com> Closes #1807 from jongyoul/ZEPPELIN-1861 and squashes the following commits: b692b3d [Jongyoul Lee] Fixed format 20000c5 [Jongyoul Lee] Fixed to support absolute path for interpreter runner 4d001e2 [Jongyoul Lee] Added test case for the basic workflow of InterpreterRunner dc8f74b [Jongyoul Lee] Added InterpreterRunner class for supporting custom runner path Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ed534972 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ed534972 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ed534972 Branch: refs/heads/master Commit: ed534972989b1ad0f6c07d25743c4c3d42ea237b Parents: ae35ec3 Author: Jongyoul Lee <jongy...@gmail.com> Authored: Tue Jan 3 18:49:22 2017 +0900 Committer: Jongyoul Lee <jongy...@apache.org> Committed: Thu Jan 5 12:02:25 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/interpreter/Interpreter.java | 38 ++++---- .../zeppelin/interpreter/InterpreterRunner.java | 18 ++++ .../interpreter/remote/RemoteInterpreter.java | 93 +++++++------------- .../interpreter/InterpreterFactory.java | 88 +++++++++--------- .../interpreter/InterpreterSetting.java | 39 +++++--- .../interpreter/InterpreterFactoryTest.java | 70 +++++++++++++-- 6 files changed, 206 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ed534972/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index 4045fc9..6a9cc2c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import com.google.gson.annotations.SerializedName; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.annotation.Experimental; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -44,7 +43,6 @@ import org.slf4j.LoggerFactory; * open(), close(), interpret() is three the most important method you need to implement. * cancel(), getProgress(), completion() is good to have * getFormType(), getScheduler() determine Zeppelin's behavior - * */ public abstract class Interpreter { @@ -66,16 +64,12 @@ public abstract class Interpreter { * Run code and return result, in synchronous way. * * @param st statements to run - * @param context - * @return */ @ZeppelinApi public abstract InterpreterResult interpret(String st, InterpreterContext context); /** * Optionally implement the canceling routine to abort interpret() method - * - * @param context */ @ZeppelinApi public abstract void cancel(InterpreterContext context); @@ -85,7 +79,7 @@ public abstract class Interpreter { * see http://zeppelin.apache.org/docs/dynamicform.html * * @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}), - * FormType.NATIVE handles form in API + * FormType.NATIVE handles form in API */ @ZeppelinApi public abstract FormType getFormType(); @@ -93,7 +87,6 @@ public abstract class Interpreter { /** * get interpret() method running process in percentage. * - * @param context * @return number between 0-100 */ @ZeppelinApi @@ -121,10 +114,8 @@ public abstract class Interpreter { * SchedulerFactory.singleton().createOrGetFIFOScheduler() * SchedulerFactory.singleton().createOrGetParallelScheduler() * - * - * @return return scheduler instance. - * This method can be called multiple times and have to return the same instance. - * Can not return null. + * @return return scheduler instance. This method can be called multiple times and have to return + * the same instance. Can not return null. */ @ZeppelinApi public Scheduler getScheduler() { @@ -133,7 +124,7 @@ public abstract class Interpreter { public static Logger logger = LoggerFactory.getLogger(Interpreter.class); private InterpreterGroup interpreterGroup; - private URL [] classloaderUrls; + private URL[] classloaderUrls; protected Properties property; private String userName; @@ -208,6 +199,7 @@ public abstract class Interpreter { /** * General function to register hook event + * * @param noteId - Note to bind hook to * @param event The type of event to hook to (pre_exec, post_exec) * @param cmd The code to be executed by the interpreter on given event @@ -221,6 +213,7 @@ public abstract class Interpreter { /** * registerHook() wrapper for global scope + * * @param event The type of event to hook to (pre_exec, post_exec) * @param cmd The code to be executed by the interpreter on given event */ @@ -231,6 +224,7 @@ public abstract class Interpreter { /** * Get the hook code + * * @param noteId - Note to bind hook to * @param event The type of event to hook to (pre_exec, post_exec) */ @@ -243,6 +237,7 @@ public abstract class Interpreter { /** * getHook() wrapper for global scope + * * @param event The type of event to hook to (pre_exec, post_exec) */ @Experimental @@ -252,6 +247,7 @@ public abstract class Interpreter { /** * Unbind code from given hook event + * * @param noteId - Note to bind hook to * @param event The type of event to hook to (pre_exec, post_exec) */ @@ -264,13 +260,14 @@ public abstract class Interpreter { /** * unregisterHook() wrapper for global scope + * * @param event The type of event to hook to (pre_exec, post_exec) */ @Experimental public void unregisterHook(String event) { unregisterHook(null, event); } - + @ZeppelinApi public Interpreter getInterpreterInTheSameSessionByClassName(String className) { synchronized (interpreterGroup) { @@ -311,17 +308,16 @@ public abstract class Interpreter { * Represent registered interpreter class */ public static class RegisteredInterpreter { - //@SerializedName("interpreterGroup") + private String group; - //@SerializedName("interpreterName") private String name; - //@SerializedName("interpreterClassName") private String className; private boolean defaultInterpreter; private Map<String, InterpreterProperty> properties; private Map<String, Object> editor; private String path; private InterpreterOption option; + private InterpreterRunner runner; public RegisteredInterpreter(String name, String group, String className, Map<String, InterpreterProperty> properties) { @@ -382,6 +378,10 @@ public abstract class Interpreter { public InterpreterOption getOption() { return option; } + + public InterpreterRunner getRunner() { + return runner; + } } /** @@ -417,8 +417,8 @@ public abstract class Interpreter { public static void register(String name, String group, String className, boolean defaultInterpreter, Map<String, InterpreterProperty> properties) { logger.warn("Static initialization is deprecated for interpreter {}, You should change it " + - "to use interpreter-setting.json in your jar or " + - "interpreter/{interpreter}/interpreter-setting.json", name); + "to use interpreter-setting.json in your jar or " + + "interpreter/{interpreter}/interpreter-setting.json", name); register(new RegisteredInterpreter(name, group, className, defaultInterpreter, properties)); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ed534972/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java new file mode 100644 index 0000000..020564b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java @@ -0,0 +1,18 @@ +package org.apache.zeppelin.interpreter; + +import com.google.gson.annotations.SerializedName; + +/** + * Interpreter runner path + */ +public class InterpreterRunner { + + @SerializedName("linux") + private String linuxPath; + @SerializedName("win") + private String winPath; + + public String getPath() { + return System.getProperty("os.name").startsWith("Windows") ? winPath : linuxPath; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ed534972/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 50ff689..3fa5df7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -44,17 +44,18 @@ import com.google.gson.reflect.TypeToken; * Proxy for Interpreter instance that runs on separate process */ public class RemoteInterpreter extends Interpreter { + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; private final ApplicationEventListener applicationEventListener; - Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); - Gson gson = new Gson(); + private Gson gson = new Gson(); private String interpreterRunner; private String interpreterPath; private String localRepoPath; private String className; private String sessionKey; - FormType formType; - boolean initialized; + private FormType formType; + private boolean initialized; private Map<String, String> env; private int connectTimeout; private int maxPoolSize; @@ -66,18 +67,10 @@ public class RemoteInterpreter extends Interpreter { /** * Remote interpreter and manage interpreter process */ - public RemoteInterpreter(Properties property, - String sessionKey, - String className, - String interpreterRunner, - String interpreterPath, - String localRepoPath, - int connectTimeout, - int maxPoolSize, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, - String userName, - Boolean isUserImpersonate) { + public RemoteInterpreter(Properties property, String sessionKey, String className, + String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, + int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { super(property); this.sessionKey = sessionKey; this.className = className; @@ -98,18 +91,10 @@ public class RemoteInterpreter extends Interpreter { /** * Connect to existing process */ - public RemoteInterpreter( - Properties property, - String sessionKey, - String className, - String host, - int port, - int connectTimeout, - int maxPoolSize, + public RemoteInterpreter(Properties property, String sessionKey, String className, String host, + int port, int connectTimeout, int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, - String userName, - Boolean isUserImpersonate) { + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { super(property); this.sessionKey = sessionKey; this.className = className; @@ -126,19 +111,11 @@ public class RemoteInterpreter extends Interpreter { // VisibleForTesting - public RemoteInterpreter( - Properties property, - String sessionKey, - String className, - String interpreterRunner, - String interpreterPath, - String localRepoPath, - Map<String, String> env, - int connectTimeout, + public RemoteInterpreter(Properties property, String sessionKey, String className, + String interpreterRunner, String interpreterPath, String localRepoPath, + Map<String, String> env, int connectTimeout, RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, - String userName, - Boolean isUserImpersonate) { + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { super(property); this.className = className; this.sessionKey = sessionKey; @@ -240,7 +217,7 @@ public class RemoteInterpreter extends Interpreter { property.put("zeppelin.interpreter.localRepo", localRepoPath); } client.createInterpreter(groupId, sessionKey, - getClassName(), (Map) property, userName); + getClassName(), (Map) property, userName); // Push angular object loaded from JSON file to remote interpreter if (!interpreterGroup.isAngularRegistryPushed()) { pushAngularObjectRegistryToRemote(client); @@ -259,7 +236,6 @@ public class RemoteInterpreter extends Interpreter { } - @Override public void open() { InterpreterGroup interpreterGroup = getInterpreterGroup(); @@ -347,7 +323,6 @@ public class RemoteInterpreter extends Interpreter { context.getConfig().clear(); context.getConfig().putAll(remoteConfig); - if (form == FormType.NATIVE) { GUI remoteGui = gson.fromJson(remoteResult.getGui(), GUI.class); currentGUI.clear(); @@ -394,7 +369,6 @@ public class RemoteInterpreter extends Interpreter { } } - @Override public FormType getFormType() { init(); @@ -480,9 +454,7 @@ public class RemoteInterpreter extends Interpreter { } else { return SchedulerFactory.singleton().createOrGetRemoteScheduler( RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(), - sessionKey, - interpreterProcess, - maxConcurrency); + sessionKey, interpreterProcess, maxConcurrency); } } @@ -491,16 +463,9 @@ public class RemoteInterpreter extends Interpreter { } private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext( - ic.getNoteId(), - ic.getParagraphId(), - ic.getReplName(), - ic.getParagraphTitle(), - ic.getParagraphText(), - gson.toJson(ic.getAuthenticationInfo()), - gson.toJson(ic.getConfig()), - gson.toJson(ic.getGui()), - gson.toJson(ic.getRunners())); + return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), + ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()), + gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners())); } private InterpreterResult convert(RemoteInterpreterResult result) { @@ -518,22 +483,21 @@ public class RemoteInterpreter extends Interpreter { * Push local angular object registry to * remote interpreter. This method should be * call ONLY inside the init() method - * @param client - * @throws TException */ void pushAngularObjectRegistryToRemote(Client client) throws TException { final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() - .getAngularObjectRegistry(); + .getAngularObjectRegistry(); if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry - .getRegistry(); + .getRegistry(); logger.info("Push local angular object registry from ZeppelinServer to" + - " remote interpreter group {}", this.getInterpreterGroup().getId()); + " remote interpreter group {}", this.getInterpreterGroup().getId()); final java.lang.reflect.Type registryType = new TypeToken<Map<String, - Map<String, AngularObject>>>() {}.getType(); + Map<String, AngularObject>>>() { + }.getType(); Gson gson = new Gson(); client.angularRegistryPush(gson.toJson(registry, registryType)); @@ -554,4 +518,9 @@ public class RemoteInterpreter extends Interpreter { } this.env.putAll(env); } + + //Only for test + public String getInterpreterRunner() { + return interpreterRunner; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ed534972/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index c620b35..f13a106 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter; +import com.google.common.base.Joiner; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -85,15 +86,15 @@ import org.apache.zeppelin.scheduler.Job.Status; * Manage interpreters. */ public class InterpreterFactory implements InterpreterGroupFactory { - private static Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); - + private static final Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); private static final String SHARED_SESSION = "shared_session"; private Map<String, URLClassLoader> cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); private ZeppelinConfiguration conf; - @Deprecated private String[] interpreterClassList; + @Deprecated + private String[] interpreterClassList; private String[] interpreterGroupOrderList; /** @@ -199,7 +200,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { * - Support ThreadInterpreter */ URLClassLoader ccl = new URLClassLoader( - recursiveBuildLibList(interpreterDir.toFile()), cl); + recursiveBuildLibList(interpreterDir.toFile()), cl); for (String className : interpreterClassList) { try { // Load classes @@ -237,7 +238,8 @@ public class InterpreterFactory implements InterpreterGroupFactory { interpreterInfo = new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(), r.getEditor()); - add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath()); + add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(), + r.getRunner()); } for (String settingId : interpreterSettingsRef.keySet()) { @@ -269,7 +271,6 @@ public class InterpreterFactory implements InterpreterGroupFactory { saveToFile(); } - for (String settingId : interpreterSettings.keySet()) { InterpreterSetting setting = interpreterSettings.get(settingId); logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId, @@ -294,7 +295,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption()); InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(), - infos, props, deps, option, o.getPath()); + infos, props, deps, option, o.getPath(), o.getInterpreterRunner()); setting.setInterpreterGroupFactory(this); return setting; } @@ -360,7 +361,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption : registeredInterpreter.getOption(); add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(), - option, absolutePath); + option, absolutePath, registeredInterpreter.getRunner()); } } @@ -409,7 +410,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { interpreterSettingObject = interpreterSettingsRef.get(setting.getGroup()); if (interpreterSettingObject == null) { logger.warn("can't get InterpreterSetting " + - "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup()); + "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup()); continue; } depClassPath = interpreterSettingObject.getPath(); @@ -546,7 +547,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { } } - private void saveToFile() throws IOException { + void saveToFile() throws IOException { String jsonString; synchronized (interpreterSettings) { @@ -604,8 +605,9 @@ public class InterpreterFactory implements InterpreterGroupFactory { private boolean findDefaultInterpreter(List<InterpreterInfo> infos) { for (InterpreterInfo interpreterInfo : infos) { - if (interpreterInfo.isDefaultInterpreter()) + if (interpreterInfo.isDefaultInterpreter()) { return true; + } } return false; } @@ -629,20 +631,22 @@ public class InterpreterFactory implements InterpreterGroupFactory { } private InterpreterSetting add(String group, InterpreterInfo interpreterInfo, - Map<String, InterpreterProperty> interpreterProperties, InterpreterOption option, String path) + Map<String, InterpreterProperty> interpreterProperties, InterpreterOption option, String path, + InterpreterRunner runner) throws InterpreterException, IOException, RepositoryException { ArrayList<InterpreterInfo> infos = new ArrayList<>(); infos.add(interpreterInfo); - return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path); + return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path, + runner); } /** * @param group InterpreterSetting reference name - * @return */ public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos, List<Dependency> dependencies, InterpreterOption option, - Map<String, InterpreterProperty> interpreterProperties, String path) { + Map<String, InterpreterProperty> interpreterProperties, String path, + InterpreterRunner runner) { Preconditions.checkNotNull(group, "name should not be null"); Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null"); Preconditions.checkNotNull(dependencies, "dependencies should not be null"); @@ -689,12 +693,11 @@ public class InterpreterFactory implements InterpreterGroupFactory { } else { interpreterSetting = new InterpreterSetting(group, null, interpreterInfos, interpreterProperties, - dependencies, option, path); + dependencies, option, path, runner); interpreterSettingsRef.put(group, interpreterSetting); } } - if (dependencies.size() > 0) { loadInterpreterDependencies(interpreterSetting); } @@ -704,21 +707,17 @@ public class InterpreterFactory implements InterpreterGroupFactory { } /** - * * @param id interpreterGroup id. Combination of interpreterSettingId + noteId/userId/shared - * depends on interpreter mode - * @param option - * @return - * @throws InterpreterException - * @throws NullArgumentException + * depends on interpreter mode */ @Override public InterpreterGroup createInterpreterGroup(String id, InterpreterOption option) throws InterpreterException, NullArgumentException { //When called from REST API without option we receive NPE - if (option == null) + if (option == null) { throw new NullArgumentException("option"); + } AngularObjectRegistry angularObjectRegistry; @@ -788,6 +787,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos(); String path = interpreterSetting.getPath(); + InterpreterRunner runner = interpreterSetting.getInterpreterRunner(); Interpreter interpreter; for (InterpreterInfo info : interpreterInfos) { if (option.isRemote()) { @@ -797,7 +797,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { properties, user, option.isUserImpersonate); } else { interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(), - properties, interpreterSetting.getId(), user, option.isUserImpersonate()); + properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner); } } else { interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties); @@ -847,8 +847,6 @@ public class InterpreterFactory implements InterpreterGroupFactory { /** * Get interpreter settings - * - * @return */ public List<InterpreterSetting> get() { synchronized (interpreterSettings) { @@ -947,11 +945,6 @@ public class InterpreterFactory implements InterpreterGroupFactory { /** * Change interpreter property and restart - * - * @param id - * @param option - * @param properties - * @throws IOException */ public void setPropertyAndRestart(String id, InterpreterOption option, Properties properties, List<Dependency> dependencies) throws IOException { @@ -1069,7 +1062,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { URLClassLoader ccl = cleanCl.get(dirName); if (ccl == null) { // classloader fallback - ccl = URLClassLoader.newInstance(new URL[] {}, oldcl); + ccl = URLClassLoader.newInstance(new URL[]{}, oldcl); } boolean separateCL = true; @@ -1085,7 +1078,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { URLClassLoader cl; if (separateCL == true) { - cl = URLClassLoader.newInstance(new URL[] {}, ccl); + cl = URLClassLoader.newInstance(new URL[]{}, ccl); } else { cl = ccl; } @@ -1093,7 +1086,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className); Constructor<Interpreter> constructor = - replClass.getConstructor(new Class[] {Properties.class}); + replClass.getConstructor(new Class[]{Properties.class}); Interpreter repl = constructor.newInstance(property); repl.setClassloaderUrls(ccl.getURLs()); LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)); @@ -1128,17 +1121,27 @@ public class InterpreterFactory implements InterpreterGroupFactory { return intp; } - private Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey, + Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey, String className, Properties property, String interpreterSettingId, - String userName, Boolean isUserImpersonate) { + String userName, Boolean isUserImpersonate, InterpreterRunner interpreterRunner) { int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId; int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); + String interpreterRunnerPath; + if (null != interpreterRunner) { + interpreterRunnerPath = interpreterRunner.getPath(); + Path p = Paths.get(interpreterRunnerPath); + if (!p.isAbsolute()) { + interpreterRunnerPath = Joiner.on(File.separator) + .join(interpreterPath, interpreterRunnerPath); + } + } else { + interpreterRunnerPath = conf.getInterpreterRemoteRunnerPath(); + } RemoteInterpreter remoteInterpreter = new RemoteInterpreter(property, interpreterSessionKey, className, - conf.getInterpreterRemoteRunnerPath(), - interpreterPath, localRepoPath, connectTimeout, maxPoolSize, + interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate); remoteInterpreter.addEnv(env); @@ -1149,8 +1152,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { * map interpreter ids into noteId * * @param noteId note id - * @param ids InterpreterSetting id list - * @throws IOException + * @param ids InterpreterSetting id list */ public void setInterpreters(String user, String noteId, List<String> ids) throws IOException { putNoteInterpreterSettingBinding(user, noteId, ids); @@ -1207,7 +1209,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { } logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + - "{}", key, noteId, user, setting.getName()); + "{}", key, noteId, user, setting.getName()); return key; } @@ -1374,7 +1376,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { } return urls; } else { - return new URL[] {path.toURI().toURL()}; + return new URL[]{path.toURI().toURL()}; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ed534972/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 1f3ddc5..2ebc6ef 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -38,6 +38,7 @@ import static org.apache.zeppelin.notebook.utility.IdHashes.generateId; * Interpreter settings */ public class InterpreterSetting { + private static final Logger logger = LoggerFactory.getLogger(InterpreterSetting.class); private static final String SHARED_PROCESS = "shared_process"; private String id; @@ -49,24 +50,29 @@ public class InterpreterSetting { /** * properties can be either Properties or Map<String, InterpreterProperty> * properties should be: - * - Properties when Interpreter instances are saved to `conf/interpreter.json` file - * - Map<String, InterpreterProperty> when Interpreters are registered - * : this is needed after https://github.com/apache/zeppelin/pull/1145 - * which changed the way of getting default interpreter setting AKA interpreterSettingsRef + * - Properties when Interpreter instances are saved to `conf/interpreter.json` file + * - Map<String, InterpreterProperty> when Interpreters are registered + * : this is needed after https://github.com/apache/zeppelin/pull/1145 + * which changed the way of getting default interpreter setting AKA interpreterSettingsRef * Note(mina): In order to simplify the implementation, I chose to change properties - * from Properties to Object instead of creating new classes. + * from Properties to Object instead of creating new classes. */ private Object properties; private Status status; private String errorReason; - @SerializedName("interpreterGroup") private List<InterpreterInfo> interpreterInfos; + @SerializedName("interpreterGroup") + private List<InterpreterInfo> interpreterInfos; private final transient Map<String, InterpreterGroup> interpreterGroupRef = new HashMap<>(); private List<Dependency> dependencies; private InterpreterOption option; private transient String path; - @Deprecated private transient InterpreterGroupFactory interpreterGroupFactory; + @SerializedName("runner") + private InterpreterRunner interpreterRunner; + + @Deprecated + private transient InterpreterGroupFactory interpreterGroupFactory; private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock; private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock; @@ -79,7 +85,7 @@ public class InterpreterSetting { public InterpreterSetting(String id, String name, String group, List<InterpreterInfo> interpreterInfos, Object properties, List<Dependency> dependencies, - InterpreterOption option, String path) { + InterpreterOption option, String path, InterpreterRunner runner) { this(); this.id = id; this.name = name; @@ -90,11 +96,14 @@ public class InterpreterSetting { this.option = option; this.path = path; this.status = Status.READY; + this.interpreterRunner = runner; } public InterpreterSetting(String name, String group, List<InterpreterInfo> interpreterInfos, - Object properties, List<Dependency> dependencies, InterpreterOption option, String path) { - this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path); + Object properties, List<Dependency> dependencies, InterpreterOption option, String path, + InterpreterRunner runner) { + this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path, + runner); } /** @@ -104,7 +113,7 @@ public class InterpreterSetting { */ public InterpreterSetting(InterpreterSetting o) { this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(), - o.getDependencies(), o.getOption(), o.getPath()); + o.getDependencies(), o.getOption(), o.getPath(), o.getInterpreterRunner()); } public String getId() { @@ -287,4 +296,12 @@ public class InterpreterSetting { public Map<String, String> getInfos() { return infos; } + + public InterpreterRunner getInterpreterRunner() { + return interpreterRunner; + } + + public void setInterpreterRunner(InterpreterRunner interpreterRunner) { + this.interpreterRunner = interpreterRunner; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ed534972/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index d9b32c8..661459b 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.interpreter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.*; import java.nio.file.Files; import java.nio.file.Paths; @@ -54,7 +56,13 @@ import org.quartz.SchedulerException; import org.sonatype.aether.RepositoryException; import static org.junit.Assert.*; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + import org.mockito.Mock; public class InterpreterFactoryTest { @@ -171,12 +179,12 @@ public class InterpreterFactoryTest { List<String> all = factory.getDefaultInterpreterSettingList(); // add setting with null option & properties expected nullArgumentException.class try { - factory.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, ""); + factory.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null); } catch(NullArgumentException e) { assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage()); } try { - factory.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, ""); + factory.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null); } catch (NullArgumentException e){ assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage()); } @@ -236,10 +244,10 @@ public class InterpreterFactoryTest { final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null); factory.add("group1", new ArrayList<InterpreterInfo>() {{ add(info1); - }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1"); + }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null); factory.add("group2", new ArrayList<InterpreterInfo>(){{ add(info2); - }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2"); + }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null); final InterpreterSetting setting1 = factory.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new Properties()); final InterpreterSetting setting2 = factory.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new Properties()); @@ -259,7 +267,7 @@ public class InterpreterFactoryTest { final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null); factory.add("group1", new ArrayList<InterpreterInfo>(){{ add(info1); - }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1"); + }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null); InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED); final InterpreterSetting setting1 = factory.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new Properties()); @@ -310,4 +318,56 @@ public class InterpreterFactoryTest { editor = factory.getEditorSetting("user1", note.getId(), "mock2"); assertEquals("text", editor.get("language")); } + + @Test + public void registerCustomInterpreterRunner() throws IOException { + InterpreterFactory spyFactory = spy(factory); + + doNothing().when(spyFactory).saveToFile(); + + ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>(); + interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap())); + + spyFactory.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, InterpreterProperty>newHashMap(), "/normalGroup1", null); + + spyFactory.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new Properties()); + + ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); + interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap())); + + InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class); + + when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh"); + + spyFactory.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, InterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner); + + spyFactory.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new Properties()); + + spyFactory.setInterpreters("anonymous", "noteCustome", spyFactory.getDefaultInterpreterSettingList()); + + spyFactory.getInterpreter("anonymous", "noteCustome", "customGroup1"); + + verify(mockInterpreterRunner, times(1)).getPath(); + } + + @Test + public void interpreterRunnerTest() { + InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class); + String testInterpreterRunner = "relativePath.sh"; + when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux + Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), "settingId", "userName", false, mockInterpreterRunner); + String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); + assertNotEquals(interpreterRunner, testInterpreterRunner); + + testInterpreterRunner = "/AbsolutePath.sh"; + when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); + i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), "settingId", "userName", false, mockInterpreterRunner); + interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); + assertEquals(interpreterRunner, testInterpreterRunner); + } + + @Test + public void interpreterRunnerAsAbsolutePathTest() { + + } }