This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/atlas-2.5 by this push:
     new c423f297e ATLAS-4971: falcon-bridge, falcon-bridge-shim modules: 
update for cod… (#296)
c423f297e is described below

commit c423f297e805c96fc421d5ae53f7a156b857b881
Author: sheetalshah1007 <sheetal.s...@freestoneinfotech.com>
AuthorDate: Thu Feb 20 10:03:47 2025 +0530

    ATLAS-4971: falcon-bridge, falcon-bridge-shim modules: update for cod… 
(#296)
    
    (cherry picked from commit 0901f6d816b7a5e2944a5ab52ec6cf61be5b4a46)
---
 addons/falcon-bridge-shim/pom.xml                  |   4 +
 .../apache/atlas/falcon/service/AtlasService.java  |  81 ++----
 addons/falcon-bridge/pom.xml                       |   5 +
 .../apache/atlas/falcon/bridge/FalconBridge.java   | 238 ++++++++--------
 .../org/apache/atlas/falcon/event/FalconEvent.java |  32 +--
 .../org/apache/atlas/falcon/hook/FalconHook.java   | 108 ++++----
 .../apache/atlas/falcon/model/FalconDataTypes.java |   7 +-
 .../falcon/publisher/FalconEventPublisher.java     |  11 +-
 .../apache/atlas/falcon/service/AtlasService.java  |  77 +++---
 .../atlas/falcon/{Util => util}/EventUtil.java     |  19 +-
 .../org/apache/atlas/falcon/hook/FalconHookIT.java | 307 +++++++++++----------
 .../test/resources/atlas-application.properties    |  43 +--
 .../src/test/resources/atlas-logback.xml           | 221 ++++++++-------
 .../falcon-bridge/src/test/resources/cluster.xml   |   2 +-
 .../falcon-bridge/src/test/resources/feed-hdfs.xml |  12 +-
 .../src/test/resources/feed-replication.xml        |  14 +-
 addons/falcon-bridge/src/test/resources/feed.xml   |  10 +-
 .../falcon-bridge/src/test/resources/process.xml   |  14 +-
 .../src/test/resources/startup.properties          |   1 -
 19 files changed, 592 insertions(+), 614 deletions(-)

diff --git a/addons/falcon-bridge-shim/pom.xml 
b/addons/falcon-bridge-shim/pom.xml
index 1ca461514..df0456f01 100755
--- a/addons/falcon-bridge-shim/pom.xml
+++ b/addons/falcon-bridge-shim/pom.xml
@@ -32,6 +32,10 @@
     <name>Apache Atlas Falcon Bridge Shim</name>
     <description>Apache Atlas Falcon Bridge Shim Module</description>
 
+    <properties>
+        <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+        <checkstyle.skip>false</checkstyle.skip>
+    </properties>
     <dependencies>
         <!-- Logging -->
         <dependency>
diff --git 
a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
 
b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
index 2b756de0e..7b464ab33 100755
--- 
a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
+++ 
b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.falcon.service;
 
-
 import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -34,12 +33,12 @@ import org.slf4j.LoggerFactory;
 public class AtlasService implements FalconService, 
ConfigurationChangeListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasService.class);
 
-    private static final String ATLAS_PLUGIN_TYPE = "falcon";
+    private static final String ATLAS_PLUGIN_TYPE                = "falcon";
     private static final String ATLAS_FALCON_HOOK_IMPL_CLASSNAME = 
"org.apache.atlas.falcon.service.AtlasService";
 
-    private AtlasPluginClassLoader atlasPluginClassLoader = null;
-    private FalconService falconServiceImpl = null;
-    private ConfigurationChangeListener configChangeListenerImpl = null;
+    private AtlasPluginClassLoader      atlasPluginClassLoader;
+    private FalconService               falconServiceImpl;
+    private ConfigurationChangeListener configChangeListenerImpl;
 
     public AtlasService() {
         this.initialize();
@@ -47,9 +46,7 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
 
     @Override
     public String getName() {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.getName()");
-        }
+        LOG.debug("==> AtlasService.getName()");
 
         String ret = null;
 
@@ -60,18 +57,14 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.getName()");
-        }
+        LOG.debug("<== AtlasService.getName()");
 
         return ret;
     }
 
     @Override
     public void init() throws FalconException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.init()");
-        }
+        LOG.debug("==> AtlasService.init()");
 
         try {
             activatePluginClassLoader();
@@ -83,16 +76,12 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.init()");
-        }
+        LOG.debug("<== AtlasService.init()");
     }
 
     @Override
     public void destroy() throws FalconException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.destroy()");
-        }
+        LOG.debug("==> AtlasService.destroy()");
 
         try {
             activatePluginClassLoader();
@@ -104,16 +93,12 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.destroy()");
-        }
+        LOG.debug("<== AtlasService.destroy()");
     }
 
     @Override
     public void onAdd(Entity entity) throws FalconException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.onAdd({})", entity);
-        }
+        LOG.debug("==> AtlasService.onAdd({})", entity);
 
         try {
             activatePluginClassLoader();
@@ -122,16 +107,12 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.onAdd({})", entity);
-        }
+        LOG.debug("<== AtlasService.onAdd({})", entity);
     }
 
     @Override
     public void onRemove(Entity entity) throws FalconException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.onRemove({})", entity);
-        }
+        LOG.debug("==> AtlasService.onRemove({})", entity);
 
         try {
             activatePluginClassLoader();
@@ -140,16 +121,12 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.onRemove({})", entity);
-        }
+        LOG.debug("<== AtlasService.onRemove({})", entity);
     }
 
     @Override
     public void onChange(Entity entity, Entity entity1) throws FalconException 
{
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1);
-        }
+        LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1);
 
         try {
             activatePluginClassLoader();
@@ -158,16 +135,12 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1);
-        }
+        LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1);
     }
 
     @Override
     public void onReload(Entity entity) throws FalconException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.onReload({})", entity);
-        }
+        LOG.debug("==> AtlasService.onReload({})", entity);
 
         try {
             activatePluginClassLoader();
@@ -176,15 +149,11 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.onReload({})", entity);
-        }
+        LOG.debug("<== AtlasService.onReload({})", entity);
     }
 
     private void initialize() {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasService.initialize()");
-        }
+        LOG.debug("==> AtlasService.initialize()");
 
         try {
             atlasPluginClassLoader = 
AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass());
@@ -195,7 +164,7 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
 
             Object atlasService = cls.newInstance();
 
-            falconServiceImpl = (FalconService) atlasService;
+            falconServiceImpl        = (FalconService) atlasService;
             configChangeListenerImpl = (ConfigurationChangeListener) 
