http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
deleted file mode 100644
index ffcb8d5..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-public class RemoteInterpreterTest {
-
-
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-
-  @Before
-  public void setUp() throws Exception {
-    intpGroup = new InterpreterGroup();
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intpGroup.close();
-  }
-
-  private RemoteInterpreter createMockInterpreterA(Properties p) {
-    return createMockInterpreterA(p, "note");
-  }
-
-  private RemoteInterpreter createMockInterpreterA(Properties p, String 
noteId) {
-    return new RemoteInterpreter(
-        p,
-        noteId,
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-  }
-
-  private RemoteInterpreter createMockInterpreterB(Properties p) {
-    return createMockInterpreterB(p, "note");
-  }
-
-  private RemoteInterpreter createMockInterpreterB(Properties p, String 
noteId) {
-    return new RemoteInterpreter(
-        p,
-        noteId,
-        MockInterpreterB.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-  }
-
-  @Test
-  public void testRemoteInterperterCall() throws TTransportException, 
IOException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = createMockInterpreterB(p);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    process.equals(intpB.getInterpreterProcess());
-
-    assertFalse(process.isRunning());
-    assertEquals(0, process.getNumIdleClient());
-    assertEquals(0, process.referenceCount());
-
-    intpA.open(); // initializa all interpreters in the same group
-    assertTrue(process.isRunning());
-    assertEquals(1, process.getNumIdleClient());
-    assertEquals(1, process.referenceCount());
-
-    intpA.interpret("1",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-    intpB.open();
-    assertEquals(1, process.referenceCount());
-
-    intpA.close();
-    assertEquals(0, process.referenceCount());
-    intpB.close();
-    assertEquals(0, process.referenceCount());
-
-    assertFalse(process.isRunning());
-
-  }
-
-  @Test
-  public void testExecuteIncorrectPrecode() throws TTransportException, 
IOException {
-    Properties p = new Properties();
-    p.put("zeppelin.MockInterpreterA.precode", "fail test");
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-
-    intpA.open();
-    
-    InterpreterResult result = intpA.interpret("1",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-
-
-    intpA.close();
-    assertEquals(Code.ERROR, result.code());
-  }
-
-  @Test
-  public void testExecuteCorrectPrecode() throws TTransportException, 
IOException {
-    Properties p = new Properties();
-    p.put("zeppelin.MockInterpreterA.precode", "2");
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-
-    intpA.open();
-
-    InterpreterResult result = intpA.interpret("1",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-
-
-    intpA.close();
-    assertEquals(Code.SUCCESS, result.code());
-    assertEquals("1", result.message().get(0).getData());
-  }
-
-  @Test
-  public void testRemoteInterperterErrorStatus() throws TTransportException, 
IOException {
-    Properties p = new Properties();
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    InterpreterResult ret = intpA.interpret("non numeric value",
-        new InterpreterContext(
-            "noteId",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-
-    assertEquals(Code.ERROR, ret.code());
-  }
-
-  @Test
-  public void testRemoteSchedulerSharing() throws TTransportException, 
IOException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterB.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    long start = System.currentTimeMillis();
-    InterpreterResult ret = intpA.interpret("500",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-    assertEquals("500", ret.message().get(0).getData());
-
-    ret = intpB.interpret("500",
-        new InterpreterContext(
-            "note",
-            "id",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-    assertEquals("1000", ret.message().get(0).getData());
-    long end = System.currentTimeMillis();
-    assertTrue(end - start >= 1000);
-
-
-    intpA.close();
-    intpB.close();
-  }
-
-  @Test
-  public void testRemoteSchedulerSharingSubmit() throws TTransportException, 
IOException, InterruptedException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    final RemoteInterpreter intpB = createMockInterpreterB(p);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    long start = System.currentTimeMillis();
-    Job jobA = new Job("jobA", null) {
-      private Object r;
-
-      @Override
-      public Object getReturn() {
-        return r;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.r = results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpA.interpret("500",
-            new InterpreterContext(
-                "note",
-                "jobA",
-                null,
-                "title",
-                "text",
-                new AuthenticationInfo(),
-                new HashMap<String, Object>(),
-                new GUI(),
-                new AngularObjectRegistry(intpGroup.getId(), null),
-                new LocalResourcePool("pool1"),
-                new LinkedList<InterpreterContextRunner>(), null));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpA.getScheduler().submit(jobA);
-
-    Job jobB = new Job("jobB", null) {
-
-      private Object r;
-
-      @Override
-      public Object getReturn() {
-        return r;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.r = results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpB.interpret("500",
-            new InterpreterContext(
-                "note",
-                "jobB",
-                null,
-                "title",
-                "text",
-                new AuthenticationInfo(),
-                new HashMap<String, Object>(),
-                new GUI(),
-                new AngularObjectRegistry(intpGroup.getId(), null),
-                new LocalResourcePool("pool1"),
-                new LinkedList<InterpreterContextRunner>(), null));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpB.getScheduler().submit(jobB);
-    // wait until both job finished
-    while (jobA.getStatus() != Status.FINISHED ||
-           jobB.getStatus() != Status.FINISHED) {
-      Thread.sleep(100);
-    }
-    long end = System.currentTimeMillis();
-    assertTrue(end - start >= 1000);
-
-    assertEquals("1000", ((InterpreterResult) 
jobB.getReturn()).message().get(0).getData());
-
-    intpA.close();
-    intpB.close();
-  }
-
-  @Test
-  public void testRunOrderPreserved() throws InterruptedException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    int concurrency = 3;
-    final List<InterpreterResultMessage> results = new LinkedList<>();
-
-    Scheduler scheduler = intpA.getScheduler();
-    for (int i = 0; i < concurrency; i++) {
-      final String jobId = Integer.toString(i);
-      scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) {
-        private Object r;
-
-        @Override
-        public Object getReturn() {
-          return r;
-        }
-
-        @Override
-        public void setResult(Object results) {
-          this.r = results;
-        }
-
-        @Override
-        public int progress() {
-          return 0;
-        }
-
-        @Override
-        public Map<String, Object> info() {
-          return null;
-        }
-
-        @Override
-        protected Object jobRun() throws Throwable {
-          InterpreterResult ret = intpA.interpret(getJobName(), new 
InterpreterContext(
-              "note",
-              jobId,
-              null,
-              "title",
-              "text",
-              new AuthenticationInfo(),
-              new HashMap<String, Object>(),
-              new GUI(),
-              new AngularObjectRegistry(intpGroup.getId(), null),
-              new LocalResourcePool("pool1"),
-              new LinkedList<InterpreterContextRunner>(), null));
-
-          synchronized (results) {
-            results.addAll(ret.message());
-            results.notify();
-          }
-          return null;
-        }
-
-        @Override
-        protected boolean jobAbort() {
-          return false;
-        }
-
-      });
-    }
-
-    // wait for job finished
-    synchronized (results) {
-      while (results.size() != concurrency) {
-        results.wait(300);
-      }
-    }
-
-    int i = 0;
-    for (InterpreterResultMessage result : results) {
-      assertEquals(Integer.toString(i++), result.getData());
-    }
-    assertEquals(concurrency, i);
-
-    intpA.close();
-  }
-
-
-  @Test
-  public void testRunParallel() throws InterruptedException {
-    Properties p = new Properties();
-    p.put("parallel", "true");
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    int concurrency = 4;
-    final int timeToSleep = 1000;
-    final List<InterpreterResultMessage> results = new LinkedList<>();
-    long start = System.currentTimeMillis();
-
-    Scheduler scheduler = intpA.getScheduler();
-    for (int i = 0; i < concurrency; i++) {
-      final String jobId = Integer.toString(i);
-      scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) {
-        private Object r;
-
-        @Override
-        public Object getReturn() {
-          return r;
-        }
-
-        @Override
-        public void setResult(Object results) {
-          this.r = results;
-        }
-
-        @Override
-        public int progress() {
-          return 0;
-        }
-
-        @Override
-        public Map<String, Object> info() {
-          return null;
-        }
-
-        @Override
-        protected Object jobRun() throws Throwable {
-          String stmt = Integer.toString(timeToSleep);
-          InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
-              "note",
-              jobId,
-              null,
-              "title",
-              "text",
-              new AuthenticationInfo(),
-              new HashMap<String, Object>(),
-              new GUI(),
-              new AngularObjectRegistry(intpGroup.getId(), null),
-              new LocalResourcePool("pool1"),
-              new LinkedList<InterpreterContextRunner>(), null));
-
-          synchronized (results) {
-            results.addAll(ret.message());
-            results.notify();
-          }
-          return stmt;
-        }
-
-        @Override
-        protected boolean jobAbort() {
-          return false;
-        }
-
-      });
-    }
-
-    // wait for job finished
-    synchronized (results) {
-      while (results.size() != concurrency) {
-        results.wait(300);
-      }
-    }
-
-    long end = System.currentTimeMillis();
-
-    assertTrue(end - start < timeToSleep * concurrency);
-
-    intpA.close();
-  }
-
-  @Test
-  public void testInterpreterGroupResetBeforeProcessStarts() {
-    Properties p = new Properties();
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpA.setInterpreterGroup(intpGroup);
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-
-    intpA.setInterpreterGroup(new 
InterpreterGroup(intpA.getInterpreterGroup().getId()));
-    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
-    assertNotSame(processA.hashCode(), processB.hashCode());
-  }
-
-  @Test
-  public void testInterpreterGroupResetAfterProcessFinished() {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpA.setInterpreterGroup(intpGroup);
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-    intpA.open();
-
-    processA.dereference();    // intpA.close();
-
-    intpA.setInterpreterGroup(new 
InterpreterGroup(intpA.getInterpreterGroup().getId()));
-    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
-    assertNotSame(processA.hashCode(), processB.hashCode());
-  }
-
-  @Test
-  public void testInterpreterGroupResetDuringProcessRunning() throws 
InterruptedException {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    final RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Job jobA = new Job("jobA", null) {
-      private Object r;
-
-      @Override
-      public Object getReturn() {
-        return r;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.r = results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        return intpA.interpret("2000",
-            new InterpreterContext(
-                "note",
-                "jobA",
-                null,
-                "title",
-                "text",
-                new AuthenticationInfo(),
-                new HashMap<String, Object>(),
-                new GUI(),
-                new AngularObjectRegistry(intpGroup.getId(), null),
-                new LocalResourcePool("pool1"),
-                new LinkedList<InterpreterContextRunner>(), null));
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-    };
-    intpA.getScheduler().submit(jobA);
-
-    // wait for job started
-    while (intpA.getScheduler().getJobsRunning().size() == 0) {
-      Thread.sleep(100);
-    }
-
-    // restart interpreter
-    RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
-    intpA.close();
-
-    InterpreterGroup newInterpreterGroup =
-        new InterpreterGroup(intpA.getInterpreterGroup().getId());
-    newInterpreterGroup.put("note", new LinkedList<Interpreter>());
-
-    intpA.setInterpreterGroup(newInterpreterGroup);
-    intpA.open();
-    RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
-
-    assertNotSame(processA.hashCode(), processB.hashCode());
-
-  }
-
-  @Test
-  public void 
testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
-    Properties p = new Properties();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpA = createMockInterpreterA(p);
-
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpB = createMockInterpreterB(p);
-
-    intpGroup.get("note").add(intpB);
-    intpB.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-    intpB.open();
-
-    assertEquals(intpA.getScheduler(), intpB.getScheduler());
-  }
-
-  @Test
-  public void testMultiInterpreterSession() {
-    Properties p = new Properties();
-    intpGroup.put("sessionA", new LinkedList<Interpreter>());
-    intpGroup.put("sessionB", new LinkedList<Interpreter>());
-
-    RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
-    intpGroup.get("sessionA").add(intpAsessionA);
-    intpAsessionA.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
-    intpGroup.get("sessionA").add(intpBsessionA);
-    intpBsessionA.setInterpreterGroup(intpGroup);
-
-    intpAsessionA.open();
-    intpBsessionA.open();
-
-    assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
-
-    RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
-    intpGroup.get("sessionB").add(intpAsessionB);
-    intpAsessionB.setInterpreterGroup(intpGroup);
-
-    RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
-    intpGroup.get("sessionB").add(intpBsessionB);
-    intpBsessionB.setInterpreterGroup(intpGroup);
-
-    intpAsessionB.open();
-    intpBsessionB.open();
-
-    assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
-    assertNotEquals(intpAsessionA.getScheduler(), 
intpAsessionB.getScheduler());
-  }
-
-  @Test
-  public void should_push_local_angular_repo_to_remote() throws Exception {
-    //Given
-    final Client client = Mockito.mock(Client.class);
-    final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), 
"noteId",
-        MockInterpreterA.class.getName(), "runner", "path", "localRepo", env, 
10 * 1000, null,
-        null, "anonymous", false);
-    final AngularObjectRegistry registry = new AngularObjectRegistry("spark", 
null);
-    registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
-    final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
-    interpreterGroup.setAngularObjectRegistry(registry);
-    intr.setInterpreterGroup(interpreterGroup);
-
-    final java.lang.reflect.Type registryType = new TypeToken<Map<String,
-                Map<String, AngularObject>>>() {}.getType();
-    final Gson gson = new Gson();
-    final String expected = gson.toJson(registry.getRegistry(), registryType);
-
-    //When
-    intr.pushAngularObjectRegistryToRemote(client);
-
-    //Then
-    Mockito.verify(client).angularRegistryPush(expected);
-  }
-
-  @Test
-  public void testEnvStringPattern() {
-    assertFalse(RemoteInterpreter.isEnvString(null));
-    assertFalse(RemoteInterpreter.isEnvString(""));
-    assertFalse(RemoteInterpreter.isEnvString("abcDEF"));
-    assertFalse(RemoteInterpreter.isEnvString("ABC-DEF"));
-    assertTrue(RemoteInterpreter.isEnvString("ABCDEF"));
-    assertTrue(RemoteInterpreter.isEnvString("ABC_DEF"));
-    assertTrue(RemoteInterpreter.isEnvString("ABC_DEF123"));
-  }
-
-  @Test
-  public void testEnvronmentAndPropertySet() {
-    Properties p = new Properties();
-    p.setProperty("MY_ENV1", "env value 1");
-    p.setProperty("my.property.1", "property value 1");
-
-    RemoteInterpreter intp = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterEnv.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intp);
-    intp.setInterpreterGroup(intpGroup);
-
-    intp.open();
-
-    InterpreterContext context = new InterpreterContext(
-        "noteId",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("pool1"),
-        new LinkedList<InterpreterContextRunner>(), null);
-
-
-    assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", 
context).message().get(0).getData());
-    assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", 
context).code());
-    assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", 
context).code());
-    assertEquals("property value 1", intp.interpret("getProperty 
my.property.1", context).message().get(0).getData());
-
-    intp.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
deleted file mode 100644
index 975d6ea..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.junit.Test;
-
-public class RemoteInterpreterUtilsTest {
-
-  @Test
-  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws 
IOException {
-    
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() 
> 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
deleted file mode 100644
index 81a9164..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterA.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreterA extends Interpreter {
-
-  private String lastSt;
-
-  public MockInterpreterA(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  public String getLastStatement() {
-    return lastSt;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    try {
-      Thread.sleep(Long.parseLong(st));
-      this.lastSt = st;
-    } catch (NumberFormatException | InterruptedException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(Code.SUCCESS, st);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    if (getProperty("parallel") != null && 
getProperty("parallel").equals("true")) {
-      return 
SchedulerFactory.singleton().createOrGetParallelScheduler("interpreter_" + 
this.hashCode(), 10);
-    } else {
-      return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
deleted file mode 100644
index d4b26ad..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterAngular.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-
-public class MockInterpreterAngular extends Interpreter {
-
-  AtomicInteger numWatch = new AtomicInteger(0);
-
-  public MockInterpreterAngular(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] stmt = st.split(" ");
-    String cmd = stmt[0];
-    String name = null;
-    if (stmt.length >= 2) {
-      name = stmt[1];
-    }
-    String value = null;
-    if (stmt.length == 3) {
-      value = stmt[2];
-    }
-
-    AngularObjectRegistry registry = context.getAngularObjectRegistry();
-
-    if (cmd.equals("add")) {
-      registry.add(name, value, context.getNoteId(), null);
-      registry.get(name, context.getNoteId(), null).addWatcher(new 
AngularObjectWatcher
-              (null) {
-
-        @Override
-        public void watch(Object oldObject, Object newObject,
-            InterpreterContext context) {
-          numWatch.incrementAndGet();
-        }
-
-      });
-    } else if (cmd.equalsIgnoreCase("update")) {
-      registry.get(name, context.getNoteId(), null).set(value);
-    } else if (cmd.equals("remove")) {
-      registry.remove(name, context.getNoteId(), null);
-    }
-
-    try {
-      Thread.sleep(500); // wait for watcher executed
-    } catch (InterruptedException e) {
-      logger.error("Exception in MockInterpreterAngular while interpret 
Thread.sleep", e);
-    }
-
-    String msg = registry.getAll(context.getNoteId(), null).size() + " " + 
Integer.toString(numWatch
-            .get());
-    return new InterpreterResult(Code.SUCCESS, msg);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
deleted file mode 100644
index 7103335..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-
-public class MockInterpreterB extends Interpreter {
-
-  public MockInterpreterB(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    MockInterpreterA intpA = getInterpreterA();
-    String intpASt = intpA.getLastStatement();
-    long timeToSleep = Long.parseLong(st);
-    if (intpASt != null) {
-      timeToSleep += Long.parseLong(intpASt);
-    }
-    try {
-      Thread.sleep(timeToSleep);
-    } catch (NumberFormatException | InterruptedException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(Code.SUCCESS, Long.toString(timeToSleep));
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  public MockInterpreterA getInterpreterA() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    synchronized (interpreterGroup) {
-      for (List<Interpreter> interpreters : interpreterGroup.values()) {
-        boolean belongsToSameNoteGroup = false;
-        MockInterpreterA a = null;
-        for (Interpreter intp : interpreters) {
-          if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
-            Interpreter p = intp;
-            while (p instanceof WrappedInterpreter) {
-              p = ((WrappedInterpreter) p).getInnerInterpreter();
-            }
-            a = (MockInterpreterA) p;
-          }
-
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          if (this == p) {
-            belongsToSameNoteGroup = true;
-          }
-        }
-        if (belongsToSameNoteGroup) {
-          return a;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    MockInterpreterA intpA = getInterpreterA();
-    if (intpA != null) {
-      return intpA.getScheduler();
-    }
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
deleted file mode 100644
index 12e11f7..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterEnv.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-
-public class MockInterpreterEnv extends Interpreter {
-
-  public MockInterpreterEnv(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] cmd = st.split(" ");
-    if (cmd[0].equals("getEnv")) {
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS, 
System.getenv(cmd[1]));
-    } else if (cmd[0].equals("getProperty")){
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS, 
System.getProperty(cmd[1]));
-    } else {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, cmd[0]);
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
deleted file mode 100644
index 349315c..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterOutputStream.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * MockInterpreter to test outputstream
- */
-public class MockInterpreterOutputStream extends Interpreter {
-  private String lastSt;
-
-  public MockInterpreterOutputStream(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    //new RuntimeException().printStackTrace();
-  }
-
-  @Override
-  public void close() {
-  }
-
-  public String getLastStatement() {
-    return lastSt;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] ret = st.split(":");
-    try {
-      if (ret[1] != null) {
-        context.out.write(ret[1]);
-      }
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-    return new InterpreterResult(InterpreterResult.Code.valueOf(ret[0]), 
(ret.length > 2) ?
-            ret[2] : "");
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + 
this.hashCode());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
deleted file mode 100644
index c4ff6ab..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote.mock;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePool;
-
-public class MockInterpreterResourcePool extends Interpreter {
-
-  AtomicInteger numWatch = new AtomicInteger(0);
-
-  public MockInterpreterResourcePool(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    String[] stmt = st.split(" ");
-    String cmd = stmt[0];
-    String noteId = null;
-    String paragraphId = null;
-    String name = null;
-    if (stmt.length >= 2) {
-      String[] npn = stmt[1].split(":");
-      if (npn.length >= 3) {
-        noteId = npn[0];
-        paragraphId = npn[1];
-        name = npn[2];
-      } else {
-        name = stmt[1];
-      }
-    }
-    String value = null;
-    if (stmt.length >= 3) {
-      value = stmt[2];
-    }
-
-    ResourcePool resourcePool = context.getResourcePool();
-    Object ret = null;
-    if (cmd.equals("put")) {
-      resourcePool.put(noteId, paragraphId, name, value);
-    } else if (cmd.equalsIgnoreCase("get")) {
-      Resource resource = resourcePool.get(noteId, paragraphId, name);
-      if (resource != null) {
-        ret = resourcePool.get(noteId, paragraphId, name).get();
-      } else {
-        ret = "";
-      }
-    } else if (cmd.equals("remove")) {
-      ret = resourcePool.remove(noteId, paragraphId, name);
-    } else if (cmd.equals("getAll")) {
-      ret = resourcePool.getAll();
-    } else if (cmd.equals("invoke")) {
-      Resource resource = resourcePool.get(noteId, paragraphId, name);
-      if (stmt.length >=4) {
-        Resource res = resource.invokeMethod(value, null, null, stmt[3]);
-        ret = res.get();
-      } else {
-        ret = resource.invokeMethod(value, null, null);
-      }
-    }
-
-    try {
-      Thread.sleep(500); // wait for watcher executed
-    } catch (InterruptedException e) {
-    }
-
-    Gson gson = new Gson();
-    return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
deleted file mode 100644
index 363ccf6..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.resource;
-
-import com.google.gson.Gson;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Unittest for DistributedResourcePool
- */
-public class DistributedResourcePoolTest {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private InterpreterGroup intpGroup1;
-  private InterpreterGroup intpGroup2;
-  private HashMap<String, String> env;
-  private RemoteInterpreter intp1;
-  private RemoteInterpreter intp2;
-  private InterpreterContext context;
-  private RemoteInterpreterEventPoller eventPoller1;
-  private RemoteInterpreterEventPoller eventPoller2;
-
-
-  @Before
-  public void setUp() throws Exception {
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
-
-    Properties p = new Properties();
-
-    intp1 = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterResourcePool.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false
-    );
-
-    intpGroup1 = new InterpreterGroup("intpGroup1");
-    intpGroup1.put("note", new LinkedList<Interpreter>());
-    intpGroup1.get("note").add(intp1);
-    intp1.setInterpreterGroup(intpGroup1);
-
-    intp2 = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterResourcePool.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",        
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false
-    );
-
-    intpGroup2 = new InterpreterGroup("intpGroup2");
-    intpGroup2.put("note", new LinkedList<Interpreter>());
-    intpGroup2.get("note").add(intp2);
-    intp2.setInterpreterGroup(intpGroup2);
-
-    context = new InterpreterContext(
-        "note",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        null,
-        null,
-        new LinkedList<InterpreterContextRunner>(),
-        null);
-
-    intp1.open();
-    intp2.open();
-
-    eventPoller1 = new RemoteInterpreterEventPoller(null, null);
-    eventPoller1.setInterpreterGroup(intpGroup1);
-    
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
-
-    eventPoller2 = new RemoteInterpreterEventPoller(null, null);
-    eventPoller2.setInterpreterGroup(intpGroup2);
-    
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
-
-    eventPoller1.start();
-    eventPoller2.start();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    eventPoller1.shutdown();
-    intp1.close();
-    intpGroup1.close();
-    eventPoller2.shutdown();
-    intp2.close();
-    intpGroup2.close();
-  }
-
-  @Test
-  public void testRemoteDistributedResourcePool() {
-    Gson gson = new Gson();
-    InterpreterResult ret;
-    intp1.interpret("put key1 value1", context);
-    intp2.interpret("put key2 value2", context);
-
-    ret = intp1.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
-
-    ret = intp2.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
-
-    ret = intp1.interpret("get key1", context);
-    assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), 
String.class));
-
-    ret = intp1.interpret("get key2", context);
-    assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), 
String.class));
-  }
-
-  @Test
-  public void testDistributedResourcePool() {
-    final LocalResourcePool pool2 = new LocalResourcePool("pool2");
-    final LocalResourcePool pool3 = new LocalResourcePool("pool3");
-
-    DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new 
ResourcePoolConnector() {
-      @Override
-      public ResourceSet getAllResources() {
-        ResourceSet set = pool2.getAll();
-        set.addAll(pool3.getAll());
-
-        ResourceSet remoteSet = new ResourceSet();
-        Gson gson = new Gson();
-        for (Resource s : set) {
-          RemoteResource remoteResource = gson.fromJson(gson.toJson(s), 
RemoteResource.class);
-          remoteResource.setResourcePoolConnector(this);
-          remoteSet.add(remoteResource);
-        }
-        return remoteSet;
-      }
-
-      @Override
-      public Object readResource(ResourceId id) {
-        if (id.getResourcePoolId().equals(pool2.id())) {
-          return pool2.get(id.getName()).get();
-        }
-        if (id.getResourcePoolId().equals(pool3.id())) {
-          return pool3.get(id.getName()).get();
-        }
-        return null;
-      }
-
-      @Override
-      public Object invokeMethod(ResourceId id, String methodName, Class[] 
paramTypes, Object[] params) {
-        return null;
-      }
-
-      @Override
-      public Resource invokeMethod(ResourceId id, String methodName, Class[] 
paramTypes, Object[]
-          params, String returnResourceName) {
-        return null;
-      }
-    });
-
-    assertEquals(0, pool1.getAll().size());
-
-
-    // test get() can get from pool
-    pool2.put("object1", "value2");
-    assertEquals(1, pool1.getAll().size());
-    assertTrue(pool1.get("object1").isRemote());
-    assertEquals("value2", pool1.get("object1").get());
-
-    // test get() is locality aware
-    pool1.put("object1", "value1");
-    assertEquals(1, pool2.getAll().size());
-    assertEquals("value1", pool1.get("object1").get());
-
-    // test getAll() is locality aware
-    assertEquals("value1", pool1.getAll().get(0).get());
-    assertEquals("value2", pool1.getAll().get(1).get());
-  }
-
-  @Test
-  public void testResourcePoolUtils() {
-    Gson gson = new Gson();
-    InterpreterResult ret;
-
-    // when create some resources
-    intp1.interpret("put note1:paragraph1:key1 value1", context);
-    intp1.interpret("put note1:paragraph2:key1 value2", context);
-    intp2.interpret("put note2:paragraph1:key1 value1", context);
-    intp2.interpret("put note2:paragraph2:key2 value2", context);
-
-
-    // then get all resources.
-    assertEquals(4, ResourcePoolUtils.getAllResources().size());
-
-    // when remove all resources from note1
-    ResourcePoolUtils.removeResourcesBelongsToNote("note1");
-
-    // then resources should be removed.
-    assertEquals(2, ResourcePoolUtils.getAllResources().size());
-    assertEquals("", gson.fromJson(
-        intp1.interpret("get note1:paragraph1:key1", 
context).message().get(0).getData(),
-        String.class));
-    assertEquals("", gson.fromJson(
-        intp1.interpret("get note1:paragraph2:key1", 
context).message().get(0).getData(),
-        String.class));
-
-
-    // when remove all resources from note2:paragraph1
-    ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
-
-    // then 1
-    assertEquals(1, ResourcePoolUtils.getAllResources().size());
-    assertEquals("value2", gson.fromJson(
-        intp1.interpret("get note2:paragraph2:key2", 
context).message().get(0).getData(),
-        String.class));
-
-  }
-
-  @Test
-  public void testResourceInvokeMethod() {
-    Gson gson = new Gson();
-    InterpreterResult ret;
-    intp1.interpret("put key1 hey", context);
-    intp2.interpret("put key2 world", context);
-
-    // invoke method in local resource pool
-    ret = intp1.interpret("invoke key1 length", context);
-    assertEquals("3", ret.message().get(0).getData());
-
-    // invoke method in remote resource pool
-    ret = intp1.interpret("invoke key2 length", context);
-    assertEquals("5", ret.message().get(0).getData());
-
-    // make sure no resources are automatically created
-    ret = intp1.interpret("getAll", context);
-    assertEquals(2, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
-
-    // invoke method in local resource pool and save result
-    ret = intp1.interpret("invoke key1 length ret1", context);
-    assertEquals("3", ret.message().get(0).getData());
-
-    ret = intp1.interpret("getAll", context);
-    assertEquals(3, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
-
-    ret = intp1.interpret("get ret1", context);
-    assertEquals("3", gson.fromJson(ret.message().get(0).getData(), 
String.class));
-
-    // invoke method in remote resource pool and save result
-    ret = intp1.interpret("invoke key2 length ret2", context);
-    assertEquals("5", ret.message().get(0).getData());
-
-    ret = intp1.interpret("getAll", context);
-    assertEquals(4, gson.fromJson(ret.message().get(0).getData(), 
ResourceSet.class).size());
-
-    ret = intp1.interpret("get ret2", context);
-    assertEquals("5", gson.fromJson(ret.message().get(0).getData(), 
String.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
deleted file mode 100644
index ebb5100..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
-
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private SchedulerFactory schedulerSvc;
-  private static final int TICK_WAIT = 100;
-  private static final int MAX_WAIT_CYCLES = 100;
-
-  @Before
-  public void setUp() throws Exception{
-    schedulerSvc = new SchedulerFactory();
-  }
-
-  @After
-  public void tearDown(){
-
-  }
-
-  @Test
-  public void test() throws Exception {
-    Properties p = new Properties();
-    final InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        this,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", 
"note",
-        intpA.getInterpreterProcess(),
-        10);
-
-    Job job = new Job("jobId", "jobName", null, 200) {
-      Object results;
-      @Override
-      public Object getReturn() {
-        return results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", new InterpreterContext(
-            "note",
-            "jobId",
-            null,
-            "title",
-            "text",
-            new AuthenticationInfo(),
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LocalResourcePool("pool1"),
-            new LinkedList<InterpreterContextRunner>(), null));
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        return false;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.results = results;
-      }
-    };
-    scheduler.submit(job);
-
-    int cycles = 0;
-    while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-    assertTrue(job.isRunning());
-
-    Thread.sleep(5*TICK_WAIT);
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(1, scheduler.getJobsRunning().size());
-
-    cycles = 0;
-    while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-
-    assertTrue(job.isTerminated());
-    assertEquals(0, scheduler.getJobsWaiting().size());
-    assertEquals(0, scheduler.getJobsRunning().size());
-
-    intpA.close();
-    schedulerSvc.removeScheduler("test");
-  }
-
-  @Test
-  public void testAbortOnPending() throws Exception {
-    Properties p = new Properties();
-    final InterpreterGroup intpGroup = new InterpreterGroup();
-    Map<String, String> env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
-
-    final RemoteInterpreter intpA = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterA.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        this,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intpA);
-    intpA.setInterpreterGroup(intpGroup);
-
-    intpA.open();
-
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", 
"note",
-        intpA.getInterpreterProcess(),
-        10);
-
-    Job job1 = new Job("jobId1", "jobName1", null, 200) {
-      Object results;
-      InterpreterContext context = new InterpreterContext(
-          "note",
-          "jobId1",
-          null,
-          "title",
-          "text",
-          new AuthenticationInfo(),
-          new HashMap<String, Object>(),
-          new GUI(),
-          new AngularObjectRegistry(intpGroup.getId(), null),
-          new LocalResourcePool("pool1"),
-          new LinkedList<InterpreterContextRunner>(), null);
-
-      @Override
-      public Object getReturn() {
-        return results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", context);
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        if (isRunning()) {
-          intpA.cancel(context);
-        }
-        return true;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.results = results;
-      }
-    };
-
-    Job job2 = new Job("jobId2", "jobName2", null, 200) {
-      public Object results;
-      InterpreterContext context = new InterpreterContext(
-          "note",
-          "jobId2",
-          null,
-          "title",
-          "text",
-          new AuthenticationInfo(),
-          new HashMap<String, Object>(),
-          new GUI(),
-          new AngularObjectRegistry(intpGroup.getId(), null),
-          new LocalResourcePool("pool1"),
-          new LinkedList<InterpreterContextRunner>(), null);
-
-      @Override
-      public Object getReturn() {
-        return results;
-      }
-
-      @Override
-      public int progress() {
-        return 0;
-      }
-
-      @Override
-      public Map<String, Object> info() {
-        return null;
-      }
-
-      @Override
-      protected Object jobRun() throws Throwable {
-        intpA.interpret("1000", context);
-        return "1000";
-      }
-
-      @Override
-      protected boolean jobAbort() {
-        if (isRunning()) {
-          intpA.cancel(context);
-        }
-        return true;
-      }
-
-      @Override
-      public void setResult(Object results) {
-        this.results = results;
-      }
-    };
-
-    job2.setResult("result2");
-
-    scheduler.submit(job1);
-    scheduler.submit(job2);
-
-
-    int cycles = 0;
-    while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-    assertTrue(job1.isRunning());
-    assertTrue(job2.getStatus() == Status.PENDING);
-
-    job2.abort();
-
-    cycles = 0;
-    while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
-      Thread.sleep(TICK_WAIT);
-      cycles++;
-    }
-
-    assertNotNull(job1.getDateFinished());
-    assertTrue(job1.isTerminated());
-    assertNull(job2.getDateFinished());
-    assertTrue(job2.isTerminated());
-    assertEquals("result2", job2.getReturn());
-
-    intpA.close();
-    schedulerSvc.removeScheduler("test");
-  }
-
-  @Override
-  public void onOutputAppend(String noteId, String paragraphId, int index, 
String output) {
-
-  }
-
-  @Override
-  public void onOutputUpdated(String noteId, String paragraphId, int index, 
InterpreterResult.Type type, String output) {
-
-  }
-
-  @Override
-  public void onOutputClear(String noteId, String paragraphId) {
-
-  }
-
-  @Override
-  public void onMetaInfosReceived(String settingId, Map<String, String> 
metaInfos) {
-
-  }
-
-  @Override
-  public void onGetParagraphRunners(String noteId, String paragraphId, 
RemoteWorksEventListener callback) {
-    if (callback != null) {
-      callback.onFinished(new LinkedList<>());
-    }
-  }
-
-  @Override
-  public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws 
Exception {
-  }
-
-  @Override
-  public void onParaInfosReceived(String noteId, String paragraphId, 
-      String interpreterSettingId, Map<String, String> metaInfos) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
new file mode 100644
index 0000000..0ac7116
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.List;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Proxy for AngularObjectRegistry that exists in remote interpreter process
+ */
+public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
+  Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
+  private InterpreterGroup interpreterGroup;
+
+  public RemoteAngularObjectRegistry(String interpreterId,
+      AngularObjectRegistryListener listener,
+      InterpreterGroup interpreterGroup) {
+    super(interpreterId, listener);
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  private RemoteInterpreterProcess getRemoteInterpreterProcess() {
+    return interpreterGroup.getRemoteInterpreterProcess();
+  }
+
+  /**
+   * When ZeppelinServer side code want to add angularObject to the registry,
+   * this method should be used instead of add()
+   * @param name
+   * @param o
+   * @param noteId
+   * @return
+   */
+  public AngularObject addAndNotifyRemoteProcess(String name, Object o, String 
noteId, String
+          paragraphId) {
+    Gson gson = new Gson();
+    RemoteInterpreterProcess remoteInterpreterProcess = 
getRemoteInterpreterProcess();
+    if (!remoteInterpreterProcess.isRunning()) {
+      return super.add(name, o, noteId, paragraphId, true);
+    }
+
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = remoteInterpreterProcess.getClient();
+      client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
+      return super.add(name, o, noteId, paragraphId, true);
+    } catch (TException e) {
+      broken = true;
+      logger.error("Error", e);
+    } catch (Exception e) {
+      logger.error("Error", e);
+    } finally {
+      if (client != null) {
+        remoteInterpreterProcess.releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * When ZeppelinServer side code want to remove angularObject from the 
registry,
+   * this method should be used instead of remove()
+   * @param name
+   * @param noteId
+   * @param paragraphId
+   * @return
+   */
+  public AngularObject removeAndNotifyRemoteProcess(String name, String 
noteId, String
+          paragraphId) {
+    RemoteInterpreterProcess remoteInterpreterProcess = 
getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null || 
!remoteInterpreterProcess.isRunning()) {
+      return super.remove(name, noteId, paragraphId);
+    }
+
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = remoteInterpreterProcess.getClient();
+      client.angularObjectRemove(name, noteId, paragraphId);
+      return super.remove(name, noteId, paragraphId);
+    } catch (TException e) {
+      broken = true;
+      logger.error("Error", e);
+    } catch (Exception e) {
+      logger.error("Error", e);
+    } finally {
+      if (client != null) {
+        remoteInterpreterProcess.releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+  
+  public void removeAllAndNotifyRemoteProcess(String noteId, String 
paragraphId) {
+    List<AngularObject> all = getAll(noteId, paragraphId);
+    for (AngularObject ao : all) {
+      removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
+    }
+  }
+
+  @Override
+  protected AngularObject createNewAngularObject(String name, Object o, String 
noteId, String
+          paragraphId) {
+    return new RemoteAngularObject(name, o, noteId, paragraphId, 
interpreterGroup,
+        getAngularObjectListener());
+  }
+}

Reply via email to