This is an automated email from the ASF dual-hosted git repository.

chaitalithombare pushed a commit to branch ATLAS-4984
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 72e787f91eee8efb7c69da2ef0b79f154b7ca85f
Author: chaitalithombare <chaitalithomb...@apache.org>
AuthorDate: Wed Apr 16 13:02:14 2025 +0530

    ATLAS-4984: Option to ignore spark_process attributes details and 
sparkPlanDescription
---
 .../notification/NotificationHookConsumer.java     |  26 +++++-
 .../preprocessor/EntityPreprocessor.java           |  16 ++++
 .../preprocessor/SparkPreprocessor.java            |  38 ++++++++
 .../preprocessor/SparkPreprocessorTest.java        | 102 +++++++++++++++++++++
 4 files changed, 180 insertions(+), 2 deletions(-)

diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 49d378f85..a0d39ae90 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -156,6 +156,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String 
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS          = 
"atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
     public static final String 
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = 
"atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
     public static final String 
CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX       = 
"atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
+    public static final String CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES    
              = 
"atlas.notification.consumer.preprocess.spark_process.attributes";
     public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER           
              = "atlas.notification.authorize.using.message.user";
     public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS      
              = "atlas.notification.authorize.authn.cache.ttl.seconds";
     public static final int    SERVER_READY_WAIT_TIME_MS                       
              = 1000;
@@ -202,6 +203,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final boolean                       preprocessEnabled;
     private final boolean                       
createShellEntityForNonExistingReference;
     private final boolean                       authorizeUsingMessageUser;
+    private final boolean                       sparkProcessAttributes;
+
     private final Map<String, Authentication>   authnCache;
     private final NotificationInterface     notificationInterface;
     private final Configuration             applicationProperties;
@@ -355,8 +358,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         hiveTypesRemoveOwnedRefAttrs   = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
         rdbmsTypesRemoveOwnedRefAttrs  = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
         s3V2DirectoryPruneObjectPrefix = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX,
 true);
-
-        preprocessEnabled        = skipHiveColumnLineageHive20633 || 
updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || 
rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || 
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || 
!hiveTablePrefixesToIgnore.isEmpty();
+        sparkProcessAttributes      = 
this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES,
 false);
+        preprocessEnabled        = skipHiveColumnLineageHive20633 || 
updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || 
rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || 
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || 
!hiveTablePrefixesToIgnore.isEmpty() || sparkProcessAttributes;
         entityCorrelationManager = new 
EntityCorrelationManager(entityCorrelationStore);
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
@@ -591,6 +594,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 pruneObjectPrefixForS3V2Directory(context);
             }
 
+            if (sparkProcessAttributes) {
+                preprocessSparkProcessAttributes(context);
+            }
+
             context.moveRegisteredReferredEntities();
 
             if (context.isHivePreprocessEnabled() && 
CollectionUtils.isNotEmpty(context.getEntities()) && 
context.getEntities().size() > 1) {
@@ -700,6 +707,21 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
+    private void preprocessSparkProcessAttributes(PreprocessorContext context) 
{
+        List<AtlasEntity> entities = context.getEntities();
+
+        if (entities != null) {
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity entity = entities.get(i);
+                EntityPreprocessor preprocessor = 
EntityPreprocessor.getSparkPreprocessor(entity.getTypeName());
+
+                if (preprocessor != null) {
+                    preprocessor.preprocess(entity, context);
+                }
+            }
+        }
+    }
+
     private void skipHiveColumnLineage(PreprocessorContext context) {
         List<AtlasEntity> entities = context.getEntities();
 
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 9cab8eda7..b801615f4 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -34,6 +34,7 @@ public abstract class EntityPreprocessor {
     public static final String TYPE_HIVE_DB_DDL         = "hive_db_ddl";
     public static final String TYPE_HIVE_TABLE_DDL      = "hive_table_ddl";
     public static final String TYPE_HIVE_TABLE          = "hive_table";
+    public static final String TYPE_SPARK_PROCESS       = "spark_process";
     public static final String TYPE_RDBMS_INSTANCE      = "rdbms_instance";
     public static final String TYPE_RDBMS_DB            = "rdbms_db";
     public static final String TYPE_RDBMS_TABLE         = "rdbms_table";
@@ -42,6 +43,8 @@ public abstract class EntityPreprocessor {
     public static final String TYPE_RDBMS_FOREIGN_KEY   = "rdbms_foreign_key";
 
     public static final String ATTRIBUTE_COLUMNS        = "columns";
+    public static final String ATTRIBUTE_DETAILS        = "details";
+    public static final String ATTRIBUTE_SPARKPLANDESCRIPTION = 
"sparkPlanDescription";
     public static final String ATTRIBUTE_INPUTS         = "inputs";
     public static final String ATTRIBUTE_OUTPUTS        = "outputs";
     public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
@@ -66,6 +69,7 @@ public abstract class EntityPreprocessor {
     private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP 
     = new HashMap<>();
     private static final Map<String, EntityPreprocessor> 
RDBMS_PREPROCESSOR_MAP     = new HashMap<>();
     private static final Map<String, EntityPreprocessor> 
AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>();
+    private static final Map<String, EntityPreprocessor> 
SPARK_PREPROCESSOR_MAP     = new HashMap<>();
 
     private final String typeName;
 
@@ -85,6 +89,10 @@ public abstract class EntityPreprocessor {
         return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) : 
null;
     }
 
+    public static EntityPreprocessor getSparkPreprocessor(String typeName) {
+        return typeName != null ? SPARK_PREPROCESSOR_MAP.get(typeName) : null;
+    }
+
     public static String getQualifiedName(AtlasEntity entity) {
         Object obj = entity != null ? 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
 
@@ -175,6 +183,10 @@ public abstract class EntityPreprocessor {
                 new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor()
         };
 
+        EntityPreprocessor[] sparkPreprocessors = new EntityPreprocessor[]{
+                new SparkPreprocessor.SparkProcessPreprocessor()
+        };
+
         for (EntityPreprocessor preprocessor : hivePreprocessors) {
             HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), 
preprocessor);
         }
@@ -186,5 +198,9 @@ public abstract class EntityPreprocessor {
         for (EntityPreprocessor preprocessor : s3V2Preprocessors) {
             AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), 
preprocessor);
         }
+
+        for (EntityPreprocessor preprocessor : sparkPreprocessors) {
+            SPARK_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), 
preprocessor);
+        }
     }
 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/SparkPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/SparkPreprocessor.java