atlasService;
         } catch (Exception excp) {
             LOG.error("Error instantiating Atlas hook implementation", excp);
@@ -203,9 +172,7 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
             deactivatePluginClassLoader();
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasService.initialize()");
-        }
+        LOG.debug("<== AtlasService.initialize()");
     }
 
     private void activatePluginClassLoader() {
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index b29837613..eca232e4d 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -32,6 +32,11 @@
     <name>Apache Atlas Falcon Bridge</name>
     <description>Apache Atlas Falcon Bridge Module</description>
 
+    <properties>
+        <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+        <checkstyle.skip>false</checkstyle.skip>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.atlas</groupId>
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
index cbf002f4f..f21dd17c3 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
@@ -20,8 +20,8 @@ package org.apache.atlas.falcon.bridge;
 
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
-import org.apache.atlas.falcon.Util.EventUtil;
 import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.falcon.util.EventUtil;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.v1.model.instance.Referenceable;
@@ -60,15 +60,18 @@ import java.util.Map;
 public class FalconBridge {
     private static final Logger LOG = 
LoggerFactory.getLogger(FalconBridge.class);
 
-    public static final String COLO = "colo";
-    public static final String TAGS = "tags";
-    public static final String GROUPS = "groups";
-    public static final String PIPELINES = "pipelines";
-    public static final String WFPROPERTIES = "workflow-properties";
-    public static final String RUNSON = "runs-on";
-    public static final String STOREDIN = "stored-in";
-    public static final String FREQUENCY = "frequency";
-    public static final String ATTRIBUTE_DB = "db";
+    public static final  String COLO         = "colo";
+    public static final  String TAGS         = "tags";
+    public static final  String GROUPS       = "groups";
+    public static final  String PIPELINES    = "pipelines";
+    public static final  String WFPROPERTIES = "workflow-properties";
+    public static final  String RUNSON       = "runs-on";
+    public static final  String STOREDIN     = "stored-in";
+    public static final  String FREQUENCY    = "frequency";
+    public static final  String ATTRIBUTE_DB = "db";
+
+    private FalconBridge() {
+    }
 
     /**
      * Creates cluster entity
@@ -92,75 +95,49 @@ public class FalconBridge {
         }
 
         if (StringUtils.isNotEmpty(cluster.getTags())) {
-            clusterRef.set(FalconBridge.TAGS,
-                    EventUtil.convertKeyValueStringToMap(cluster.getTags()));
+            clusterRef.set(FalconBridge.TAGS, 
EventUtil.convertKeyValueStringToMap(cluster.getTags()));
         }
 
         return clusterRef;
     }
 
-    private static Referenceable createFeedEntity(Feed feed, Referenceable 
clusterReferenceable) {
-        LOG.info("Creating feed dataset: {}", feed.getName());
-
-        Referenceable feedEntity = new 
Referenceable(FalconDataTypes.FALCON_FEED.getName());
-        feedEntity.set(AtlasClient.NAME, feed.getName());
-        feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription());
-        String feedQualifiedName =
-                getFeedQualifiedName(feed.getName(), (String) 
clusterReferenceable.get(AtlasClient.NAME));
-        feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
feedQualifiedName);
-        feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString());
-        feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable);
-        if (feed.getACL() != null) {
-            feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner());
-        }
-
-        if (StringUtils.isNotEmpty(feed.getTags())) {
-            feedEntity.set(FalconBridge.TAGS,
-                    EventUtil.convertKeyValueStringToMap(feed.getTags()));
-        }
-
-        if (feed.getGroups() != null) {
-            feedEntity.set(FalconBridge.GROUPS, feed.getGroups());
-        }
-
-        return feedEntity;
-    }
-
     public static List<Referenceable> createFeedCreationEntity(Feed feed, 
ConfigurationStore falconStore) throws FalconException, URISyntaxException {
         LOG.info("Creating feed : {}", feed.getName());
 
         List<Referenceable> entities = new ArrayList<>();
 
         if (feed.getClusters() != null) {
-            List<Referenceable> replicationInputs = new ArrayList<>();
+            List<Referenceable> replicationInputs  = new ArrayList<>();
             List<Referenceable> replicationOutputs = new ArrayList<>();
 
             for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : 
feed.getClusters().getClusters()) {
-                org.apache.falcon.entity.v0.cluster.Cluster cluster = 
falconStore.get(EntityType.CLUSTER,
-                        feedCluster.getName());
+                org.apache.falcon.entity.v0.cluster.Cluster cluster = 
falconStore.get(EntityType.CLUSTER, feedCluster.getName());
 
                 // set cluster
                 Referenceable clusterReferenceable = 
getClusterEntityReference(cluster.getName(), cluster.getColo());
+
                 entities.add(clusterReferenceable);
 
                 // input as hive_table or hdfs_path, output as falcon_feed 
dataset
-                List<Referenceable> inputs = new ArrayList<>();
+                List<Referenceable> inputs              = new ArrayList<>();
                 List<Referenceable> inputReferenceables = 
getInputEntities(cluster, feed);
+
                 if (inputReferenceables != null) {
                     entities.addAll(inputReferenceables);
                     
inputs.add(inputReferenceables.get(inputReferenceables.size() - 1));
                 }
 
-                List<Referenceable> outputs = new ArrayList<>();
-                Referenceable feedEntity = createFeedEntity(feed, 
clusterReferenceable);
+                List<Referenceable> outputs    = new ArrayList<>();
+                Referenceable       feedEntity = createFeedEntity(feed, 
clusterReferenceable);
+
                 if (feedEntity != null) {
                     entities.add(feedEntity);
                     outputs.add(feedEntity);
                 }
 
                 if (!inputs.isEmpty() || !outputs.isEmpty()) {
-                    Referenceable feedCreateEntity = new 
Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName());
-                    String feedQualifiedName = 
getFeedQualifiedName(feed.getName(), cluster.getName());
+                    Referenceable feedCreateEntity  = new 
Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName());
+                    String        feedQualifiedName = 
getFeedQualifiedName(feed.getName(), cluster.getName());
 
                     feedCreateEntity.set(AtlasClient.NAME, feed.getName());
                     feedCreateEntity.set(AtlasClient.DESCRIPTION, "Feed 
creation - " + feed.getName());
@@ -169,6 +146,7 @@ public class FalconBridge {
                     if (!inputs.isEmpty()) {
                         
feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs);
                     }
+
                     if (!outputs.isEmpty()) {
                         
feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs);
                     }
@@ -185,32 +163,29 @@ public class FalconBridge {
             }
 
             if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) {
-                Referenceable feedReplicationEntity = new 
Referenceable(FalconDataTypes
-                        .FALCON_FEED_REPLICATION.getName());
+                Referenceable feedReplicationEntity = new 
Referenceable(FalconDataTypes.FALCON_FEED_REPLICATION.getName());
 
                 feedReplicationEntity.set(AtlasClient.NAME, feed.getName());
                 
feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
feed.getName());
-
                 
feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, 
replicationInputs);
                 
feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, 
replicationOutputs);
+
                 entities.add(feedReplicationEntity);
             }
-
         }
+
         return entities;
     }
 
     /**
      * Creates process entity
-     * 
+     *
      * @param process process entity
      * @param falconStore config store
      * @return process instance reference
-     *
      * @throws FalconException if retrieving from the configuration store fail
      */
-    public static List<Referenceable> 
createProcessEntity(org.apache.falcon.entity.v0.process.Process process,
-                                                          ConfigurationStore 
falconStore) throws FalconException {
+    public static List<Referenceable> 
createProcessEntity(org.apache.falcon.entity.v0.process.Process process, 
ConfigurationStore falconStore) throws FalconException {
         LOG.info("Creating process Entity : {}", process.getName());
 
         // The requirement is for each cluster, create a process entity with 
name
@@ -218,44 +193,47 @@ public class FalconBridge {
         List<Referenceable> entities = new ArrayList<>();
 
         if (process.getClusters() != null) {
-
             for (Cluster processCluster : process.getClusters().getClusters()) 
{
-                org.apache.falcon.entity.v0.cluster.Cluster cluster =
-                        falconStore.get(EntityType.CLUSTER, 
processCluster.getName());
-                Referenceable clusterReferenceable = 
getClusterEntityReference(cluster.getName(), cluster.getColo());
+                org.apache.falcon.entity.v0.cluster.Cluster cluster            
  = falconStore.get(EntityType.CLUSTER, processCluster.getName());
+                Referenceable                               
clusterReferenceable = getClusterEntityReference(cluster.getName(), 
cluster.getColo());
+
                 entities.add(clusterReferenceable);
 
                 List<Referenceable> inputs = new ArrayList<>();
+
                 if (process.getInputs() != null) {
                     for (Input input : process.getInputs().getInputs()) {
-                        Feed feed = falconStore.get(EntityType.FEED, 
input.getFeed());
+                        Feed          feed               = 
falconStore.get(EntityType.FEED, input.getFeed());
                         Referenceable inputReferenceable = 
getFeedDataSetReference(feed, clusterReferenceable);
+
                         entities.add(inputReferenceable);
                         inputs.add(inputReferenceable);
                     }
                 }
 
                 List<Referenceable> outputs = new ArrayList<>();
+
                 if (process.getOutputs() != null) {
                     for (Output output : process.getOutputs().getOutputs()) {
-                        Feed feed = falconStore.get(EntityType.FEED, 
output.getFeed());
+                        Feed          feed                = 
falconStore.get(EntityType.FEED, output.getFeed());
                         Referenceable outputReferenceable = 
getFeedDataSetReference(feed, clusterReferenceable);
+
                         entities.add(outputReferenceable);
                         outputs.add(outputReferenceable);
                     }
                 }
 
                 if (!inputs.isEmpty() || !outputs.isEmpty()) {
-
                     Referenceable processEntity = new 
Referenceable(FalconDataTypes.FALCON_PROCESS.getName());
+
                     processEntity.set(AtlasClient.NAME, process.getName());
-                    processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                            getProcessQualifiedName(process.getName(), 
cluster.getName()));
+                    
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getProcessQualifiedName(process.getName(), cluster.getName()));
                     processEntity.set(FalconBridge.FREQUENCY, 
process.getFrequency().toString());
 
                     if (!inputs.isEmpty()) {
                         
processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs);
                     }
+
                     if (!outputs.isEmpty()) {
                         
processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs);
                     }
@@ -269,43 +247,89 @@ public class FalconBridge {
                     }
 
                     if (StringUtils.isNotEmpty(process.getTags())) {
-                        processEntity.set(FalconBridge.TAGS,
-                                
EventUtil.convertKeyValueStringToMap(process.getTags()));
+                        processEntity.set(FalconBridge.TAGS, 
EventUtil.convertKeyValueStringToMap(process.getTags()));
                     }
 
                     if (process.getPipelines() != null) {
                         processEntity.set(FalconBridge.PIPELINES, 
process.getPipelines());
                     }
 
-                    processEntity.set(FalconBridge.WFPROPERTIES,
-                            getProcessEntityWFProperties(process.getWorkflow(),
-                                    process.getName()));
+                    processEntity.set(FalconBridge.WFPROPERTIES, 
getProcessEntityWFProperties(process.getWorkflow(), process.getName()));
 
                     entities.add(processEntity);
                 }
-
             }
         }
+
         return entities;
     }
 
