This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new aa4731e699 [ZEPPELIN-5845] Cluster polish (#4505) aa4731e699 is described below commit aa4731e69916ed20a3aad56a89544d7a08d57d24 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Thu Nov 10 12:57:13 2022 +0100 [ZEPPELIN-5845] Cluster polish (#4505) --- .../apache/zeppelin/cluster/ClusterManager.java | 16 +++---- .../zeppelin/cluster/ClusterManagerServer.java | 11 ++--- .../apache/zeppelin/cluster/ClusterMonitor.java | 4 +- .../apache/zeppelin/cluster/meta/ClusterMeta.java | 49 ++++++++++------------ .../zeppelin/cluster/meta/ClusterMetaEntity.java | 7 ++-- .../zeppelin/cluster/ClusterMultiNodeTest.java | 19 ++++----- .../zeppelin/cluster/ClusterSingleNodeTest.java | 21 +++++----- .../launcher/ClusterInterpreterCheckThread.java | 6 +-- .../launcher/ClusterInterpreterLauncher.java | 12 +++--- .../launcher/ClusterInterpreterLauncherTest.java | 3 +- .../interpreter/launcher/ClusterMockTest.java | 18 ++++---- .../org/apache/zeppelin/rest/ClusterRestApi.java | 17 ++++---- .../ClusterIntpSettingEventListenerTest.java | 2 +- .../cluster/ClusterNoteEventListenerTest.java | 1 - 14 files changed, 92 insertions(+), 94 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java index 38bb832334..6e586afb7f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java @@ -344,7 +344,7 @@ public abstract class ClusterManager { ClusterMetaType metaType = entity.getMetaType(); String metaKey = entity.getKey(); - HashMap<String, Object> newMetaValue = entity.getValues(); + Map<String, Object> newMetaValue = entity.getValues(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("putClusterMeta {} {}", metaType, metaKey); @@ -361,7 +361,7 @@ public abstract class ClusterManager { } // put metadata into cluster metadata - public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) { + public void putClusterMeta(ClusterMetaType type, String key, Map<String, Object> values) { ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values); boolean result = putClusterMeta(metaEntity); @@ -407,9 +407,9 @@ public abstract class ClusterManager { } // get metadata by cluster metadata - public HashMap<String, HashMap<String, Object>> getClusterMeta( + public Map<String, Map<String, Object>> getClusterMeta( ClusterMetaType metaType, String metaKey) { - HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>(); + Map<String, Map<String, Object>> clusterMeta = new HashMap<>(); if (!raftInitialized()) { LOGGER.error("Raft incomplete initialization!"); return clusterMeta; @@ -434,7 +434,7 @@ public abstract class ClusterManager { } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString()); + LOGGER.debug("getClusterMeta >>> {}", clusterMeta); } return clusterMeta; @@ -442,12 +442,12 @@ public abstract class ClusterManager { public InterpreterClient getIntpProcessStatus(String intpName, int timeout, - ClusterCallback<HashMap<String, Object>> callback) { + ClusterCallback<Map<String, Object>> callback) { final int CHECK_META_INTERVAL = 1000; int MAX_RETRY_GET_META = timeout / CHECK_META_INTERVAL; int retryGetMeta = 0; while (retryGetMeta++ < MAX_RETRY_GET_META) { - HashMap<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName); + Map<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName); if (interpreterMetaOnline(intpMeta)) { // connect exist Interpreter Process String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST); @@ -485,7 +485,7 @@ public abstract class ClusterManager { } // Check if the interpreter is online - private boolean interpreterMetaOnline(HashMap<String, Object> intpProcMeta) { + private boolean interpreterMetaOnline(Map<String, Object> intpProcMeta) { if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST) && intpProcMeta.containsKey(INTP_TSERVER_PORT) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java index 11e56029df..b394b25f25 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java @@ -90,6 +90,7 @@ public class ClusterManagerServer extends ClusterManager { } } + @Override public void start() { if (!zConf.isClusterMode()) { return; @@ -262,13 +263,13 @@ public class ClusterManagerServer extends ClusterManager { } // Obtain the server node whose resources are idle in the cluster - public HashMap<String, Object> getIdleNodeMeta() { - HashMap<String, Object> idleNodeMeta = null; - HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(SERVER_META, ""); + public Map<String, Object> getIdleNodeMeta() { + Map<String, Object> idleNodeMeta = null; + Map<String, Map<String, Object>> clusterMeta = getClusterMeta(SERVER_META, ""); long memoryIdle = 0; - for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) { - HashMap<String, Object> meta = entry.getValue(); + for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) { + Map<String, Object> meta = entry.getValue(); // Check if the service or process is offline String status = (String) meta.get(ClusterMeta.STATUS); if (null == status || StringUtils.isEmpty(status) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java index 2f0b60c4c0..133f8b6910 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java @@ -136,14 +136,14 @@ public class ClusterMonitor { LocalDateTime now = LocalDateTime.now(); // check machine mate for (ClusterMetaType metaType : ClusterMetaType.values()) { - Map<String, HashMap<String, Object>> clusterMeta + Map<String, Map<String, Object>> clusterMeta = clusterManager.getClusterMeta(metaType, ""); if (LOGGER.isDebugEnabled()) { LOGGER.debug("clusterMeta : {}", clusterMeta); } - for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) { + for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) { String key = entry.getKey(); Map<String, Object> meta = entry.getValue(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java index 4e8a2767f1..95162bb225 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.cluster.meta; -import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,36 +28,36 @@ import java.util.Map; * Metadata stores metadata information in a KV key-value pair */ public class ClusterMeta implements Serializable { - private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMeta.class); // The name of each server node in the cluster - public static String NODE_NAME = "NODE_NAME"; + public static final String NODE_NAME = "NODE_NAME"; // zeppelin-server meta - public static String SERVER_HOST = "SERVER_HOST"; - public static String SERVER_PORT = "SERVER_PORT"; - public static String SERVER_START_TIME = "SERVER_START_TIME"; + public static final String SERVER_HOST = "SERVER_HOST"; + public static final String SERVER_PORT = "SERVER_PORT"; + public static final String SERVER_START_TIME = "SERVER_START_TIME"; // interperter-process meta - public static String INTP_PROCESS_NAME = "INTP_PROCESS_NAME"; - public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST"; - public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT"; - public static String INTP_START_TIME = "INTP_START_TIME"; + public static final String INTP_PROCESS_NAME = "INTP_PROCESS_NAME"; + public static final String INTP_TSERVER_HOST = "INTP_TSERVER_HOST"; + public static final String INTP_TSERVER_PORT = "INTP_TSERVER_PORT"; + public static final String INTP_START_TIME = "INTP_START_TIME"; // zeppelin-server resource usage - public static String CPU_CAPACITY = "CPU_CAPACITY"; - public static String CPU_USED = "CPU_USED"; - public static String MEMORY_CAPACITY = "MEMORY_CAPACITY"; - public static String MEMORY_USED = "MEMORY_USED"; + public static final String CPU_CAPACITY = "CPU_CAPACITY"; + public static final String CPU_USED = "CPU_USED"; + public static final String MEMORY_CAPACITY = "MEMORY_CAPACITY"; + public static final String MEMORY_USED = "MEMORY_USED"; - public static String LATEST_HEARTBEAT = "LATEST_HEARTBEAT"; + public static final String LATEST_HEARTBEAT = "LATEST_HEARTBEAT"; // zeppelin-server or interperter-process status - public static String STATUS = "STATUS"; - public static String ONLINE_STATUS = "ONLINE"; - public static String OFFLINE_STATUS = "OFFLINE"; - public static String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT"; - public static String INTP_PROCESS_LIST = "INTP_PROCESS_LIST"; + public static final String STATUS = "STATUS"; + public static final String ONLINE_STATUS = "ONLINE"; + public static final String OFFLINE_STATUS = "OFFLINE"; + public static final String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT"; + public static final String INTP_PROCESS_LIST = "INTP_PROCESS_LIST"; // cluster_name = host:port // Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} @@ -67,8 +66,6 @@ public class ClusterMeta implements Serializable { // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...} private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>(); - public static Gson gson = new Gson(); - public void put(ClusterMetaType type, String key, Object value) { Map<String, Object> mapValue = (Map<String, Object>) value; @@ -104,7 +101,7 @@ public class ClusterMeta implements Serializable { if (mapServerMeta.containsKey(key)) { values = mapServerMeta.get(key); } else { - logger.warn("can not find key : {}", key); + LOGGER.warn("can not find key : {}", key); } break; case INTP_PROCESS_META: @@ -114,7 +111,7 @@ public class ClusterMeta implements Serializable { if (mapInterpreterMeta.containsKey(key)) { values = mapInterpreterMeta.get(key); } else { - logger.warn("can not find key : {}", key); + LOGGER.warn("can not find key : {}", key); } break; } @@ -131,14 +128,14 @@ public class ClusterMeta implements Serializable { if (mapServerMeta.containsKey(key)) { return mapServerMeta.remove(key); } else { - logger.warn("can not find key : {}", key); + LOGGER.warn("can not find key : {}", key); } break; case INTP_PROCESS_META: if (mapInterpreterMeta.containsKey(key)) { return mapInterpreterMeta.remove(key); } else { - logger.warn("can not find key : {}", key); + LOGGER.warn("can not find key : {}", key); } break; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java index 7a5afb013b..fab1333dfe 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.cluster.meta; import java.io.Serializable; import java.util.HashMap; +import java.util.Map; /** * Cluster operations, cluster types, encapsulation objects for keys and values @@ -26,10 +27,10 @@ public class ClusterMetaEntity implements Serializable { private ClusterMetaOperation operation; private ClusterMetaType type; private String key; - private HashMap<String, Object> values = new HashMap<>(); + private Map<String, Object> values = new HashMap<>(); public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type, - String key, HashMap<String, Object> values) { + String key, Map<String, Object> values) { this.operation = operation; this.type = type; this.key = key; @@ -51,7 +52,7 @@ public class ClusterMetaEntity implements Serializable { return key; } - public HashMap<String, Object> getValues() { + public Map<String, Object> getValues() { return values; } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java index c4e88b0d7e..f5c79421a5 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java @@ -21,18 +21,18 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ClusterMultiNodeTest { private static Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class); @@ -68,10 +68,10 @@ public class ClusterMultiNodeTest { String clusterHost = parts[0]; int clusterPort = Integer.valueOf(parts[1]); - Class clazz = ClusterManagerServer.class; - Constructor constructor = clazz.getDeclaredConstructor(); + Class<ClusterManagerServer> clazz = ClusterManagerServer.class; + Constructor<ClusterManagerServer> constructor = clazz.getDeclaredConstructor(); constructor.setAccessible(true); - ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance(); + ClusterManagerServer clusterServer = constructor.newInstance(); clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort); clusterServers.add(clusterServer); @@ -145,17 +145,16 @@ public class ClusterMultiNodeTest { public static void getClusterServerMeta() { LOGGER.info("getClusterServerMeta >>>"); // Get metadata for all services - Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); + Map<String, Map<String, Object>> srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); LOGGER.info(srvMeta.toString()); - Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); + Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); LOGGER.info(intpMeta.toString()); assertNotNull(srvMeta); - assertEquals(true, (srvMeta instanceof HashMap)); - HashMap hashMap = (HashMap) srvMeta; + assertTrue(srvMeta instanceof Map); - assertEquals(hashMap.size(), 3); + assertEquals(3, srvMeta.size()); LOGGER.info("getClusterServerMeta <<<"); } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java index b6bb92102c..26d7bf9eb6 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java @@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ClusterSingleNodeTest { private static Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class); @@ -98,22 +100,19 @@ public class ClusterSingleNodeTest { LOGGER.info("getServerMeta >>>"); // Get metadata for all services - Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); + Map<String, Map<String, Object>> meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); LOGGER.info(meta.toString()); - Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); + Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); LOGGER.info(intpMeta.toString()); assertNotNull(meta); - assertEquals(true, (meta instanceof HashMap)); - HashMap hashMap = (HashMap) meta; + assertTrue(meta instanceof Map); // Get metadata for the current service - Object values = hashMap.get(clusterClient.getClusterNodeName()); - assertEquals(true, (values instanceof HashMap)); - HashMap mapMetaValues = (HashMap) values; - - assertEquals(true, mapMetaValues.size()>0); + Map<String, Object> values = meta.get(clusterClient.getClusterNodeName()); + assertTrue(values instanceof Map); + assertTrue(values.size() > 0); LOGGER.info("getServerMeta <<<"); } @@ -121,7 +120,7 @@ public class ClusterSingleNodeTest { @Test public void putIntpProcessMeta() { // mock IntpProcess Meta - HashMap<String, Object> meta = new HashMap<>(); + Map<String, Object> meta = new HashMap<>(); meta.put(ClusterMeta.SERVER_HOST, zServerHost); meta.put(ClusterMeta.SERVER_PORT, zServerPort); meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST"); @@ -135,7 +134,7 @@ public class ClusterSingleNodeTest { clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta); // get IntpProcess Meta - HashMap<String, HashMap<String, Object>> check + Map<String, Map<String, Object>> check = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey); LOGGER.info(check.toString()); diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java index ef7bc68e8c..f428bc2346 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java @@ -23,7 +23,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.Map; import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST; import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT; @@ -54,9 +54,9 @@ public class ClusterInterpreterCheckThread extends Thread { ZeppelinConfiguration.create()); clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout, - new ClusterCallback<HashMap<String, Object>>() { + new ClusterCallback<Map<String, Object>>() { @Override - public InterpreterClient online(HashMap<String, Object> result) { + public InterpreterClient online(Map<String, Object> result) { String intpTSrvHost = (String) result.get(INTP_TSERVER_HOST); int intpTSrvPort = (int) result.get(INTP_TSERVER_PORT); LOGGER.info("Found cluster interpreter {}:{}", intpTSrvHost, intpTSrvPort); diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index 397f6ab515..b34be06230 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -61,7 +61,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher @Override public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { - LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); + LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); this.context = context; this.properties = context.getProperties(); @@ -70,9 +70,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher // connect exist Interpreter Process InterpreterClient intpClient = clusterServer.getIntpProcessStatus( - intpGroupId, 3000, new ClusterCallback<HashMap<String, Object>>() { + intpGroupId, 3000, new ClusterCallback<Map<String, Object>>() { @Override - public InterpreterClient online(HashMap<String, Object> result) { + public InterpreterClient online(Map<String, Object> result) { String intpTserverHost = (String) result.get(INTP_TSERVER_HOST); int intpTserverPort = (int) result.get(INTP_TSERVER_PORT); @@ -100,7 +100,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher // No process was found for the InterpreterGroup ID String srvHost = null; int srvPort = 0; - HashMap<String, Object> meta = clusterServer.getIdleNodeMeta(); + Map<String, Object> meta = clusterServer.getIdleNodeMeta(); if (null == meta) { LOGGER.error("Don't get idle node meta, launch interpreter on local."); InterpreterClient clusterIntpProcess = createInterpreterProcess(context); @@ -145,9 +145,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher String finalSrvHost = srvHost; int finalSrvPort = srvPort; intpClient = clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout, - new ClusterCallback<HashMap<String, Object>>() { + new ClusterCallback<Map<String, Object>>() { @Override - public InterpreterClient online(HashMap<String, Object> result) { + public InterpreterClient online(Map<String, Object> result) { // connect exist Interpreter Process String intpTserverHost = (String) result.get(INTP_TSERVER_HOST); int intpTserverPort = (int) result.get(INTP_TSERVER_PORT); diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java index 995da7ecd9..743f800ce9 100644 --- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java @@ -33,7 +33,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class ClusterInterpreterLauncherTest extends ClusterMockTest { - private static Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class); @BeforeClass public static void startTest() throws IOException, InterruptedException { diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java index dfacfa0c46..a6be913257 100644 --- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java +++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.LocalDateTime; import java.util.HashMap; +import java.util.Map; import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS; import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS; @@ -112,27 +113,26 @@ public class ClusterMockTest { LOGGER.info("serverMeta >>>"); // Get metadata for all services - Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); + Map<String, Map<String, Object>> meta = + clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); LOGGER.info(meta.toString()); assertNotNull(meta); - assertEquals(true, (meta instanceof HashMap)); - HashMap hashMap = (HashMap) meta; + assertEquals(true, (meta instanceof Map)); // Get metadata for the current service - Object values = hashMap.get(zServerHost + ":" + zServerPort); - assertEquals(true, (values instanceof HashMap)); - HashMap mapMetaValues = (HashMap) values; + Map<String, Object> values = meta.get(zServerHost + ":" + zServerPort); + assertEquals(true, (values instanceof Map)); - assertEquals(true, mapMetaValues.size() > 0); + assertEquals(true, values.size() > 0); LOGGER.info("serverMeta <<<"); } public void mockIntpProcessMeta(String metaKey, boolean online) { // mock IntpProcess Meta - HashMap<String, Object> meta = new HashMap<>(); + Map<String, Object> meta = new HashMap<>(); meta.put(ClusterMeta.SERVER_HOST, "127.0.0.1"); meta.put(ClusterMeta.SERVER_PORT, 6000); meta.put(ClusterMeta.INTP_TSERVER_HOST, "127.0.0.1"); @@ -152,7 +152,7 @@ public class ClusterMockTest { clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta); // get IntpProcess Meta - HashMap<String, HashMap<String, Object>> check + Map<String, Map<String, Object>> check = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey); LOGGER.info(check.toString()); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java index e0c911a17b..31d936e077 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; /** * clusters Rest api. @@ -53,7 +54,7 @@ public class ClusterRestApi { // Do not modify, Use by `zeppelin-web/src/app/cluster/cluster.html` - private static String PROPERTIES = "properties"; + private static final String PROPERTIES = "properties"; @Inject public ClusterRestApi(ZeppelinConfiguration zConf) { @@ -85,13 +86,13 @@ public class ClusterRestApi { public Response getClusterNodes(){ List<Map<String, Object>> nodes = new ArrayList<>(); - Map<String, HashMap<String, Object>> clusterMeta; - Map<String, HashMap<String, Object>> intpMeta; + Map<String, Map<String, Object>> clusterMeta; + Map<String, Map<String, Object>> intpMeta; clusterMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.SERVER_META, ""); intpMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); // Number of calculation processes - for (Map.Entry<String, HashMap<String, Object>> serverMetaEntity : clusterMeta.entrySet()) { + for (Entry<String, Map<String, Object>> serverMetaEntity : clusterMeta.entrySet()) { if (!serverMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)) { continue; } @@ -99,7 +100,7 @@ public class ClusterRestApi { List<String> arrIntpProcess = new ArrayList<>(); int intpProcCount = 0; - for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) { + for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) { if (!intpMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME) && !intpMetaEntity.getValue().containsKey(ClusterMeta.INTP_PROCESS_NAME)) { continue; @@ -116,7 +117,7 @@ public class ClusterRestApi { serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_LIST, arrIntpProcess); } - for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) { + for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) { String nodeName = entry.getKey(); Map<String, Object> properties = entry.getValue(); @@ -197,11 +198,11 @@ public class ClusterRestApi { @PathParam("intpName") String intpName){ List<Map<String, Object>> intpProcesses = new ArrayList<>(); - Map<String, HashMap<String, Object>> intpMeta = clusterManagerServer.getClusterMeta( + Map<String, Map<String, Object>> intpMeta = clusterManagerServer.getClusterMeta( ClusterMetaType.INTP_PROCESS_META, ""); // Number of calculation processes - for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) { + for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) { String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME); if (null != intpNodeName && intpNodeName.equals(nodeName)) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java index 734611f272..79e9a6ee86 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.cluster; import org.apache.zeppelin.cluster.event.ClusterEventListener; import org.apache.zeppelin.cluster.event.ClusterMessage; -import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,5 +33,6 @@ public class ClusterIntpSettingEventListenerTest implements ClusterEventListener receiveMsg = msg; LOGGER.info("ClusterIntpSettingEventListenerTest#onClusterEvent : {}", msg); ClusterMessage message = ClusterMessage.deserializeMessage(msg); + assertNotNull(message); } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java index 0685cf9668..a34e2879d6 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.util.Map; import java.util.Set; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; public class ClusterNoteEventListenerTest implements ClusterEventListener {