This is an automated email from the ASF dual-hosted git repository.

radhikakundam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ef15afc ATLAS-4830: Make ignore patterns generic for all the hooks
f3ef15afc is described below

commit f3ef15afc9aa336c27aeeb427e5f2078b233d364
Author: Amruth S <amrut...@cloudera.com>
AuthorDate: Fri Mar 15 17:40:59 2024 +0530

    ATLAS-4830: Make ignore patterns generic for all the hooks
    
    Signed-off-by: Radhika Kundam <radhika.kun...@gmail.com>
---
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |  15 --
 .../main/java/org/apache/atlas/hook/AtlasHook.java |  85 ++++++++++-
 .../notification/NotificationHookConsumer.java     |  66 +++++++++
 .../preprocessor/GenericEntityPreprocessor.java    |  61 ++++++++
 .../GenericEntityPreprocessorTest.java             | 161 +++++++++++++++++++++
 5 files changed, 371 insertions(+), 17 deletions(-)

diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 4d74d0c48..96b77e187 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -336,21 +336,6 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
 
         return ret;
     }
-
-    private boolean isMatch(String name, List<Pattern> patterns) {
-        boolean ret = false;
-
-        for (Pattern p : patterns) {
-            if (p.matcher(name).matches()) {
-                ret = true;
-
-                break;
-            }
-        }
-
-        return ret;
-    }
-
     public static HiveHookObjectNamesCache getKnownObjects() {
         if (knownObjects != null && knownObjects.isCacheExpired()) {
             LOG.info("HiveHook.run(): purging cached databaseNames ({}) and 
tableNames ({})", knownObjects.getCachedDbCount(), 
knownObjects.getCachedTableCount());
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 4c70aedb9..980d4feec 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -24,10 +24,12 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.utils.AtlasConfigurationUtil;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.atlas.model.notification.MessageSource;
 import org.apache.commons.lang.StringUtils;
@@ -38,11 +40,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 
 /**
@@ -65,6 +71,8 @@ public abstract class AtlasHook {
     public static final String CLUSTER_NAME_KEY                                
   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME                            
   = "primary";
     public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED           
   = "atlas.hook.messages.sort.enabled";
+    public static final String ATLAS_HOOK_ENTITY_IGNORE_PATTERN                
   = "atlas.hook.entity.ignore.pattern";
+    public static final String ATTRIBUTE_QUALIFIED_NAME                        
   = "qualifiedName";
 
     protected static Configuration         atlasProperties;
     protected static NotificationInterface notificationInterface;
@@ -79,6 +87,8 @@ public abstract class AtlasHook {
     private static       ExecutorService      executor = null;
     public  static final boolean              isRESTNotificationEnabled;
     public  static final boolean              isHookMsgsSortEnabled;
+    private static final List<Pattern> entitiesToIgnore  = new ArrayList<>();
+    private static boolean shouldPreprocess = false;
 
 
     static {
@@ -106,6 +116,23 @@ public abstract class AtlasHook {
         notificationRetryInterval = 
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
         notificationInterface     = NotificationProvider.get();
 
+        String[] patternsToIgnoreEntities = 
atlasProperties.getStringArray(ATLAS_HOOK_ENTITY_IGNORE_PATTERN);
+
+        if (patternsToIgnoreEntities != null) {
+            for (String pattern: patternsToIgnoreEntities) {
+                try {
+                    entitiesToIgnore.add(Pattern.compile(pattern));
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, pattern);
+                }
+            }
+            LOG.info("{}={}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, 
entitiesToIgnore);
+        }
+
+
+        shouldPreprocess = CollectionUtils.isNotEmpty(entitiesToIgnore);
+
         String currentUser = "";
 
         try {
@@ -163,6 +190,60 @@ public abstract class AtlasHook {
 
     public abstract String getMessageSource();
 
+    protected static boolean isMatch(String qualifiedName, List<Pattern> 
patterns) {
+        return patterns.stream().anyMatch((Pattern pattern) -> 
pattern.matcher(qualifiedName).matches());
+    }
+
+    private static AtlasEntity.AtlasEntitiesWithExtInfo 
getAtlasEntitiesWithExtInfo(HookNotification hookNotification) {
+        AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = null;
+        switch (hookNotification.getType()) {
+            case ENTITY_CREATE_V2:
+                entitiesWithExtInfo = 
((HookNotification.EntityCreateRequestV2) hookNotification).getEntities();
+                break;
+            case ENTITY_FULL_UPDATE_V2:
+                entitiesWithExtInfo = 
((HookNotification.EntityUpdateRequestV2) hookNotification).getEntities();
+                break;
+        }
+        return entitiesWithExtInfo;
+
+    }
+
+    private static void preprocessEntities(List<HookNotification> 
hookNotifications) {
+        for (int i = 0; i < hookNotifications.size(); i++) {
+            HookNotification hookNotification = hookNotifications.get(i);
+
+            AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = 
getAtlasEntitiesWithExtInfo(hookNotification);
+
+            if (entitiesWithExtInfo == null) {
+                return;
+            }
+
+            List<AtlasEntity> entities = entitiesWithExtInfo.getEntities();
+            entities = ((entities != null) ? entities : 
Collections.emptyList());
+            entities.removeIf((AtlasEntity entity) -> 
isMatch(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(), 
entitiesToIgnore));
+
+
+            Map<String, AtlasEntity> referredEntitiesMap = 
entitiesWithExtInfo.getReferredEntities();
+            referredEntitiesMap = ((referredEntitiesMap != null) ? 
referredEntitiesMap: Collections.emptyMap());
+            referredEntitiesMap.entrySet().removeIf((Map.Entry<String, 
AtlasEntity> entry) -> 
isMatch(entry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(), 
entitiesToIgnore));
+
+
+            if (CollectionUtils.isEmpty(entities) && 
CollectionUtils.isEmpty(referredEntitiesMap.values())) {
+                hookNotifications.remove(i--);
+
+                LOG.info("ignored message: {}", hookNotification);
+            }
+        }
+    }
+    private static void  notifyEntitiesPostPreprocess(List<HookNotification> 
messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
+        if (shouldPreprocess) {
+            preprocessEntities(messages);
+        }
+        if (CollectionUtils.isNotEmpty(messages)) {
+            notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger, source);
+        }
+    }
+
     /**
      * Notify atlas of the entity through message. The entity can be a
      * complex entity with reference to other entities.
@@ -174,12 +255,12 @@ public abstract class AtlasHook {
      */
     public static void notifyEntities(List<HookNotification> messages, 
UserGroupInformation ugi, int maxRetries, MessageSource source) {
         if (executor == null) { // send synchronously
-            notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger, source);
+            notifyEntitiesPostPreprocess(messages, ugi, maxRetries, source);
         } else {
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
-                    notifyEntitiesInternal(messages, maxRetries, ugi, 
notificationInterface, logFailedMessages, failedMessagesLogger, source);
+                    notifyEntitiesPostPreprocess(messages, ugi, maxRetries, 
source);
                 }
             });
         }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 936423aa2..7b02ac449 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -39,6 +39,7 @@ import 
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV
 import 