-    private static List<Referenceable> 
getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster,
-                                                        Feed feed) throws 
URISyntaxException {
+    public static String getFeedQualifiedName(final String feedName, final 
String clusterName) {
+        return String.format("%s@%s", feedName, clusterName);
+    }
+
+    public static String getProcessQualifiedName(final String processName, 
final String clusterName) {
+        return String.format("%s@%s", processName, clusterName);
+    }
+
+    public static String normalize(final String str) {
+        if (StringUtils.isBlank(str)) {
+            return null;
+        }
+
+        return str.toLowerCase().trim();
+    }
+
+    private static Referenceable createFeedEntity(Feed feed, Referenceable 
clusterReferenceable) {
+        LOG.info("Creating feed dataset: {}", feed.getName());
+
+        Referenceable feedEntity = new 
Referenceable(FalconDataTypes.FALCON_FEED.getName());
+
+        feedEntity.set(AtlasClient.NAME, feed.getName());
+        feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription());
+
+        String feedQualifiedName = getFeedQualifiedName(feed.getName(), 
(String) clusterReferenceable.get(AtlasClient.NAME));
+
+        feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
feedQualifiedName);
+        feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString());
+        feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable);
+
+        if (feed.getACL() != null) {
+            feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner());
+        }
+
+        if (StringUtils.isNotEmpty(feed.getTags())) {
+            feedEntity.set(FalconBridge.TAGS, 
EventUtil.convertKeyValueStringToMap(feed.getTags()));
+        }
+
+        if (feed.getGroups() != null) {
+            feedEntity.set(FalconBridge.GROUPS, feed.getGroups());
+        }
+
+        return feedEntity;
+    }
+
+    private static List<Referenceable> 
getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, Feed 
feed) throws URISyntaxException {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = 
FeedHelper.getCluster(feed, cluster.getName());
 
-        if(feedCluster != null) {
+        if (feedCluster != null) {
             final CatalogTable table = getTable(feedCluster, feed);
+
             if (table != null) {
                 CatalogStorage storage = new CatalogStorage(cluster, table);
-                return createHiveTableInstance(cluster.getName(), 
storage.getDatabase().toLowerCase(),
-                        storage.getTable().toLowerCase());
+
+                return createHiveTableInstance(cluster.getName(), 
storage.getDatabase().toLowerCase(), storage.getTable().toLowerCase());
             } else {
                 List<Location> locations = 
FeedHelper.getLocations(feedCluster, feed);
+
                 if (CollectionUtils.isNotEmpty(locations)) {
                     Location dataLocation = 
FileSystemStorage.getLocation(locations, LocationType.DATA);
+
                     if (dataLocation != null) {
                         final String pathUri = 
normalize(dataLocation.getPath());
+
                         LOG.info("Registering DFS Path {} ", pathUri);
+
                         return fillHDFSDataSet(pathUri, cluster.getName());
                     }
                 }
@@ -326,91 +350,83 @@ public class FalconBridge {
 
     private static List<Referenceable> fillHDFSDataSet(final String pathUri, 
final String clusterName) {
         List<Referenceable> entities = new ArrayList<>();
-        Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);
+        Referenceable       ref      = new 
Referenceable(HiveMetaStoreBridge.HDFS_PATH);
+
         ref.set("path", pathUri);
+
         //        Path path = new Path(pathUri);
         //        ref.set("name", path.getName());
         //TODO - Fix after ATLAS-542 to shorter Name
         Path path = new Path(pathUri);
+
         ref.set(AtlasClient.NAME, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
         ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
         ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+
         entities.add(ref);
+
         return entities;
     }
 
     private static Referenceable createHiveDatabaseInstance(String 
clusterName, String dbName) {
         Referenceable dbRef = new 
Referenceable(HiveDataTypes.HIVE_DB.getName());
+
         dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
         dbRef.set(AtlasClient.NAME, dbName);
-        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
+        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
+
         return dbRef;
     }
 
-    private static List<Referenceable> createHiveTableInstance(String 
clusterName, String dbName,
-                                                               String 
tableName) {
+    private static List<Referenceable> createHiveTableInstance(String 
clusterName, String dbName, String tableName) {
         List<Referenceable> entities = new ArrayList<>();
-        Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
+        Referenceable       dbRef    = createHiveDatabaseInstance(clusterName, 
dbName);
+
         entities.add(dbRef);
 
         Referenceable tableRef = new 
Referenceable(HiveDataTypes.HIVE_TABLE.getName());
-        tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, 
tableName));
+
+        tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
         tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
         tableRef.set(ATTRIBUTE_DB, dbRef);
+
         entities.add(tableRef);
 
         return entities;
     }
 
-    private static Referenceable getClusterEntityReference(final String 
clusterName,
-                                                           final String colo) {
+    private static Referenceable getClusterEntityReference(final String 
clusterName, final String colo) {
         LOG.info("Getting reference for entity {}", clusterName);
+
         Referenceable clusterRef = new 
Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
+
         clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName));
         clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName);
         clusterRef.set(FalconBridge.COLO, colo);
+
         return clusterRef;
     }
 
-
     private static Referenceable getFeedDataSetReference(Feed feed, 
Referenceable clusterReference) {
         LOG.info("Getting reference for entity {}", feed.getName());
+
         Referenceable feedDatasetRef = new 
Referenceable(FalconDataTypes.FALCON_FEED.getName());
+
         feedDatasetRef.set(AtlasClient.NAME, feed.getName());
-        feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getFeedQualifiedName(feed.getName(),
-                (String) 
clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)));
+        feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getFeedQualifiedName(feed.getName(), (String) 
clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)));
         feedDatasetRef.set(FalconBridge.STOREDIN, clusterReference);
         feedDatasetRef.set(FalconBridge.FREQUENCY, feed.getFrequency());
+
         return feedDatasetRef;
     }
 
-    private static Map<String, String> getProcessEntityWFProperties(final 
Workflow workflow,
-                                                                    final 
String processName) {
+    private static Map<String, String> getProcessEntityWFProperties(final 
Workflow workflow, final String processName) {
         Map<String, String> wfProperties = new HashMap<>();
-        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
-                ProcessHelper.getProcessWorkflowName(workflow.getName(), 
processName));
-        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
-                workflow.getVersion());
-        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
-                workflow.getEngine().value());
 
-        return wfProperties;
-    }
-
-    public static String getFeedQualifiedName(final String feedName, final 
String clusterName) {
-        return String.format("%s@%s", feedName, clusterName);
-    }
-
-    public static String getProcessQualifiedName(final String processName, 
final String clusterName) {
-        return String.format("%s@%s", processName, clusterName);
-    }
+        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), 
ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
+        
wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), 
workflow.getVersion());
+        wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), 
workflow.getEngine().value());
 
