This is an automated email from the ASF dual-hosted git repository. chaitalithombare pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 4fb2885c2 ATLAS-4977 : storm-bridge, storm-bridge-shim modules: update for code readability improvement (#404) 4fb2885c2 is described below commit 4fb2885c2dfa1da405a50b23afa996d753669b27 Author: Vinayak Marraiya <72193307+vinayakmarraiya230...@users.noreply.github.com> AuthorDate: Mon Jul 21 12:58:18 2025 +0530 ATLAS-4977 : storm-bridge, storm-bridge-shim modules: update for code readability improvement (#404) Co-authored-by: Vinayak Marraiya <vinayak.marra...@cloudera.com> --- .../apache/atlas/storm/hook/StormAtlasHook.java | 11 ++--- .../apache/atlas/storm/hook/StormAtlasHook.java | 26 ++++++------ .../apache/atlas/storm/hook/StormTopologyUtil.java | 49 +++++++++++++--------- .../apache/atlas/storm/model/StormDataTypes.java | 8 +--- .../apache/atlas/storm/hook/StormAtlasHookIT.java | 14 +++---- .../org/apache/atlas/storm/hook/StormTestUtil.java | 3 +- 6 files changed, 53 insertions(+), 58 deletions(-) diff --git a/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 0ce7633aa..e30a674db 100644 --- a/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -18,7 +18,6 @@ package org.apache.atlas.storm.hook; - import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader; import org.apache.storm.ISubmitterHook; import org.apache.storm.generated.StormTopology; @@ -33,14 +32,11 @@ import java.util.Map; */ public class StormAtlasHook implements ISubmitterHook { private static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class); - - private static final String ATLAS_PLUGIN_TYPE = "storm"; private static final String ATLAS_STORM_HOOK_IMPL_CLASSNAME = "org.apache.atlas.storm.hook.StormAtlasHook"; - private AtlasPluginClassLoader atlasPluginClassLoader = null; - private ISubmitterHook stormHook = null; - + private AtlasPluginClassLoader atlasPluginClassLoader; + private ISubmitterHook stormHook; public StormAtlasHook() { this.initialize(); @@ -48,11 +44,10 @@ public class StormAtlasHook implements ISubmitterHook { @Override public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology) - throws IllegalAccessException { + throws IllegalAccessException { if (LOG.isDebugEnabled()) { LOG.debug("==> StormAtlasHook.notify({}, {}, {})", topologyInfo, stormConf, stormTopology); } - try { activatePluginClassLoader(); stormHook.notify(topologyInfo, stormConf, stormTopology); diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 69d58d574..6c339e7c1 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -18,40 +18,40 @@ package org.apache.atlas.storm.hook; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.storm.model.StormDataTypes; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.utils.HdfsNameServiceResolver; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.storm.ISubmitterHook; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.utils.Utils; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasConstants; -import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; -import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.storm.model.StormDataTypes; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Date; import static org.apache.atlas.repository.Constants.STORM_SOURCE; @@ -264,7 +264,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { final String dbName = config.get("HiveBolt.options.databaseName"); final String tblName = config.get("HiveBolt.options.tableName"); - if (dbName == null || tblName ==null) { + if (dbName == null || tblName == null) { LOG.error("Hive database or table name not found"); } else { AtlasEntity dbEntity = new AtlasEntity("hive_db"); @@ -413,4 +413,4 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { public String getMessageSource() { return STORM_SOURCE; } -} \ No newline at end of file +} diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java index b903dbc69..efa694b62 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java @@ -18,12 +18,12 @@ package org.apache.atlas.storm.hook; +import com.google.common.base.Joiner; import org.apache.commons.lang.StringUtils; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; -import com.google.common.base.Joiner; import org.slf4j.Logger; import java.lang.reflect.Field; @@ -94,25 +94,26 @@ public final class StormTopologyUtil { public static Set<String> removeSystemComponents(Set<String> components) { Set<String> userComponents = new HashSet<>(); for (String component : components) { - if (!isSystemComponent(component)) + if (!isSystemComponent(component)) { userComponents.add(component); + } } return userComponents; } private static final Set<Class> WRAPPER_TYPES = new HashSet<Class>() {{ - add(Boolean.class); - add(Character.class); - add(Byte.class); - add(Short.class); - add(Integer.class); - add(Long.class); - add(Float.class); - add(Double.class); - add(Void.class); - add(String.class); - }}; + add(Boolean.class); + add(Character.class); + add(Byte.class); + add(Short.class); + add(Integer.class); + add(Long.class); + add(Float.class); + add(Double.class); + add(Void.class); + add(String.class); + }}; public static boolean isWrapperType(Class clazz) { return WRAPPER_TYPES.contains(clazz); @@ -161,7 +162,9 @@ public final class StormTopologyUtil { continue; } else if (fieldVal.getClass().isPrimitive() || isWrapperType(fieldVal.getClass())) { - if (toString(fieldVal, false).isEmpty()) continue; + if (toString(fieldVal, false).isEmpty()) { + continue; + } output.put(key, toString(fieldVal, false)); } else if (isMapType(fieldVal.getClass())) { //TODO: check if it makes more sense to just stick to json @@ -181,7 +184,9 @@ public final class StormTopologyUtil { //TODO check if it makes more sense to just stick to // json like structure instead of a flatten output. Collection collection = (Collection) fieldVal; - if (collection.size() == 0) continue; + if (collection.size() == 0) { + continue; + } String outStr = ""; for (Object o : collection) { outStr += getString(o, false, objectsToSkip) + ","; @@ -203,7 +208,7 @@ public final class StormTopologyUtil { } } } - catch (Exception e){ + catch (Exception e) { LOG.warn("Exception while constructing topology", e); } return output; @@ -237,12 +242,16 @@ public final class StormTopologyUtil { } private static String toString(Object instance, boolean wrapWithQuote) { - if (instance instanceof String) - if (wrapWithQuote) + if (instance instanceof String) { + if (wrapWithQuote) { return "\"" + instance + "\""; - else + } + else { return instance.toString(); - else + } + } + else { return instance.toString(); + } } } diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java index 7eb1e3cb8..fd29146a4 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java @@ -15,15 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.atlas.storm.model; - - /** * Storm Data Types for model and hook. */ public enum StormDataTypes { - // Topology Classes STORM_TOPOLOGY, // represents the topology containing the DAG @@ -34,9 +30,7 @@ public enum StormDataTypes { // Data Sets KAFKA_TOPIC, // kafka data set JMS_TOPIC, // jms data set - HBASE_TABLE, // hbase table data set - ; - + HBASE_TABLE; // hbase table data set public String getName() { return name().toLowerCase(); } diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java index e11e1b8b0..36c19c7a7 100644 --- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java +++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java @@ -19,26 +19,25 @@ package org.apache.atlas.storm.hook; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; -import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.storm.model.StormDataTypes; import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.commons.configuration.Configuration; import org.apache.storm.ILocalCluster; import org.apache.storm.generated.StormTopology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.testng.Assert.assertNotNull; + @Test public class StormAtlasHookIT { - public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHookIT.class); private static final String ATLAS_URL = "http://localhost:21000/"; @@ -55,13 +54,12 @@ public class StormAtlasHookIT { Configuration configuration = ApplicationProperties.get(); if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[]{"admin", "admin"}); + atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[] {"admin", "admin"}); } else { atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT)); } } - @AfterClass public void tearDown() throws Exception { LOG.info("Shutting down storm local cluster"); @@ -78,11 +76,11 @@ public class StormAtlasHookIT { // todo: test if topology metadata is registered in atlas String guid = getTopologyGUID(); - Assert.assertNotNull(guid); + assertNotNull(guid); LOG.info("GUID is {}", guid); Referenceable topologyReferenceable = atlasClient.getEntity(guid); - Assert.assertNotNull(topologyReferenceable); + assertNotNull(topologyReferenceable); } private String getTopologyGUID() throws Exception { diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java index d869f18cd..eb0171084 100644 --- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java +++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java @@ -34,13 +34,12 @@ import java.util.HashMap; * An until to create a test topology. */ final class StormTestUtil { - private StormTestUtil() { } public static ILocalCluster createLocalStormCluster() { // start a local storm cluster - HashMap<String,Object> localClusterConf = new HashMap<>(); + HashMap<String, Object> localClusterConf = new HashMap<>(); localClusterConf.put("nimbus-daemon", true); return Testing.getLocalCluster(localClusterConf); }