org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
 import org.apache.atlas.notification.NotificationInterface.NotificationType;
 import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
+import org.apache.atlas.notification.preprocessor.GenericEntityPreprocessor;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext;
 import 
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
@@ -149,6 +150,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
     public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633    
              = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
     public static final String 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
+    public static final String CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN  
              = 
"atlas.notification.consumer.preprocess.entity.type.ignore.pattern";
+    public static final String CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN       
              = "atlas.notification.consumer.preprocess.entity.ignore.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN   
              = 
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN    
              = 
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE       
              = "atlas.notification.consumer.preprocess.hive_table.cache.size";
@@ -182,6 +185,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final boolean                       
updateHiveProcessNameWithQualifiedName;
     private final int                           
largeMessageProcessingTimeThresholdMs;
     private final boolean                       consumerDisabled;
+    private final List<Pattern>                 entityTypesToIgnore = new 
ArrayList<>();
+    private final List<Pattern>                 entitiesToIgnore = new 
ArrayList<>();
     private final List<Pattern>                 hiveTablesToIgnore = new 
ArrayList<>();
     private final List<Pattern>                 hiveTablesToPrune  = new 
ArrayList<>();
     private final List<String>                  hiveDummyDatabasesToIgnore;
@@ -246,9 +251,36 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? 
new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
 
+        String[] patternEntityTypesToIgnore = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN);
+        String[] patternEntitiesToIgnore = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN);
+
         String[] patternHiveTablesToIgnore = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
         String[] patternHiveTablesToPrune  = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
 
