This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 8725f31  ATLAS-4348: Atlas-Kafka Hook : When a producer publishes 
messages to multiple topics, the latest relationship is marked ACTIVE , rest 
are marked DELETED
8725f31 is described below

commit 8725f31af9821907ced99a829b9e0dbbf3f6de2b
Author: Radhika Kundam <rkun...@cloudera.com>
AuthorDate: Wed Jul 14 22:12:31 2021 -0700

    ATLAS-4348: Atlas-Kafka Hook : When a producer publishes messages to 
multiple topics, the latest relationship is marked ACTIVE , rest are marked 
DELETED
    
    Signed-off-by: Sarath Subramanian <sar...@apache.org>
---
 .../019-kafka_producer_lineage_add_options.json    | 15 +++++++
 .../org/apache/atlas/type/AtlasEntityType.java     | 28 ++++++++++---
 .../org/apache/atlas/TestRelationshipUtilsV2.java  |  2 +
 .../store/graph/v2/EntityGraphMapper.java          | 49 +++++++++++++++++++---
 .../store/graph/v2/AtlasEntityTestBase.java        |  2 +-
 .../graph/v2/AtlasRelationshipStoreV2Test.java     | 33 +++++++++++++++
 6 files changed, 117 insertions(+), 12 deletions(-)

