This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new f3ef15afc ATLAS-4830: Make ignore patterns generic for all the hooks f3ef15afc is described below commit f3ef15afc9aa336c27aeeb427e5f2078b233d364 Author: Amruth S <amrut...@cloudera.com> AuthorDate: Fri Mar 15 17:40:59 2024 +0530 ATLAS-4830: Make ignore patterns generic for all the hooks Signed-off-by: Radhika Kundam <radhika.kun...@gmail.com> --- .../java/org/apache/atlas/hive/hook/HiveHook.java | 15 -- .../main/java/org/apache/atlas/hook/AtlasHook.java | 85 ++++++++++- .../notification/NotificationHookConsumer.java | 66 +++++++++ .../preprocessor/GenericEntityPreprocessor.java | 61 ++++++++ .../GenericEntityPreprocessorTest.java | 161 +++++++++++++++++++++ 5 files changed, 371 insertions(+), 17 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 4d74d0c48..96b77e187 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -336,21 +336,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return ret; } - - private boolean isMatch(String name, List<Pattern> patterns) { - boolean ret = false; - - for (Pattern p : patterns) { - if (p.matcher(name).matches()) { - ret = true; - - break; - } - } - - return ret; - } - public static HiveHookObjectNamesCache getKnownObjects() { if (knownObjects != null && knownObjects.isCacheExpired()) { LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount()); diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 4c70aedb9..980d4feec 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -24,10 +24,12 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasConstants; import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.utils.AtlasConfigurationUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.atlas.model.notification.MessageSource; import org.apache.commons.lang.StringUtils; @@ -38,11 +40,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; /** @@ -65,6 +71,8 @@ public abstract class AtlasHook { public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED = "atlas.hook.messages.sort.enabled"; + public static final String ATLAS_HOOK_ENTITY_IGNORE_PATTERN = "atlas.hook.entity.ignore.pattern"; + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; protected static Configuration atlasProperties; protected static NotificationInterface notificationInterface; @@ -79,6 +87,8 @@ public abstract class AtlasHook { private static ExecutorService executor = null; public static final boolean isRESTNotificationEnabled; public static final boolean isHookMsgsSortEnabled; + private static final List<Pattern> entitiesToIgnore = new ArrayList<>(); + private static boolean shouldPreprocess = false; static { @@ -106,6 +116,23 @@ public abstract class AtlasHook { notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); notificationInterface = NotificationProvider.get(); + String[] patternsToIgnoreEntities = atlasProperties.getStringArray(ATLAS_HOOK_ENTITY_IGNORE_PATTERN); + + if (patternsToIgnoreEntities != null) { + for (String pattern: patternsToIgnoreEntities) { + try { + entitiesToIgnore.add(Pattern.compile(pattern)); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, pattern); + } + } + LOG.info("{}={}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, entitiesToIgnore); + } + + + shouldPreprocess = CollectionUtils.isNotEmpty(entitiesToIgnore); + String currentUser = ""; try { @@ -163,6 +190,60 @@ public abstract class AtlasHook { public abstract String getMessageSource(); + protected static boolean isMatch(String qualifiedName, List<Pattern> patterns) { + return patterns.stream().anyMatch((Pattern pattern) -> pattern.matcher(qualifiedName).matches()); + } + + private static AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntitiesWithExtInfo(HookNotification hookNotification) { + AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = null; + switch (hookNotification.getType()) { + case ENTITY_CREATE_V2: + entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) hookNotification).getEntities(); + break; + case ENTITY_FULL_UPDATE_V2: + entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) hookNotification).getEntities(); + break; + } + return entitiesWithExtInfo; + + } + + private static void preprocessEntities(List<HookNotification> hookNotifications) { + for (int i = 0; i < hookNotifications.size(); i++) { + HookNotification hookNotification = hookNotifications.get(i); + + AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = getAtlasEntitiesWithExtInfo(hookNotification); + + if (entitiesWithExtInfo == null) { + return; + } + + List<AtlasEntity> entities = entitiesWithExtInfo.getEntities(); + entities = ((entities != null) ? entities : Collections.emptyList()); + entities.removeIf((AtlasEntity entity) -> isMatch(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(), entitiesToIgnore)); + + + Map<String, AtlasEntity> referredEntitiesMap = entitiesWithExtInfo.getReferredEntities(); + referredEntitiesMap = ((referredEntitiesMap != null) ? referredEntitiesMap: Collections.emptyMap()); + referredEntitiesMap.entrySet().removeIf((Map.Entry<String, AtlasEntity> entry) -> isMatch(entry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(), entitiesToIgnore)); + + + if (CollectionUtils.isEmpty(entities) && CollectionUtils.isEmpty(referredEntitiesMap.values())) { + hookNotifications.remove(i--); + + LOG.info("ignored message: {}", hookNotification); + } + } + } + private static void notifyEntitiesPostPreprocess(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) { + if (shouldPreprocess) { + preprocessEntities(messages); + } + if (CollectionUtils.isNotEmpty(messages)) { + notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source); + } + } + /** * Notify atlas of the entity through message. The entity can be a * complex entity with reference to other entities. @@ -174,12 +255,12 @@ public abstract class AtlasHook { */ public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) { if (executor == null) { // send synchronously - notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source); + notifyEntitiesPostPreprocess(messages, ugi, maxRetries, source); } else { executor.submit(new Runnable() { @Override public void run() { - notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source); + notifyEntitiesPostPreprocess(messages, ugi, maxRetries, source); } }); } diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 936423aa2..7b02ac449 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -39,6 +39,7 @@ import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2; import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.preprocessor.EntityPreprocessor; +import org.apache.atlas.notification.preprocessor.GenericEntityPreprocessor; import org.apache.atlas.notification.preprocessor.PreprocessorContext; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.repository.store.graph.EntityCorrelationStore; @@ -149,6 +150,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633"; public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold"; + public static final String CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.entity.type.ignore.pattern"; + public static final String CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.entity.ignore.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern"; public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size"; @@ -182,6 +185,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final boolean updateHiveProcessNameWithQualifiedName; private final int largeMessageProcessingTimeThresholdMs; private final boolean consumerDisabled; + private final List<Pattern> entityTypesToIgnore = new ArrayList<>(); + private final List<Pattern> entitiesToIgnore = new ArrayList<>(); private final List<Pattern> hiveTablesToIgnore = new ArrayList<>(); private final List<Pattern> hiveTablesToPrune = new ArrayList<>(); private final List<String> hiveDummyDatabasesToIgnore; @@ -246,9 +251,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null; + String[] patternEntityTypesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN); + String[] patternEntitiesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN); + String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN); String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN); + if (patternEntityTypesToIgnore != null) { + for (String pattern: patternEntityTypesToIgnore) { + try { + this.entityTypesToIgnore.add(Pattern.compile(pattern)); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, pattern); + } + } + LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, entityTypesToIgnore); + } + + if (patternEntitiesToIgnore != null) { + for (String pattern: patternEntitiesToIgnore) { + try { + this.entitiesToIgnore.add(Pattern.compile(pattern)); + } catch (Throwable t) { + LOG.warn("failed to compile pattern {}", pattern, t); + LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, pattern); + } + } + LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, entitiesToIgnore); + } + if (patternHiveTablesToIgnore != null) { for (String pattern : patternHiveTablesToIgnore) { try { @@ -1073,6 +1105,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void preprocessEntities(PreprocessorContext context) { + GenericEntityPreprocessor genericEntityPreprocessor = new GenericEntityPreprocessor(this.entityTypesToIgnore, this.entitiesToIgnore); + + List<AtlasEntity> entities = context.getEntities(); + + if (entities != null) { + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); + genericEntityPreprocessor.preprocess(entity, context); + + if (context.isIgnoredEntity(entity.getGuid())) { + entities.remove(i--); + } + } + } + + Map<String, AtlasEntity> referredEntities = context.getReferredEntities(); + + if (referredEntities != null) { + for (Iterator<Map.Entry<String, AtlasEntity>> iterator = referredEntities.entrySet().iterator(); iterator.hasNext(); ) { + AtlasEntity entity = iterator.next().getValue(); + genericEntityPreprocessor.preprocess(entity, context); + + if (context.isIgnoredEntity(entity.getGuid())) { + iterator.remove(); + } + } + } + } + private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { PreprocessorContext context = null; @@ -1081,6 +1143,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, entityCorrelationManager); + if (CollectionUtils.isNotEmpty(this.entityTypesToIgnore) || CollectionUtils.isNotEmpty(this.entitiesToIgnore)) { + preprocessEntities(context); + } + if (context.isHivePreprocessEnabled()) { preprocessHiveTypes(context); } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java new file mode 100644 index 000000000..195cc2945 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.regex.Pattern; + +public class GenericEntityPreprocessor extends EntityPreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityPreprocessor.class); + private final List<Pattern> entitiesToIgnore; + private final List<Pattern> entityTypesToIgnore; + public GenericEntityPreprocessor(List<Pattern> entityTypesToIgnore, List<Pattern> entitiesToIgnore) { + super("Generic"); + this.entityTypesToIgnore = entityTypesToIgnore; + this.entitiesToIgnore = entitiesToIgnore; + } + + private boolean isMatch(String property, List<Pattern> patterns) { + return patterns.stream().anyMatch((Pattern pattern) -> pattern.matcher(property).matches()); + } + + private boolean isToBeIgnored(AtlasEntity entity) { + String qualifiedName = getQualifiedName(entity); + boolean decision = false; + + if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will Ignore all entities whose qualified name matches the ignore pattern. + decision = isMatch(qualifiedName, this.entitiesToIgnore); + } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { // Will Ignore all entities whose type matches the regex given. + decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore); + } else { // Combination of above 2 cases. + decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore) && isMatch(qualifiedName, this.entitiesToIgnore); + } + + return decision; + } + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (entity != null && isToBeIgnored(entity)) { + context.addToIgnoredEntities(entity); + } + } +} diff --git a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java new file mode 100644 index 000000000..f777d27d7 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.hook.HookMessageDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.getQualifiedName; + + + +public class GenericEntityPreprocessorTest { + private static final Logger LOG = LoggerFactory.getLogger(GenericEntityPreprocessorTest.class); + private final HookMessageDeserializer deserializer = new HookMessageDeserializer(); + + private PreprocessorContext getPreprocessorContext(String msgJson) { + HookNotification hookNotification = deserializer.deserialize(msgJson); + + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage<>(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), false, false, true, + false, null); + + return context; + } + + private boolean isMatch(List<Pattern> patterns, String property) { + for (Pattern p : patterns) { + if (p.matcher(property).matches()) { + return true; + } + } + return false; + } + + public void testEntityTypesToIgnore(String msgJson, List<Pattern> entityTypesToIgnore) { + PreprocessorContext context = getPreprocessorContext(msgJson); + List<AtlasEntity> entities = context.getEntities(); + + Set<String> filteredEntitiesActual = filterEntity(entities, (AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName())); + + if (context.getReferredEntities() != null) { + filteredEntitiesActual.addAll(filterEntity(new ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName()))); + } + + GenericEntityPreprocessor entityPreprocessor = new GenericEntityPreprocessor(entityTypesToIgnore, Collections.emptyList()); + preprocessEntities(entityPreprocessor, context); + + Assert.assertEquals(filteredEntitiesActual, context.getIgnoredEntities()); + } + + private void preprocessEntities(GenericEntityPreprocessor genericEntityPreprocessor, PreprocessorContext context) { + List<AtlasEntity> entities = context.getEntities(); + + if (entities != null) { + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); + genericEntityPreprocessor.preprocess(entity, context); + + if (context.isIgnoredEntity(entity.getGuid())) { + entities.remove(i--); + } + } + } + + Map<String, AtlasEntity> referredEntities = context.getReferredEntities(); + + if (referredEntities != null) { + for (Iterator<Map.Entry<String, AtlasEntity>> iterator = referredEntities.entrySet().iterator(); iterator.hasNext(); ) { + AtlasEntity entity = iterator.next().getValue(); + genericEntityPreprocessor.preprocess(entity, context); + + if (context.isIgnoredEntity(entity.getGuid())) { + iterator.remove(); + } + } + } + } + + public void testEntitiesToIgnoreByQName(String msgJson, List<Pattern> entitiesToIgnore) { + PreprocessorContext context = getPreprocessorContext(msgJson); + List<AtlasEntity> entities = context.getEntities(); + + Set<String> filteredEntitiesActual = filterEntity(entities, (AtlasEntity entity) -> isMatch(entitiesToIgnore, getQualifiedName(entity))); + if (context.getReferredEntities() != null) { + filteredEntitiesActual.addAll(filterEntity(new ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> isMatch(entitiesToIgnore, getQualifiedName(entity)))); + } + + GenericEntityPreprocessor entityPreprocessor = new GenericEntityPreprocessor(Collections.emptyList(), entitiesToIgnore); + preprocessEntities(entityPreprocessor, context); + + + Assert.assertEquals(filteredEntitiesActual, context.getIgnoredEntities()); + } + + private Set<String> filterEntity(List<AtlasEntity> entities, Predicate<AtlasEntity> predicate) { + Set<String> filteredEntitiesActual = new HashSet<>(); + + if (entities != null) { + for (AtlasEntity entity: entities) { + if (predicate.test(entity)) { + filteredEntitiesActual.add(entity.getGuid()); + } + } + } + + return filteredEntitiesActual; + } + + public void testEntitiesToIgnoreByAndTypeQName(String msgJson, List<Pattern> entityTypesToIgnore, List<Pattern> entitiesToIgnore) { + PreprocessorContext context = getPreprocessorContext(msgJson); + List<AtlasEntity> entities = context.getEntities(); + + Set<String> filteredEntitiesActual = filterEntity(entities, (AtlasEntity entity) -> + isMatch(entityTypesToIgnore, entity.getTypeName()) && isMatch(entitiesToIgnore, getQualifiedName(entity))); + + if (context.getReferredEntities() != null) { + filteredEntitiesActual.addAll(filterEntity(new ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> + isMatch(entityTypesToIgnore, entity.getTypeName()) && isMatch(entitiesToIgnore, getQualifiedName(entity)))); + } + + + GenericEntityPreprocessor entityPreprocessor = new GenericEntityPreprocessor(entityTypesToIgnore, entitiesToIgnore); + preprocessEntities(entityPreprocessor, context); + + + Assert.assertEquals(filteredEntitiesActual, context.getIgnoredEntities()); + } +}