http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java deleted file mode 100644 index ffcb8d5..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ /dev/null @@ -1,914 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.Job.Status; -import org.apache.zeppelin.scheduler.Scheduler; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -public class RemoteInterpreterTest { - - - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - - private InterpreterGroup intpGroup; - private HashMap<String, String> env; - - @Before - public void setUp() throws Exception { - intpGroup = new InterpreterGroup(); - env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - } - - @After - public void tearDown() throws Exception { - intpGroup.close(); - } - - private RemoteInterpreter createMockInterpreterA(Properties p) { - return createMockInterpreterA(p, "note"); - } - - private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) { - return new RemoteInterpreter( - p, - noteId, - MockInterpreterA.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false); - } - - private RemoteInterpreter createMockInterpreterB(Properties p) { - return createMockInterpreterB(p, "note"); - } - - private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) { - return new RemoteInterpreter( - p, - noteId, - MockInterpreterB.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false); - } - - @Test - public void testRemoteInterperterCall() throws TTransportException, IOException { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpB = createMockInterpreterB(p); - - intpGroup.get("note").add(intpB); - intpB.setInterpreterGroup(intpGroup); - - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - process.equals(intpB.getInterpreterProcess()); - - assertFalse(process.isRunning()); - assertEquals(0, process.getNumIdleClient()); - assertEquals(0, process.referenceCount()); - - intpA.open(); // initializa all interpreters in the same group - assertTrue(process.isRunning()); - assertEquals(1, process.getNumIdleClient()); - assertEquals(1, process.referenceCount()); - - intpA.interpret("1", - new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - - intpB.open(); - assertEquals(1, process.referenceCount()); - - intpA.close(); - assertEquals(0, process.referenceCount()); - intpB.close(); - assertEquals(0, process.referenceCount()); - - assertFalse(process.isRunning()); - - } - - @Test - public void testExecuteIncorrectPrecode() throws TTransportException, IOException { - Properties p = new Properties(); - p.put("zeppelin.MockInterpreterA.precode", "fail test"); - intpGroup.put("note", new LinkedList<Interpreter>()); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - - intpA.open(); - - InterpreterResult result = intpA.interpret("1", - new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - - - - intpA.close(); - assertEquals(Code.ERROR, result.code()); - } - - @Test - public void testExecuteCorrectPrecode() throws TTransportException, IOException { - Properties p = new Properties(); - p.put("zeppelin.MockInterpreterA.precode", "2"); - intpGroup.put("note", new LinkedList<Interpreter>()); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - - intpA.open(); - - InterpreterResult result = intpA.interpret("1", - new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - - - - intpA.close(); - assertEquals(Code.SUCCESS, result.code()); - assertEquals("1", result.message().get(0).getData()); - } - - @Test - public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { - Properties p = new Properties(); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.put("note", new LinkedList<Interpreter>()); - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - InterpreterResult ret = intpA.interpret("non numeric value", - new InterpreterContext( - "noteId", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - - assertEquals(Code.ERROR, ret.code()); - } - - @Test - public void testRemoteSchedulerSharing() throws TTransportException, IOException { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - RemoteInterpreter intpA = new RemoteInterpreter( - p, - "note", - MockInterpreterA.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false); - - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpB = new RemoteInterpreter( - p, - "note", - MockInterpreterB.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false); - - intpGroup.get("note").add(intpB); - intpB.setInterpreterGroup(intpGroup); - - intpA.open(); - intpB.open(); - - long start = System.currentTimeMillis(); - InterpreterResult ret = intpA.interpret("500", - new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - assertEquals("500", ret.message().get(0).getData()); - - ret = intpB.interpret("500", - new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - assertEquals("1000", ret.message().get(0).getData()); - long end = System.currentTimeMillis(); - assertTrue(end - start >= 1000); - - - intpA.close(); - intpB.close(); - } - - @Test - public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - final RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - final RemoteInterpreter intpB = createMockInterpreterB(p); - - intpGroup.get("note").add(intpB); - intpB.setInterpreterGroup(intpGroup); - - intpA.open(); - intpB.open(); - - long start = System.currentTimeMillis(); - Job jobA = new Job("jobA", null) { - private Object r; - - @Override - public Object getReturn() { - return r; - } - - @Override - public void setResult(Object results) { - this.r = results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - return intpA.interpret("500", - new InterpreterContext( - "note", - "jobA", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - } - - @Override - protected boolean jobAbort() { - return false; - } - - }; - intpA.getScheduler().submit(jobA); - - Job jobB = new Job("jobB", null) { - - private Object r; - - @Override - public Object getReturn() { - return r; - } - - @Override - public void setResult(Object results) { - this.r = results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - return intpB.interpret("500", - new InterpreterContext( - "note", - "jobB", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - } - - @Override - protected boolean jobAbort() { - return false; - } - - }; - intpB.getScheduler().submit(jobB); - // wait until both job finished - while (jobA.getStatus() != Status.FINISHED || - jobB.getStatus() != Status.FINISHED) { - Thread.sleep(100); - } - long end = System.currentTimeMillis(); - assertTrue(end - start >= 1000); - - assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData()); - - intpA.close(); - intpB.close(); - } - - @Test - public void testRunOrderPreserved() throws InterruptedException { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - final RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - int concurrency = 3; - final List<InterpreterResultMessage> results = new LinkedList<>(); - - Scheduler scheduler = intpA.getScheduler(); - for (int i = 0; i < concurrency; i++) { - final String jobId = Integer.toString(i); - scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { - private Object r; - - @Override - public Object getReturn() { - return r; - } - - @Override - public void setResult(Object results) { - this.r = results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( - "note", - jobId, - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - - synchronized (results) { - results.addAll(ret.message()); - results.notify(); - } - return null; - } - - @Override - protected boolean jobAbort() { - return false; - } - - }); - } - - // wait for job finished - synchronized (results) { - while (results.size() != concurrency) { - results.wait(300); - } - } - - int i = 0; - for (InterpreterResultMessage result : results) { - assertEquals(Integer.toString(i++), result.getData()); - } - assertEquals(concurrency, i); - - intpA.close(); - } - - - @Test - public void testRunParallel() throws InterruptedException { - Properties p = new Properties(); - p.put("parallel", "true"); - intpGroup.put("note", new LinkedList<Interpreter>()); - - final RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - int concurrency = 4; - final int timeToSleep = 1000; - final List<InterpreterResultMessage> results = new LinkedList<>(); - long start = System.currentTimeMillis(); - - Scheduler scheduler = intpA.getScheduler(); - for (int i = 0; i < concurrency; i++) { - final String jobId = Integer.toString(i); - scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { - private Object r; - - @Override - public Object getReturn() { - return r; - } - - @Override - public void setResult(Object results) { - this.r = results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - String stmt = Integer.toString(timeToSleep); - InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext( - "note", - jobId, - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - - synchronized (results) { - results.addAll(ret.message()); - results.notify(); - } - return stmt; - } - - @Override - protected boolean jobAbort() { - return false; - } - - }); - } - - // wait for job finished - synchronized (results) { - while (results.size() != concurrency) { - results.wait(300); - } - } - - long end = System.currentTimeMillis(); - - assertTrue(end - start < timeToSleep * concurrency); - - intpA.close(); - } - - @Test - public void testInterpreterGroupResetBeforeProcessStarts() { - Properties p = new Properties(); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpA.setInterpreterGroup(intpGroup); - RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); - - intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); - RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); - - assertNotSame(processA.hashCode(), processB.hashCode()); - } - - @Test - public void testInterpreterGroupResetAfterProcessFinished() { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpA.setInterpreterGroup(intpGroup); - RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); - intpA.open(); - - processA.dereference(); // intpA.close(); - - intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); - RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); - - assertNotSame(processA.hashCode(), processB.hashCode()); - } - - @Test - public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - final RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - Job jobA = new Job("jobA", null) { - private Object r; - - @Override - public Object getReturn() { - return r; - } - - @Override - public void setResult(Object results) { - this.r = results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - return intpA.interpret("2000", - new InterpreterContext( - "note", - "jobA", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - } - - @Override - protected boolean jobAbort() { - return false; - } - - }; - intpA.getScheduler().submit(jobA); - - // wait for job started - while (intpA.getScheduler().getJobsRunning().size() == 0) { - Thread.sleep(100); - } - - // restart interpreter - RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); - intpA.close(); - - InterpreterGroup newInterpreterGroup = - new InterpreterGroup(intpA.getInterpreterGroup().getId()); - newInterpreterGroup.put("note", new LinkedList<Interpreter>()); - - intpA.setInterpreterGroup(newInterpreterGroup); - intpA.open(); - RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); - - assertNotSame(processA.hashCode(), processB.hashCode()); - - } - - @Test - public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { - Properties p = new Properties(); - intpGroup.put("note", new LinkedList<Interpreter>()); - - RemoteInterpreter intpA = createMockInterpreterA(p); - - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpB = createMockInterpreterB(p); - - intpGroup.get("note").add(intpB); - intpB.setInterpreterGroup(intpGroup); - - intpA.open(); - intpB.open(); - - assertEquals(intpA.getScheduler(), intpB.getScheduler()); - } - - @Test - public void testMultiInterpreterSession() { - Properties p = new Properties(); - intpGroup.put("sessionA", new LinkedList<Interpreter>()); - intpGroup.put("sessionB", new LinkedList<Interpreter>()); - - RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA"); - intpGroup.get("sessionA").add(intpAsessionA); - intpAsessionA.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA"); - intpGroup.get("sessionA").add(intpBsessionA); - intpBsessionA.setInterpreterGroup(intpGroup); - - intpAsessionA.open(); - intpBsessionA.open(); - - assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler()); - - RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB"); - intpGroup.get("sessionB").add(intpAsessionB); - intpAsessionB.setInterpreterGroup(intpGroup); - - RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB"); - intpGroup.get("sessionB").add(intpBsessionB); - intpBsessionB.setInterpreterGroup(intpGroup); - - intpAsessionB.open(); - intpBsessionB.open(); - - assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler()); - assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler()); - } - - @Test - public void should_push_local_angular_repo_to_remote() throws Exception { - //Given - final Client client = Mockito.mock(Client.class); - final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId", - MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null, - null, "anonymous", false); - final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null); - registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId"); - final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId"); - interpreterGroup.setAngularObjectRegistry(registry); - intr.setInterpreterGroup(interpreterGroup); - - final java.lang.reflect.Type registryType = new TypeToken<Map<String, - Map<String, AngularObject>>>() {}.getType(); - final Gson gson = new Gson(); - final String expected = gson.toJson(registry.getRegistry(), registryType); - - //When - intr.pushAngularObjectRegistryToRemote(client); - - //Then - Mockito.verify(client).angularRegistryPush(expected); - } - - @Test - public void testEnvStringPattern() { - assertFalse(RemoteInterpreter.isEnvString(null)); - assertFalse(RemoteInterpreter.isEnvString("")); - assertFalse(RemoteInterpreter.isEnvString("abcDEF")); - assertFalse(RemoteInterpreter.isEnvString("ABC-DEF")); - assertTrue(RemoteInterpreter.isEnvString("ABCDEF")); - assertTrue(RemoteInterpreter.isEnvString("ABC_DEF")); - assertTrue(RemoteInterpreter.isEnvString("ABC_DEF123")); - } - - @Test - public void testEnvronmentAndPropertySet() { - Properties p = new Properties(); - p.setProperty("MY_ENV1", "env value 1"); - p.setProperty("my.property.1", "property value 1"); - - RemoteInterpreter intp = new RemoteInterpreter( - p, - "note", - MockInterpreterEnv.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false); - - intpGroup.put("note", new LinkedList<Interpreter>()); - intpGroup.get("note").add(intp); - intp.setInterpreterGroup(intpGroup); - - intp.open(); - - InterpreterContext context = new InterpreterContext( - "noteId", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - - assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData()); - assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code()); - assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code()); - assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData()); - - intp.close(); - } - -}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java deleted file mode 100644 index 975d6ea..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; - -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.junit.Test; - -public class RemoteInterpreterUtilsTest { - - @Test - public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { - assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java deleted file mode 100644 index 81a9164..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import java.util.List; -import java.util.Properties; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -public class MockInterpreterA extends Interpreter { - - private String lastSt; - - public MockInterpreterA(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - public String getLastStatement() { - return lastSt; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - try { - Thread.sleep(Long.parseLong(st)); - this.lastSt = st; - } catch (NumberFormatException | InterruptedException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(Code.SUCCESS, st); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - @Override - public Scheduler getScheduler() { - if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { - return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); - } else { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java deleted file mode 100644 index d4b26ad..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectWatcher; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; - -public class MockInterpreterAngular extends Interpreter { - - AtomicInteger numWatch = new AtomicInteger(0); - - public MockInterpreterAngular(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] stmt = st.split(" "); - String cmd = stmt[0]; - String name = null; - if (stmt.length >= 2) { - name = stmt[1]; - } - String value = null; - if (stmt.length == 3) { - value = stmt[2]; - } - - AngularObjectRegistry registry = context.getAngularObjectRegistry(); - - if (cmd.equals("add")) { - registry.add(name, value, context.getNoteId(), null); - registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher - (null) { - - @Override - public void watch(Object oldObject, Object newObject, - InterpreterContext context) { - numWatch.incrementAndGet(); - } - - }); - } else if (cmd.equalsIgnoreCase("update")) { - registry.get(name, context.getNoteId(), null).set(value); - } else if (cmd.equals("remove")) { - registry.remove(name, context.getNoteId(), null); - } - - try { - Thread.sleep(500); // wait for watcher executed - } catch (InterruptedException e) { - logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e); - } - - String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch - .get()); - return new InterpreterResult(Code.SUCCESS, msg); - } - - @Override - public void cancel(InterpreterContext context) { - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java deleted file mode 100644 index 7103335..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import java.util.List; -import java.util.Properties; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; - -public class MockInterpreterB extends Interpreter { - - public MockInterpreterB(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - MockInterpreterA intpA = getInterpreterA(); - String intpASt = intpA.getLastStatement(); - long timeToSleep = Long.parseLong(st); - if (intpASt != null) { - timeToSleep += Long.parseLong(intpASt); - } - try { - Thread.sleep(timeToSleep); - } catch (NumberFormatException | InterruptedException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - public MockInterpreterA getInterpreterA() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - synchronized (interpreterGroup) { - for (List<Interpreter> interpreters : interpreterGroup.values()) { - boolean belongsToSameNoteGroup = false; - MockInterpreterA a = null; - for (Interpreter intp : interpreters) { - if (intp.getClassName().equals(MockInterpreterA.class.getName())) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - a = (MockInterpreterA) p; - } - - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - if (this == p) { - belongsToSameNoteGroup = true; - } - } - if (belongsToSameNoteGroup) { - return a; - } - } - } - return null; - } - - @Override - public Scheduler getScheduler() { - MockInterpreterA intpA = getInterpreterA(); - if (intpA != null) { - return intpA.getScheduler(); - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java deleted file mode 100644 index 12e11f7..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -import java.util.List; -import java.util.Properties; - - -public class MockInterpreterEnv extends Interpreter { - - public MockInterpreterEnv(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] cmd = st.split(" "); - if (cmd[0].equals("getEnv")) { - return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1])); - } else if (cmd[0].equals("getProperty")){ - return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1])); - } else { - return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]); - } - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } -} - http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java deleted file mode 100644 index 349315c..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote.mock; - -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -/** - * MockInterpreter to test outputstream - */ -public class MockInterpreterOutputStream extends Interpreter { - private String lastSt; - - public MockInterpreterOutputStream(Properties property) { - super(property); - } - - @Override - public void open() { - //new RuntimeException().printStackTrace(); - } - - @Override - public void close() { - } - - public String getLastStatement() { - return lastSt; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] ret = st.split(":"); - try { - if (ret[1] != null) { - context.out.write(ret[1]); - } - } catch (IOException e) { - throw new InterpreterException(e); - } - return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? - ret[2] : ""); - } - - @Override - public void cancel(InterpreterContext context) { - - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java deleted file mode 100644 index c4ff6ab..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote.mock; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import com.google.gson.Gson; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectWatcher; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.resource.Resource; -import org.apache.zeppelin.resource.ResourcePool; - -public class MockInterpreterResourcePool extends Interpreter { - - AtomicInteger numWatch = new AtomicInteger(0); - - public MockInterpreterResourcePool(Properties property) { - super(property); - } - - @Override - public void open() { - } - - @Override - public void close() { - - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - String[] stmt = st.split(" "); - String cmd = stmt[0]; - String noteId = null; - String paragraphId = null; - String name = null; - if (stmt.length >= 2) { - String[] npn = stmt[1].split(":"); - if (npn.length >= 3) { - noteId = npn[0]; - paragraphId = npn[1]; - name = npn[2]; - } else { - name = stmt[1]; - } - } - String value = null; - if (stmt.length >= 3) { - value = stmt[2]; - } - - ResourcePool resourcePool = context.getResourcePool(); - Object ret = null; - if (cmd.equals("put")) { - resourcePool.put(noteId, paragraphId, name, value); - } else if (cmd.equalsIgnoreCase("get")) { - Resource resource = resourcePool.get(noteId, paragraphId, name); - if (resource != null) { - ret = resourcePool.get(noteId, paragraphId, name).get(); - } else { - ret = ""; - } - } else if (cmd.equals("remove")) { - ret = resourcePool.remove(noteId, paragraphId, name); - } else if (cmd.equals("getAll")) { - ret = resourcePool.getAll(); - } else if (cmd.equals("invoke")) { - Resource resource = resourcePool.get(noteId, paragraphId, name); - if (stmt.length >=4) { - Resource res = resource.invokeMethod(value, null, null, stmt[3]); - ret = res.get(); - } else { - ret = resource.invokeMethod(value, null, null); - } - } - - try { - Thread.sleep(500); // wait for watcher executed - } catch (InterruptedException e) { - } - - Gson gson = new Gson(); - return new InterpreterResult(Code.SUCCESS, gson.toJson(ret)); - } - - @Override - public void cancel(InterpreterContext context) { - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java deleted file mode 100644 index 363ccf6..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.resource; - -import com.google.gson.Gson; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Unittest for DistributedResourcePool - */ -public class DistributedResourcePoolTest { - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - private InterpreterGroup intpGroup1; - private InterpreterGroup intpGroup2; - private HashMap<String, String> env; - private RemoteInterpreter intp1; - private RemoteInterpreter intp2; - private InterpreterContext context; - private RemoteInterpreterEventPoller eventPoller1; - private RemoteInterpreterEventPoller eventPoller2; - - - @Before - public void setUp() throws Exception { - env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - - Properties p = new Properties(); - - intp1 = new RemoteInterpreter( - p, - "note", - MockInterpreterResourcePool.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false - ); - - intpGroup1 = new InterpreterGroup("intpGroup1"); - intpGroup1.put("note", new LinkedList<Interpreter>()); - intpGroup1.get("note").add(intp1); - intp1.setInterpreterGroup(intpGroup1); - - intp2 = new RemoteInterpreter( - p, - "note", - MockInterpreterResourcePool.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - null, - null, - "anonymous", - false - ); - - intpGroup2 = new InterpreterGroup("intpGroup2"); - intpGroup2.put("note", new LinkedList<Interpreter>()); - intpGroup2.get("note").add(intp2); - intp2.setInterpreterGroup(intpGroup2); - - context = new InterpreterContext( - "note", - "id", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - null, - null, - new LinkedList<InterpreterContextRunner>(), - null); - - intp1.open(); - intp2.open(); - - eventPoller1 = new RemoteInterpreterEventPoller(null, null); - eventPoller1.setInterpreterGroup(intpGroup1); - eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess()); - - eventPoller2 = new RemoteInterpreterEventPoller(null, null); - eventPoller2.setInterpreterGroup(intpGroup2); - eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess()); - - eventPoller1.start(); - eventPoller2.start(); - } - - @After - public void tearDown() throws Exception { - eventPoller1.shutdown(); - intp1.close(); - intpGroup1.close(); - eventPoller2.shutdown(); - intp2.close(); - intpGroup2.close(); - } - - @Test - public void testRemoteDistributedResourcePool() { - Gson gson = new Gson(); - InterpreterResult ret; - intp1.interpret("put key1 value1", context); - intp2.interpret("put key2 value2", context); - - ret = intp1.interpret("getAll", context); - assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); - - ret = intp2.interpret("getAll", context); - assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); - - ret = intp1.interpret("get key1", context); - assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class)); - - ret = intp1.interpret("get key2", context); - assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class)); - } - - @Test - public void testDistributedResourcePool() { - final LocalResourcePool pool2 = new LocalResourcePool("pool2"); - final LocalResourcePool pool3 = new LocalResourcePool("pool3"); - - DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() { - @Override - public ResourceSet getAllResources() { - ResourceSet set = pool2.getAll(); - set.addAll(pool3.getAll()); - - ResourceSet remoteSet = new ResourceSet(); - Gson gson = new Gson(); - for (Resource s : set) { - RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class); - remoteResource.setResourcePoolConnector(this); - remoteSet.add(remoteResource); - } - return remoteSet; - } - - @Override - public Object readResource(ResourceId id) { - if (id.getResourcePoolId().equals(pool2.id())) { - return pool2.get(id.getName()).get(); - } - if (id.getResourcePoolId().equals(pool3.id())) { - return pool3.get(id.getName()).get(); - } - return null; - } - - @Override - public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) { - return null; - } - - @Override - public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] - params, String returnResourceName) { - return null; - } - }); - - assertEquals(0, pool1.getAll().size()); - - - // test get() can get from pool - pool2.put("object1", "value2"); - assertEquals(1, pool1.getAll().size()); - assertTrue(pool1.get("object1").isRemote()); - assertEquals("value2", pool1.get("object1").get()); - - // test get() is locality aware - pool1.put("object1", "value1"); - assertEquals(1, pool2.getAll().size()); - assertEquals("value1", pool1.get("object1").get()); - - // test getAll() is locality aware - assertEquals("value1", pool1.getAll().get(0).get()); - assertEquals("value2", pool1.getAll().get(1).get()); - } - - @Test - public void testResourcePoolUtils() { - Gson gson = new Gson(); - InterpreterResult ret; - - // when create some resources - intp1.interpret("put note1:paragraph1:key1 value1", context); - intp1.interpret("put note1:paragraph2:key1 value2", context); - intp2.interpret("put note2:paragraph1:key1 value1", context); - intp2.interpret("put note2:paragraph2:key2 value2", context); - - - // then get all resources. - assertEquals(4, ResourcePoolUtils.getAllResources().size()); - - // when remove all resources from note1 - ResourcePoolUtils.removeResourcesBelongsToNote("note1"); - - // then resources should be removed. - assertEquals(2, ResourcePoolUtils.getAllResources().size()); - assertEquals("", gson.fromJson( - intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(), - String.class)); - assertEquals("", gson.fromJson( - intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(), - String.class)); - - - // when remove all resources from note2:paragraph1 - ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1"); - - // then 1 - assertEquals(1, ResourcePoolUtils.getAllResources().size()); - assertEquals("value2", gson.fromJson( - intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(), - String.class)); - - } - - @Test - public void testResourceInvokeMethod() { - Gson gson = new Gson(); - InterpreterResult ret; - intp1.interpret("put key1 hey", context); - intp2.interpret("put key2 world", context); - - // invoke method in local resource pool - ret = intp1.interpret("invoke key1 length", context); - assertEquals("3", ret.message().get(0).getData()); - - // invoke method in remote resource pool - ret = intp1.interpret("invoke key2 length", context); - assertEquals("5", ret.message().get(0).getData()); - - // make sure no resources are automatically created - ret = intp1.interpret("getAll", context); - assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); - - // invoke method in local resource pool and save result - ret = intp1.interpret("invoke key1 length ret1", context); - assertEquals("3", ret.message().get(0).getData()); - - ret = intp1.interpret("getAll", context); - assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); - - ret = intp1.interpret("get ret1", context); - assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class)); - - // invoke method in remote resource pool and save result - ret = intp1.interpret("invoke key2 length ret2", context); - assertEquals("5", ret.message().get(0).getData()); - - ret = intp1.interpret("getAll", context); - assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); - - ret = intp1.interpret("get ret2", context); - assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class)); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java deleted file mode 100644 index ebb5100..0000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scheduler; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Properties; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; -import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.scheduler.Job.Status; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { - - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; - private SchedulerFactory schedulerSvc; - private static final int TICK_WAIT = 100; - private static final int MAX_WAIT_CYCLES = 100; - - @Before - public void setUp() throws Exception{ - schedulerSvc = new SchedulerFactory(); - } - - @After - public void tearDown(){ - - } - - @Test - public void test() throws Exception { - Properties p = new Properties(); - final InterpreterGroup intpGroup = new InterpreterGroup(); - Map<String, String> env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - "note", - MockInterpreterA.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - this, - null, - "anonymous", - false); - - intpGroup.put("note", new LinkedList<Interpreter>()); - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", - intpA.getInterpreterProcess(), - 10); - - Job job = new Job("jobId", "jobName", null, 200) { - Object results; - @Override - public Object getReturn() { - return results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", new InterpreterContext( - "note", - "jobId", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null)); - return "1000"; - } - - @Override - protected boolean jobAbort() { - return false; - } - - @Override - public void setResult(Object results) { - this.results = results; - } - }; - scheduler.submit(job); - - int cycles = 0; - while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - assertTrue(job.isRunning()); - - Thread.sleep(5*TICK_WAIT); - assertEquals(0, scheduler.getJobsWaiting().size()); - assertEquals(1, scheduler.getJobsRunning().size()); - - cycles = 0; - while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - - assertTrue(job.isTerminated()); - assertEquals(0, scheduler.getJobsWaiting().size()); - assertEquals(0, scheduler.getJobsRunning().size()); - - intpA.close(); - schedulerSvc.removeScheduler("test"); - } - - @Test - public void testAbortOnPending() throws Exception { - Properties p = new Properties(); - final InterpreterGroup intpGroup = new InterpreterGroup(); - Map<String, String> env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - - final RemoteInterpreter intpA = new RemoteInterpreter( - p, - "note", - MockInterpreterA.class.getName(), - new File(INTERPRETER_SCRIPT).getAbsolutePath(), - "fake", - "fakeRepo", - env, - 10 * 1000, - this, - null, - "anonymous", - false); - - intpGroup.put("note", new LinkedList<Interpreter>()); - intpGroup.get("note").add(intpA); - intpA.setInterpreterGroup(intpGroup); - - intpA.open(); - - Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", - intpA.getInterpreterProcess(), - 10); - - Job job1 = new Job("jobId1", "jobName1", null, 200) { - Object results; - InterpreterContext context = new InterpreterContext( - "note", - "jobId1", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - @Override - public Object getReturn() { - return results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", context); - return "1000"; - } - - @Override - protected boolean jobAbort() { - if (isRunning()) { - intpA.cancel(context); - } - return true; - } - - @Override - public void setResult(Object results) { - this.results = results; - } - }; - - Job job2 = new Job("jobId2", "jobName2", null, 200) { - public Object results; - InterpreterContext context = new InterpreterContext( - "note", - "jobId2", - null, - "title", - "text", - new AuthenticationInfo(), - new HashMap<String, Object>(), - new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LocalResourcePool("pool1"), - new LinkedList<InterpreterContextRunner>(), null); - - @Override - public Object getReturn() { - return results; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - intpA.interpret("1000", context); - return "1000"; - } - - @Override - protected boolean jobAbort() { - if (isRunning()) { - intpA.cancel(context); - } - return true; - } - - @Override - public void setResult(Object results) { - this.results = results; - } - }; - - job2.setResult("result2"); - - scheduler.submit(job1); - scheduler.submit(job2); - - - int cycles = 0; - while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - assertTrue(job1.isRunning()); - assertTrue(job2.getStatus() == Status.PENDING); - - job2.abort(); - - cycles = 0; - while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) { - Thread.sleep(TICK_WAIT); - cycles++; - } - - assertNotNull(job1.getDateFinished()); - assertTrue(job1.isTerminated()); - assertNull(job2.getDateFinished()); - assertTrue(job2.isTerminated()); - assertEquals("result2", job2.getReturn()); - - intpA.close(); - schedulerSvc.removeScheduler("test"); - } - - @Override - public void onOutputAppend(String noteId, String paragraphId, int index, String output) { - - } - - @Override - public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { - - } - - @Override - public void onOutputClear(String noteId, String paragraphId) { - - } - - @Override - public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { - - } - - @Override - public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { - if (callback != null) { - callback.onFinished(new LinkedList<>()); - } - } - - @Override - public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { - } - - @Override - public void onParaInfosReceived(String noteId, String paragraphId, - String interpreterSettingId, Map<String, String> metaInfos) { - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java new file mode 100644 index 0000000..0ac7116 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +/** + * Proxy for AngularObjectRegistry that exists in remote interpreter process + */ +public class RemoteAngularObjectRegistry extends AngularObjectRegistry { + Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); + private InterpreterGroup interpreterGroup; + + public RemoteAngularObjectRegistry(String interpreterId, + AngularObjectRegistryListener listener, + InterpreterGroup interpreterGroup) { + super(interpreterId, listener); + this.interpreterGroup = interpreterGroup; + } + + private RemoteInterpreterProcess getRemoteInterpreterProcess() { + return interpreterGroup.getRemoteInterpreterProcess(); + } + + /** + * When ZeppelinServer side code want to add angularObject to the registry, + * this method should be used instead of add() + * @param name + * @param o + * @param noteId + * @return + */ + public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String + paragraphId) { + Gson gson = new Gson(); + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (!remoteInterpreterProcess.isRunning()) { + return super.add(name, o, noteId, paragraphId, true); + } + + Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); + return super.add(name, o, noteId, paragraphId, true); + } catch (TException e) { + broken = true; + logger.error("Error", e); + } catch (Exception e) { + logger.error("Error", e); + } finally { + if (client != null) { + remoteInterpreterProcess.releaseClient(client, broken); + } + } + return null; + } + + /** + * When ZeppelinServer side code want to remove angularObject from the registry, + * this method should be used instead of remove() + * @param name + * @param noteId + * @param paragraphId + * @return + */ + public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String + paragraphId) { + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { + return super.remove(name, noteId, paragraphId); + } + + Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + client.angularObjectRemove(name, noteId, paragraphId); + return super.remove(name, noteId, paragraphId); + } catch (TException e) { + broken = true; + logger.error("Error", e); + } catch (Exception e) { + logger.error("Error", e); + } finally { + if (client != null) { + remoteInterpreterProcess.releaseClient(client, broken); + } + } + return null; + } + + public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { + List<AngularObject> all = getAll(noteId, paragraphId); + for (AngularObject ao : all) { + removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId); + } + } + + @Override + protected AngularObject createNewAngularObject(String name, Object o, String noteId, String + paragraphId) { + return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, + getAngularObjectListener()); + } +}