This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push: new c423f297e ATLAS-4971: falcon-bridge, falcon-bridge-shim modules: update for cod… (#296) c423f297e is described below commit c423f297e805c96fc421d5ae53f7a156b857b881 Author: sheetalshah1007 <sheetal.s...@freestoneinfotech.com> AuthorDate: Thu Feb 20 10:03:47 2025 +0530 ATLAS-4971: falcon-bridge, falcon-bridge-shim modules: update for cod… (#296) (cherry picked from commit 0901f6d816b7a5e2944a5ab52ec6cf61be5b4a46) --- addons/falcon-bridge-shim/pom.xml | 4 + .../apache/atlas/falcon/service/AtlasService.java | 81 ++---- addons/falcon-bridge/pom.xml | 5 + .../apache/atlas/falcon/bridge/FalconBridge.java | 238 ++++++++-------- .../org/apache/atlas/falcon/event/FalconEvent.java | 32 +-- .../org/apache/atlas/falcon/hook/FalconHook.java | 108 ++++---- .../apache/atlas/falcon/model/FalconDataTypes.java | 7 +- .../falcon/publisher/FalconEventPublisher.java | 11 +- .../apache/atlas/falcon/service/AtlasService.java | 77 +++--- .../atlas/falcon/{Util => util}/EventUtil.java | 19 +- .../org/apache/atlas/falcon/hook/FalconHookIT.java | 307 +++++++++++---------- .../test/resources/atlas-application.properties | 43 +-- .../src/test/resources/atlas-logback.xml | 221 ++++++++------- .../falcon-bridge/src/test/resources/cluster.xml | 2 +- .../falcon-bridge/src/test/resources/feed-hdfs.xml | 12 +- .../src/test/resources/feed-replication.xml | 14 +- addons/falcon-bridge/src/test/resources/feed.xml | 10 +- .../falcon-bridge/src/test/resources/process.xml | 14 +- .../src/test/resources/startup.properties | 1 - 19 files changed, 592 insertions(+), 614 deletions(-) diff --git a/addons/falcon-bridge-shim/pom.xml b/addons/falcon-bridge-shim/pom.xml index 1ca461514..df0456f01 100755 --- a/addons/falcon-bridge-shim/pom.xml +++ b/addons/falcon-bridge-shim/pom.xml @@ -32,6 +32,10 @@ <name>Apache Atlas Falcon Bridge Shim</name> <description>Apache Atlas Falcon Bridge Shim Module</description> + <properties> + <checkstyle.failOnViolation>true</checkstyle.failOnViolation> + <checkstyle.skip>false</checkstyle.skip> + </properties> <dependencies> <!-- Logging --> <dependency> diff --git a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java index 2b756de0e..7b464ab33 100755 --- a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java +++ b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,7 +18,6 @@ package org.apache.atlas.falcon.service; - import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; @@ -34,12 +33,12 @@ import org.slf4j.LoggerFactory; public class AtlasService implements FalconService, ConfigurationChangeListener { private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); - private static final String ATLAS_PLUGIN_TYPE = "falcon"; + private static final String ATLAS_PLUGIN_TYPE = "falcon"; private static final String ATLAS_FALCON_HOOK_IMPL_CLASSNAME = "org.apache.atlas.falcon.service.AtlasService"; - private AtlasPluginClassLoader atlasPluginClassLoader = null; - private FalconService falconServiceImpl = null; - private ConfigurationChangeListener configChangeListenerImpl = null; + private AtlasPluginClassLoader atlasPluginClassLoader; + private FalconService falconServiceImpl; + private ConfigurationChangeListener configChangeListenerImpl; public AtlasService() { this.initialize(); @@ -47,9 +46,7 @@ public class AtlasService implements FalconService, ConfigurationChangeListener @Override public String getName() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.getName()"); - } + LOG.debug("==> AtlasService.getName()"); String ret = null; @@ -60,18 +57,14 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.getName()"); - } + LOG.debug("<== AtlasService.getName()"); return ret; } @Override public void init() throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.init()"); - } + LOG.debug("==> AtlasService.init()"); try { activatePluginClassLoader(); @@ -83,16 +76,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.init()"); - } + LOG.debug("<== AtlasService.init()"); } @Override public void destroy() throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.destroy()"); - } + LOG.debug("==> AtlasService.destroy()"); try { activatePluginClassLoader(); @@ -104,16 +93,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.destroy()"); - } + LOG.debug("<== AtlasService.destroy()"); } @Override public void onAdd(Entity entity) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onAdd({})", entity); - } + LOG.debug("==> AtlasService.onAdd({})", entity); try { activatePluginClassLoader(); @@ -122,16 +107,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onAdd({})", entity); - } + LOG.debug("<== AtlasService.onAdd({})", entity); } @Override public void onRemove(Entity entity) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onRemove({})", entity); - } + LOG.debug("==> AtlasService.onRemove({})", entity); try { activatePluginClassLoader(); @@ -140,16 +121,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onRemove({})", entity); - } + LOG.debug("<== AtlasService.onRemove({})", entity); } @Override public void onChange(Entity entity, Entity entity1) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1); - } + LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1); try { activatePluginClassLoader(); @@ -158,16 +135,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1); - } + LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1); } @Override public void onReload(Entity entity) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onReload({})", entity); - } + LOG.debug("==> AtlasService.onReload({})", entity); try { activatePluginClassLoader(); @@ -176,15 +149,11 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onReload({})", entity); - } + LOG.debug("<== AtlasService.onReload({})", entity); } private void initialize() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.initialize()"); - } + LOG.debug("==> AtlasService.initialize()"); try { atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass()); @@ -195,7 +164,7 @@ public class AtlasService implements FalconService, ConfigurationChangeListener Object atlasService = cls.newInstance(); - falconServiceImpl = (FalconService) atlasService; + falconServiceImpl = (FalconService) atlasService; configChangeListenerImpl = (ConfigurationChangeListener) atlasService; } catch (Exception excp) { LOG.error("Error instantiating Atlas hook implementation", excp); @@ -203,9 +172,7 @@ public class AtlasService implements FalconService, ConfigurationChangeListener deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.initialize()"); - } + LOG.debug("<== AtlasService.initialize()"); } private void activatePluginClassLoader() { diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index b29837613..eca232e4d 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -32,6 +32,11 @@ <name>Apache Atlas Falcon Bridge</name> <description>Apache Atlas Falcon Bridge Module</description> + <properties> + <checkstyle.failOnViolation>true</checkstyle.failOnViolation> + <checkstyle.skip>false</checkstyle.skip> + </properties> + <dependencies> <dependency> <groupId>org.apache.atlas</groupId> diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java index cbf002f4f..f21dd17c3 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java @@ -20,8 +20,8 @@ package org.apache.atlas.falcon.bridge; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; -import org.apache.atlas.falcon.Util.EventUtil; import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.falcon.util.EventUtil; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.v1.model.instance.Referenceable; @@ -60,15 +60,18 @@ import java.util.Map; public class FalconBridge { private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class); - public static final String COLO = "colo"; - public static final String TAGS = "tags"; - public static final String GROUPS = "groups"; - public static final String PIPELINES = "pipelines"; - public static final String WFPROPERTIES = "workflow-properties"; - public static final String RUNSON = "runs-on"; - public static final String STOREDIN = "stored-in"; - public static final String FREQUENCY = "frequency"; - public static final String ATTRIBUTE_DB = "db"; + public static final String COLO = "colo"; + public static final String TAGS = "tags"; + public static final String GROUPS = "groups"; + public static final String PIPELINES = "pipelines"; + public static final String WFPROPERTIES = "workflow-properties"; + public static final String RUNSON = "runs-on"; + public static final String STOREDIN = "stored-in"; + public static final String FREQUENCY = "frequency"; + public static final String ATTRIBUTE_DB = "db"; + + private FalconBridge() { + } /** * Creates cluster entity @@ -92,75 +95,49 @@ public class FalconBridge { } if (StringUtils.isNotEmpty(cluster.getTags())) { - clusterRef.set(FalconBridge.TAGS, - EventUtil.convertKeyValueStringToMap(cluster.getTags())); + clusterRef.set(FalconBridge.TAGS, EventUtil.convertKeyValueStringToMap(cluster.getTags())); } return clusterRef; } - private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) { - LOG.info("Creating feed dataset: {}", feed.getName()); - - Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); - feedEntity.set(AtlasClient.NAME, feed.getName()); - feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription()); - String feedQualifiedName = - getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME)); - feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); - feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString()); - feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable); - if (feed.getACL() != null) { - feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner()); - } - - if (StringUtils.isNotEmpty(feed.getTags())) { - feedEntity.set(FalconBridge.TAGS, - EventUtil.convertKeyValueStringToMap(feed.getTags())); - } - - if (feed.getGroups() != null) { - feedEntity.set(FalconBridge.GROUPS, feed.getGroups()); - } - - return feedEntity; - } - public static List<Referenceable> createFeedCreationEntity(Feed feed, ConfigurationStore falconStore) throws FalconException, URISyntaxException { LOG.info("Creating feed : {}", feed.getName()); List<Referenceable> entities = new ArrayList<>(); if (feed.getClusters() != null) { - List<Referenceable> replicationInputs = new ArrayList<>(); + List<Referenceable> replicationInputs = new ArrayList<>(); List<Referenceable> replicationOutputs = new ArrayList<>(); for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, - feedCluster.getName()); + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, feedCluster.getName()); // set cluster Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); // input as hive_table or hdfs_path, output as falcon_feed dataset - List<Referenceable> inputs = new ArrayList<>(); + List<Referenceable> inputs = new ArrayList<>(); List<Referenceable> inputReferenceables = getInputEntities(cluster, feed); + if (inputReferenceables != null) { entities.addAll(inputReferenceables); inputs.add(inputReferenceables.get(inputReferenceables.size() - 1)); } - List<Referenceable> outputs = new ArrayList<>(); - Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable); + List<Referenceable> outputs = new ArrayList<>(); + Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable); + if (feedEntity != null) { entities.add(feedEntity); outputs.add(feedEntity); } if (!inputs.isEmpty() || !outputs.isEmpty()) { - Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName()); - String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName()); + Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName()); + String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName()); feedCreateEntity.set(AtlasClient.NAME, feed.getName()); feedCreateEntity.set(AtlasClient.DESCRIPTION, "Feed creation - " + feed.getName()); @@ -169,6 +146,7 @@ public class FalconBridge { if (!inputs.isEmpty()) { feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); } + if (!outputs.isEmpty()) { feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); } @@ -185,32 +163,29 @@ public class FalconBridge { } if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) { - Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes - .FALCON_FEED_REPLICATION.getName()); + Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes.FALCON_FEED_REPLICATION.getName()); feedReplicationEntity.set(AtlasClient.NAME, feed.getName()); feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName()); - feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, replicationInputs); feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, replicationOutputs); + entities.add(feedReplicationEntity); } - } + return entities; } /** * Creates process entity - * + * * @param process process entity * @param falconStore config store * @return process instance reference - * * @throws FalconException if retrieving from the configuration store fail */ - public static List<Referenceable> createProcessEntity(org.apache.falcon.entity.v0.process.Process process, - ConfigurationStore falconStore) throws FalconException { + public static List<Referenceable> createProcessEntity(org.apache.falcon.entity.v0.process.Process process, ConfigurationStore falconStore) throws FalconException { LOG.info("Creating process Entity : {}", process.getName()); // The requirement is for each cluster, create a process entity with name @@ -218,44 +193,47 @@ public class FalconBridge { List<Referenceable> entities = new ArrayList<>(); if (process.getClusters() != null) { - for (Cluster processCluster : process.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = - falconStore.get(EntityType.CLUSTER, processCluster.getName()); - Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, processCluster.getName()); + Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); List<Referenceable> inputs = new ArrayList<>(); + if (process.getInputs() != null) { for (Input input : process.getInputs().getInputs()) { - Feed feed = falconStore.get(EntityType.FEED, input.getFeed()); + Feed feed = falconStore.get(EntityType.FEED, input.getFeed()); Referenceable inputReferenceable = getFeedDataSetReference(feed, clusterReferenceable); + entities.add(inputReferenceable); inputs.add(inputReferenceable); } } List<Referenceable> outputs = new ArrayList<>(); + if (process.getOutputs() != null) { for (Output output : process.getOutputs().getOutputs()) { - Feed feed = falconStore.get(EntityType.FEED, output.getFeed()); + Feed feed = falconStore.get(EntityType.FEED, output.getFeed()); Referenceable outputReferenceable = getFeedDataSetReference(feed, clusterReferenceable); + entities.add(outputReferenceable); outputs.add(outputReferenceable); } } if (!inputs.isEmpty() || !outputs.isEmpty()) { - Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName()); + processEntity.set(AtlasClient.NAME, process.getName()); - processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getProcessQualifiedName(process.getName(), cluster.getName())); + processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(process.getName(), cluster.getName())); processEntity.set(FalconBridge.FREQUENCY, process.getFrequency().toString()); if (!inputs.isEmpty()) { processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); } + if (!outputs.isEmpty()) { processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); } @@ -269,43 +247,89 @@ public class FalconBridge { } if (StringUtils.isNotEmpty(process.getTags())) { - processEntity.set(FalconBridge.TAGS, - EventUtil.convertKeyValueStringToMap(process.getTags())); + processEntity.set(FalconBridge.TAGS, EventUtil.convertKeyValueStringToMap(process.getTags())); } if (process.getPipelines() != null) { processEntity.set(FalconBridge.PIPELINES, process.getPipelines()); } - processEntity.set(FalconBridge.WFPROPERTIES, - getProcessEntityWFProperties(process.getWorkflow(), - process.getName())); + processEntity.set(FalconBridge.WFPROPERTIES, getProcessEntityWFProperties(process.getWorkflow(), process.getName())); entities.add(processEntity); } - } } + return entities; } - private static List<Referenceable> getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, - Feed feed) throws URISyntaxException { + public static String getFeedQualifiedName(final String feedName, final String clusterName) { + return String.format("%s@%s", feedName, clusterName); + } + + public static String getProcessQualifiedName(final String processName, final String clusterName) { + return String.format("%s@%s", processName, clusterName); + } + + public static String normalize(final String str) { + if (StringUtils.isBlank(str)) { + return null; + } + + return str.toLowerCase().trim(); + } + + private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) { + LOG.info("Creating feed dataset: {}", feed.getName()); + + Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + + feedEntity.set(AtlasClient.NAME, feed.getName()); + feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription()); + + String feedQualifiedName = getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME)); + + feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); + feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString()); + feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable); + + if (feed.getACL() != null) { + feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner()); + } + + if (StringUtils.isNotEmpty(feed.getTags())) { + feedEntity.set(FalconBridge.TAGS, EventUtil.convertKeyValueStringToMap(feed.getTags())); + } + + if (feed.getGroups() != null) { + feedEntity.set(FalconBridge.GROUPS, feed.getGroups()); + } + + return feedEntity; + } + + private static List<Referenceable> getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, Feed feed) throws URISyntaxException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); - if(feedCluster != null) { + if (feedCluster != null) { final CatalogTable table = getTable(feedCluster, feed); + if (table != null) { CatalogStorage storage = new CatalogStorage(cluster, table); - return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), - storage.getTable().toLowerCase()); + + return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), storage.getTable().toLowerCase()); } else { List<Location> locations = FeedHelper.getLocations(feedCluster, feed); + if (CollectionUtils.isNotEmpty(locations)) { Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); + if (dataLocation != null) { final String pathUri = normalize(dataLocation.getPath()); + LOG.info("Registering DFS Path {} ", pathUri); + return fillHDFSDataSet(pathUri, cluster.getName()); } } @@ -326,91 +350,83 @@ public class FalconBridge { private static List<Referenceable> fillHDFSDataSet(final String pathUri, final String clusterName) { List<Referenceable> entities = new ArrayList<>(); - Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH); + Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH); + ref.set("path", pathUri); + // Path path = new Path(pathUri); // ref.set("name", path.getName()); //TODO - Fix after ATLAS-542 to shorter Name Path path = new Path(pathUri); + ref.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + entities.add(ref); + return entities; } private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName) { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(AtlasClient.NAME, dbName); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + return dbRef; } - private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName, - String tableName) { + private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) { List<Referenceable> entities = new ArrayList<>(); - Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + entities.add(dbRef); Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + + tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(ATTRIBUTE_DB, dbRef); + entities.add(tableRef); return entities; } - private static Referenceable getClusterEntityReference(final String clusterName, - final String colo) { + private static Referenceable getClusterEntityReference(final String clusterName, final String colo) { LOG.info("Getting reference for entity {}", clusterName); + Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); + clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName)); clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName); clusterRef.set(FalconBridge.COLO, colo); + return clusterRef; } - private static Referenceable getFeedDataSetReference(Feed feed, Referenceable clusterReference) { LOG.info("Getting reference for entity {}", feed.getName()); + Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + feedDatasetRef.set(AtlasClient.NAME, feed.getName()); - feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(), - (String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME))); + feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(), (String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME))); feedDatasetRef.set(FalconBridge.STOREDIN, clusterReference); feedDatasetRef.set(FalconBridge.FREQUENCY, feed.getFrequency()); + return feedDatasetRef; } - private static Map<String, String> getProcessEntityWFProperties(final Workflow workflow, - final String processName) { + private static Map<String, String> getProcessEntityWFProperties(final Workflow workflow, final String processName) { Map<String, String> wfProperties = new HashMap<>(); - wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), - ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); - wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), - workflow.getVersion()); - wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), - workflow.getEngine().value()); - return wfProperties; - } - - public static String getFeedQualifiedName(final String feedName, final String clusterName) { - return String.format("%s@%s", feedName, clusterName); - } - - public static String getProcessQualifiedName(final String processName, final String clusterName) { - return String.format("%s@%s", processName, clusterName); - } + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), workflow.getVersion()); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), workflow.getEngine().value()); - public static String normalize(final String str) { - if (StringUtils.isBlank(str)) { - return null; - } - return str.toLowerCase().trim(); + return wfProperties; } } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java index 51db894ab..37dda6ed0 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,23 +24,14 @@ import org.apache.falcon.entity.v0.Entity; * Falcon event to interface with Atlas Service. */ public class FalconEvent { - protected String user; + protected String user; protected OPERATION operation; - protected Entity entity; + protected Entity entity; public FalconEvent(String doAsUser, OPERATION falconOperation, Entity entity) { - this.user = doAsUser; + this.user = doAsUser; this.operation = falconOperation; - this.entity = entity; - } - - public enum OPERATION { - ADD_CLUSTER, - UPDATE_CLUSTER, - ADD_FEED, - UPDATE_FEED, - ADD_PROCESS, - UPDATE_PROCESS, + this.entity = entity; } public String getUser() { @@ -54,4 +45,13 @@ public class FalconEvent { public Entity getEntity() { return entity; } + + public enum OPERATION { + ADD_CLUSTER, + UPDATE_CLUSTER, + ADD_FEED, + UPDATE_FEED, + ADD_PROCESS, + UPDATE_PROCESS, + } } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index b8a73cbe6..3a0f35d8e 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; + import static org.apache.atlas.repository.Constants.FALCON_SOURCE; /** @@ -43,31 +44,17 @@ import static org.apache.atlas.repository.Constants.FALCON_SOURCE; public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); - private static ConfigurationStore STORE; + private static ConfigurationStore store; @Override public String getMessageSource() { return FALCON_SOURCE; } - private enum Operation { - ADD, - UPDATE - } - - static { - try { - STORE = ConfigurationStore.get(); - } catch (Exception e) { - LOG.error("Caught exception initializing the falcon hook.", e); - } - - LOG.info("Created Atlas Hook for Falcon"); - } - @Override public void publish(final Data data) { final FalconEvent event = data.getEvent(); + try { fireAndForget(event); } catch (Throwable t) { @@ -77,17 +64,19 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private void fireAndForget(FalconEvent event) throws FalconException, URISyntaxException { LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); + List<HookNotification> messages = new ArrayList<>(); + Operation op = getOperation(event.getOperation()); + String user = getUser(event.getUser()); - Operation op = getOperation(event.getOperation()); - String user = getUser(event.getUser()); LOG.info("fireAndForget user:{}", user); - switch (op) { - case ADD: - messages.add(new EntityCreateRequest(user, createEntities(event, user))); - break; + switch (op) { + case ADD: + messages.add(new EntityCreateRequest(user, createEntities(event, user))); + break; } + notifyEntities(messages, null); } @@ -95,24 +84,23 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { List<Referenceable> entities = new ArrayList<>(); switch (event.getOperation()) { - case ADD_CLUSTER: - entities.add(FalconBridge - .createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity())); - break; - - case ADD_PROCESS: - entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE)); - break; - - case ADD_FEED: - entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE)); - break; - - case UPDATE_CLUSTER: - case UPDATE_FEED: - case UPDATE_PROCESS: - default: - LOG.info("Falcon operation {} is not valid or supported", event.getOperation()); + case ADD_CLUSTER: + entities.add(FalconBridge.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity())); + break; + + case ADD_PROCESS: + entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), store)); + break; + + case ADD_FEED: + entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), store)); + break; + + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + default: + LOG.info("Falcon operation {} is not valid or supported", event.getOperation()); } return entities; @@ -120,19 +108,33 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private static Operation getOperation(final FalconEvent.OPERATION op) throws FalconException { switch (op) { - case ADD_CLUSTER: - case ADD_FEED: - case ADD_PROCESS: - return Operation.ADD; - - case UPDATE_CLUSTER: - case UPDATE_FEED: - case UPDATE_PROCESS: - return Operation.UPDATE; - - default: - throw new FalconException("Falcon operation " + op + " is not valid or supported"); + case ADD_CLUSTER: + case ADD_FEED: + case ADD_PROCESS: + return Operation.ADD; + + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + return Operation.UPDATE; + + default: + throw new FalconException("Falcon operation " + op + " is not valid or supported"); } } -} + private enum Operation { + ADD, + UPDATE + } + + static { + try { + store = ConfigurationStore.get(); + } catch (Exception e) { + LOG.error("Caught exception initializing the falcon hook.", e); + } + + LOG.info("Created Atlas Hook for Falcon"); + } +} diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java index e36ff23af..ca1032ddc 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,5 +32,4 @@ public enum FalconDataTypes { public String getName() { return name().toLowerCase(); } - } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java index a01ec14be..a21244304 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,13 +18,14 @@ package org.apache.atlas.falcon.publisher; - import org.apache.atlas.falcon.event.FalconEvent; /** * Falcon publisher for Atlas */ public interface FalconEventPublisher { + void publish(Data data); + class Data { private FalconEvent event; @@ -36,6 +37,4 @@ public interface FalconEventPublisher { return event; } } - - void publish(final Data data); } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java index 7482ba7b8..e3014a408 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java @@ -18,10 +18,10 @@ package org.apache.atlas.falcon.service; -import org.apache.atlas.falcon.Util.EventUtil; import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.hook.FalconHook; import org.apache.atlas.falcon.publisher.FalconEventPublisher; +import org.apache.atlas.falcon.util.EventUtil; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -30,20 +30,19 @@ import org.apache.falcon.service.FalconService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Atlas service to publish Falcon events */ public class AtlasService implements FalconService, ConfigurationChangeListener { - private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); - private FalconEventPublisher publisher; /** * Constant for the service name. */ public static final String SERVICE_NAME = AtlasService.class.getSimpleName(); + private FalconEventPublisher publisher; + @Override public String getName() { return SERVICE_NAME; @@ -63,22 +62,22 @@ public class AtlasService implements FalconService, ConfigurationChangeListener try { EntityType entityType = entity.getEntityType(); switch (entityType) { - case CLUSTER: - addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER); - break; + case CLUSTER: + addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER); + break; - case PROCESS: - addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS); - break; + case PROCESS: + addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS); + break; - case FEED: - addEntity(entity, FalconEvent.OPERATION.ADD_FEED); - break; + case FEED: + addEntity(entity, FalconEvent.OPERATION.ADD_FEED); + break; - default: - LOG.debug("Entity type not processed {}", entityType); + default: + LOG.debug("Entity type not processed {}", entityType); } - } catch(Throwable t) { + } catch (Throwable t) { LOG.warn("Error handling entity {}", entity, t); } } @@ -91,26 +90,26 @@ public class AtlasService implements FalconService, ConfigurationChangeListener public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { /** * Skipping update for now - update uses full update currently and this might result in all attributes wiped for hive entities - EntityType entityType = newEntity.getEntityType(); - switch (entityType) { - case CLUSTER: - addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER); - break; - - case PROCESS: - addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); - break; - - case FEED: - FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ? - FalconEvent.OPERATION.UPDATE_REPLICATION_FEED : - FalconEvent.OPERATION.UPDATE_FEED; - addEntity(newEntity, operation); - break; - - default: - LOG.debug("Entity type not processed {}", entityType); - } + EntityType entityType = newEntity.getEntityType(); + switch (entityType) { + case CLUSTER: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER); + break; + + case PROCESS: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); + break; + + case FEED: + FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ? + FalconEvent.OPERATION.UPDATE_REPLICATION_FEED : + FalconEvent.OPERATION.UPDATE_FEED; + addEntity(newEntity, operation); + break; + + default: + LOG.debug("Entity type not processed {}", entityType); + } **/ } @@ -124,9 +123,9 @@ public class AtlasService implements FalconService, ConfigurationChangeListener LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName()); try { - FalconEvent event = - new FalconEvent(EventUtil.getUser(), operation, entity); - FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); + FalconEvent event = new FalconEvent(EventUtil.getUser(), operation, entity); + FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); + publisher.publish(data); } catch (Exception ex) { throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java similarity index 86% rename from addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java rename to addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java index ef5634009..bcf838ca2 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.atlas.falcon.Util; +package org.apache.atlas.falcon.util; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; @@ -29,24 +29,24 @@ import java.util.Map; * Falcon event util */ public final class EventUtil { - private EventUtil() {} - public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) { if (StringUtils.isBlank(keyValueString)) { return null; } Map<String, String> keyValueMap = new HashMap<>(); + String[] tags = keyValueString.split(","); - String[] tags = keyValueString.split(","); for (String tag : tags) { - int index = tag.indexOf("="); - String tagKey = tag.substring(0, index).trim(); + int index = tag.indexOf("="); + String tagKey = tag.substring(0, index).trim(); String tagValue = tag.substring(index + 1, tag.length()).trim(); + keyValueMap.put(tagKey, tagValue); } + return keyValueMap; } @@ -56,6 +56,7 @@ public final class EventUtil { } catch (Exception ioe) { //Ignore is failed to get user, uses login user } + return null; } } diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index e77f4c96d..c96479dad 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,17 +23,17 @@ import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.falcon.service.AtlasService; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.ParamChecker; +import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; -import org.apache.atlas.falcon.service.AtlasService; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.FileSystemStorage; import org.apache.falcon.entity.store.ConfigurationStore; @@ -50,6 +50,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import javax.xml.bind.JAXBException; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -61,33 +62,145 @@ import static org.testng.Assert.fail; public class FalconHookIT { public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class); - public static final String CLUSTER_RESOURCE = "/cluster.xml"; - public static final String FEED_RESOURCE = "/feed.xml"; - public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml"; + public static final String CLUSTER_RESOURCE = "/cluster.xml"; + public static final String FEED_RESOURCE = "/feed.xml"; + public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml"; public static final String FEED_REPLICATION_RESOURCE = "/feed-replication.xml"; - public static final String PROCESS_RESOURCE = "/process.xml"; - - private AtlasClientV2 atlasClient; + public static final String PROCESS_RESOURCE = "/process.xml"; private static final ConfigurationStore STORE = ConfigurationStore.get(); + private AtlasClientV2 atlasClient; + @BeforeClass public void setUp() throws Exception { Configuration atlasProperties = ApplicationProperties.get(); + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[]{"admin", "admin"}); + atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[] {"admin", "admin"}); } else { atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT)); } AtlasService service = new AtlasService(); + service.init(); STORE.registerListener(service); CurrentUser.authenticate(System.getProperty("user.name")); } + @Test + public void testCreateProcess() throws Exception { + Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, cluster); + + assertClusterIsRegistered(cluster); + + Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null); + String infeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName()))).getGuid(); + Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String outFeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); + Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); + + process.getClusters().getClusters().get(0).setName(cluster.getName()); + process.getInputs().getInputs().get(0).setFeed(infeed.getName()); + process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); + + STORE.publish(EntityType.PROCESS, process); + + String pid = assertProcessIsRegistered(process, cluster.getName()); + AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); + + assertNotNull(processEntity); + assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); + assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)), infeedId); + assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), outFeedId); + } + + @Test + public void testReplicationFeed() throws Exception { + Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, srcCluster); + + assertClusterIsRegistered(srcCluster); + + Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, targetCluster); + + assertClusterIsRegistered(targetCluster); + + Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName()); + String inId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName()))).getGuid(); + String outId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName()))).getGuid(); + String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName()); + AtlasEntity process = atlasClient.getEntityByGuid(processId).getEntity(); + + assertEquals(getGuidFromObjectId(((List<?>) process.getAttribute("inputs")).get(0)), inId); + assertEquals(getGuidFromObjectId(((List<?>) process.getAttribute("outputs")).get(0)), outId); + } + + @Test + public void testCreateProcessWithHDFSFeed() throws Exception { + Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, cluster); + + TypesUtil.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName()); + Feed infeed = result.right; + String infeedId = result.left; + + Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String outfeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); + Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); + + process.getClusters().getClusters().get(0).setName(cluster.getName()); + process.getInputs().getInputs().get(0).setFeed(infeed.getName()); + process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); + + STORE.publish(EntityType.PROCESS, process); + + String pid = assertProcessIsRegistered(process, cluster.getName()); + AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); + + assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); + assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName())); + assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)), infeedId); + assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), outfeedId); + } + + /** + * Wait for a condition, expressed via a {@link Predicate} to become true. + * + * @param timeout maximum time in milliseconds to wait for the predicate to become true. + * @param predicate predicate waiting on. + */ + protected void waitFor(int timeout, Predicate predicate) throws Exception { + ParamChecker.notNull(predicate, "predicate"); + + long mustEnd = System.currentTimeMillis() + timeout; + + while (true) { + try { + predicate.evaluate(); + return; + } catch (Error | Exception e) { + if (System.currentTimeMillis() >= mustEnd) { + fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e); + } + + LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e); + + Thread.sleep(400); + } + } + } + private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException { Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource)); + switch (entity.getEntityType()) { case CLUSTER: ((Cluster) entity).setName(name); @@ -101,7 +214,8 @@ public class FalconHookIT { ((Process) entity).setName(name); break; } - return (T)entity; + + return (T) entity; } private String random() { @@ -112,67 +226,39 @@ public class FalconHookIT { return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName); } - @Test - public void testCreateProcess() throws Exception { - Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, cluster); - assertClusterIsRegistered(cluster); - - Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null); - String infeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName()))).getGuid(); - - Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); - String outFeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); - - Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); - process.getClusters().getClusters().get(0).setName(cluster.getName()); - process.getInputs().getInputs().get(0).setFeed(infeed.getName()); - process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); - STORE.publish(EntityType.PROCESS, process); - - String pid = assertProcessIsRegistered(process, cluster.getName()); - AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); - assertNotNull(processEntity); - assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); - assertEquals(getGuidFromObjectId(((List<?>)processEntity.getAttribute("inputs")).get(0)), infeedId); - assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), outFeedId); - } - private String assertProcessIsRegistered(Process process, String clusterName) throws Exception { - return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getProcessQualifiedName(process.getName(), clusterName)); + return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getProcessQualifiedName(process.getName(), clusterName)); } private String assertClusterIsRegistered(Cluster cluster) throws Exception { - return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName()); + return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName()); } private TypesUtil.Pair<String, Feed> getHDFSFeed(String feedResource, String clusterName) throws Exception { - Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); + Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0); + feedCluster.setName(clusterName); STORE.publish(EntityType.FEED, feed); + String feedId = assertFeedIsRegistered(feed, clusterName); + assertFeedAttributes(feedId); - String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); + String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); AtlasEntity processEntity = atlasClient.getEntityByGuid(processId).getEntity(); + assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), feedId); - String inputId = getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)); + String inputId = getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)); AtlasEntity pathEntity = atlasClient.getEntityByGuid(inputId).getEntity(); + assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH); - List<Location> locations = FeedHelper.getLocations(feedCluster, feed); - Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); - assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), - FalconBridge.normalize(dataLocation.getPath())); + List<Location> locations = FeedHelper.getLocations(feedCluster, feed); + Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); + + assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), FalconBridge.normalize(dataLocation.getPath())); return TypesUtil.Pair.of(feedId, feed); } @@ -182,114 +268,67 @@ public class FalconHookIT { } private Feed getTableFeed(String feedResource, String clusterName, String secondClusterName) throws Exception { - Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); + Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0); + feedCluster.setName(clusterName); - String dbName = "db" + random(); + + String dbName = "db" + random(); String tableName = "table" + random(); + feedCluster.getTable().setUri(getTableUri(dbName, tableName)); - String dbName2 = "db" + random(); + String dbName2 = "db" + random(); String tableName2 = "table" + random(); if (secondClusterName != null) { org.apache.falcon.entity.v0.feed.Cluster feedCluster2 = feed.getClusters().getClusters().get(1); + feedCluster2.setName(secondClusterName); feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2)); } STORE.publish(EntityType.FEED, feed); + String feedId = assertFeedIsRegistered(feed, clusterName); + assertFeedAttributes(feedId); verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName); if (secondClusterName != null) { String feedId2 = assertFeedIsRegistered(feed, secondClusterName); + assertFeedAttributes(feedId2); verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2); } + return feed; } private void assertFeedAttributes(String feedId) throws Exception { AtlasEntity feedEntity = atlasClient.getEntityByGuid(feedId).getEntity(); + assertEquals(feedEntity.getAttribute(AtlasClient.OWNER), "testuser"); assertEquals(feedEntity.getAttribute(FalconBridge.FREQUENCY), "hours(1)"); assertEquals(feedEntity.getAttribute(AtlasClient.DESCRIPTION), "test input"); } - private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName) - throws Exception{ + private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName) throws Exception { //verify that lineage from hive table to falcon feed is created - String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feedName, clusterName)); + String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feedName, clusterName)); AtlasEntity processEntity = atlasClient.getEntityByGuid(processId).getEntity(); + assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), feedId); - String inputId = getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)); + String inputId = getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)); AtlasEntity tableEntity = atlasClient.getEntityByGuid(inputId).getEntity(); - assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName()); - assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), - HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName()); + assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); } private String assertFeedIsRegistered(Feed feed, String clusterName) throws Exception { - return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); - } - - @Test - public void testReplicationFeed() throws Exception { - Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, srcCluster); - assertClusterIsRegistered(srcCluster); - - Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, targetCluster); - assertClusterIsRegistered(targetCluster); - - Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName()); - String inId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName()))).getGuid(); - String outId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName()))).getGuid(); - - - String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName()); - AtlasEntity process = atlasClient.getEntityByGuid(processId).getEntity(); - assertEquals(getGuidFromObjectId(((List<?>) process.getAttribute("inputs")).get(0)), inId); - assertEquals(getGuidFromObjectId(((List<?>) process.getAttribute("outputs")).get(0)), outId); - } - - @Test - public void testCreateProcessWithHDFSFeed() throws Exception { - Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, cluster); - - TypesUtil.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName()); - Feed infeed = result.right; - String infeedId = result.left; - - Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); - String outfeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); - - Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); - process.getClusters().getClusters().get(0).setName(cluster.getName()); - process.getInputs().getInputs().get(0).setFeed(infeed.getName()); - process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); - STORE.publish(EntityType.PROCESS, process); - - String pid = assertProcessIsRegistered(process, cluster.getName()); - AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); - assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); - assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), - FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName())); - assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("inputs")).get(0)), infeedId); - assertEquals(getGuidFromObjectId(((List<?>) processEntity.getAttribute("outputs")).get(0)), outfeedId); + return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); } private String assertEntityIsRegistered(final String typeName, final String property, final String value) throws Exception { @@ -297,10 +336,12 @@ public class FalconHookIT { @Override public void evaluate() throws Exception { AtlasEntity.AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value)); + assertNotNull(entity); assertNotNull(entity.getEntity()); } }); + return atlasClient.getEntityHeaderByAttribute(typeName, Collections.singletonMap(property, value)).getGuid(); } @@ -323,28 +364,4 @@ public class FalconHookIT { */ void evaluate() throws Exception; } - - /** - * Wait for a condition, expressed via a {@link Predicate} to become true. - * - * @param timeout maximum time in milliseconds to wait for the predicate to become true. - * @param predicate predicate waiting on. - */ - protected void waitFor(int timeout, Predicate predicate) throws Exception { - ParamChecker.notNull(predicate, "predicate"); - long mustEnd = System.currentTimeMillis() + timeout; - - while (true) { - try { - predicate.evaluate(); - return; - } catch(Error | Exception e) { - if (System.currentTimeMillis() >= mustEnd) { - fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e); - } - LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e); - Thread.sleep(400); - } - } - } } diff --git a/addons/falcon-bridge/src/test/resources/atlas-application.properties b/addons/falcon-bridge/src/test/resources/atlas-application.properties index 0ce0f46c9..94a75aab4 100644 --- a/addons/falcon-bridge/src/test/resources/atlas-application.properties +++ b/addons/falcon-bridge/src/test/resources/atlas-application.properties @@ -15,54 +15,39 @@ # See the License for the specific language governing permissions and # limitations under the License. # - ######### Atlas Server Configs ######### atlas.rest.address=http://localhost:31000 - ######### Graph Database Configs ######### - - # Graph database implementation. Value inserted by maven. atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase atlas.graph.index.search.solr.wait-searcher=true - # Graph Storage atlas.graph.storage.backend=berkeleyje - # Entity repository implementation atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository - # Graph Search Index Backend atlas.graph.index.search.backend=solr - #Berkeley storage directory atlas.graph.storage.directory=${sys:atlas.data}/berkley atlas.graph.storage.transactions=true - #hbase #For standalone mode , specify localhost #for distributed mode, specify zookeeper quorum here - atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.lock.wait-time=10000 - #ElasticSearch atlas.graph.index.search.directory=${sys:atlas.data}/es atlas.graph.index.search.elasticsearch.client-only=false atlas.graph.index.search.elasticsearch.local-mode=true atlas.graph.index.search.elasticsearch.create.sleep=2000 - # Solr cloud mode properties atlas.graph.index.search.solr.mode=cloud atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} atlas.graph.index.search.solr.embedded=true atlas.graph.index.search.max-result-set-size=150 - - ######### Notification Configs ######### atlas.notification.embedded=true - atlas.kafka.zookeeper.connect=localhost:19026 atlas.kafka.bootstrap.servers=localhost:19027 atlas.kafka.data=${sys:atlas.data}/kafka @@ -73,52 +58,38 @@ atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.hook.group.id=atlas atlas.kafka.entities.group.id=atlas_entities #atlas.kafka.auto.commit.enable=false - atlas.kafka.enable.auto.commit=false atlas.kafka.auto.offset.reset=earliest atlas.kafka.session.timeout.ms=30000 atlas.kafka.offsets.topic.replication.factor=1 - - - ######### Entity Audit Configs ######### atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS atlas.audit.zookeeper.session.timeout.ms=1000 atlas.audit.hbase.zookeeper.quorum=localhost atlas.audit.hbase.zookeeper.property.clientPort=19026 - ######### Security Properties ######### - # SSL config atlas.enableTLS=false atlas.server.https.port=31443 - ######### Security Properties ######### - hbase.security.authentication=simple - atlas.hook.falcon.synchronous=true - ######### JAAS Configuration ######## - -atlas.jaas.KafkaClient.loginModuleName = com.sun.security.auth.module.Krb5LoginModule -atlas.jaas.KafkaClient.loginModuleControlFlag = required -atlas.jaas.KafkaClient.option.useKeyTab = true -atlas.jaas.KafkaClient.option.storeKey = true -atlas.jaas.KafkaClient.option.serviceName = kafka -atlas.jaas.KafkaClient.option.keyTab = /etc/security/keytabs/atlas.service.keytab -atlas.jaas.KafkaClient.option.principal = atlas/_h...@example.com - +atlas.jaas.KafkaClient.loginModuleName=com.sun.security.auth.module.Krb5LoginModule +atlas.jaas.KafkaClient.loginModuleControlFlag=required +atlas.jaas.KafkaClient.option.useKeyTab=true +atlas.jaas.KafkaClient.option.storeKey=true +atlas.jaas.KafkaClient.option.serviceName=kafka +atlas.jaas.KafkaClient.option.keyTab=/etc/security/keytabs/atlas.service.keytab +atlas.jaas.KafkaClient.option.principal=atlas/_h...@example.com ######### High Availability Configuration ######## atlas.server.ha.enabled=false #atlas.server.ids=id1 #atlas.server.address.id1=localhost:21000 - ######### Atlas Authorization ######### atlas.authorizer.impl=none # atlas.authorizer.impl=simple # atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json - ######### Atlas Authentication ######### atlas.authentication.method.file=true atlas.authentication.method.ldap.type=none diff --git a/addons/falcon-bridge/src/test/resources/atlas-logback.xml b/addons/falcon-bridge/src/test/resources/atlas-logback.xml index 78fd420dc..991cb621d 100755 --- a/addons/falcon-bridge/src/test/resources/atlas-logback.xml +++ b/addons/falcon-bridge/src/test/resources/atlas-logback.xml @@ -18,115 +18,114 @@ --> <configuration> - <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> - <param name="Target" value="System.out"/> - <encoder> - <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> - </encoder> - <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> - <level>INFO</level> - </filter> - </appender> - - <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>${atlas.log.dir}/${atlas.log.file}</file> - <append>true</append> - <encoder> - <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> - </encoder> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern> - <maxHistory>20</maxHistory> - <cleanHistoryOnStart>true</cleanHistoryOnStart> - </rollingPolicy> - </appender> - - <appender name="AUDIT" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>${atlas.log.dir}/audit.log</file> - <append>true</append> - <encoder> - <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> - </encoder> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${atlas.log.dir}/audit-%d.log</fileNamePattern> - <maxHistory>20</maxHistory> - <cleanHistoryOnStart>false</cleanHistoryOnStart> - </rollingPolicy> - </appender> - - <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>${atlas.log.dir}/metrics.log</file> - <append>true</append> - <encoder> - <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> - </encoder> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${atlas.log.dir}/metrics-%d.log</fileNamePattern> - <maxHistory>20</maxHistory> - <cleanHistoryOnStart>false</cleanHistoryOnStart> - </rollingPolicy> - </appender> - - <appender name="FAILED" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>${atlas.log.dir}/failed.log</file> - <append>true</append> - <encoder> - <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> - </encoder> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${atlas.log.dir}/failed-%d.log</fileNamePattern> - <maxHistory>20</maxHistory> - <cleanHistoryOnStart>false</cleanHistoryOnStart> - </rollingPolicy> - </appender> - - <logger name="org.apache.atlas" additivity="false" level="info"> - <appender-ref ref="FILE"/> - </logger> - - <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false" level="debug"> - <appender-ref ref="FILE"/> - </logger> - - <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" additivity="false" level="debug"> - <appender-ref ref="FILE"/> - </logger> - - <logger name="org.janusgraph" additivity="false" level="warn"> - <appender-ref ref="FILE"/> - </logger> - - <logger name="org.springframework" additivity="false" level="warn"> - <appender-ref ref="console"/> - </logger> - - <logger name="org.eclipse" additivity="false" level="warn"> - <appender-ref ref="console"/> - </logger> - - <logger name="com.sun.jersey" additivity="false" level="warn"> - <appender-ref ref="console"/> - </logger> - - - <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config --> - <logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false" level="error"> - <appender-ref ref="FILE"/> - </logger> - - <logger name="METRICS" additivity="false" level="debug"> - <appender-ref ref="METRICS"/> - </logger> - - <logger name="FAILED" additivity="false" level="info"> - <appender-ref ref="FAILED"/> - </logger> - - <logger name="AUDIT" additivity="false" level="info"> - <appender-ref ref="AUDIT"/> - </logger> - - <root level="warn"> - <appender-ref ref="FILE"/> - </root> + <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> + <param name="Target" value="System.out" /> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${atlas.log.dir}/${atlas.log.file}</file> + <append>true</append> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern> + <maxHistory>20</maxHistory> + <cleanHistoryOnStart>true</cleanHistoryOnStart> + </rollingPolicy> + </appender> + + <appender name="AUDIT" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${atlas.log.dir}/audit.log</file> + <append>true</append> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${atlas.log.dir}/audit-%d.log</fileNamePattern> + <maxHistory>20</maxHistory> + <cleanHistoryOnStart>false</cleanHistoryOnStart> + </rollingPolicy> + </appender> + + <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${atlas.log.dir}/metrics.log</file> + <append>true</append> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${atlas.log.dir}/metrics-%d.log</fileNamePattern> + <maxHistory>20</maxHistory> + <cleanHistoryOnStart>false</cleanHistoryOnStart> + </rollingPolicy> + </appender> + + <appender name="FAILED" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${atlas.log.dir}/failed.log</file> + <append>true</append> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${atlas.log.dir}/failed-%d.log</fileNamePattern> + <maxHistory>20</maxHistory> + <cleanHistoryOnStart>false</cleanHistoryOnStart> + </rollingPolicy> + </appender> + + <logger name="org.apache.atlas" additivity="false" level="info"> + <appender-ref ref="FILE" /> + </logger> + + <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false" level="debug"> + <appender-ref ref="FILE" /> + </logger> + + <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" additivity="false" level="debug"> + <appender-ref ref="FILE" /> + </logger> + + <logger name="org.janusgraph" additivity="false" level="warn"> + <appender-ref ref="FILE" /> + </logger> + + <logger name="org.springframework" additivity="false" level="warn"> + <appender-ref ref="console" /> + </logger> + + <logger name="org.eclipse" additivity="false" level="warn"> + <appender-ref ref="console" /> + </logger> + + <logger name="com.sun.jersey" additivity="false" level="warn"> + <appender-ref ref="console" /> + </logger> + + <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config --> + <logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false" level="error"> + <appender-ref ref="FILE" /> + </logger> + + <logger name="METRICS" additivity="false" level="debug"> + <appender-ref ref="METRICS" /> + </logger> + + <logger name="FAILED" additivity="false" level="info"> + <appender-ref ref="FAILED" /> + </logger> + + <logger name="AUDIT" additivity="false" level="info"> + <appender-ref ref="AUDIT" /> + </logger> + + <root level="warn"> + <appender-ref ref="FILE" /> + </root> </configuration> diff --git a/addons/falcon-bridge/src/test/resources/cluster.xml b/addons/falcon-bridge/src/test/resources/cluster.xml index b183847db..b7bbda74c 100644 --- a/addons/falcon-bridge/src/test/resources/cluster.xml +++ b/addons/falcon-bridge/src/test/resources/cluster.xml @@ -20,7 +20,7 @@ Primary cluster configuration for demo vm --> <cluster colo="west-coast" description="Primary Cluster" name="testcluster" - xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + xmlns="uri:falcon:cluster:0.1"> <interfaces> <interface type="readonly" endpoint="hftp://localhost:10070" version="1.1.1" /> diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml index 435db0745..582998a3c 100644 --- a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml +++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml @@ -21,19 +21,19 @@ <frequency>hours(1)</frequency> <timezone>UTC</timezone> - <late-arrival cut-off="hours(3)"/> + <late-arrival cut-off="hours(3)" /> <clusters> <cluster name="testcluster" type="source"> - <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> - <retention limit="hours(24)" action="delete"/> + <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" /> + <retention limit="hours(24)" action="delete" /> </cluster> </clusters> <locations> - <location type="data" path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/> + <location type="data" path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}" /> </locations> - <ACL owner="testuser" group="group" permission="0x755"/> - <schema location="hcat" provider="hcat"/> + <ACL owner="testuser" group="group" permission="0x755" /> + <schema location="hcat" provider="hcat" /> </feed> diff --git a/addons/falcon-bridge/src/test/resources/feed-replication.xml b/addons/falcon-bridge/src/test/resources/feed-replication.xml index dcd427b18..42c4f863f 100644 --- a/addons/falcon-bridge/src/test/resources/feed-replication.xml +++ b/addons/falcon-bridge/src/test/resources/feed-replication.xml @@ -21,23 +21,23 @@ <frequency>hours(1)</frequency> <timezone>UTC</timezone> - <late-arrival cut-off="hours(3)"/> + <late-arrival cut-off="hours(3)" /> <clusters> <cluster name="testcluster" type="source"> - <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> - <retention limit="hours(24)" action="delete"/> + <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" /> + <retention limit="hours(24)" action="delete" /> <table uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> </cluster> <cluster name="testcluster" type="target"> - <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> - <retention limit="hours(24)" action="delete"/> + <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" /> + <retention limit="hours(24)" action="delete" /> <table uri="catalog:outdb:outtable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> </cluster> </clusters> <table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> - <ACL owner="testuser" group="group" permission="0x755"/> - <schema location="hcat" provider="hcat"/> + <ACL owner="testuser" group="group" permission="0x755" /> + <schema location="hcat" provider="hcat" /> </feed> diff --git a/addons/falcon-bridge/src/test/resources/feed.xml b/addons/falcon-bridge/src/test/resources/feed.xml index 473c745ce..f58316b77 100644 --- a/addons/falcon-bridge/src/test/resources/feed.xml +++ b/addons/falcon-bridge/src/test/resources/feed.xml @@ -21,18 +21,18 @@ <frequency>hours(1)</frequency> <timezone>UTC</timezone> - <late-arrival cut-off="hours(3)"/> + <late-arrival cut-off="hours(3)" /> <clusters> <cluster name="testcluster" type="source"> - <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> - <retention limit="hours(24)" action="delete"/> + <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" /> + <retention limit="hours(24)" action="delete" /> <table uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> </cluster> </clusters> <table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> - <ACL owner="testuser" group="group" permission="0x755"/> - <schema location="hcat" provider="hcat"/> + <ACL owner="testuser" group="group" permission="0x755" /> + <schema location="hcat" provider="hcat" /> </feed> diff --git a/addons/falcon-bridge/src/test/resources/process.xml b/addons/falcon-bridge/src/test/resources/process.xml index b94d0a847..62e7542f1 100644 --- a/addons/falcon-bridge/src/test/resources/process.xml +++ b/addons/falcon-bridge/src/test/resources/process.xml @@ -22,7 +22,7 @@ <clusters> <cluster name="testcluster"> - <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/> + <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z" /> </cluster> </clusters> @@ -32,22 +32,22 @@ <timezone>UTC</timezone> <inputs> - <input end="today(0,0)" start="today(0,0)" feed="testinput" name="input"/> + <input end="today(0,0)" start="today(0,0)" feed="testinput" name="input" /> </inputs> <outputs> - <output instance="now(0,0)" feed="testoutput" name="output"/> + <output instance="now(0,0)" feed="testoutput" name="output" /> </outputs> <properties> - <property name="blah" value="blah"/> + <property name="blah" value="blah" /> </properties> - <workflow engine="hive" path="/falcon/test/apps/hive/script.hql"/> + <workflow engine="hive" path="/falcon/test/apps/hive/script.hql" /> - <retry policy="periodic" delay="minutes(10)" attempts="3"/> + <retry policy="periodic" delay="minutes(10)" attempts="3" /> <late-process policy="exp-backoff" delay="hours(2)"> - <late-input input="input" workflow-path="/falcon/test/workflow"/> + <late-input input="input" workflow-path="/falcon/test/workflow" /> </late-process> </process> diff --git a/addons/falcon-bridge/src/test/resources/startup.properties b/addons/falcon-bridge/src/test/resources/startup.properties index 962347039..661a2be89 100644 --- a/addons/falcon-bridge/src/test/resources/startup.properties +++ b/addons/falcon-bridge/src/test/resources/startup.properties @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - *.domain=debug *.config.store.persist=false *.config.store.uri=target/config_store \ No newline at end of file