+        if (patternEntityTypesToIgnore != null) {
+            for (String pattern: patternEntityTypesToIgnore) {
+                try {
+                    this.entityTypesToIgnore.add(Pattern.compile(pattern));
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, pattern);
+                }
+            }
+            LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, 
entityTypesToIgnore);
+        }
+
+        if (patternEntitiesToIgnore != null) {
+            for (String pattern: patternEntitiesToIgnore) {
+                try {
+                    this.entitiesToIgnore.add(Pattern.compile(pattern));
+                } catch (Throwable t) {
+                    LOG.warn("failed to compile pattern {}", pattern, t);
+                    LOG.warn("Ignoring invalid pattern in configuration {}: 
{}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, pattern);
+                }
+            }
+            LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, 
entitiesToIgnore);
+        }
+
         if (patternHiveTablesToIgnore != null) {
             for (String pattern : patternHiveTablesToIgnore) {
                 try {
@@ -1073,6 +1105,36 @@ public class NotificationHookConsumer implements 
Service, ActiveStateChangeHandl
         }
     }
 
+    private void preprocessEntities(PreprocessorContext context) {
+        GenericEntityPreprocessor genericEntityPreprocessor = new 
GenericEntityPreprocessor(this.entityTypesToIgnore, this.entitiesToIgnore);
+
+        List<AtlasEntity> entities = context.getEntities();
+
+        if (entities != null) {
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity entity = entities.get(i);
+                genericEntityPreprocessor.preprocess(entity, context);
+
+                if (context.isIgnoredEntity(entity.getGuid())) {
+                    entities.remove(i--);
+                }
+            }
+        }
+
+        Map<String, AtlasEntity> referredEntities = 
context.getReferredEntities();
+
+        if (referredEntities != null) {
+            for (Iterator<Map.Entry<String, AtlasEntity>> iterator = 
referredEntities.entrySet().iterator(); iterator.hasNext(); ) {
+                AtlasEntity entity = iterator.next().getValue();
+                genericEntityPreprocessor.preprocess(entity, context);
+
+                if (context.isIgnoredEntity(entity.getGuid())) {
+                    iterator.remove();
+                }
+            }
+        }
+    }
+
     private PreprocessorContext 
preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
         PreprocessorContext context = null;
 
@@ -1081,6 +1143,10 @@ public class NotificationHookConsumer implements 
Service, ActiveStateChangeHandl
                     hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, 
hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
                     rdbmsTypesRemoveOwnedRefAttrs, 
s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, 
entityCorrelationManager);
 
+            if (CollectionUtils.isNotEmpty(this.entityTypesToIgnore) || 
CollectionUtils.isNotEmpty(this.entitiesToIgnore)) {
+                preprocessEntities(context);
+            }
+
             if (context.isHivePreprocessEnabled()) {
                 preprocessHiveTypes(context);
             }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