-    public static String normalize(final String str) {
-        if (StringUtils.isBlank(str)) {
-            return null;
-        }
-        return str.toLowerCase().trim();
+        return wfProperties;
     }
 }
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
index 51db894ab..37dda6ed0 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,23 +24,14 @@ import org.apache.falcon.entity.v0.Entity;
  * Falcon event to interface with Atlas Service.
  */
 public class FalconEvent {
-    protected String user;
+    protected String    user;
     protected OPERATION operation;
-    protected Entity entity;
+    protected Entity    entity;
 
     public FalconEvent(String doAsUser, OPERATION falconOperation, Entity 
entity) {
-        this.user = doAsUser;
+        this.user      = doAsUser;
         this.operation = falconOperation;
-        this.entity = entity;
-    }
-
-    public enum OPERATION {
-        ADD_CLUSTER,
-        UPDATE_CLUSTER,
-        ADD_FEED,
-        UPDATE_FEED,
-        ADD_PROCESS,
-        UPDATE_PROCESS,
+        this.entity    = entity;
     }
 
     public String getUser() {
@@ -54,4 +45,13 @@ public class FalconEvent {
     public Entity getEntity() {
         return entity;
     }
+
+    public enum OPERATION {
+        ADD_CLUSTER,
+        UPDATE_CLUSTER,
+        ADD_FEED,
+        UPDATE_FEED,
+        ADD_PROCESS,
+        UPDATE_PROCESS,
+    }
 }
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index b8a73cbe6..3a0f35d8e 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+
 import static org.apache.atlas.repository.Constants.FALCON_SOURCE;
 
 /**
@@ -43,31 +44,17 @@ import static 
org.apache.atlas.repository.Constants.FALCON_SOURCE;
 public class FalconHook extends AtlasHook implements FalconEventPublisher {
     private static final Logger LOG = 
LoggerFactory.getLogger(FalconHook.class);
 
-    private static ConfigurationStore STORE;
+    private static ConfigurationStore store;
 
     @Override
     public String getMessageSource() {
         return FALCON_SOURCE;
     }
 
-    private enum Operation {
-        ADD,
-        UPDATE
-    }
-
-    static {
-        try {
-            STORE = ConfigurationStore.get();
-        } catch (Exception e) {
-            LOG.error("Caught exception initializing the falcon hook.", e);
-        }
-
-        LOG.info("Created Atlas Hook for Falcon");
-    }
-
     @Override
     public void publish(final Data data) {
         final FalconEvent event = data.getEvent();
+
         try {
             fireAndForget(event);
         } catch (Throwable t) {
@@ -77,17 +64,19 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
 
     private void fireAndForget(FalconEvent event) throws FalconException, 
URISyntaxException {
         LOG.info("Entered Atlas hook for Falcon hook operation {}", 
event.getOperation());
+
         List<HookNotification> messages = new ArrayList<>();
+        Operation              op       = getOperation(event.getOperation());
+        String                 user     = getUser(event.getUser());
 
-        Operation op = getOperation(event.getOperation());
-        String user = getUser(event.getUser());
         LOG.info("fireAndForget user:{}", user);
-        switch (op) {
-        case ADD:
-            messages.add(new EntityCreateRequest(user, createEntities(event, 
user)));
-            break;
 
+        switch (op) {
+            case ADD:
+                messages.add(new EntityCreateRequest(user, 
createEntities(event, user)));
+                break;
         }
+
         notifyEntities(messages, null);
     }
 
@@ -95,24 +84,23 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
         List<Referenceable> entities = new ArrayList<>();
 
         switch (event.getOperation()) {
-        case ADD_CLUSTER:
-            entities.add(FalconBridge
-                    
.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) 
event.getEntity()));
-            break;
-
-        case ADD_PROCESS:
-            entities.addAll(FalconBridge.createProcessEntity((Process) 
event.getEntity(), STORE));
-            break;
-
-        case ADD_FEED:
-            entities.addAll(FalconBridge.createFeedCreationEntity((Feed) 
event.getEntity(), STORE));
-            break;
-
-        case UPDATE_CLUSTER:
-        case UPDATE_FEED:
-        case UPDATE_PROCESS:
-        default:
-            LOG.info("Falcon operation {} is not valid or supported", 
event.getOperation());
+            case ADD_CLUSTER:
+                
entities.add(FalconBridge.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster)
 event.getEntity()));
+                break;
+
+            case ADD_PROCESS:
+                entities.addAll(FalconBridge.createProcessEntity((Process) 
event.getEntity(), store));
+                break;
+
+            case ADD_FEED:
+                entities.addAll(FalconBridge.createFeedCreationEntity((Feed) 
event.getEntity(), store));
+                break;
+
+            case UPDATE_CLUSTER:
+            case UPDATE_FEED:
+            case UPDATE_PROCESS:
+            default:
+                LOG.info("Falcon operation {} is not valid or supported", 
event.getOperation());
         }
 
         return entities;
@@ -120,19 +108,33 @@ public class FalconHook extends AtlasHook implements 
FalconEventPublisher {
 
     private static Operation getOperation(final FalconEvent.OPERATION op) 
throws FalconException {
         switch (op) {
-        case ADD_CLUSTER:
-        case ADD_FEED:
-        case ADD_PROCESS:
-            return Operation.ADD;
-
-        case UPDATE_CLUSTER:
-        case UPDATE_FEED:
-        case UPDATE_PROCESS:
-            return Operation.UPDATE;
-
-        default:
-            throw new FalconException("Falcon operation " + op + " is not 
valid or supported");
+            case ADD_CLUSTER:
+            case ADD_FEED:
+            case ADD_PROCESS:
+                return Operation.ADD;
+
+            case UPDATE_CLUSTER:
+            case UPDATE_FEED:
+            case UPDATE_PROCESS:
+                return Operation.UPDATE;
+
+            default:
+                throw new FalconException("Falcon operation " + op + " is not 
valid or supported");
         }
     }
-}
 
+    private enum Operation {
+        ADD,
+        UPDATE
+    }
+
+    static {
+        try {
+            store = ConfigurationStore.get();
+        } catch (Exception e) {
+            LOG.error("Caught exception initializing the falcon hook.", e);
+        }
+
+        LOG.info("Created Atlas Hook for Falcon");
+    }
+}
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
index e36ff23af..ca1032ddc 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,5 +32,4 @@ public enum FalconDataTypes {
     public String getName() {
         return name().toLowerCase();
     }
-
 }
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
index a01ec14be..a21244304 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,14 @@
 
 package org.apache.atlas.falcon.publisher;
 
-
 import org.apache.atlas.falcon.event.FalconEvent;
 
 /**
  * Falcon publisher for Atlas
  */
 public interface FalconEventPublisher {
+    void publish(Data data);
+
     class Data {
         private FalconEvent event;
 
@@ -36,6 +37,4 @@ public interface FalconEventPublisher {
             return event;
         }
     }
-
-    void publish(final Data data);
 }
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
index 7482ba7b8..e3014a408 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java
@@ -18,10 +18,10 @@
 
 package org.apache.atlas.falcon.service;
 
-import org.apache.atlas.falcon.Util.EventUtil;
 import org.apache.atlas.falcon.event.FalconEvent;
 import org.apache.atlas.falcon.hook.FalconHook;
 import org.apache.atlas.falcon.publisher.FalconEventPublisher;
+import org.apache.atlas.falcon.util.EventUtil;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -30,20 +30,19 @@ import org.apache.falcon.service.FalconService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Atlas service to publish Falcon events
  */
 public class AtlasService implements FalconService, 
ConfigurationChangeListener {
-
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasService.class);
-    private FalconEventPublisher publisher;
 
     /**
      * Constant for the service name.
      */
     public static final String SERVICE_NAME = 
AtlasService.class.getSimpleName();
 
+    private FalconEventPublisher publisher;
+
     @Override
     public String getName() {
         return SERVICE_NAME;
@@ -63,22 +62,22 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
         try {
             EntityType entityType = entity.getEntityType();
             switch (entityType) {
-            case CLUSTER:
-                addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER);
-                break;
+                case CLUSTER:
+                    addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER);
+                    break;
 
-            case PROCESS:
-                addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS);
-                break;
+                case PROCESS:
+                    addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS);
+                    break;
 
-            case FEED:
-                addEntity(entity, FalconEvent.OPERATION.ADD_FEED);
-                break;
+                case FEED:
+                    addEntity(entity, FalconEvent.OPERATION.ADD_FEED);
+                    break;
 
-            default:
-                LOG.debug("Entity type not processed {}", entityType);
+                default:
+                    LOG.debug("Entity type not processed {}", entityType);
             }
-        } catch(Throwable t) {
+        } catch (Throwable t) {
             LOG.warn("Error handling entity {}", entity, t);
         }
     }
@@ -91,26 +90,26 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
     public void onChange(Entity oldEntity, Entity newEntity) throws 
FalconException {
         /**
          * Skipping update for now - update uses full update currently and 
this might result in all attributes wiped for hive entities
-        EntityType entityType = newEntity.getEntityType();
-        switch (entityType) {
-        case CLUSTER:
-            addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER);
-            break;
-
-        case PROCESS:
-            addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
-            break;
-
-        case FEED:
-            FalconEvent.OPERATION operation = isReplicationFeed((Feed) 
newEntity) ?
-                    FalconEvent.OPERATION.UPDATE_REPLICATION_FEED :
-                    FalconEvent.OPERATION.UPDATE_FEED;
-            addEntity(newEntity, operation);
-            break;
-
-        default:
-            LOG.debug("Entity type not processed {}", entityType);
-        }
+         EntityType entityType = newEntity.getEntityType();
+         switch (entityType) {
+         case CLUSTER:
+         addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER);
+         break;
+
+         case PROCESS:
+         addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
+         break;
+
+         case FEED:
+         FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) 
?
+         FalconEvent.OPERATION.UPDATE_REPLICATION_FEED :
+         FalconEvent.OPERATION.UPDATE_FEED;
+         addEntity(newEntity, operation);
+         break;
+
+         default:
+         LOG.debug("Entity type not processed {}", entityType);
+         }
          **/
     }
 