new file mode 100644
index 000000000..266ce3790
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/SparkPreprocessor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkPreprocessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkPreprocessor.class);
+    static class SparkProcessPreprocessor extends EntityPreprocessor {
+        public SparkProcessPreprocessor() {
+            super(TYPE_SPARK_PROCESS);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            entity.removeAttribute(ATTRIBUTE_DETAILS);
+            entity.removeAttribute(ATTRIBUTE_SPARKPLANDESCRIPTION);
+        }
+    }
+}
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/SparkPreprocessorTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/SparkPreprocessorTest.java
new file mode 100644
index 000000000..ec8927364
--- /dev/null
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/SparkPreprocessorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.hook.HookMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+
+import java.util.regex.Pattern;
+
+import static org.testng.Assert.assertEquals;
+
+public class SparkPreprocessorTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkPreprocessorTest.class);
+    private final HookMessageDeserializer deserializer = new 
HookMessageDeserializer();
+    public static final String TYPE_SPARK_PROCESS = "spark_process";
+    public static final String ATTRIBUTE_DETAILS = "details";
+    public static final String ATTRIBUTE_SPARKPLANDESCRIPTION = 
"sparkPlanDescription";
+    public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+    public static final String ATTRIBUTE_NAME = "name";
+    public static final String ATTRIBUTE_ISINCOMPLETE = "isIncomplete";
+    public static final String ATTRIBUTE_REMOTEUSER = "remoteUser";
+    public static final String ATTRIBUTE_EXECUTIONID = "executionId";
+    public static final String ATTRIBUTE_QUERYTEXT = "queryText";
+    public static final String ATTRIBUTE_CURRUSER = "currUser";
+    public static final String ATTRIBUTE_GUID = "guid";
+    private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
+    private static final List<Pattern> EMPTY_PATTERN_LIST = new ArrayList<>();
+
+    private void getPreprocessorContext(AtlasEntity entity) {
+        EntityPreprocessor preprocessor = 
EntityPreprocessor.getSparkPreprocessor(entity.getTypeName());
+        HookNotification hookNotification = new 
HookNotification.EntityCreateRequestV2("test", new 
AtlasEntity.AtlasEntitiesWithExtInfo(entity));
+
+        AtlasKafkaMessage<HookNotification> kafkaMsg = new 
AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
+
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, 
EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
+                EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
+                false, true, false, null);
+        if (preprocessor != null) {
+            preprocessor.preprocess(entity, context);
+        }
+    }
+
+    public Object[][] provideSparkProcessData() {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(ATTRIBUTE_NAME, "execution-1");
+        attributes.put(ATTRIBUTE_QUALIFIED_NAME, 
"application_1740993925593_0006-execution-1sparkTab1");
+        attributes.put(ATTRIBUTE_DETAILS, "== Parsed Logical Plan 
==\\nCreateHiveTableAsSelectCommand ...");
+        attributes.put(ATTRIBUTE_SPARKPLANDESCRIPTION, "Execute 
CreateHiveTableAsSelectCommand ...");
+        attributes.put(ATTRIBUTE_GUID, "-32055574130361399");
+        attributes.put(ATTRIBUTE_ISINCOMPLETE, "false");
+        attributes.put(ATTRIBUTE_REMOTEUSER, "spark");
+        attributes.put(ATTRIBUTE_EXECUTIONID, 1);
+        attributes.put(ATTRIBUTE_QUERYTEXT, null);
+        attributes.put(ATTRIBUTE_CURRUSER, "spark");
+
+        return new Object[][] { { attributes } };
+    }
+
+    @Test
+    public void replaceAttributesInSparkProcess() {
+        Object[][] testData = provideSparkProcessData();
+
+        for (Object[] data : testData) {
+            Map<String, Object> attributes = (Map<String, Object>) data[0]; // 
Extract attributes
+
+            AtlasEntity atlasEntity = new AtlasEntity();
+            atlasEntity.setTypeName(TYPE_SPARK_PROCESS);
+            attributes.forEach(atlasEntity::setAttribute);
+            getPreprocessorContext(atlasEntity);
+
+            assertEquals(atlasEntity.getAttribute(ATTRIBUTE_DETAILS), null);
+            
assertEquals(atlasEntity.getAttribute(ATTRIBUTE_SPARKPLANDESCRIPTION), null);
+        }
+    }
+}

Reply via email to