new file mode 100644
index 000000000..195cc2945
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class GenericEntityPreprocessor extends EntityPreprocessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityPreprocessor.class);
+    private final List<Pattern> entitiesToIgnore;
+    private final List<Pattern> entityTypesToIgnore;
+    public GenericEntityPreprocessor(List<Pattern> entityTypesToIgnore, 
List<Pattern> entitiesToIgnore) {
+        super("Generic");
+        this.entityTypesToIgnore = entityTypesToIgnore;
+        this.entitiesToIgnore = entitiesToIgnore;
+    }
+
+    private boolean isMatch(String property, List<Pattern> patterns) {
+        return patterns.stream().anyMatch((Pattern pattern) -> 
pattern.matcher(property).matches());
+    }
+
+    private boolean isToBeIgnored(AtlasEntity entity) {
+        String qualifiedName = getQualifiedName(entity);
+        boolean decision = false;
+
+        if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will 
Ignore all entities whose qualified name matches the ignore pattern.
+            decision = isMatch(qualifiedName, this.entitiesToIgnore);
+        } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { // Will 
Ignore all entities whose type matches the regex given.
+            decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore);
+        } else { // Combination of above 2 cases.
+            decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore) 
&& isMatch(qualifiedName, this.entitiesToIgnore);
+        }
+
+        return decision;
+    }
+    @Override
+    public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+        if (entity != null && isToBeIgnored(entity)) {
+            context.addToIgnoredEntities(entity);
+        }
+    }
+}
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
new file mode 100644
index 000000000..f777d27d7
--- /dev/null
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.hook.HookMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import static 
org.apache.atlas.notification.preprocessor.EntityPreprocessor.getQualifiedName;
+
+
+
+public class GenericEntityPreprocessorTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GenericEntityPreprocessorTest.class);
+    private final HookMessageDeserializer deserializer = new 
HookMessageDeserializer();
+
+    private PreprocessorContext getPreprocessorContext(String msgJson) {
+        HookNotification hookNotification = deserializer.deserialize(msgJson);
+
+        AtlasKafkaMessage<HookNotification> kafkaMsg            = new 
AtlasKafkaMessage<>(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, 
-1);
+
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), 
Collections.emptyList(),
+                Collections.emptyList(), Collections.emptyList(), false, 
false, true,
+                false, null);
+
+        return context;
+    }
+
+    private boolean isMatch(List<Pattern> patterns, String property) {
+        for (Pattern p : patterns) {
+            if (p.matcher(property).matches()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void testEntityTypesToIgnore(String msgJson, List<Pattern> 
entityTypesToIgnore) {
+        PreprocessorContext context = getPreprocessorContext(msgJson);
+        List<AtlasEntity> entities = context.getEntities();
+
+        Set<String> filteredEntitiesActual = filterEntity(entities, 
(AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName()));
+
+        if (context.getReferredEntities() != null) {
+            filteredEntitiesActual.addAll(filterEntity(new 
ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> 
isMatch(entityTypesToIgnore, entity.getTypeName())));
+        }
+
+        GenericEntityPreprocessor entityPreprocessor = new 
GenericEntityPreprocessor(entityTypesToIgnore, Collections.emptyList());
+        preprocessEntities(entityPreprocessor, context);
+
+        Assert.assertEquals(filteredEntitiesActual, 
context.getIgnoredEntities());
+    }
+
+    private void preprocessEntities(GenericEntityPreprocessor 
genericEntityPreprocessor, PreprocessorContext context) {
+        List<AtlasEntity> entities = context.getEntities();
+
+        if (entities != null) {
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity entity = entities.get(i);
+                genericEntityPreprocessor.preprocess(entity, context);
+
+                if (context.isIgnoredEntity(entity.getGuid())) {
+                    entities.remove(i--);
+                }
+            }
+        }
+
+        Map<String, AtlasEntity> referredEntities = 
context.getReferredEntities();
+
+        if (referredEntities != null) {
+            for (Iterator<Map.Entry<String, AtlasEntity>> iterator = 
referredEntities.entrySet().iterator(); iterator.hasNext(); ) {
+                AtlasEntity entity = iterator.next().getValue();
+                genericEntityPreprocessor.preprocess(entity, context);
+
+                if (context.isIgnoredEntity(entity.getGuid())) {
+                    iterator.remove();
+                }
+            }
+        }
+    }
+
+    public void testEntitiesToIgnoreByQName(String msgJson, List<Pattern> 
entitiesToIgnore) {
+        PreprocessorContext context = getPreprocessorContext(msgJson);
+        List<AtlasEntity> entities = context.getEntities();
+
+        Set<String> filteredEntitiesActual = filterEntity(entities, 
(AtlasEntity entity) -> isMatch(entitiesToIgnore, getQualifiedName(entity)));
+        if (context.getReferredEntities() != null) {
+            filteredEntitiesActual.addAll(filterEntity(new 
ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> 
isMatch(entitiesToIgnore, getQualifiedName(entity))));
+        }
+
+        GenericEntityPreprocessor entityPreprocessor = new 
GenericEntityPreprocessor(Collections.emptyList(), entitiesToIgnore);
+        preprocessEntities(entityPreprocessor, context);
+
+
+        Assert.assertEquals(filteredEntitiesActual, 
context.getIgnoredEntities());
+    }
+
+    private Set<String> filterEntity(List<AtlasEntity> entities, 
Predicate<AtlasEntity> predicate) {
+        Set<String> filteredEntitiesActual = new HashSet<>();
+
+        if (entities != null) {
+            for (AtlasEntity entity: entities) {
+                if (predicate.test(entity)) {
+                    filteredEntitiesActual.add(entity.getGuid());
+                }
+            }
+        }
+
+        return filteredEntitiesActual;
+    }
+
+    public void testEntitiesToIgnoreByAndTypeQName(String msgJson, 
List<Pattern> entityTypesToIgnore, List<Pattern> entitiesToIgnore) {
+        PreprocessorContext context = getPreprocessorContext(msgJson);
+        List<AtlasEntity> entities = context.getEntities();
+
+        Set<String> filteredEntitiesActual = filterEntity(entities, 
(AtlasEntity entity) ->
+                isMatch(entityTypesToIgnore, entity.getTypeName()) && 
isMatch(entitiesToIgnore, getQualifiedName(entity)));
+
+        if (context.getReferredEntities() != null) {
+            filteredEntitiesActual.addAll(filterEntity(new 
ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) ->
+                    isMatch(entityTypesToIgnore, entity.getTypeName()) && 
isMatch(entitiesToIgnore, getQualifiedName(entity))));
+        }
+
+
+        GenericEntityPreprocessor entityPreprocessor = new 
GenericEntityPreprocessor(entityTypesToIgnore, entitiesToIgnore);
+        preprocessEntities(entityPreprocessor, context);
+
+
+        Assert.assertEquals(filteredEntitiesActual, 
context.getIgnoredEntities());
+    }
+}

Reply via email to