@@ -124,9 +123,9 @@ public class AtlasService implements FalconService, 
ConfigurationChangeListener
         LOG.info("Adding {} entity to Atlas: {}", 
entity.getEntityType().name(), entity.getName());
 
         try {
-            FalconEvent event =
-                    new FalconEvent(EventUtil.getUser(), operation, entity);
-            FalconEventPublisher.Data data = new 
FalconEventPublisher.Data(event);
+            FalconEvent               event = new 
FalconEvent(EventUtil.getUser(), operation, entity);
+            FalconEventPublisher.Data data  = new 
FalconEventPublisher.Data(event);
+
             publisher.publish(data);
         } catch (Exception ex) {
             throw new FalconException("Unable to publish data to publisher " + 
ex.getMessage(), ex);
diff --git 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java
similarity index 86%
rename from 
addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
rename to 
addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java
index ef5634009..bcf838ca2 100644
--- 
a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java
+++ 
b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.atlas.falcon.Util;
+package org.apache.atlas.falcon.util;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
@@ -29,24 +29,24 @@ import java.util.Map;
  * Falcon event util
  */
 public final class EventUtil {
-
     private EventUtil() {}
 
-
     public static Map<String, String> convertKeyValueStringToMap(final String 
keyValueString) {
         if (StringUtils.isBlank(keyValueString)) {
             return null;
         }
 
         Map<String, String> keyValueMap = new HashMap<>();
+        String[]            tags        = keyValueString.split(",");
 
-        String[] tags = keyValueString.split(",");
         for (String tag : tags) {
-            int index = tag.indexOf("=");
-            String tagKey = tag.substring(0, index).trim();
+            int    index    = tag.indexOf("=");
+            String tagKey   = tag.substring(0, index).trim();
             String tagValue = tag.substring(index + 1, tag.length()).trim();
+
             keyValueMap.put(tagKey, tagValue);
         }
+
         return keyValueMap;
     }
 
@@ -56,6 +56,7 @@ public final class EventUtil {
         } catch (Exception ioe) {
             //Ignore is failed to get user, uses login user
         }
+
         return null;
     }
 }
diff --git 
a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
 
b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index e77f4c96d..c96479dad 100644
--- 
a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++ 
b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,17 +23,17 @@ import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.falcon.bridge.FalconBridge;
 import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.falcon.service.AtlasService;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.type.AtlasTypeUtil;
-import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.utils.AuthenticationUtil;
 import org.apache.atlas.utils.ParamChecker;
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.atlas.falcon.service.AtlasService;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.FileSystemStorage;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -50,6 +50,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import javax.xml.bind.JAXBException;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -61,33 +62,145 @@ import static org.testng.Assert.fail;
 public class FalconHookIT {
     public static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
 
-    public static final String CLUSTER_RESOURCE = "/cluster.xml";
-    public static final String FEED_RESOURCE = "/feed.xml";
-    public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
+    public static final String CLUSTER_RESOURCE          = "/cluster.xml";
+    public static final String FEED_RESOURCE             = "/feed.xml";
+    public static final String FEED_HDFS_RESOURCE        = "/feed-hdfs.xml";
     public static final String FEED_REPLICATION_RESOURCE = 
"/feed-replication.xml";
-    public static final String PROCESS_RESOURCE = "/process.xml";
-
-    private AtlasClientV2 atlasClient;
+    public static final String PROCESS_RESOURCE          = "/process.xml";
 
     private static final ConfigurationStore STORE = ConfigurationStore.get();
 
+    private              AtlasClientV2      atlasClient;
+
     @BeforeClass
     public void setUp() throws Exception {
         Configuration atlasProperties = ApplicationProperties.get();
+
         if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
-            atlasClient = new 
AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT),
 new String[]{"admin", "admin"});
+            atlasClient = new 
AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT),
 new String[] {"admin", "admin"});
         } else {
             atlasClient = new 
AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
         }
 
         AtlasService service = new AtlasService();
+
         service.init();
         STORE.registerListener(service);
         CurrentUser.authenticate(System.getProperty("user.name"));
     }
 
+    @Test
+    public void testCreateProcess() throws Exception {
+        Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, 
"cluster" + random());
+
+        STORE.publish(EntityType.CLUSTER, cluster);
+
+        assertClusterIsRegistered(cluster);
+
+        Feed    infeed    = getTableFeed(FEED_RESOURCE, cluster.getName(), 
null);
+        String  infeedId  = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(infeed.getName(), 
cluster.getName()))).getGuid();
+        Feed    outfeed   = getTableFeed(FEED_RESOURCE, cluster.getName());
+        String  outFeedId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(outfeed.getName(), 
cluster.getName()))).getGuid();
+        Process process   = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, 
"process" + random());
+
+        process.getClusters().getClusters().get(0).setName(cluster.getName());
+        process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+        process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+
+        STORE.publish(EntityType.PROCESS, process);
+
+        String      pid           = assertProcessIsRegistered(process, 
cluster.getName());
+        AtlasEntity processEntity = 
atlasClient.getEntityByGuid(pid).getEntity();
+
+        assertNotNull(processEntity);
+        assertEquals(processEntity.getAttribute(AtlasClient.NAME), 
process.getName());
+        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0)), infeedId);
+        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("outputs")).get(0)), outFeedId);
+    }
+
+    @Test
+    public void testReplicationFeed() throws Exception {
+        Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, 
"cluster" + random());
+
+        STORE.publish(EntityType.CLUSTER, srcCluster);
+
+        assertClusterIsRegistered(srcCluster);
+
+        Cluster targetCluster = loadEntity(EntityType.CLUSTER, 
CLUSTER_RESOURCE, "cluster" + random());
+
+        STORE.publish(EntityType.CLUSTER, targetCluster);
+
+        assertClusterIsRegistered(targetCluster);
+
+        Feed        feed      = getTableFeed(FEED_REPLICATION_RESOURCE, 
srcCluster.getName(), targetCluster.getName());
+        String      inId      = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(feed.getName(), 
srcCluster.getName()))).getGuid();
+        String      outId     = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(feed.getName(), 
targetCluster.getName()))).getGuid();
+        String      processId = 
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName());
+        AtlasEntity process   = 
atlasClient.getEntityByGuid(processId).getEntity();
+
+        assertEquals(getGuidFromObjectId(((List<?>) 
process.getAttribute("inputs")).get(0)), inId);
+        assertEquals(getGuidFromObjectId(((List<?>) 
process.getAttribute("outputs")).get(0)), outId);
+    }
+
+    @Test
+    public void testCreateProcessWithHDFSFeed() throws Exception {
+        Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, 
"cluster" + random());
+
+        STORE.publish(EntityType.CLUSTER, cluster);
+
+        TypesUtil.Pair<String, Feed> result   = 
getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName());
+        Feed                         infeed   = result.right;
+        String                       infeedId = result.left;
+
+        Feed    outfeed   = getTableFeed(FEED_RESOURCE, cluster.getName());
+        String  outfeedId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(outfeed.getName(), 
cluster.getName()))).getGuid();
+        Process process   = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, 
"process" + random());
+
+        process.getClusters().getClusters().get(0).setName(cluster.getName());
+        process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+        process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+
+        STORE.publish(EntityType.PROCESS, process);
+
+        String      pid           = assertProcessIsRegistered(process, 
cluster.getName());
+        AtlasEntity processEntity = 
atlasClient.getEntityByGuid(pid).getEntity();
+
+        assertEquals(processEntity.getAttribute(AtlasClient.NAME), 
process.getName());
+        
assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
 FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName()));
+        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0)), infeedId);
+        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("outputs")).get(0)), outfeedId);
+    }
+
+    /**
+     * Wait for a condition, expressed via a {@link Predicate} to become true.
+     *
+     * @param timeout maximum time in milliseconds to wait for the predicate 
to become true.
+     * @param predicate predicate waiting on.
+     */
+    protected void waitFor(int timeout, Predicate predicate) throws Exception {
+        ParamChecker.notNull(predicate, "predicate");
+
+        long mustEnd = System.currentTimeMillis() + timeout;
+
+        while (true) {
+            try {
+                predicate.evaluate();
+                return;
+            } catch (Error | Exception e) {
+                if (System.currentTimeMillis() >= mustEnd) {
+                    fail("Assertions failed. Failing after waiting for timeout 
" + timeout + " msecs", e);
+                }
+
+                LOG.debug("Waiting up to {} msec as assertion failed", mustEnd 
- System.currentTimeMillis(), e);
+
+                Thread.sleep(400);
+            }
+        }
+    }
+
     private <T extends Entity> T loadEntity(EntityType type, String resource, 
String name) throws JAXBException {
         Entity entity = (Entity) 
type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
+
         switch (entity.getEntityType()) {
             case CLUSTER:
                 ((Cluster) entity).setName(name);
@@ -101,7 +214,8 @@ public class FalconHookIT {
                 ((Process) entity).setName(name);
                 break;
         }
-        return (T)entity;
+
+        return (T) entity;
     }
 
     private String random() {
@@ -112,67 +226,39 @@ public class FalconHookIT {
         return 
String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, 
tableName);
     }
 
-    @Test
-    public void testCreateProcess() throws Exception {
-        Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, 
"cluster" + random());
-        STORE.publish(EntityType.CLUSTER, cluster);
-        assertClusterIsRegistered(cluster);
-
-        Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null);
-        String infeedId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(infeed.getName(), 
cluster.getName()))).getGuid();
-
-        Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
-        String outFeedId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(outfeed.getName(), 
cluster.getName()))).getGuid();
-
-        Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, 
"process" + random());
-        process.getClusters().getClusters().get(0).setName(cluster.getName());
-        process.getInputs().getInputs().get(0).setFeed(infeed.getName());
-        process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
-        STORE.publish(EntityType.PROCESS, process);
-
-        String                         pid           = 
assertProcessIsRegistered(process, cluster.getName());
-        AtlasEntity processEntity = 
atlasClient.getEntityByGuid(pid).getEntity();
-        assertNotNull(processEntity);
-        assertEquals(processEntity.getAttribute(AtlasClient.NAME), 
process.getName());
-        
assertEquals(getGuidFromObjectId(((List<?>)processEntity.getAttribute("inputs")).get(0)),
 infeedId);
