This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new be0548c  [ZEPPELIN-4208] Cluster synchronize InterpreterSetting
be0548c is described below

commit be0548c74e8fa72dd5d3b741390901224935275d
Author: Xun Liu <[email protected]>
AuthorDate: Mon Jul 8 23:28:04 2019 +0800

    [ZEPPELIN-4208] Cluster synchronize InterpreterSetting
    
    ### What is this PR for?
    In cluster mode, The user creates, modifies, and deletes the 
InterpreterSetting on any of the zeppelin servers.
    All need to be notified to all the zeppelin servers in the cluster to 
synchronize the update of InterpreterSetting. Failure to do so will result in 
the user not being able to continue while switching to another server.
    
    1. Listen for note update events
      Listen for the 
NEW_INTERPRETER_SETTING、DEL_INTERPRETER_SETTING、UPDATE_INTERPRETER_SETTING ... 
event of the InterpreterSetting in the InterpreterSettingManager#onMessage() 
function.
    
    2. Broadcast note update event
      The note is refreshed by notifying the event to all zeppelin servers in 
the cluster via messagingService.
    
    ### What type of PR is it?
    [Feature]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4208
    
    ### How should this be tested?
    * [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/554484474)
    
    ### Screenshots (if appropriate)
    
![Sync-InterpreterSetting](https://user-images.githubusercontent.com/3677382/60696346-ef568600-9f17-11e9-9a6c-de322042a977.gif)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: Xun Liu <[email protected]>
    
    Closes #3397 from liuxunorg/ZEPPELIN-4208 and squashes the following 
commits:
    
    cfc27a445 [Xun Liu] Add test case testInterpreterJsonSerializable().
    fbbaadfc5 [Xun Liu] InterpreterSetting support JsonSerializable.
    5eaded1d0 [Xun Liu] [ZEPPELIN-4208] Cluster synchronize InterpreterSetting
---
 .../zeppelin/cluster/ClusterManagerServer.java     |  23 ++-
 .../zeppelin/cluster/event/ClusterEvent.java       |   7 +-
 .../zeppelin/cluster/event/ClusterMessage.java     |   5 +
 .../zeppelin/interpreter/InterpreterOption.java    |   2 +
 .../org/apache/zeppelin/server/ZeppelinServer.java |   7 +-
 .../cluster/ClusterAuthEventListenerTest.java      |   6 +-
 .../apache/zeppelin/cluster/ClusterEventTest.java  | 175 ++++++++++++++++-----
 ...va => ClusterIntpSettingEventListenerTest.java} |  18 +--
 .../cluster/ClusterNoteAuthEventListenerTest.java  |   2 +-
 .../cluster/ClusterNoteEventListenerTest.java      |   2 +-
 .../zeppelin/cluster/ZeppelinServerMock.java       |  34 +++-
 .../zeppelin/interpreter/InterpreterSetting.java   | 112 +++++++++++++
 .../interpreter/InterpreterSettingManager.java     | 139 ++++++++++++++--
 .../zeppelin/notebook/AuthorizationService.java    |   1 -
 .../interpreter/InterpreterSettingTest.java        |  33 ++++
 15 files changed, 492 insertions(+), 74 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 0305ce6..530dff5 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -68,12 +68,14 @@ public class ClusterManagerServer extends ClusterManager {
   private List<ClusterEventListener> clusterIntpEventListeners = new 
ArrayList<>();
   private List<ClusterEventListener> clusterNoteEventListeners = new 
ArrayList<>();
   private List<ClusterEventListener> clusterAuthEventListeners = new 
ArrayList<>();
+  private List<ClusterEventListener> clusterIntpSettingEventListeners = new 
ArrayList<>();
 
   // zeppelin cluster event
   public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC";
   public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC";
   public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC";
   public static String CLUSTER_NB_AUTH_EVENT_TOPIC = 
"CLUSTER_NB_AUTH_EVENT_TOPIC";
+  public static String CLUSTER_INTP_SETTING_EVENT_TOPIC = 
"CLUSTER_INTP_SETTING_EVENT_TOPIC";
 
   private ClusterManagerServer() {
     super();
@@ -208,6 +210,8 @@ public class ClusterManagerServer extends ClusterManager {
             subscribeClusterNoteEvent, MoreExecutors.directExecutor());
         messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC,
             subscribeClusterAuthEvent, MoreExecutors.directExecutor());
+        messagingService.registerHandler(CLUSTER_INTP_SETTING_EVENT_TOPIC,
+            subscribeIntpSettingEvent, MoreExecutors.directExecutor());
 
         HashMap<String, Object> meta = new HashMap<String, Object>();
         String nodeName = getClusterNodeName();
@@ -297,8 +301,9 @@ public class ClusterManagerServer extends ClusterManager {
   }
 
   public void broadcastClusterEvent(String topic, String msg) {
-    LOGGER.info("send broadcastClusterEvent message {}", msg);
-
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("send broadcastClusterEvent message {}", msg);
+    }
     for (Node node : clusterNodes) {
       if (StringUtils.equals(node.address().host(), zeplServerHost)
           && node.address().port() == raftServerPort) {
@@ -354,6 +359,18 @@ public class ClusterManagerServer extends ClusterManager {
     return null;
   };
 
+  private BiFunction<Address, byte[], byte[]> subscribeIntpSettingEvent = 
(address, data) -> {
+    String message = new String(data);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("subscribeIntpSettingEvent() {}", message);
+    }
+    for (ClusterEventListener eventListener : 
clusterIntpSettingEventListeners) {
+      eventListener.onClusterEvent(message);
+    }
+
+    return null;
+  };
+
   public void addClusterEventListeners(String topic, ClusterEventListener 
listener) {
     if (StringUtils.equals(topic, CLUSTER_INTP_EVENT_TOPIC)) {
       clusterIntpEventListeners.add(listener);
@@ -361,6 +378,8 @@ public class ClusterManagerServer extends ClusterManager {
       clusterNoteEventListeners.add(listener);
     } else if (StringUtils.equals(topic, CLUSTER_AUTH_EVENT_TOPIC)) {
       clusterAuthEventListeners.add(listener);
+    } else if (StringUtils.equals(topic, CLUSTER_INTP_SETTING_EVENT_TOPIC)) {
+      clusterIntpSettingEventListeners.add(listener);
     } else {
       LOGGER.error("Unknow cluster event topic : {}", topic);
     }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
index 4fb61da..9772b54 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
@@ -36,5 +36,10 @@ public enum ClusterEvent {
   SET_WRITERS_PERMISSIONS,
   SET_OWNERS_PERMISSIONS,
   CLEAR_PERMISSION,
-  SET_NEW_NOTE_PERMISSIONS
+  // CLUSTER_NBAUTH_EVENT_TOPIC
+  SET_NEW_NOTE_PERMISSIONS,
+  // CLUSTER_INTP_SETTING_EVENT_TOPIC
+  CREATE_INTP_SETTING,
+  UPDATE_INTP_SETTING,
+  DELETE_INTP_SETTING,
 }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
index 1fa6938..cd999c4 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
@@ -41,6 +41,11 @@ public class ClusterMessage {
     return this;
   }
 
+  public ClusterMessage put(Map<String, String> params) {
+    data.putAll(params);
+    return this;
+  }
+
   public String get(String k) {
     return data.get(k);
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
index 1b1b29a..82cf872 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.interpreter;
 
 import java.util.ArrayList;
 import java.util.List;
+
+import com.google.gson.Gson;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 
 /**
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 04b2b71..29fbd07 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -357,11 +357,16 @@ public class ZeppelinServer extends ResourceConfig {
   private static void setupClusterManagerServer(ServiceLocator serviceLocator) 
{
     if (conf.isClusterMode()) {
       ClusterManagerServer clusterManagerServer = 
ClusterManagerServer.getInstance();
+
       NotebookServer notebookServer = 
serviceLocator.getService(NotebookServer.class);
-      AuthorizationService authorizationService = 
serviceLocator.getService(AuthorizationService.class);
       
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC,
 notebookServer);
+
+      AuthorizationService authorizationService = 
serviceLocator.getService(AuthorizationService.class);
       
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC,
 authorizationService);
 
+      InterpreterSettingManager interpreterSettingManager = 
serviceLocator.getService(InterpreterSettingManager.class);
+      
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC,
 interpreterSettingManager);
+
       // Since the ClusterInterpreterLauncher is lazy, dynamically generated, 
So in cluster mode,
       // when the zeppelin service starts, Create a ClusterInterpreterLauncher 
object,
       // This allows the ClusterInterpreterLauncher to listen for cluster 
events.
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java
index 15bc23e..868c7a7 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java
@@ -40,7 +40,7 @@ public class ClusterAuthEventListenerTest implements 
ClusterEventListener {
   @Override
   public void onClusterEvent(String msg) {
     receiveMsg = msg;
-    LOGGER.info("onClusterEvent : {}", msg);
+    LOGGER.info("ClusterAuthEventListenerTest#onClusterEvent : {}", msg);
     ClusterMessage message = ClusterMessage.deserializeMessage(msg);
 
     String noteId = message.get("noteId");
@@ -49,12 +49,14 @@ public class ClusterAuthEventListenerTest implements 
ClusterEventListener {
     Gson gson = new Gson();
     Set<String> set  = gson.fromJson(jsonSet, new TypeToken<Set<String>>() 
{}.getType());
 
-    assertNotNull(set);
     switch (message.clusterEvent) {
       case SET_READERS_PERMISSIONS:
       case SET_WRITERS_PERMISSIONS:
       case SET_OWNERS_PERMISSIONS:
       case SET_RUNNERS_PERMISSIONS:
+        assertNotNull(set);
+        assertNotNull(noteId);
+        break;
       case CLEAR_PERMISSION:
         assertNotNull(noteId);
         break;
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
index 169460a..7257245 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
@@ -18,14 +18,20 @@ package org.apache.zeppelin.cluster;
 
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
+import org.apache.commons.httpclient.methods.DeleteMethod;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.PutMethod;
 import org.apache.thrift.TException;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
 import org.apache.zeppelin.cluster.meta.ClusterMetaType;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
 import org.apache.zeppelin.interpreter.thrift.ServiceException;
@@ -41,6 +47,7 @@ import org.apache.zeppelin.service.NotebookService;
 import org.apache.zeppelin.socket.NotebookServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.utils.TestUtils;
+import org.hamcrest.MatcherAssert;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -73,10 +80,11 @@ public class ClusterEventTest extends ZeppelinServerMock {
   private static List<ClusterAuthEventListenerTest> 
clusterAuthEventListenerTests = new ArrayList<>();
   private static List<ClusterNoteEventListenerTest> 
clusterNoteEventListenerTests = new ArrayList<>();
   private static List<ClusterNoteAuthEventListenerTest> 
clusterNoteAuthEventListenerTests = new ArrayList<>();
+  private static List<ClusterIntpSettingEventListenerTest> 
clusterIntpSettingEventListenerTests = new ArrayList<>();
 
   private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
   private static ClusterManagerClient clusterClient = null;
-  static final String metaKey = "ClusterMultiNodeTestKey";
+  static final String metaKey = "ClusterEventTestKey";
 
   private static Notebook notebook;
   private static NotebookServer notebookServer;
@@ -94,18 +102,41 @@ public class ClusterEventTest extends ZeppelinServerMock {
 
     ZeppelinServerMock.startUp(ClusterEventTest.class.getSimpleName(), zconf);
     notebook = TestUtils.getInstance(Notebook.class);
-    authorizationService = new AuthorizationService(notebook, 
notebook.getConf());
-    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-    schedulerService = new QuartzSchedulerService(conf, notebook);
+    authorizationService = new AuthorizationService(notebook, zconf);
+    schedulerService = new QuartzSchedulerService(zconf, notebook);
     notebookServer = spy(NotebookServer.getInstance());
-    notebookService =
-        new NotebookService(notebook, authorizationService, conf, 
schedulerService);
+    notebookService = new NotebookService(notebook, authorizationService, 
zconf, schedulerService);
 
     ConfigurationService configurationService = new 
ConfigurationService(notebook.getConf());
     when(notebookServer.getNotebookService()).thenReturn(notebookService);
     
when(notebookServer.getConfigurationService()).thenReturn(configurationService);
 
     startOtherZeppelinClusterNode(zconf);
+
+    // wait zeppelin cluster startup
+    Thread.sleep(10000);
+    // mock cluster manager client
+    clusterClient = ClusterManagerClient.getInstance();
+    clusterClient.start(metaKey);
+
+    // Waiting for cluster startup
+    int wait = 0;
+    while(wait++ < 100) {
+      if (clusterIsStartup() && clusterClient.raftInitialized()) {
+        LOGGER.info("wait {}(ms) found cluster leader", wait*500);
+        break;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+    }
+
+    Thread.sleep(3000);
+    assertEquals(true, clusterIsStartup());
+
+    getClusterServerMeta();
   }
 
   @AfterClass
@@ -174,11 +205,9 @@ public class ClusterEventTest extends ZeppelinServerMock {
         String clusterHost = parts[0];
         int clusterPort = Integer.valueOf(parts[1]);
 
-        // ClusterSingleNodeMock clusterSingleNodeMock = new 
ClusterSingleNodeMock();
         ClusterManagerServer clusterServer
             = startClusterSingleNode(clusterAddrList, clusterHost, 
clusterPort);
         clusterServers.add(clusterServer);
-        // clusterSingleNodeMockList.add(clusterSingleNodeMock);
       }
     } catch (Exception e) {
       LOGGER.error(e.getMessage(), e);
@@ -197,31 +226,13 @@ public class ClusterEventTest extends ZeppelinServerMock {
       clusterNoteAuthEventListenerTests.add(clusterNoteAuthEventListenerTest);
       
clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NB_AUTH_EVENT_TOPIC,
 clusterNoteAuthEventListenerTest);
 
-      clusterServer.start();
-    }
-
-    // mock cluster manager client
-    clusterClient = ClusterManagerClient.getInstance();
-    clusterClient.start(metaKey);
+      ClusterIntpSettingEventListenerTest clusterIntpSettingEventListenerTest 
= new ClusterIntpSettingEventListenerTest();
+      
clusterIntpSettingEventListenerTests.add(clusterIntpSettingEventListenerTest);
+      
clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC,
 clusterIntpSettingEventListenerTest);
 
-    // Waiting for cluster startup
-    int wait = 0;
-    while(wait++ < 100) {
-      if (clusterIsStartup() && clusterClient.raftInitialized()) {
-        LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
-        break;
-      }
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        LOGGER.error(e.getMessage(), e);
-      }
+      clusterServer.start();
     }
 
-    Thread.sleep(3000);
-    assertEquals(true, clusterIsStartup());
-
-    getClusterServerMeta();
     LOGGER.info("startCluster <<<");
   }
 
@@ -237,9 +248,9 @@ public class ClusterEventTest extends ZeppelinServerMock {
     }
   }
 
-  private void checkClusterNoteAuthEventListener() {
-    for (ClusterNoteAuthEventListenerTest clusterNoteAuthEventListenerTest : 
clusterNoteAuthEventListenerTests) {
-      assertNotNull(clusterNoteAuthEventListenerTest.receiveMsg);
+  private void checkClusterIntpSettingEventListener() {
+    for (ClusterIntpSettingEventListenerTest 
clusterIntpSettingEventListenerTest : clusterIntpSettingEventListenerTests) {
+      assertNotNull(clusterIntpSettingEventListenerTest.receiveMsg);
     }
   }
 
@@ -267,9 +278,9 @@ public class ClusterEventTest extends ZeppelinServerMock {
     assertNotNull(srvMeta);
     assertEquals(true, (srvMeta instanceof HashMap));
     HashMap hashMap = (HashMap) srvMeta;
-
     assertEquals(hashMap.size(), 3);
-    LOGGER.info("getClusterServerMeta <<<");
+
+    LOGGER.info("getClusterServerMeta <<< ");
   }
 
   @Test
@@ -438,4 +449,98 @@ public class ClusterEventTest extends ZeppelinServerMock {
       }
     }
   }
+
+  @Test
+  public void testInterpreterEvent() throws IOException, InterruptedException {
+    // when: Create 1 interpreter settings `sh1`
+    String md1Name = "sh1";
+
+    String md1Dep = "org.apache.drill.exec:drill-jdbc:jar:1.7.0";
+
+    String reqBody1 = "{\"name\":\"" + md1Name + "\",\"group\":\"sh\"," +
+        "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": 
\"propname\", " +
+        "\"type\": \"textarea\"}}," +
+        
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.shell.ShellInterpreter\","
 +
+        "\"name\":\"md\"}]," +
+        "\"dependencies\":[ {\n" +
+        "      \"groupArtifactVersion\": \"" + md1Dep + "\",\n" +
+        "      \"exclusions\":[]\n" +
+        "    }]," +
+        "\"option\": { \"remote\": true, \"session\": false }}";
+    PostMethod post = httpPost("/interpreter/setting", reqBody1);
+    String postResponse = post.getResponseBodyAsString();
+    LOG.info("testCreatedInterpreterDependencies create response\n" + 
post.getResponseBodyAsString());
+    InterpreterSetting created = 
convertResponseToInterpreterSetting(postResponse);
+    MatcherAssert.assertThat("test create method:", post, isAllowed());
+    post.releaseConnection();
+
+    // 1. Call settings API
+    GetMethod get = httpGet("/interpreter/setting");
+    String rawResponse = get.getResponseBodyAsString();
+    get.releaseConnection();
+
+    // 2. Parsing to List<InterpreterSettings>
+    JsonObject responseJson = gson.fromJson(rawResponse, 
JsonElement.class).getAsJsonObject();
+    JsonArray bodyArr = responseJson.getAsJsonArray("body");
+    List<InterpreterSetting> settings = new Gson().fromJson(bodyArr,
+        new TypeToken<ArrayList<InterpreterSetting>>() {
+        }.getType());
+
+    // 3. Filter interpreters out we have just created
+    InterpreterSetting md1 = null;
+    for (InterpreterSetting setting : settings) {
+      if (md1Name.equals(setting.getName())) {
+        md1 = setting;
+      }
+    }
+
+    // then: should get created interpreters which have different dependencies
+
+    // 4. Validate each md interpreter has its own dependencies
+    assertEquals(1, md1.getDependencies().size());
+    assertEquals(md1Dep, 
md1.getDependencies().get(0).getGroupArtifactVersion());
+    Thread.sleep(1000);
+    checkClusterIntpSettingEventListener();
+
+    // 2. test update Interpreter
+    String rawRequest = "{\"name\":\"sh1\",\"group\":\"sh\"," +
+        "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": 
\"propname\", " +
+        "\"type\": \"textarea\"}}," +
+        
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\"," +
+        "\"name\":\"md\"}],\"dependencies\":[]," +
+        "\"option\": { \"remote\": true, \"session\": false }}";
+    JsonObject jsonRequest = gson.fromJson(rawRequest, 
JsonElement.class).getAsJsonObject();
+
+    // when: call update setting API
+    JsonObject jsonObject = new JsonObject();
+    jsonObject.addProperty("name", "propname2");
+    jsonObject.addProperty("value", "this is new prop");
+    jsonObject.addProperty("type", "textarea");
+    jsonRequest.getAsJsonObject("properties").add("propname2", jsonObject);
+    PutMethod put = httpPut("/interpreter/setting/" + created.getId(), 
jsonRequest.toString());
+    LOG.info("testSettingCRUD update response\n" + 
put.getResponseBodyAsString());
+    // then: call update setting API
+    MatcherAssert.assertThat("test update method:", put, isAllowed());
+    put.releaseConnection();
+    Thread.sleep(1000);
+    checkClusterIntpSettingEventListener();
+
+    // 3: call delete setting API
+    DeleteMethod delete = httpDelete("/interpreter/setting/" + 
created.getId());
+    LOG.info("testSettingCRUD delete response\n" + 
delete.getResponseBodyAsString());
+    // then: call delete setting API
+    MatcherAssert.assertThat("Test delete method:", delete, isAllowed());
+    delete.releaseConnection();
+    Thread.sleep(1000);
+    checkClusterIntpSettingEventListener();
+  }
+
+  private JsonObject getBodyFieldFromResponse(String rawResponse) {
+    JsonObject response = gson.fromJson(rawResponse, 
JsonElement.class).getAsJsonObject();
+    return response.getAsJsonObject("body");
+  }
+
+  private InterpreterSetting convertResponseToInterpreterSetting(String 
rawResponse) {
+    return gson.fromJson(getBodyFieldFromResponse(rawResponse), 
InterpreterSetting.class);
+  }
 }
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
similarity index 72%
copy from 
zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
copy to 
zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
index f2ac6b2..734611f 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
@@ -16,35 +16,23 @@
  */
 package org.apache.zeppelin.cluster;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import org.apache.zeppelin.cluster.event.ClusterEventListener;
 import org.apache.zeppelin.cluster.event.ClusterMessage;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
-
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
 
-public class ClusterNoteAuthEventListenerTest implements ClusterEventListener {
-  private static Logger LOGGER = 
LoggerFactory.getLogger(ClusterNoteAuthEventListenerTest.class);
+public class ClusterIntpSettingEventListenerTest implements 
ClusterEventListener {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(ClusterIntpSettingEventListenerTest.class);
 
   public String receiveMsg = null;
 
   @Override
   public void onClusterEvent(String msg) {
     receiveMsg = msg;
-    LOGGER.info("onClusterEvent : {}", msg);
+    LOGGER.info("ClusterIntpSettingEventListenerTest#onClusterEvent : {}", 
msg);
     ClusterMessage message = ClusterMessage.deserializeMessage(msg);
-    String noteId  = message.get("noteId");
-    String json  = message.get("subject");
-    AuthenticationInfo subject = AuthenticationInfo.fromJson(json);
-
-    assertNotNull(noteId);
-    assertNotNull(json);
-    assertNotNull(subject);
   }
 }
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
index f2ac6b2..56996bd 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
@@ -37,7 +37,7 @@ public class ClusterNoteAuthEventListenerTest implements 
ClusterEventListener {
   @Override
   public void onClusterEvent(String msg) {
     receiveMsg = msg;
-    LOGGER.info("onClusterEvent : {}", msg);
+    LOGGER.info("ClusterNoteAuthEventListenerTest#onClusterEvent : {}", msg);
     ClusterMessage message = ClusterMessage.deserializeMessage(msg);
     String noteId  = message.get("noteId");
     String json  = message.get("subject");
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
index a8d9444..a6c1708 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
@@ -40,7 +40,7 @@ public class ClusterNoteEventListenerTest implements 
ClusterEventListener {
   @Override
   public void onClusterEvent(String msg) {
     receiveMsg = msg;
-    LOGGER.info("onClusterEvent : {}", msg);
+    LOGGER.info("ClusterNoteEventListenerTest#onClusterEvent : {}", msg);
     ClusterMessage message = ClusterMessage.deserializeMessage(msg);
 
     Note note = null;
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java
index a78a582..6a45934 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java
@@ -21,6 +21,7 @@ import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpMethodBase;
 import org.apache.commons.httpclient.cookie.CookiePolicy;
 import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.DeleteMethod;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.PutMethod;
@@ -124,13 +125,17 @@ public class ZeppelinServerMock {
           ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(),
           notebookDir.getPath()
       );
+      LOG.info("zconf.getClusterAddress() = {}", zconf.getClusterAddress());
+      System.setProperty(
+          ZeppelinConfiguration.ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(),
+          zconf.getClusterAddress()
+      );
 
       // some test profile does not build zeppelin-web.
       // to prevent zeppelin starting up fail, create zeppelin-web/dist 
directory
       new File("../zeppelin-web/dist").mkdirs();
 
-      LOG.info("Staring test Zeppelin up...");
-      ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+      LOG.info("Staring ZeppelinServerMock Zeppelin up...");
 
       executor = Executors.newSingleThreadExecutor();
       executor.submit(SERVER);
@@ -147,7 +152,7 @@ public class ZeppelinServerMock {
         throw new RuntimeException("Can not start Zeppelin server");
       }
       
//ZeppelinServer.notebook.setParagraphJobListener(NotebookServer.getInstance());
-      LOG.info("Test Zeppelin stared.");
+      LOG.info("ZeppelinServerMock stared.");
     }
   }
 
@@ -169,7 +174,7 @@ public class ZeppelinServerMock {
           
TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().restart(setting.getId());
         }
       }
-      LOG.info("Terminating test Zeppelin...");
+      LOG.info("ZeppelinServerMock Zeppelin...");
       ZeppelinServer.jettyWebServer.stop();
       executor.shutdown();
       PluginManager.reset();
@@ -187,7 +192,7 @@ public class ZeppelinServerMock {
         throw new RuntimeException("Can not stop Zeppelin server");
       }
 
-      LOG.info("Test Zeppelin terminated.");
+      LOG.info("ZeppelinServerMock terminated.");
 
       if (deleteConfDir && 
!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) {
         // don't delete interpreter.json when recovery is enabled. otherwise 
the interpreter setting
@@ -196,7 +201,6 @@ public class ZeppelinServerMock {
         FileUtils.deleteDirectory(confDir);
       }
     }
-
   }
 
   protected static boolean checkIfServerIsRunning() {
@@ -266,6 +270,24 @@ public class ZeppelinServerMock {
     return httpPost(path, body, StringUtils.EMPTY, StringUtils.EMPTY);
   }
 
+  protected static DeleteMethod httpDelete(String path) throws IOException {
+    return httpDelete(path, StringUtils.EMPTY, StringUtils.EMPTY);
+  }
+
+  protected static DeleteMethod httpDelete(String path, String user, String 
pwd)
+      throws IOException {
+    LOG.info("Connecting to {}", URL + path);
+    HttpClient httpClient = new HttpClient();
+    DeleteMethod deleteMethod = new DeleteMethod(URL + path);
+    deleteMethod.addRequestHeader("Origin", URL);
+    if (userAndPasswordAreNotBlank(user, pwd)) {
+      deleteMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, 
pwd));
+    }
+    httpClient.executeMethod(deleteMethod);
+    LOG.info("{} - {}", deleteMethod.getStatusCode(), 
deleteMethod.getStatusText());
+    return deleteMethod;
+  }
+
   protected static PostMethod httpPost(String path, String request, String 
user, String pwd)
       throws IOException {
     LOG.info("Connecting to {}", URL + path);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 0afd7b0..82ff9c2 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -20,11 +20,16 @@ package org.apache.zeppelin.interpreter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.annotations.SerializedName;
 import com.google.gson.internal.StringMap;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -49,6 +54,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -385,6 +392,10 @@ public class InterpreterSetting {
     return id;
   }
 
+  public void setId(String id) {
+    this.id = id;
+  }
+
   public String getName() {
     return name;
   }
@@ -985,4 +996,105 @@ public class InterpreterSetting {
   public void waitForReady() throws InterpreterException {
     waitForReady(Long.MAX_VALUE);
   }
+
+  public static String toJson(InterpreterSetting intpSetting) {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+    StringWriter stringWriter = new StringWriter();
+    JsonWriter jsonWriter = new JsonWriter(stringWriter);
+    try {
+      // id
+      jsonWriter.beginObject();
+      jsonWriter.name("id");
+      jsonWriter.value(intpSetting.getId());
+
+      // name
+      jsonWriter.name("name");
+      jsonWriter.value(intpSetting.getName());
+
+      // group
+      jsonWriter.name("group");
+      jsonWriter.value(intpSetting.getGroup());
+
+      // dependencies
+      jsonWriter.name("dependencies");
+      String jsonDep = gson.toJson(intpSetting.getDependencies(), new 
TypeToken<List<Dependency>>() {
+      }.getType());
+      jsonWriter.value(jsonDep);
+
+      // properties
+      jsonWriter.name("properties");
+      String jsonProps = gson.toJson(intpSetting.getProperties(), new 
TypeToken<Map<String, InterpreterProperty>>() {
+      }.getType());
+      jsonWriter.value(jsonProps);
+
+      // interpreterOption
+      jsonWriter.name("interpreterOption");
+      String jsonOption = gson.toJson(intpSetting.getOption(), new 
TypeToken<InterpreterOption>() {
+      }.getType());
+      jsonWriter.value(jsonOption);
+
+      // interpreterGroup
+      jsonWriter.name("interpreterGroup");
+      String jsonIntpInfos = gson.toJson(intpSetting.getInterpreterInfos(), 
new TypeToken<List<InterpreterInfo>>() {
+      }.getType());
+      jsonWriter.value(jsonIntpInfos);
+
+      jsonWriter.endObject();
+      jsonWriter.flush();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+
+    return stringWriter.getBuffer().toString();
+  }
+
+  public static InterpreterSetting fromJson(String json) {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+    StringReader stringReader = new StringReader(json);
+    JsonReader jsonReader = new JsonReader(stringReader);
+    InterpreterSetting intpSetting = new InterpreterSetting();
+    try {
+      jsonReader.beginObject();
+      while (jsonReader.hasNext()) {
+        String tag = jsonReader.nextName();
+        if (tag.equals("id")) {
+          String id = jsonReader.nextString();
+          intpSetting.setId(id);
+        } else if (tag.equals("name")) {
+          String name = jsonReader.nextString();
+          intpSetting.setName(name);
+        } else if (tag.equals("group")) {
+          String group = jsonReader.nextString();
+          intpSetting.setGroup(group);
+        } else if (tag.equals("dependencies")) {
+          String strDep = jsonReader.nextString();
+          List<Dependency> dependencies = gson.fromJson(strDep, new 
TypeToken<List<Dependency>>() {}.getType());
+          intpSetting.setDependencies(dependencies);
+        } else if (tag.equals("properties")) {
+          String strProp = jsonReader.nextString();
+          Map<String, InterpreterProperty> properties = gson.fromJson(strProp,
+              new TypeToken<Map<String, InterpreterProperty>>() {}.getType());
+          intpSetting.setProperties(properties);
+        } else if (tag.equals("interpreterOption")) {
+          String strOption = jsonReader.nextString();
+          InterpreterOption intpOption = gson.fromJson(strOption, new 
TypeToken<InterpreterOption>() {}.getType());
+          intpSetting.setOption(intpOption);
+        } else if (tag.equals("interpreterGroup")) {
+          String strIntpInfos = jsonReader.nextString();
+          List<InterpreterInfo> intpInfos = gson.fromJson(strIntpInfos, new 
TypeToken<List<InterpreterInfo>>() {}.getType());
+          intpSetting.setInterpreterInfos(intpInfos);
+        } else {
+          LOGGER.error("Error data type!");
+        }
+      }
+      jsonReader.endObject();
+      jsonReader.close();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+
+    return intpSetting;
+  }
 }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 44d99be..7340ab0 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -25,11 +25,16 @@ import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
+
 import java.util.Set;
 import javax.inject.Inject;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.cluster.event.ClusterEvent;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.dep.Dependency;
@@ -83,6 +88,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.zeppelin.cluster.ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC;
+
 
 /**
  * InterpreterSettingManager is the component which manage all the interpreter 
settings.
@@ -90,7 +97,7 @@ import java.util.stream.Collectors;
  * TODO(zjffdu) We could move it into another separated component.
  */
 @ManagedObject("interpreterSettingManager")
-public class InterpreterSettingManager implements NoteEventListener {
+public class InterpreterSettingManager implements NoteEventListener, 
ClusterEventListener {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InterpreterSettingManager.class);
   private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
@@ -677,9 +684,29 @@ public class InterpreterSettingManager implements 
NoteEventListener {
   }
 
   public InterpreterSetting createNewSetting(String name, String group,
-      List<Dependency> dependencies, InterpreterOption option, Map<String, 
InterpreterProperty> p)
+                                             List<Dependency> dependencies,
+                                             InterpreterOption option,
+                                             Map<String, InterpreterProperty> 
properties)
       throws IOException {
 
+    InterpreterSetting interpreterSetting = null;
+    try {
+      interpreterSetting = inlineCreateNewSetting(name, group, dependencies, 
option, properties);
+
+      broadcastClusterEvent(ClusterEvent.CREATE_INTP_SETTING, 
interpreterSetting);
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+      throw e;
+    }
+
+    return interpreterSetting;
+  }
+
+  private InterpreterSetting inlineCreateNewSetting(String name, String group,
+                                                    List<Dependency> 
dependencies,
+                                                    InterpreterOption option,
+                                                    Map<String, 
InterpreterProperty> properties)
+      throws IOException {
     if (name.indexOf(".") >= 0) {
       throw new IOException("'.' is invalid for InterpreterSetting name.");
     }
@@ -695,15 +722,14 @@ public class InterpreterSettingManager implements 
NoteEventListener {
     //TODO(zjffdu) Should use setDependencies
     setting.appendDependencies(dependencies);
     setting.setInterpreterOption(option);
-    setting.setProperties(p);
+    setting.setProperties(properties);
     initInterpreterSetting(setting);
     interpreterSettings.put(setting.getId(), setting);
     saveToFile();
+
     return setting;
   }
 
-
-
   @VisibleForTesting
   public void closeNote(String user, String noteId) {
     // close interpreters in this note session
@@ -753,11 +779,12 @@ public class InterpreterSettingManager implements 
NoteEventListener {
   }
 
   /** Change interpreter properties and restart */
-  public void setPropertyAndRestart(
+  private InterpreterSetting inlineSetPropertyAndRestart(
       String id,
       InterpreterOption option,
       Map<String, InterpreterProperty> properties,
-      List<Dependency> dependencies)
+      List<Dependency> dependencies,
+      boolean initiator)
       throws InterpreterException, IOException {
     InterpreterSetting intpSetting = interpreterSettings.get(id);
     if (intpSetting != null) {
@@ -767,7 +794,9 @@ public class InterpreterSettingManager implements 
NoteEventListener {
         intpSetting.setProperties(properties);
         intpSetting.setDependencies(dependencies);
         intpSetting.postProcessing();
-        saveToFile();
+        if (initiator) {
+          saveToFile();
+        }
       } catch (Exception e) {
         loadFromFile();
         throw new IOException(e);
@@ -775,6 +804,23 @@ public class InterpreterSettingManager implements 
NoteEventListener {
     } else {
       throw new InterpreterException("Interpreter setting id " + id + " not 
found");
     }
+    return intpSetting;
+  }
+
+  /** Change interpreter properties and restart */
+  public void setPropertyAndRestart(
+      String id,
+      InterpreterOption option,
+      Map<String, InterpreterProperty> properties,
+      List<Dependency> dependencies)
+      throws InterpreterException, IOException {
+    try {
+      InterpreterSetting intpSetting = inlineSetPropertyAndRestart(id, option, 
properties, dependencies, true);
+      // broadcast cluster event
+      broadcastClusterEvent(ClusterEvent.UPDATE_INTP_SETTING, intpSetting);
+    } catch (Exception e) {
+      throw e;
+    }
   }
 
   // restart in note page
@@ -813,6 +859,17 @@ public class InterpreterSettingManager implements 
NoteEventListener {
   }
 
   public void remove(String id) throws IOException {
+    boolean removed = inlineRemove(id, true);
+    if (removed) {
+      // broadcast cluster event
+      InterpreterSetting intpSetting = new InterpreterSetting();
+      intpSetting.setId(id);
+      broadcastClusterEvent(ClusterEvent.DELETE_INTP_SETTING, intpSetting);
+    }
+  }
+
+  private boolean inlineRemove(String id, boolean initiator) throws 
IOException {
+    boolean removed = false;
     // 1. close interpreter groups of this interpreter setting
     // 2. remove this interpreter setting
     // 3. remove this interpreter setting from note binding
@@ -822,11 +879,18 @@ public class InterpreterSettingManager implements 
NoteEventListener {
       InterpreterSetting intp = interpreterSettings.get(id);
       intp.close();
       interpreterSettings.remove(id);
-      saveToFile();
+      if (initiator) {
+        // Event initiator saves the file
+        // Cluster event accepting nodes do not need to save files repeatedly
+        saveToFile();
+      }
+      removed = true;
     }
 
     File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + 
id);
     FileUtils.deleteDirectory(localRepoDir);
+
+    return removed;
   }
 
   /**
@@ -978,4 +1042,61 @@ public class InterpreterSettingManager implements 
NoteEventListener {
   public void onParagraphStatusChange(Paragraph p, Job.Status status) throws 
IOException {
 
   }
+
+  @Override
+  public void onClusterEvent(String msg) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("onClusterEvent : {}", msg);
+    }
+
+    try {
+      Gson gson = new Gson();
+      ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+      String jsonIntpSetting = message.get("intpSetting");
+      InterpreterSetting intpSetting = 
InterpreterSetting.fromJson(jsonIntpSetting);
+      String id = intpSetting.getId();
+      String name = intpSetting.getName();
+      String group = intpSetting.getGroup();
+      InterpreterOption option = intpSetting.getOption();
+      HashMap<String, InterpreterProperty> properties
+          = (HashMap<String, InterpreterProperty>) InterpreterSetting
+          .convertInterpreterProperties(intpSetting.getProperties());
+      List<Dependency> dependencies = intpSetting.getDependencies();
+
+      switch (message.clusterEvent) {
+        case CREATE_INTP_SETTING:
+          inlineCreateNewSetting(name, group, dependencies, option, 
properties);
+          break;
+        case UPDATE_INTP_SETTING:
+          inlineSetPropertyAndRestart(id, option, properties, dependencies, 
false);
+          break;
+        case DELETE_INTP_SETTING:
+          inlineRemove(id, false);
+          break;
+        default:
+          LOGGER.error("Unknown clusterEvent:{}, msg:{} ", 
message.clusterEvent, msg);
+          break;
+      }
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+    } catch (InterpreterException e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  // broadcast cluster event
+  private void broadcastClusterEvent(ClusterEvent event, InterpreterSetting 
intpSetting) {
+    if (!conf.isClusterMode()) {
+      return;
+    }
+
+    String jsonIntpSetting = InterpreterSetting.toJson(intpSetting);
+
+    ClusterMessage message = new ClusterMessage(event);
+    message.put("intpSetting", jsonIntpSetting);
+    String msg = ClusterMessage.serializeMessage(message);
+    ClusterManagerServer.getInstance().broadcastClusterEvent(
+        CLUSTER_INTP_SETTING_EVENT_TOPIC, msg);
+  }
+
 }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
index 5f0ea36..91face1 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
@@ -326,7 +326,6 @@ public class AuthorizationService implements 
ClusterEventListener {
   // broadcast cluster event
   private void broadcastClusterEvent(ClusterEvent event, String noteId,
                                      String user, Set<String> set) {
-    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
     if (!conf.isClusterMode()) {
       return;
     }
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
index b465a89..eb02609 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
@@ -24,7 +24,9 @@ import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class InterpreterSettingTest {
 
@@ -432,4 +434,35 @@ public class InterpreterSettingTest {
     interpreterSetting.closeInterpreters("user2", "note2");
     assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
   }
+
+  @Test
+  public void testInterpreterJsonSerializable() {
+    InterpreterOption interpreterOption = new InterpreterOption();
+    interpreterOption.setPerUser(InterpreterOption.SHARED);
+    InterpreterInfo interpreterInfo1 = new 
InterpreterInfo(EchoInterpreter.class.getName(),
+        "echo", true, new HashMap<String, Object>(), new HashMap<String, 
Object>());
+    InterpreterInfo interpreterInfo2 = new 
InterpreterInfo(DoubleEchoInterpreter.class.getName(),
+        "double_echo", false, new HashMap<String, Object>(),
+        new HashMap<String, Object>());
+    List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(interpreterInfo1);
+    interpreterInfos.add(interpreterInfo2);
+    InterpreterSetting interpreterSetting = new InterpreterSetting.Builder()
+        .setId("id")
+        .setName("id")
+        .setGroup("group")
+        .setInterpreterInfos(interpreterInfos)
+        .setOption(interpreterOption)
+        .create();
+
+    String json = InterpreterSetting.toJson(interpreterSetting);
+
+    InterpreterSetting checkIntpSetting = InterpreterSetting.fromJson(json);
+    assertEquals(checkIntpSetting.getId(), "id");
+    assertEquals(checkIntpSetting.getName(), "id");
+    assertEquals(checkIntpSetting.getGroup(), "group");
+    assertTrue(checkIntpSetting.getOption().perUserShared());
+    assertNotNull(checkIntpSetting.getInterpreterInfo("echo"));
+    assertNotNull(checkIntpSetting.getInterpreterInfo("double_echo"));
+  }
 }

Reply via email to