diff --git 
a/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json 
b/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json
new file mode 100644
index 0000000..bff4bd5
--- /dev/null
+++ 
b/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json
@@ -0,0 +1,15 @@
+{
+  "patches": [
+    {
+      "id": "TYPEDEF_PATCH_1000_019_001",
+      "description": "Add 'appendRelationshipsOnPartialUpdate' typeDefOptions 
to kafka_producer_lineage",
+      "action": "UPDATE_TYPEDEF_OPTIONS",
+      "typeName": "kafka_producer_lineage",
+      "applyToVersion": "1.0",
+      "updateToVersion": "1.1",
+      "typeDefOptions": {
+        "appendRelationshipsOnPartialUpdate": "[\"outputs\"]"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index ded6d63..6d83599 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -65,11 +65,12 @@ public class AtlasEntityType extends AtlasStructType {
     private static final String OPTION_SCHEMA_ATTRIBUTES     = 
"schemaAttributes";
     private static final String INTERNAL_TYPENAME            = "__internal";
 
-    private static final char   NS_ATTRIBUTE_NAME_SEPARATOR  = '.';
+    public static final String OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE = 
"appendRelationshipsOnPartialUpdate";
 
-    private static final char DYN_ATTRIBUTE_NAME_SEPARATOR   = '.';
-    private static final char DYN_ATTRIBUTE_OPEN_DELIM       = '{';
-    private static final char DYN_ATTRIBUTE_CLOSE_DELIM      = '}';
+    private static final char NS_ATTRIBUTE_NAME_SEPARATOR  = '.';
+    private static final char DYN_ATTRIBUTE_NAME_SEPARATOR = '.';
+    private static final char DYN_ATTRIBUTE_OPEN_DELIM     = '{';
+    private static final char DYN_ATTRIBUTE_CLOSE_DELIM    = '}';
 
     private static final String[] ENTITY_HEADER_ATTRIBUTES = new 
String[]{NAME, DESCRIPTION, OWNER, CREATE_TIME};
     private static final String   ENTITY_ROOT_NAME         = "__ENTITY_ROOT";
@@ -326,7 +327,11 @@ public class AtlasEntityType extends AtlasStructType {
                 String         relationshipType = relationsEntry.getKey();
                 AtlasAttribute relationshipAttr = relationsEntry.getValue();
 
-                relationshipAttrDefs.add(new 
AtlasRelationshipAttributeDef(relationshipType, 
relationshipAttr.isLegacyAttribute(), relationshipAttr.getAttributeDef()));
+                AtlasRelationshipAttributeDef relationshipAttributeDef = new 
AtlasRelationshipAttributeDef(relationshipType, 
relationshipAttr.isLegacyAttribute(), relationshipAttr.getAttributeDef());
+
+                
updateRelationshipAttrDefForPartialUpdate(relationshipAttributeDef, entityDef);
+
+                relationshipAttrDefs.add(relationshipAttributeDef);
             }
         }
 
@@ -369,6 +374,19 @@ public class AtlasEntityType extends AtlasStructType {
         }
     }
 
+    private void 
updateRelationshipAttrDefForPartialUpdate(AtlasRelationshipAttributeDef 
relationshipAttributeDef, AtlasEntityDef entityDef) {
+        String appendRelationshipsOnPartialUpdate = 
entityDef.getOption(OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE);
+        String relationshipAttributeName          = 
relationshipAttributeDef.getName();
+
+        if (StringUtils.isNotEmpty(appendRelationshipsOnPartialUpdate)) {
+            Set<String> relationshipTypesToAppend = 
AtlasType.fromJson(appendRelationshipsOnPartialUpdate, Set.class);
+
+            if (CollectionUtils.isNotEmpty(relationshipTypesToAppend) && 
relationshipTypesToAppend.contains(relationshipAttributeName)) {
+                
relationshipAttributeDef.setOption(AtlasAttributeDef.ATTRDEF_OPTION_APPEND_ON_PARTIAL_UPDATE,
 Boolean.toString(true));
+            }
+        }
+    }
+
     @Override
     public AtlasAttribute getSystemAttribute(String attributeName) {
         return AtlasEntityType.ENTITY_ROOT.allAttributes.get(attributeName);
diff --git a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java 
b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
index e276a42..65fa62f 100755
--- a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
@@ -32,6 +32,7 @@ import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -97,6 +98,7 @@ public final class TestRelationshipUtilsV2 {
                                                         
createOptionalAttrDef("orgLevel", ORG_LEVEL_TYPE),
                                                         
createOptionalAttrDef("shares", "long"),
                                                         
createOptionalAttrDef("salary", "double"));
+        
employeeType.setOption(AtlasEntityType.OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE,
 "[\"friends\"]");
         /******* Department Type *******/
         AtlasEntityDef departmentType = createClassTypeDef(DEPARTMENT_TYPE, 
description(DEPARTMENT_TYPE), superType(null),
                                                         
createUniqueRequiredAttrDef("name", "string"));
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index b51cbfc..6d8305a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -37,6 +37,8 @@ import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import 
org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
 import org.apache.atlas.repository.Constants;
@@ -1392,6 +1394,7 @@ public class EntityGraphMapper {
         AtlasAttribute inverseRefAttribute = 
attribute.getInverseRefAttribute();
         Cardinality    cardinality         = 
attribute.getAttributeDef().getCardinality();
         List<Object>   newElementsCreated  = new ArrayList<>();
+        List<Object>   allArrayElements    = null;
         List<Object>   currentElements;
 
         if (isReference && !isSoftReference) {
@@ -1442,13 +1445,22 @@ public class EntityGraphMapper {
         }
 
         if (isReference && !isSoftReference) {
-            List<AtlasEdge> additionalEdges = 
removeUnusedArrayEntries(attribute, (List) currentElements, (List) 
newElementsCreated, ctx.getReferringVertex());
-            newElementsCreated.addAll(additionalEdges);
+            boolean isAppendOnPartialUpdate = 
getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
+
+            if (isAppendOnPartialUpdate) {
+                allArrayElements = unionCurrentAndNewElements(attribute, 
(List) currentElements, (List) newElementsCreated);
+            } else {
+                List<AtlasEdge> activeCurrentElements = 
removeUnusedArrayEntries(attribute, (List) currentElements, (List) 
newElementsCreated, ctx.getReferringVertex());
+
+                allArrayElements = unionCurrentAndNewElements(attribute, 
activeCurrentElements, (List) newElementsCreated);
+            }
+        } else {
+            allArrayElements = newElementsCreated;
         }
 
         // add index to attributes of array type
-       for (int index = 0; index < newElementsCreated.size(); index++) {
-           Object element = newElementsCreated.get(index);
+       for (int index = 0; allArrayElements != null && index < 
allArrayElements.size(); index++) {
+           Object element = allArrayElements.get(index);
 
            if (element instanceof AtlasEdge) {
                AtlasGraphUtilsV2.setEncodedProperty((AtlasEdge) element, 
ATTRIBUTE_INDEX_PROPERTY_KEY, index);
@@ -1458,14 +1470,28 @@ public class EntityGraphMapper {
         if (isNewElementsNull) {
             setArrayElementsProperty(elementType, isSoftReference, 
ctx.getReferringVertex(), ctx.getVertexProperty(), null);
         } else {
-            setArrayElementsProperty(elementType, isSoftReference, 
ctx.getReferringVertex(), ctx.getVertexProperty(), newElementsCreated);
+            setArrayElementsProperty(elementType, isSoftReference, 
ctx.getReferringVertex(), ctx.getVertexProperty(), allArrayElements);
         }
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== mapArrayValue({})", ctx);
         }
 
-        return newElementsCreated;
+        return allArrayElements;
+    }
+
+    private boolean getAppendOptionForRelationship(AtlasVertex entityVertex, 
String relationshipAttributeName) {
+        boolean                             ret                       = false;
+        String                              entityTypeName            = 
AtlasGraphUtilsV2.getTypeName(entityVertex);
+        AtlasEntityDef                      entityDef                 = 
typeRegistry.getEntityDefByName(entityTypeName);
+        List<AtlasRelationshipAttributeDef> relationshipAttributeDefs = 
entityDef.getRelationshipAttributeDefs();
+
+        if (CollectionUtils.isNotEmpty(relationshipAttributeDefs)) {
+            ret = 
relationshipAttributeDefs.stream().anyMatch(relationshipAttrDef -> 
relationshipAttrDef.getName().equals(relationshipAttributeName)
+                    && relationshipAttrDef.isAppendOnPartialUpdate());
+        }
+
+        return ret;
     }
 
     private AtlasEdge createVertex(AtlasStruct struct, AtlasVertex 
referringVertex, String edgeLabel, EntityMutationContext context) throws 
AtlasBaseException {
@@ -1848,6 +1874,17 @@ public class EntityGraphMapper {
         return ret;
     }
 
+    private List<AtlasEdge> unionCurrentAndNewElements(AtlasAttribute 
attribute, List<AtlasEdge> currentElements, List<AtlasEdge> newElements) {
+        Collection<AtlasEdge> ret              = null;
+        AtlasType             arrayElementType = ((AtlasArrayType) 
attribute.getAttributeType()).getElementType();
+
+        if (arrayElementType != null && isReference(arrayElementType)) {
+            ret = CollectionUtils.union(currentElements, newElements);
+        }
+
+        return CollectionUtils.isNotEmpty(ret) ? new ArrayList<>(ret) : 
Collections.emptyList();
+    }
+
     //Removes unused edges from the old collection, compared to the new 
collection
 
     private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, 
List<AtlasEdge> currentEntries, List<AtlasEdge> newEntries, AtlasVertex 
entityVertex) throws AtlasBaseException {
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
index 397e2ab..0bc91a7 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
@@ -210,7 +210,7 @@ public class AtlasEntityTestBase extends AtlasTestBase {
                     Assert.assertTrue(actualList.size() >= 
expectedList.size());
 
                     for (int i = 0; i < expectedList.size(); i++) {
-                        validateAttribute(entityExtInfo, actualList.get(i), 
expectedList.get(i), elemType, attrName);
+                        
Assert.assertTrue(actualList.contains(expectedList.get(i)));
                     }
                 }
                 break;
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
index 9099924..8bb7d09 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
@@ -51,6 +51,7 @@ import org.testng.annotations.Test;
 
 import javax.inject.Inject;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -560,6 +561,38 @@ public abstract class AtlasRelationshipStoreV2Test extends 
AtlasTestBase {
         verifyRelationshipAttributeList(a2Entity, "manyB", 
ImmutableList.of(getAtlasObjectId(b1)));
     }
 
+    @Test
+    public void testRelationshipAttributeOnPartialUpdate() throws Exception {
+        AtlasObjectId maxId = employeeNameIdMap.get("Max");
+        AtlasObjectId janeId = employeeNameIdMap.get("Jane");
+        AtlasObjectId mikeId = employeeNameIdMap.get("Mike");
+        AtlasObjectId johnId = employeeNameIdMap.get("John");
+
+        // Partial Update Max's Employee.friends reference with Jane and apply 
the change as a partial update.
+        // This should also update friends list of Max and Jane.
+        AtlasEntity maxEntityForUpdate = new AtlasEntity(EMPLOYEE_TYPE);
+        maxEntityForUpdate.setRelationshipAttribute("friends", 
Arrays.asList(janeId));
+
+        AtlasEntityType employeeType = 
typeRegistry.getEntityTypeByName(EMPLOYEE_TYPE);
+        Map<String, Object> uniqAttributes = Collections.<String, 
Object>singletonMap("name", "Max");
+
+        init();
+        EntityMutationResponse updateResponse = 
entityStore.updateByUniqueAttributes(employeeType, uniqAttributes, new 
AtlasEntityWithExtInfo(maxEntityForUpdate));
+
+        List<AtlasEntityHeader> partialUpdatedEntities = 
updateResponse.getPartialUpdatedEntities();
+        assertEquals(partialUpdatedEntities.size(), 2);
+        // 2 entities should have been updated:
+        // * Max to add  Employee.friends reference
+        // * Jane to add Max from Employee.friends
+        AtlasEntitiesWithExtInfo updatedEntities = 
entityStore.getByIds(ImmutableList.of(maxId.getGuid(), janeId.getGuid()));
+
+        AtlasEntity maxEntity = updatedEntities.getEntity(maxId.getGuid());
+        verifyRelationshipAttributeList(maxEntity, "friends", 
ImmutableList.of(mikeId, johnId, janeId));
+
+        AtlasEntity janeEntity = updatedEntities.getEntity(janeId.getGuid());
+        verifyRelationshipAttributeList(janeEntity, "friends", 
ImmutableList.of(maxId));
+    }
+
     protected abstract void 
verifyRelationshipAttributeUpdate_NonComposite_OneToOne(AtlasEntity a1, 
AtlasEntity b);
 
     protected abstract void 
verifyRelationshipAttributeUpdate_NonComposite_OneToMany(AtlasEntity entity) 
throws Exception;

Reply via email to