-        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("outputs")).get(0)), outFeedId);
-    }
-
     private String assertProcessIsRegistered(Process process, String 
clusterName) throws Exception {
-        return 
assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(),
-                AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getProcessQualifiedName(process.getName(), 
clusterName));
+        return 
assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getProcessQualifiedName(process.getName(), clusterName));
     }
 
     private String assertClusterIsRegistered(Cluster cluster) throws Exception 
{
-        return 
assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(),
-                AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName());
+        return 
assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName());
     }
 
     private TypesUtil.Pair<String, Feed> getHDFSFeed(String feedResource, 
String clusterName) throws Exception {
-        Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + 
random());
+        Feed                                     feed        = 
loadEntity(EntityType.FEED, feedResource, "feed" + random());
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = 
feed.getClusters().getClusters().get(0);
+
         feedCluster.setName(clusterName);
         STORE.publish(EntityType.FEED, feed);
+
         String feedId = assertFeedIsRegistered(feed, clusterName);
+
         assertFeedAttributes(feedId);
 
-        String processId = 
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
-                AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(feed.getName(), 
clusterName));
+        String      processId     = 
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
         AtlasEntity processEntity = 
atlasClient.getEntityByGuid(processId).getEntity();
+
         assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("outputs")).get(0)), feedId);
 
-        String inputId = getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0));
+        String      inputId    = getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0));
         AtlasEntity pathEntity = 
atlasClient.getEntityByGuid(inputId).getEntity();
+
         assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH);
 
-        List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
-        Location dataLocation = FileSystemStorage.getLocation(locations, 
LocationType.DATA);
-        
assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
-                FalconBridge.normalize(dataLocation.getPath()));
+        List<Location> locations    = FeedHelper.getLocations(feedCluster, 
feed);
+        Location       dataLocation = FileSystemStorage.getLocation(locations, 
LocationType.DATA);
+
+        
assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), 
FalconBridge.normalize(dataLocation.getPath()));
 
         return TypesUtil.Pair.of(feedId, feed);
     }
@@ -182,114 +268,67 @@ public class FalconHookIT {
     }
 
     private Feed getTableFeed(String feedResource, String clusterName, String 
secondClusterName) throws Exception {
-        Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + 
random());
+        Feed                                     feed        = 
loadEntity(EntityType.FEED, feedResource, "feed" + random());
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = 
feed.getClusters().getClusters().get(0);
+
         feedCluster.setName(clusterName);
-        String dbName = "db" + random();
+
+        String dbName    = "db" + random();
         String tableName = "table" + random();
+
         feedCluster.getTable().setUri(getTableUri(dbName, tableName));
 
-        String dbName2 = "db" + random();
+        String dbName2    = "db" + random();
         String tableName2 = "table" + random();
 
         if (secondClusterName != null) {
             org.apache.falcon.entity.v0.feed.Cluster feedCluster2 = 
feed.getClusters().getClusters().get(1);
+
             feedCluster2.setName(secondClusterName);
             feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2));
         }
 
         STORE.publish(EntityType.FEED, feed);
+
         String feedId = assertFeedIsRegistered(feed, clusterName);
+
         assertFeedAttributes(feedId);
         verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, 
tableName);
 
         if (secondClusterName != null) {
             String feedId2 = assertFeedIsRegistered(feed, secondClusterName);
+
             assertFeedAttributes(feedId2);
             verifyFeedLineage(feed.getName(), secondClusterName, feedId2, 
dbName2, tableName2);
         }
+
         return feed;
     }
 
     private void assertFeedAttributes(String feedId) throws Exception {
         AtlasEntity feedEntity = 
atlasClient.getEntityByGuid(feedId).getEntity();
+
         assertEquals(feedEntity.getAttribute(AtlasClient.OWNER), "testuser");
         assertEquals(feedEntity.getAttribute(FalconBridge.FREQUENCY), 
"hours(1)");
         assertEquals(feedEntity.getAttribute(AtlasClient.DESCRIPTION), "test 
input");
     }
 
-    private void verifyFeedLineage(String feedName, String clusterName, String 
feedId, String dbName, String tableName)
-            throws Exception{
+    private void verifyFeedLineage(String feedName, String clusterName, String 
feedId, String dbName, String tableName) throws Exception {
         //verify that lineage from hive table to falcon feed is created
-        String processId = 
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
-                AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(feedName, clusterName));
+        String      processId     = 
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(feedName, clusterName));
         AtlasEntity processEntity = 
atlasClient.getEntityByGuid(processId).getEntity();
+
         assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("outputs")).get(0)), feedId);
 
-        String inputId = getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0));
+        String      inputId     = getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0));
         AtlasEntity tableEntity = 
atlasClient.getEntityByGuid(inputId).getEntity();
-        assertEquals(tableEntity.getTypeName(), 
HiveDataTypes.HIVE_TABLE.getName());
-        
assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
-                HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, 
tableName));
 
+        assertEquals(tableEntity.getTypeName(), 
HiveDataTypes.HIVE_TABLE.getName());
+        
assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), 
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
     }
 
     private String assertFeedIsRegistered(Feed feed, String clusterName) 
throws Exception {
-        return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(feed.getName(), 
clusterName));
-    }
-
-    @Test
-    public void testReplicationFeed() throws Exception {
-        Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, 
"cluster" + random());
-        STORE.publish(EntityType.CLUSTER, srcCluster);
-        assertClusterIsRegistered(srcCluster);
-
-        Cluster targetCluster = loadEntity(EntityType.CLUSTER, 
CLUSTER_RESOURCE, "cluster" + random());
-        STORE.publish(EntityType.CLUSTER, targetCluster);
-        assertClusterIsRegistered(targetCluster);
-
-        Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, 
srcCluster.getName(), targetCluster.getName());
-        String inId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                                                                               
                                              
FalconBridge.getFeedQualifiedName(feed.getName(), 
srcCluster.getName()))).getGuid();
-        String outId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(feed.getName(), 
targetCluster.getName()))).getGuid();
-
-
-        String processId = 
assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(),
-                AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName());
-        AtlasEntity process = 
atlasClient.getEntityByGuid(processId).getEntity();
-        assertEquals(getGuidFromObjectId(((List<?>) 
process.getAttribute("inputs")).get(0)), inId);
-        assertEquals(getGuidFromObjectId(((List<?>) 
process.getAttribute("outputs")).get(0)), outId);
-    }
-
-    @Test
-    public void testCreateProcessWithHDFSFeed() throws Exception {
-        Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, 
"cluster" + random());
-        STORE.publish(EntityType.CLUSTER, cluster);
-
-        TypesUtil.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE, 
cluster.getName());
-        Feed infeed = result.right;
-        String infeedId = result.left;
-
-        Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
-        String outfeedId = 
atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), 
Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME,
-                FalconBridge.getFeedQualifiedName(outfeed.getName(), 
cluster.getName()))).getGuid();
-
-        Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, 
"process" + random());
-        process.getClusters().getClusters().get(0).setName(cluster.getName());
-        process.getInputs().getInputs().get(0).setFeed(infeed.getName());
-        process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
-        STORE.publish(EntityType.PROCESS, process);
-
-        String pid = assertProcessIsRegistered(process, cluster.getName());
-        AtlasEntity processEntity = 
atlasClient.getEntityByGuid(pid).getEntity();
-        assertEquals(processEntity.getAttribute(AtlasClient.NAME), 
process.getName());
-        
assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME),
-                FalconBridge.getProcessQualifiedName(process.getName(), 
cluster.getName()));
-        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("inputs")).get(0)), infeedId);
-        assertEquals(getGuidFromObjectId(((List<?>) 
processEntity.getAttribute("outputs")).get(0)), outfeedId);
+        return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), 
AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, 
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
     }
 
     private String assertEntityIsRegistered(final String typeName, final 
String property, final String value) throws Exception {
@@ -297,10 +336,12 @@ public class FalconHookIT {
             @Override
             public void evaluate() throws Exception {
                 AtlasEntity.AtlasEntityWithExtInfo entity = 
atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, 
value));
+
                 assertNotNull(entity);
                 assertNotNull(entity.getEntity());
             }
         });
+
         return atlasClient.getEntityHeaderByAttribute(typeName, 
Collections.singletonMap(property, value)).getGuid();
     }
 
@@ -323,28 +364,4 @@ public class FalconHookIT {
          */
         void evaluate() throws Exception;
     }
-
-    /**
-     * Wait for a condition, expressed via a {@link Predicate} to become true.
-     *
-     * @param timeout maximum time in milliseconds to wait for the predicate 
to become true.
-     * @param predicate predicate waiting on.
-     */
-    protected void waitFor(int timeout, Predicate predicate) throws Exception {
-        ParamChecker.notNull(predicate, "predicate");
-        long mustEnd = System.currentTimeMillis() + timeout;
-
-        while (true) {
-            try {
-                predicate.evaluate();
-                return;
-            } catch(Error | Exception e) {
-                if (System.currentTimeMillis() >= mustEnd) {
-                    fail("Assertions failed. Failing after waiting for timeout 
" + timeout + " msecs", e);
-                }
-                LOG.debug("Waiting up to {} msec as assertion failed", mustEnd 
- System.currentTimeMillis(), e);
-                Thread.sleep(400);
-            }
-        }
-    }
 }
diff --git 
a/addons/falcon-bridge/src/test/resources/atlas-application.properties 
b/addons/falcon-bridge/src/test/resources/atlas-application.properties
index 0ce0f46c9..94a75aab4 100644
--- a/addons/falcon-bridge/src/test/resources/atlas-application.properties
+++ b/addons/falcon-bridge/src/test/resources/atlas-application.properties
@@ -15,54 +15,39 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 #########  Atlas Server Configs #########
 atlas.rest.address=http://localhost:31000
-
 #########  Graph Database Configs  #########
-
-
 # Graph database implementation.  Value inserted by maven.
 
atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase
 atlas.graph.index.search.solr.wait-searcher=true
-
 # Graph Storage
 atlas.graph.storage.backend=berkeleyje
-
 # Entity repository implementation
 
atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository
-
 # Graph Search Index Backend
 atlas.graph.index.search.backend=solr
-
 #Berkeley storage directory
 atlas.graph.storage.directory=${sys:atlas.data}/berkley
 atlas.graph.storage.transactions=true
-
 #hbase
 #For standalone mode , specify localhost
 #for distributed mode, specify zookeeper quorum here
-
 atlas.graph.storage.hostname=${graph.storage.hostname}
 atlas.graph.storage.hbase.regions-per-server=1
 atlas.graph.storage.lock.wait-time=10000
-
 #ElasticSearch
 atlas.graph.index.search.directory=${sys:atlas.data}/es
 atlas.graph.index.search.elasticsearch.client-only=false
 atlas.graph.index.search.elasticsearch.local-mode=true
 atlas.graph.index.search.elasticsearch.create.sleep=2000
-
 # Solr cloud mode properties
 atlas.graph.index.search.solr.mode=cloud
 atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address}
 atlas.graph.index.search.solr.embedded=true
 atlas.graph.index.search.max-result-set-size=150
-
-
 #########  Notification Configs  #########
 atlas.notification.embedded=true
-
 atlas.kafka.zookeeper.connect=localhost:19026
 atlas.kafka.bootstrap.servers=localhost:19027
 atlas.kafka.data=${sys:atlas.data}/kafka
@@ -73,52 +58,38 @@ atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
 #atlas.kafka.auto.commit.enable=false
-
 atlas.kafka.enable.auto.commit=false
 atlas.kafka.auto.offset.reset=earliest
 atlas.kafka.session.timeout.ms=30000
 atlas.kafka.offsets.topic.replication.factor=1
-
-
-
 #########  Entity Audit Configs  #########
 atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS
 atlas.audit.zookeeper.session.timeout.ms=1000
 atlas.audit.hbase.zookeeper.quorum=localhost
 atlas.audit.hbase.zookeeper.property.clientPort=19026
-
 #########  Security Properties  #########
-
 # SSL config
 atlas.enableTLS=false
 atlas.server.https.port=31443
-
 #########  Security Properties  #########
-
 hbase.security.authentication=simple
-
 atlas.hook.falcon.synchronous=true
-
 #########  JAAS Configuration ########
-
-atlas.jaas.KafkaClient.loginModuleName = 
com.sun.security.auth.module.Krb5LoginModule
-atlas.jaas.KafkaClient.loginModuleControlFlag = required
-atlas.jaas.KafkaClient.option.useKeyTab = true
-atlas.jaas.KafkaClient.option.storeKey = true
-atlas.jaas.KafkaClient.option.serviceName = kafka
-atlas.jaas.KafkaClient.option.keyTab = 
/etc/security/keytabs/atlas.service.keytab
-atlas.jaas.KafkaClient.option.principal = atlas/_h...@example.com
-
+atlas.jaas.KafkaClient.loginModuleName=com.sun.security.auth.module.Krb5LoginModule
+atlas.jaas.KafkaClient.loginModuleControlFlag=required
+atlas.jaas.KafkaClient.option.useKeyTab=true
+atlas.jaas.KafkaClient.option.storeKey=true
+atlas.jaas.KafkaClient.option.serviceName=kafka
+atlas.jaas.KafkaClient.option.keyTab=/etc/security/keytabs/atlas.service.keytab
+atlas.jaas.KafkaClient.option.principal=atlas/_h...@example.com
 #########  High Availability Configuration ########
 atlas.server.ha.enabled=false
 #atlas.server.ids=id1
 #atlas.server.address.id1=localhost:21000
-
 ######### Atlas Authorization #########
 atlas.authorizer.impl=none
 # atlas.authorizer.impl=simple
 # atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json
-
 ######### Atlas Authentication #########
 atlas.authentication.method.file=true
 atlas.authentication.method.ldap.type=none
diff --git a/addons/falcon-bridge/src/test/resources/atlas-logback.xml 
b/addons/falcon-bridge/src/test/resources/atlas-logback.xml
index 78fd420dc..991cb621d 100755
--- a/addons/falcon-bridge/src/test/resources/atlas-logback.xml
+++ b/addons/falcon-bridge/src/test/resources/atlas-logback.xml
@@ -18,115 +18,114 @@
   -->
 
 <configuration>
-  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
-    <param name="Target" value="System.out"/>
-    <encoder>
-      <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
-    </encoder>
-    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-      <level>INFO</level>
-    </filter>
-  </appender>
-
-  <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>${atlas.log.dir}/${atlas.log.file}</file>
-    <append>true</append>
-    <encoder>
-      <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
-    </encoder>
-    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-      <fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
-      <maxHistory>20</maxHistory>
-      <cleanHistoryOnStart>true</cleanHistoryOnStart>
-    </rollingPolicy>
-  </appender>
-
-  <appender name="AUDIT" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>${atlas.log.dir}/audit.log</file>
-    <append>true</append>
-    <encoder>
-      <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
-    </encoder>
-    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-      <fileNamePattern>${atlas.log.dir}/audit-%d.log</fileNamePattern>
-      <maxHistory>20</maxHistory>
-      <cleanHistoryOnStart>false</cleanHistoryOnStart>
-    </rollingPolicy>
-  </appender>
-
-  <appender name="METRICS" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>${atlas.log.dir}/metrics.log</file>
-    <append>true</append>
-    <encoder>
-      <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
-    </encoder>
-    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-      <fileNamePattern>${atlas.log.dir}/metrics-%d.log</fileNamePattern>
-      <maxHistory>20</maxHistory>
-      <cleanHistoryOnStart>false</cleanHistoryOnStart>
-    </rollingPolicy>
-  </appender>
-
-  <appender name="FAILED" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-    <file>${atlas.log.dir}/failed.log</file>
-    <append>true</append>
-    <encoder>
-      <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
-    </encoder>
-    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-      <fileNamePattern>${atlas.log.dir}/failed-%d.log</fileNamePattern>
-      <maxHistory>20</maxHistory>
-      <cleanHistoryOnStart>false</cleanHistoryOnStart>
-    </rollingPolicy>
-  </appender>
-
-  <logger name="org.apache.atlas" additivity="false" level="info">
-    <appender-ref ref="FILE"/>
-  </logger>
-
-  <logger name="org.apache.atlas.impala.ImpalaLineageTool" additivity="false" 
level="debug">
-    <appender-ref ref="FILE"/>
-  </logger>
-
-  <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" 
additivity="false" level="debug">
-    <appender-ref ref="FILE"/>
-  </logger>
-
-  <logger name="org.janusgraph" additivity="false" level="warn">
-    <appender-ref ref="FILE"/>
-  </logger>
-
-  <logger name="org.springframework" additivity="false" level="warn">
-    <appender-ref ref="console"/>
-  </logger>
-
-  <logger name="org.eclipse" additivity="false" level="warn">
-    <appender-ref ref="console"/>
-  </logger>
-
-  <logger name="com.sun.jersey" additivity="false" level="warn">
-    <appender-ref ref="console"/>
-  </logger>
-
-
-  <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was 
supplied but isn't a known config -->
-  <logger name="org.apache.kafka.common.config.AbstractConfig" 
additivity="false" level="error">
-    <appender-ref ref="FILE"/>
-  </logger>
-
-  <logger name="METRICS" additivity="false" level="debug">
-    <appender-ref ref="METRICS"/>
-  </logger>
-
-  <logger name="FAILED" additivity="false" level="info">
-    <appender-ref ref="FAILED"/>
-  </logger>
-
-  <logger name="AUDIT" additivity="false" level="info">
-    <appender-ref ref="AUDIT"/>
-  </logger>
-
-   <root level="warn">
-    <appender-ref ref="FILE"/>
-  </root>
+    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+        <param name="Target" value="System.out" />
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${atlas.log.dir}/${atlas.log.file}</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
+            <maxHistory>20</maxHistory>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
+        </rollingPolicy>
+    </appender>
+
+    <appender name="AUDIT" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${atlas.log.dir}/audit.log</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${atlas.log.dir}/audit-%d.log</fileNamePattern>
+            <maxHistory>20</maxHistory>
+            <cleanHistoryOnStart>false</cleanHistoryOnStart>
+        </rollingPolicy>
+    </appender>
+
+    <appender name="METRICS" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${atlas.log.dir}/metrics.log</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${atlas.log.dir}/metrics-%d.log</fileNamePattern>
+            <maxHistory>20</maxHistory>
+            <cleanHistoryOnStart>false</cleanHistoryOnStart>
+        </rollingPolicy>
+    </appender>
+
+    <appender name="FAILED" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${atlas.log.dir}/failed.log</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${atlas.log.dir}/failed-%d.log</fileNamePattern>
+            <maxHistory>20</maxHistory>
+            <cleanHistoryOnStart>false</cleanHistoryOnStart>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="org.apache.atlas" additivity="false" level="info">
+        <appender-ref ref="FILE" />
+    </logger>
+
+    <logger name="org.apache.atlas.impala.ImpalaLineageTool" 
additivity="false" level="debug">
+        <appender-ref ref="FILE" />
+    </logger>
+
+    <logger name="org.apache.atlas.impala.hook.ImpalaLineageHook" 
additivity="false" level="debug">
+        <appender-ref ref="FILE" />
+    </logger>
+
+    <logger name="org.janusgraph" additivity="false" level="warn">
+        <appender-ref ref="FILE" />
+    </logger>
+
+    <logger name="org.springframework" additivity="false" level="warn">
+        <appender-ref ref="console" />
+    </logger>
+
+    <logger name="org.eclipse" additivity="false" level="warn">
+        <appender-ref ref="console" />
+    </logger>
+
+    <logger name="com.sun.jersey" additivity="false" level="warn">
+        <appender-ref ref="console" />
+    </logger>
+
+    <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was 
supplied but isn't a known config -->
+    <logger name="org.apache.kafka.common.config.AbstractConfig" 
additivity="false" level="error">
+        <appender-ref ref="FILE" />
+    </logger>
+
+    <logger name="METRICS" additivity="false" level="debug">
+        <appender-ref ref="METRICS" />
+    </logger>
+
+    <logger name="FAILED" additivity="false" level="info">
+        <appender-ref ref="FAILED" />
+    </logger>
+
+    <logger name="AUDIT" additivity="false" level="info">
+        <appender-ref ref="AUDIT" />
+    </logger>
+
+    <root level="warn">
+        <appender-ref ref="FILE" />
+    </root>
 </configuration>
