Repository: zeppelin Updated Branches: refs/heads/master 8e96d8bd7 -> d9c4a5f0b
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java new file mode 100644 index 0000000..2ba7a76 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -0,0 +1,914 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Job.Status; +import org.apache.zeppelin.scheduler.Scheduler; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +public class RemoteInterpreterTest { + + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + private InterpreterGroup intpGroup; + private HashMap<String, String> env; + + @Before + public void setUp() throws Exception { + intpGroup = new InterpreterGroup(); + env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + } + + @After + public void tearDown() throws Exception { + intpGroup.close(); + } + + private RemoteInterpreter createMockInterpreterA(Properties p) { + return createMockInterpreterA(p, "note"); + } + + private RemoteInterpreter createMockInterpreterA(Properties p, String noteId) { + return new RemoteInterpreter( + p, + noteId, + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + } + + private RemoteInterpreter createMockInterpreterB(Properties p) { + return createMockInterpreterB(p, "note"); + } + + private RemoteInterpreter createMockInterpreterB(Properties p, String noteId) { + return new RemoteInterpreter( + p, + noteId, + MockInterpreterB.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + } + + @Test + public void testRemoteInterperterCall() throws TTransportException, IOException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = createMockInterpreterB(p); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + process.equals(intpB.getInterpreterProcess()); + + assertFalse(process.isRunning()); + assertEquals(0, process.getNumIdleClient()); + assertEquals(0, process.referenceCount()); + + intpA.open(); // initializa all interpreters in the same group + assertTrue(process.isRunning()); + assertEquals(1, process.getNumIdleClient()); + assertEquals(1, process.referenceCount()); + + intpA.interpret("1", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + intpB.open(); + assertEquals(1, process.referenceCount()); + + intpA.close(); + assertEquals(0, process.referenceCount()); + intpB.close(); + assertEquals(0, process.referenceCount()); + + assertFalse(process.isRunning()); + + } + + @Test + public void testExecuteIncorrectPrecode() throws TTransportException, IOException { + Properties p = new Properties(); + p.put("zeppelin.MockInterpreterA.precode", "fail test"); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + + intpA.open(); + + InterpreterResult result = intpA.interpret("1", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + + + intpA.close(); + assertEquals(Code.ERROR, result.code()); + } + + @Test + public void testExecuteCorrectPrecode() throws TTransportException, IOException { + Properties p = new Properties(); + p.put("zeppelin.MockInterpreterA.precode", "2"); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreterProcess process = intpA.getInterpreterProcess(); + + intpA.open(); + + InterpreterResult result = intpA.interpret("1", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + + + intpA.close(); + assertEquals(Code.SUCCESS, result.code()); + assertEquals("1", result.message().get(0).getData()); + } + + @Test + public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { + Properties p = new Properties(); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + InterpreterResult ret = intpA.interpret("non numeric value", + new InterpreterContext( + "noteId", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + assertEquals(Code.ERROR, ret.code()); + } + + @Test + public void testRemoteSchedulerSharing() throws TTransportException, IOException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = new RemoteInterpreter( + p, + "note", + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = new RemoteInterpreter( + p, + "note", + MockInterpreterB.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + long start = System.currentTimeMillis(); + InterpreterResult ret = intpA.interpret("500", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + assertEquals("500", ret.message().get(0).getData()); + + ret = intpB.interpret("500", + new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + assertEquals("1000", ret.message().get(0).getData()); + long end = System.currentTimeMillis(); + assertTrue(end - start >= 1000); + + + intpA.close(); + intpB.close(); + } + + @Test + public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOException, InterruptedException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + final RemoteInterpreter intpB = createMockInterpreterB(p); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + long start = System.currentTimeMillis(); + Job jobA = new Job("jobA", null) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("500", + new InterpreterContext( + "note", + "jobA", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + Job jobB = new Job("jobB", null) { + + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpB.interpret("500", + new InterpreterContext( + "note", + "jobB", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpB.getScheduler().submit(jobB); + // wait until both job finished + while (jobA.getStatus() != Status.FINISHED || + jobB.getStatus() != Status.FINISHED) { + Thread.sleep(100); + } + long end = System.currentTimeMillis(); + assertTrue(end - start >= 1000); + + assertEquals("1000", ((InterpreterResult) jobB.getReturn()).message().get(0).getData()); + + intpA.close(); + intpB.close(); + } + + @Test + public void testRunOrderPreserved() throws InterruptedException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + int concurrency = 3; + final List<InterpreterResultMessage> results = new LinkedList<>(); + + Scheduler scheduler = intpA.getScheduler(); + for (int i = 0; i < concurrency; i++) { + final String jobId = Integer.toString(i); + scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext( + "note", + jobId, + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + synchronized (results) { + results.addAll(ret.message()); + results.notify(); + } + return null; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + } + + // wait for job finished + synchronized (results) { + while (results.size() != concurrency) { + results.wait(300); + } + } + + int i = 0; + for (InterpreterResultMessage result : results) { + assertEquals(Integer.toString(i++), result.getData()); + } + assertEquals(concurrency, i); + + intpA.close(); + } + + + @Test + public void testRunParallel() throws InterruptedException { + Properties p = new Properties(); + p.put("parallel", "true"); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + int concurrency = 4; + final int timeToSleep = 1000; + final List<InterpreterResultMessage> results = new LinkedList<>(); + long start = System.currentTimeMillis(); + + Scheduler scheduler = intpA.getScheduler(); + for (int i = 0; i < concurrency; i++) { + final String jobId = Integer.toString(i); + scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + String stmt = Integer.toString(timeToSleep); + InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext( + "note", + jobId, + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + + synchronized (results) { + results.addAll(ret.message()); + results.notify(); + } + return stmt; + } + + @Override + protected boolean jobAbort() { + return false; + } + + }); + } + + // wait for job finished + synchronized (results) { + while (results.size() != concurrency) { + results.wait(300); + } + } + + long end = System.currentTimeMillis(); + + assertTrue(end - start < timeToSleep * concurrency); + + intpA.close(); + } + + @Test + public void testInterpreterGroupResetBeforeProcessStarts() { + Properties p = new Properties(); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpA.setInterpreterGroup(intpGroup); + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + + intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); + + assertNotSame(processA.hashCode(), processB.hashCode()); + } + + @Test + public void testInterpreterGroupResetAfterProcessFinished() { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpA.setInterpreterGroup(intpGroup); + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.open(); + + processA.dereference(); // intpA.close(); + + intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); + + assertNotSame(processA.hashCode(), processB.hashCode()); + } + + @Test + public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + final RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Job jobA = new Job("jobA", null) { + private Object r; + + @Override + public Object getReturn() { + return r; + } + + @Override + public void setResult(Object results) { + this.r = results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("2000", + new InterpreterContext( + "note", + "jobA", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + // wait for job started + while (intpA.getScheduler().getJobsRunning().size() == 0) { + Thread.sleep(100); + } + + // restart interpreter + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.close(); + + InterpreterGroup newInterpreterGroup = + new InterpreterGroup(intpA.getInterpreterGroup().getId()); + newInterpreterGroup.put("note", new LinkedList<Interpreter>()); + + intpA.setInterpreterGroup(newInterpreterGroup); + intpA.open(); + RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); + + assertNotSame(processA.hashCode(), processB.hashCode()); + + } + + @Test + public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { + Properties p = new Properties(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + RemoteInterpreter intpA = createMockInterpreterA(p); + + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = createMockInterpreterB(p); + + intpGroup.get("note").add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + assertEquals(intpA.getScheduler(), intpB.getScheduler()); + } + + @Test + public void testMultiInterpreterSession() { + Properties p = new Properties(); + intpGroup.put("sessionA", new LinkedList<Interpreter>()); + intpGroup.put("sessionB", new LinkedList<Interpreter>()); + + RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA"); + intpGroup.get("sessionA").add(intpAsessionA); + intpAsessionA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA"); + intpGroup.get("sessionA").add(intpBsessionA); + intpBsessionA.setInterpreterGroup(intpGroup); + + intpAsessionA.open(); + intpBsessionA.open(); + + assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler()); + + RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB"); + intpGroup.get("sessionB").add(intpAsessionB); + intpAsessionB.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB"); + intpGroup.get("sessionB").add(intpBsessionB); + intpBsessionB.setInterpreterGroup(intpGroup); + + intpAsessionB.open(); + intpBsessionB.open(); + + assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler()); + assertNotEquals(intpAsessionA.getScheduler(), intpAsessionB.getScheduler()); + } + + @Test + public void should_push_local_angular_repo_to_remote() throws Exception { + //Given + final Client client = Mockito.mock(Client.class); + final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId", + MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 10 * 1000, null, + null, "anonymous", false); + final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null); + registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId"); + final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId"); + interpreterGroup.setAngularObjectRegistry(registry); + intr.setInterpreterGroup(interpreterGroup); + + final java.lang.reflect.Type registryType = new TypeToken<Map<String, + Map<String, AngularObject>>>() {}.getType(); + final Gson gson = new Gson(); + final String expected = gson.toJson(registry.getRegistry(), registryType); + + //When + intr.pushAngularObjectRegistryToRemote(client); + + //Then + Mockito.verify(client).angularRegistryPush(expected); + } + + @Test + public void testEnvStringPattern() { + assertFalse(RemoteInterpreterUtils.isEnvString(null)); + assertFalse(RemoteInterpreterUtils.isEnvString("")); + assertFalse(RemoteInterpreterUtils.isEnvString("abcDEF")); + assertFalse(RemoteInterpreterUtils.isEnvString("ABC-DEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABCDEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF")); + assertTrue(RemoteInterpreterUtils.isEnvString("ABC_DEF123")); + } + + @Test + public void testEnvronmentAndPropertySet() { + Properties p = new Properties(); + p.setProperty("MY_ENV1", "env value 1"); + p.setProperty("my.property.1", "property value 1"); + + RemoteInterpreter intp = new RemoteInterpreter( + p, + "note", + MockInterpreterEnv.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intp); + intp.setInterpreterGroup(intpGroup); + + intp.open(); + + InterpreterContext context = new InterpreterContext( + "noteId", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + + assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData()); + assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code()); + assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code()); + assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData()); + + intp.close(); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java new file mode 100644 index 0000000..975d6ea --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.junit.Test; + +public class RemoteInterpreterUtilsTest { + + @Test + public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException { + assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java new file mode 100644 index 0000000..81a9164 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +public class MockInterpreterA extends Interpreter { + + private String lastSt; + + public MockInterpreterA(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + try { + Thread.sleep(Long.parseLong(st)); + this.lastSt = st; + } catch (NumberFormatException | InterruptedException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(Code.SUCCESS, st); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + if (getProperty("parallel") != null && getProperty("parallel").equals("true")) { + return SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + this.hashCode(), 10); + } else { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java new file mode 100644 index 0000000..d4b26ad --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; + +public class MockInterpreterAngular extends Interpreter { + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterAngular(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String name = null; + if (stmt.length >= 2) { + name = stmt[1]; + } + String value = null; + if (stmt.length == 3) { + value = stmt[2]; + } + + AngularObjectRegistry registry = context.getAngularObjectRegistry(); + + if (cmd.equals("add")) { + registry.add(name, value, context.getNoteId(), null); + registry.get(name, context.getNoteId(), null).addWatcher(new AngularObjectWatcher + (null) { + + @Override + public void watch(Object oldObject, Object newObject, + InterpreterContext context) { + numWatch.incrementAndGet(); + } + + }); + } else if (cmd.equalsIgnoreCase("update")) { + registry.get(name, context.getNoteId(), null).set(value); + } else if (cmd.equals("remove")) { + registry.remove(name, context.getNoteId(), null); + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + logger.error("Exception in MockInterpreterAngular while interpret Thread.sleep", e); + } + + String msg = registry.getAll(context.getNoteId(), null).size() + " " + Integer.toString(numWatch + .get()); + return new InterpreterResult(Code.SUCCESS, msg); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java new file mode 100644 index 0000000..7103335 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; + +public class MockInterpreterB extends Interpreter { + + public MockInterpreterB(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + MockInterpreterA intpA = getInterpreterA(); + String intpASt = intpA.getLastStatement(); + long timeToSleep = Long.parseLong(st); + if (intpASt != null) { + timeToSleep += Long.parseLong(intpASt); + } + try { + Thread.sleep(timeToSleep); + } catch (NumberFormatException | InterruptedException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep)); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + public MockInterpreterA getInterpreterA() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + synchronized (interpreterGroup) { + for (List<Interpreter> interpreters : interpreterGroup.values()) { + boolean belongsToSameNoteGroup = false; + MockInterpreterA a = null; + for (Interpreter intp : interpreters) { + if (intp.getClassName().equals(MockInterpreterA.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + a = (MockInterpreterA) p; + } + + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + if (this == p) { + belongsToSameNoteGroup = true; + } + } + if (belongsToSameNoteGroup) { + return a; + } + } + } + return null; + } + + @Override + public Scheduler getScheduler() { + MockInterpreterA intpA = getInterpreterA(); + if (intpA != null) { + return intpA.getScheduler(); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java new file mode 100644 index 0000000..12e11f7 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.util.List; +import java.util.Properties; + + +public class MockInterpreterEnv extends Interpreter { + + public MockInterpreterEnv(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] cmd = st.split(" "); + if (cmd[0].equals("getEnv")) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getenv(cmd[1])); + } else if (cmd[0].equals("getProperty")){ + return new InterpreterResult(InterpreterResult.Code.SUCCESS, System.getProperty(cmd[1])); + } else { + return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java new file mode 100644 index 0000000..349315c --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote.mock; + +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * MockInterpreter to test outputstream + */ +public class MockInterpreterOutputStream extends Interpreter { + private String lastSt; + + public MockInterpreterOutputStream(Properties property) { + super(property); + } + + @Override + public void open() { + //new RuntimeException().printStackTrace(); + } + + @Override + public void close() { + } + + public String getLastStatement() { + return lastSt; + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] ret = st.split(":"); + try { + if (ret[1] != null) { + context.out.write(ret[1]); + } + } catch (IOException e) { + throw new InterpreterException(e); + } + return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), (ret.length > 2) ? + ret[2] : ""); + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java new file mode 100644 index 0000000..c4ff6ab --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote.mock; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.gson.Gson; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; + +public class MockInterpreterResourcePool extends Interpreter { + + AtomicInteger numWatch = new AtomicInteger(0); + + public MockInterpreterResourcePool(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + String[] stmt = st.split(" "); + String cmd = stmt[0]; + String noteId = null; + String paragraphId = null; + String name = null; + if (stmt.length >= 2) { + String[] npn = stmt[1].split(":"); + if (npn.length >= 3) { + noteId = npn[0]; + paragraphId = npn[1]; + name = npn[2]; + } else { + name = stmt[1]; + } + } + String value = null; + if (stmt.length >= 3) { + value = stmt[2]; + } + + ResourcePool resourcePool = context.getResourcePool(); + Object ret = null; + if (cmd.equals("put")) { + resourcePool.put(noteId, paragraphId, name, value); + } else if (cmd.equalsIgnoreCase("get")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (resource != null) { + ret = resourcePool.get(noteId, paragraphId, name).get(); + } else { + ret = ""; + } + } else if (cmd.equals("remove")) { + ret = resourcePool.remove(noteId, paragraphId, name); + } else if (cmd.equals("getAll")) { + ret = resourcePool.getAll(); + } else if (cmd.equals("invoke")) { + Resource resource = resourcePool.get(noteId, paragraphId, name); + if (stmt.length >=4) { + Resource res = resource.invokeMethod(value, null, null, stmt[3]); + ret = res.get(); + } else { + ret = resource.invokeMethod(value, null, null); + } + } + + try { + Thread.sleep(500); // wait for watcher executed + } catch (InterruptedException e) { + } + + Gson gson = new Gson(); + return new InterpreterResult(Code.SUCCESS, gson.toJson(ret)); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java new file mode 100644 index 0000000..363ccf6 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import com.google.gson.Gson; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unittest for DistributedResourcePool + */ +public class DistributedResourcePoolTest { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + private InterpreterGroup intpGroup1; + private InterpreterGroup intpGroup2; + private HashMap<String, String> env; + private RemoteInterpreter intp1; + private RemoteInterpreter intp2; + private InterpreterContext context; + private RemoteInterpreterEventPoller eventPoller1; + private RemoteInterpreterEventPoller eventPoller2; + + + @Before + public void setUp() throws Exception { + env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + Properties p = new Properties(); + + intp1 = new RemoteInterpreter( + p, + "note", + MockInterpreterResourcePool.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false + ); + + intpGroup1 = new InterpreterGroup("intpGroup1"); + intpGroup1.put("note", new LinkedList<Interpreter>()); + intpGroup1.get("note").add(intp1); + intp1.setInterpreterGroup(intpGroup1); + + intp2 = new RemoteInterpreter( + p, + "note", + MockInterpreterResourcePool.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false + ); + + intpGroup2 = new InterpreterGroup("intpGroup2"); + intpGroup2.put("note", new LinkedList<Interpreter>()); + intpGroup2.get("note").add(intp2); + intp2.setInterpreterGroup(intpGroup2); + + context = new InterpreterContext( + "note", + "id", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + null, + null, + new LinkedList<InterpreterContextRunner>(), + null); + + intp1.open(); + intp2.open(); + + eventPoller1 = new RemoteInterpreterEventPoller(null, null); + eventPoller1.setInterpreterGroup(intpGroup1); + eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess()); + + eventPoller2 = new RemoteInterpreterEventPoller(null, null); + eventPoller2.setInterpreterGroup(intpGroup2); + eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess()); + + eventPoller1.start(); + eventPoller2.start(); + } + + @After + public void tearDown() throws Exception { + eventPoller1.shutdown(); + intp1.close(); + intpGroup1.close(); + eventPoller2.shutdown(); + intp2.close(); + intpGroup2.close(); + } + + @Test + public void testRemoteDistributedResourcePool() { + Gson gson = new Gson(); + InterpreterResult ret; + intp1.interpret("put key1 value1", context); + intp2.interpret("put key2 value2", context); + + ret = intp1.interpret("getAll", context); + assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + ret = intp2.interpret("getAll", context); + assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + ret = intp1.interpret("get key1", context); + assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class)); + + ret = intp1.interpret("get key2", context); + assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class)); + } + + @Test + public void testDistributedResourcePool() { + final LocalResourcePool pool2 = new LocalResourcePool("pool2"); + final LocalResourcePool pool3 = new LocalResourcePool("pool3"); + + DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() { + @Override + public ResourceSet getAllResources() { + ResourceSet set = pool2.getAll(); + set.addAll(pool3.getAll()); + + ResourceSet remoteSet = new ResourceSet(); + Gson gson = new Gson(); + for (Resource s : set) { + RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class); + remoteResource.setResourcePoolConnector(this); + remoteSet.add(remoteResource); + } + return remoteSet; + } + + @Override + public Object readResource(ResourceId id) { + if (id.getResourcePoolId().equals(pool2.id())) { + return pool2.get(id.getName()).get(); + } + if (id.getResourcePoolId().equals(pool3.id())) { + return pool3.get(id.getName()).get(); + } + return null; + } + + @Override + public Object invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] params) { + return null; + } + + @Override + public Resource invokeMethod(ResourceId id, String methodName, Class[] paramTypes, Object[] + params, String returnResourceName) { + return null; + } + }); + + assertEquals(0, pool1.getAll().size()); + + + // test get() can get from pool + pool2.put("object1", "value2"); + assertEquals(1, pool1.getAll().size()); + assertTrue(pool1.get("object1").isRemote()); + assertEquals("value2", pool1.get("object1").get()); + + // test get() is locality aware + pool1.put("object1", "value1"); + assertEquals(1, pool2.getAll().size()); + assertEquals("value1", pool1.get("object1").get()); + + // test getAll() is locality aware + assertEquals("value1", pool1.getAll().get(0).get()); + assertEquals("value2", pool1.getAll().get(1).get()); + } + + @Test + public void testResourcePoolUtils() { + Gson gson = new Gson(); + InterpreterResult ret; + + // when create some resources + intp1.interpret("put note1:paragraph1:key1 value1", context); + intp1.interpret("put note1:paragraph2:key1 value2", context); + intp2.interpret("put note2:paragraph1:key1 value1", context); + intp2.interpret("put note2:paragraph2:key2 value2", context); + + + // then get all resources. + assertEquals(4, ResourcePoolUtils.getAllResources().size()); + + // when remove all resources from note1 + ResourcePoolUtils.removeResourcesBelongsToNote("note1"); + + // then resources should be removed. + assertEquals(2, ResourcePoolUtils.getAllResources().size()); + assertEquals("", gson.fromJson( + intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(), + String.class)); + assertEquals("", gson.fromJson( + intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(), + String.class)); + + + // when remove all resources from note2:paragraph1 + ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1"); + + // then 1 + assertEquals(1, ResourcePoolUtils.getAllResources().size()); + assertEquals("value2", gson.fromJson( + intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(), + String.class)); + + } + + @Test + public void testResourceInvokeMethod() { + Gson gson = new Gson(); + InterpreterResult ret; + intp1.interpret("put key1 hey", context); + intp2.interpret("put key2 world", context); + + // invoke method in local resource pool + ret = intp1.interpret("invoke key1 length", context); + assertEquals("3", ret.message().get(0).getData()); + + // invoke method in remote resource pool + ret = intp1.interpret("invoke key2 length", context); + assertEquals("5", ret.message().get(0).getData()); + + // make sure no resources are automatically created + ret = intp1.interpret("getAll", context); + assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + // invoke method in local resource pool and save result + ret = intp1.interpret("invoke key1 length ret1", context); + assertEquals("3", ret.message().get(0).getData()); + + ret = intp1.interpret("getAll", context); + assertEquals(3, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + ret = intp1.interpret("get ret1", context); + assertEquals("3", gson.fromJson(ret.message().get(0).getData(), String.class)); + + // invoke method in remote resource pool and save result + ret = intp1.interpret("invoke key2 length ret2", context); + assertEquals("5", ret.message().get(0).getData()); + + ret = intp1.interpret("getAll", context); + assertEquals(4, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size()); + + ret = intp1.interpret("get ret2", context); + assertEquals("5", gson.fromJson(ret.message().get(0).getData(), String.class)); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java new file mode 100644 index 0000000..ebb5100 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Properties; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.scheduler.Job.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + private SchedulerFactory schedulerSvc; + private static final int TICK_WAIT = 100; + private static final int MAX_WAIT_CYCLES = 100; + + @Before + public void setUp() throws Exception{ + schedulerSvc = new SchedulerFactory(); + } + + @After + public void tearDown(){ + + } + + @Test + public void test() throws Exception { + Properties p = new Properties(); + final InterpreterGroup intpGroup = new InterpreterGroup(); + Map<String, String> env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + "note", + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + this, + null, + "anonymous", + false); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", + intpA.getInterpreterProcess(), + 10); + + Job job = new Job("jobId", "jobName", null, 200) { + Object results; + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", new InterpreterContext( + "note", + "jobId", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + return "1000"; + } + + @Override + protected boolean jobAbort() { + return false; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + scheduler.submit(job); + + int cycles = 0; + while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job.isRunning()); + + Thread.sleep(5*TICK_WAIT); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(1, scheduler.getJobsRunning().size()); + + cycles = 0; + while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertTrue(job.isTerminated()); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(0, scheduler.getJobsRunning().size()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + + @Test + public void testAbortOnPending() throws Exception { + Properties p = new Properties(); + final InterpreterGroup intpGroup = new InterpreterGroup(); + Map<String, String> env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + "note", + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + this, + null, + "anonymous", + false); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", + intpA.getInterpreterProcess(), + 10); + + Job job1 = new Job("jobId1", "jobName1", null, 200) { + Object results; + InterpreterContext context = new InterpreterContext( + "note", + "jobId1", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + + Job job2 = new Job("jobId2", "jobName2", null, 200) { + public Object results; + InterpreterContext context = new InterpreterContext( + "note", + "jobId2", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + + job2.setResult("result2"); + + scheduler.submit(job1); + scheduler.submit(job2); + + + int cycles = 0; + while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job1.isRunning()); + assertTrue(job2.getStatus() == Status.PENDING); + + job2.abort(); + + cycles = 0; + while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertNotNull(job1.getDateFinished()); + assertTrue(job1.isTerminated()); + assertNull(job2.getDateFinished()); + assertTrue(job2.isTerminated()); + assertEquals("result2", job2.getReturn()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, int index, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { + + } + + @Override + public void onOutputClear(String noteId, String paragraphId) { + + } + + @Override + public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { + + } + + @Override + public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { + if (callback != null) { + callback.onFinished(new LinkedList<>()); + } + } + + @Override + public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { + } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + } +}