This is an automated email from the ASF dual-hosted git repository. chaitalithombare pushed a commit to branch ATLAS-4984 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 72e787f91eee8efb7c69da2ef0b79f154b7ca85f Author: chaitalithombare <chaitalithomb...@apache.org> AuthorDate: Wed Apr 16 13:02:14 2025 +0530 ATLAS-4984: Option to ignore spark_process attributes details and sparkPlanDescription --- .../notification/NotificationHookConsumer.java | 26 +++++- .../preprocessor/EntityPreprocessor.java | 16 ++++ .../preprocessor/SparkPreprocessor.java | 38 ++++++++ .../preprocessor/SparkPreprocessorTest.java | 102 +++++++++++++++++++++ 4 files changed, 180 insertions(+), 2 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 49d378f85..a0d39ae90 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -156,6 +156,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs"; public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix"; + public static final String CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES = "atlas.notification.consumer.preprocess.spark_process.attributes"; public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user"; public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; @@ -202,6 +203,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final boolean preprocessEnabled; private final boolean createShellEntityForNonExistingReference; private final boolean authorizeUsingMessageUser; + private final boolean sparkProcessAttributes; + private final Map<String, Authentication> authnCache; private final NotificationInterface notificationInterface; private final Configuration applicationProperties; @@ -355,8 +358,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true); rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); s3V2DirectoryPruneObjectPrefix = applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true); - - preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty(); + sparkProcessAttributes = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES, false); + preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || sparkProcessAttributes; entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); @@ -591,6 +594,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl pruneObjectPrefixForS3V2Directory(context); } + if (sparkProcessAttributes) { + preprocessSparkProcessAttributes(context); + } + context.moveRegisteredReferredEntities(); if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) { @@ -700,6 +707,21 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void preprocessSparkProcessAttributes(PreprocessorContext context) { + List<AtlasEntity> entities = context.getEntities(); + + if (entities != null) { + for (int i = 0; i < entities.size(); i++) { + AtlasEntity entity = entities.get(i); + EntityPreprocessor preprocessor = EntityPreprocessor.getSparkPreprocessor(entity.getTypeName()); + + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + } + } + } + } + private void skipHiveColumnLineage(PreprocessorContext context) { List<AtlasEntity> entities = context.getEntities(); diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index 9cab8eda7..b801615f4 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -34,6 +34,7 @@ public abstract class EntityPreprocessor { public static final String TYPE_HIVE_DB_DDL = "hive_db_ddl"; public static final String TYPE_HIVE_TABLE_DDL = "hive_table_ddl"; public static final String TYPE_HIVE_TABLE = "hive_table"; + public static final String TYPE_SPARK_PROCESS = "spark_process"; public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance"; public static final String TYPE_RDBMS_DB = "rdbms_db"; public static final String TYPE_RDBMS_TABLE = "rdbms_table"; @@ -42,6 +43,8 @@ public abstract class EntityPreprocessor { public static final String TYPE_RDBMS_FOREIGN_KEY = "rdbms_foreign_key"; public static final String ATTRIBUTE_COLUMNS = "columns"; + public static final String ATTRIBUTE_DETAILS = "details"; + public static final String ATTRIBUTE_SPARKPLANDESCRIPTION = "sparkPlanDescription"; public static final String ATTRIBUTE_INPUTS = "inputs"; public static final String ATTRIBUTE_OUTPUTS = "outputs"; public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys"; @@ -66,6 +69,7 @@ public abstract class EntityPreprocessor { private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>(); private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>(); private static final Map<String, EntityPreprocessor> AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> SPARK_PREPROCESSOR_MAP = new HashMap<>(); private final String typeName; @@ -85,6 +89,10 @@ public abstract class EntityPreprocessor { return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) : null; } + public static EntityPreprocessor getSparkPreprocessor(String typeName) { + return typeName != null ? SPARK_PREPROCESSOR_MAP.get(typeName) : null; + } + public static String getQualifiedName(AtlasEntity entity) { Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null; @@ -175,6 +183,10 @@ public abstract class EntityPreprocessor { new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor() }; + EntityPreprocessor[] sparkPreprocessors = new EntityPreprocessor[]{ + new SparkPreprocessor.SparkProcessPreprocessor() + }; + for (EntityPreprocessor preprocessor : hivePreprocessors) { HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); } @@ -186,5 +198,9 @@ public abstract class EntityPreprocessor { for (EntityPreprocessor preprocessor : s3V2Preprocessors) { AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); } + + for (EntityPreprocessor preprocessor : sparkPreprocessors) { + SPARK_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/SparkPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/SparkPreprocessor.java new file mode 100644 index 000000000..266ce3790 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/SparkPreprocessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkPreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(SparkPreprocessor.class); + static class SparkProcessPreprocessor extends EntityPreprocessor { + public SparkProcessPreprocessor() { + super(TYPE_SPARK_PROCESS); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + entity.removeAttribute(ATTRIBUTE_DETAILS); + entity.removeAttribute(ATTRIBUTE_SPARKPLANDESCRIPTION); + } + } +} diff --git a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/SparkPreprocessorTest.java b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/SparkPreprocessorTest.java new file mode 100644 index 000000000..ec8927364 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/SparkPreprocessorTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.hook.HookMessageDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; + +import java.util.regex.Pattern; + +import static org.testng.Assert.assertEquals; + +public class SparkPreprocessorTest { + private static final Logger LOG = LoggerFactory.getLogger(SparkPreprocessorTest.class); + private final HookMessageDeserializer deserializer = new HookMessageDeserializer(); + public static final String TYPE_SPARK_PROCESS = "spark_process"; + public static final String ATTRIBUTE_DETAILS = "details"; + public static final String ATTRIBUTE_SPARKPLANDESCRIPTION = "sparkPlanDescription"; + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTRIBUTE_NAME = "name"; + public static final String ATTRIBUTE_ISINCOMPLETE = "isIncomplete"; + public static final String ATTRIBUTE_REMOTEUSER = "remoteUser"; + public static final String ATTRIBUTE_EXECUTIONID = "executionId"; + public static final String ATTRIBUTE_QUERYTEXT = "queryText"; + public static final String ATTRIBUTE_CURRUSER = "currUser"; + public static final String ATTRIBUTE_GUID = "guid"; + private static final List<String> EMPTY_STR_LIST = new ArrayList<>(); + private static final List<Pattern> EMPTY_PATTERN_LIST = new ArrayList<>(); + + private void getPreprocessorContext(AtlasEntity entity) { + EntityPreprocessor preprocessor = EntityPreprocessor.getSparkPreprocessor(entity.getTypeName()); + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + } + } + + public Object[][] provideSparkProcessData() { + Map<String, Object> attributes = new HashMap<>(); + attributes.put(ATTRIBUTE_NAME, "execution-1"); + attributes.put(ATTRIBUTE_QUALIFIED_NAME, "application_1740993925593_0006-execution-1sparkTab1"); + attributes.put(ATTRIBUTE_DETAILS, "== Parsed Logical Plan ==\\nCreateHiveTableAsSelectCommand ..."); + attributes.put(ATTRIBUTE_SPARKPLANDESCRIPTION, "Execute CreateHiveTableAsSelectCommand ..."); + attributes.put(ATTRIBUTE_GUID, "-32055574130361399"); + attributes.put(ATTRIBUTE_ISINCOMPLETE, "false"); + attributes.put(ATTRIBUTE_REMOTEUSER, "spark"); + attributes.put(ATTRIBUTE_EXECUTIONID, 1); + attributes.put(ATTRIBUTE_QUERYTEXT, null); + attributes.put(ATTRIBUTE_CURRUSER, "spark"); + + return new Object[][] { { attributes } }; + } + + @Test + public void replaceAttributesInSparkProcess() { + Object[][] testData = provideSparkProcessData(); + + for (Object[] data : testData) { + Map<String, Object> attributes = (Map<String, Object>) data[0]; // Extract attributes + + AtlasEntity atlasEntity = new AtlasEntity(); + atlasEntity.setTypeName(TYPE_SPARK_PROCESS); + attributes.forEach(atlasEntity::setAttribute); + getPreprocessorContext(atlasEntity); + + assertEquals(atlasEntity.getAttribute(ATTRIBUTE_DETAILS), null); + assertEquals(atlasEntity.getAttribute(ATTRIBUTE_SPARKPLANDESCRIPTION), null); + } + } +}