diff --git a/addons/falcon-bridge/src/test/resources/cluster.xml 
b/addons/falcon-bridge/src/test/resources/cluster.xml
index b183847db..b7bbda74c 100644
--- a/addons/falcon-bridge/src/test/resources/cluster.xml
+++ b/addons/falcon-bridge/src/test/resources/cluster.xml
@@ -20,7 +20,7 @@
     Primary cluster configuration for demo vm
   -->
 <cluster colo="west-coast" description="Primary Cluster" name="testcluster"
-         xmlns="uri:falcon:cluster:0.1" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+         xmlns="uri:falcon:cluster:0.1">
 
     <interfaces>
         <interface type="readonly" endpoint="hftp://localhost:10070"; 
version="1.1.1" />
diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml 
b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
index 435db0745..582998a3c 100644
--- a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
+++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
@@ -21,19 +21,19 @@
 
     <frequency>hours(1)</frequency>
     <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(3)"/>
+    <late-arrival cut-off="hours(3)" />
 
     <clusters>
         <cluster name="testcluster" type="source">
-            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
-            <retention limit="hours(24)" action="delete"/>
+            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+            <retention limit="hours(24)" action="delete" />
         </cluster>
     </clusters>
 
     <locations>
-        <location type="data" 
path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
+        <location type="data" 
path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}" />
     </locations>
 
-    <ACL owner="testuser" group="group" permission="0x755"/>
-    <schema location="hcat" provider="hcat"/>
+    <ACL owner="testuser" group="group" permission="0x755" />
+    <schema location="hcat" provider="hcat" />
 </feed>
diff --git a/addons/falcon-bridge/src/test/resources/feed-replication.xml 
b/addons/falcon-bridge/src/test/resources/feed-replication.xml
index dcd427b18..42c4f863f 100644
--- a/addons/falcon-bridge/src/test/resources/feed-replication.xml
+++ b/addons/falcon-bridge/src/test/resources/feed-replication.xml
@@ -21,23 +21,23 @@
 
     <frequency>hours(1)</frequency>
     <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(3)"/>
+    <late-arrival cut-off="hours(3)" />
 
     <clusters>
         <cluster name="testcluster" type="source">
-            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
-            <retention limit="hours(24)" action="delete"/>
+            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+            <retention limit="hours(24)" action="delete" />
             <table 
uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
         </cluster>
         <cluster name="testcluster" type="target">
-            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
-            <retention limit="hours(24)" action="delete"/>
+            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+            <retention limit="hours(24)" action="delete" />
             <table 
uri="catalog:outdb:outtable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
         </cluster>
     </clusters>
 
     <table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
 
-    <ACL owner="testuser" group="group" permission="0x755"/>
-    <schema location="hcat" provider="hcat"/>
+    <ACL owner="testuser" group="group" permission="0x755" />
+    <schema location="hcat" provider="hcat" />
 </feed>
diff --git a/addons/falcon-bridge/src/test/resources/feed.xml 
b/addons/falcon-bridge/src/test/resources/feed.xml
index 473c745ce..f58316b77 100644
--- a/addons/falcon-bridge/src/test/resources/feed.xml
+++ b/addons/falcon-bridge/src/test/resources/feed.xml
@@ -21,18 +21,18 @@
 
     <frequency>hours(1)</frequency>
     <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(3)"/>
+    <late-arrival cut-off="hours(3)" />
 
     <clusters>
         <cluster name="testcluster" type="source">
-            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
-            <retention limit="hours(24)" action="delete"/>
+            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z" />
+            <retention limit="hours(24)" action="delete" />
             <table 
uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
         </cluster>
     </clusters>
 
     <table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
 
-    <ACL owner="testuser" group="group" permission="0x755"/>
-    <schema location="hcat" provider="hcat"/>
+    <ACL owner="testuser" group="group" permission="0x755" />
+    <schema location="hcat" provider="hcat" />
 </feed>
diff --git a/addons/falcon-bridge/src/test/resources/process.xml 
b/addons/falcon-bridge/src/test/resources/process.xml
index b94d0a847..62e7542f1 100644
--- a/addons/falcon-bridge/src/test/resources/process.xml
+++ b/addons/falcon-bridge/src/test/resources/process.xml
@@ -22,7 +22,7 @@
 
     <clusters>
         <cluster name="testcluster">
-            <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/>
+            <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z" />
         </cluster>
     </clusters>
 
@@ -32,22 +32,22 @@
     <timezone>UTC</timezone>
 
     <inputs>
-        <input end="today(0,0)" start="today(0,0)" feed="testinput" 
name="input"/>
+        <input end="today(0,0)" start="today(0,0)" feed="testinput" 
name="input" />
     </inputs>
 
     <outputs>
-        <output instance="now(0,0)" feed="testoutput" name="output"/>
+        <output instance="now(0,0)" feed="testoutput" name="output" />
     </outputs>
 
     <properties>
-        <property name="blah" value="blah"/>
+        <property name="blah" value="blah" />
     </properties>
 
-    <workflow engine="hive" path="/falcon/test/apps/hive/script.hql"/>
+    <workflow engine="hive" path="/falcon/test/apps/hive/script.hql" />
 
-    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+    <retry policy="periodic" delay="minutes(10)" attempts="3" />
 
     <late-process policy="exp-backoff" delay="hours(2)">
-        <late-input input="input" workflow-path="/falcon/test/workflow"/>
+        <late-input input="input" workflow-path="/falcon/test/workflow" />
     </late-process>
 </process>
diff --git a/addons/falcon-bridge/src/test/resources/startup.properties 
b/addons/falcon-bridge/src/test/resources/startup.properties
index 962347039..661a2be89 100644
--- a/addons/falcon-bridge/src/test/resources/startup.properties
+++ b/addons/falcon-bridge/src/test/resources/startup.properties
@@ -15,7 +15,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 *.domain=debug
 *.config.store.persist=false
 *.config.store.uri=target/config_store
\ No newline at end of file


Reply via email to