This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 8fb29d3 ATLAS-4348: Atlas-Kafka Hook : When a producer publishes messages to multiple topics, the latest relationship is marked ACTIVE , rest are marked DELETED 8fb29d3 is described below commit 8fb29d30baf9328687916cb6cdf4981a7915d805 Author: Radhika Kundam <rkun...@cloudera.com> AuthorDate: Wed Jul 14 22:12:31 2021 -0700 ATLAS-4348: Atlas-Kafka Hook : When a producer publishes messages to multiple topics, the latest relationship is marked ACTIVE , rest are marked DELETED Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit 8725f31af9821907ced99a829b9e0dbbf3f6de2b) --- .../019-kafka_producer_lineage_add_options.json | 15 +++++++ .../org/apache/atlas/type/AtlasEntityType.java | 28 ++++++++++--- .../org/apache/atlas/TestRelationshipUtilsV2.java | 2 + .../store/graph/v2/EntityGraphMapper.java | 49 +++++++++++++++++++--- .../store/graph/v2/AtlasEntityTestBase.java | 2 +- .../graph/v2/AtlasRelationshipStoreV2Test.java | 33 +++++++++++++++ 6 files changed, 117 insertions(+), 12 deletions(-) diff --git a/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json b/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json new file mode 100644 index 0000000..bff4bd5 --- /dev/null +++ b/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json @@ -0,0 +1,15 @@ +{ + "patches": [ + { + "id": "TYPEDEF_PATCH_1000_019_001", + "description": "Add 'appendRelationshipsOnPartialUpdate' typeDefOptions to kafka_producer_lineage", + "action": "UPDATE_TYPEDEF_OPTIONS", + "typeName": "kafka_producer_lineage", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "typeDefOptions": { + "appendRelationshipsOnPartialUpdate": "[\"outputs\"]" + } + } + ] +} \ No newline at end of file diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java index ded6d63..6d83599 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java @@ -65,11 +65,12 @@ public class AtlasEntityType extends AtlasStructType { private static final String OPTION_SCHEMA_ATTRIBUTES = "schemaAttributes"; private static final String INTERNAL_TYPENAME = "__internal"; - private static final char NS_ATTRIBUTE_NAME_SEPARATOR = '.'; + public static final String OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE = "appendRelationshipsOnPartialUpdate"; - private static final char DYN_ATTRIBUTE_NAME_SEPARATOR = '.'; - private static final char DYN_ATTRIBUTE_OPEN_DELIM = '{'; - private static final char DYN_ATTRIBUTE_CLOSE_DELIM = '}'; + private static final char NS_ATTRIBUTE_NAME_SEPARATOR = '.'; + private static final char DYN_ATTRIBUTE_NAME_SEPARATOR = '.'; + private static final char DYN_ATTRIBUTE_OPEN_DELIM = '{'; + private static final char DYN_ATTRIBUTE_CLOSE_DELIM = '}'; private static final String[] ENTITY_HEADER_ATTRIBUTES = new String[]{NAME, DESCRIPTION, OWNER, CREATE_TIME}; private static final String ENTITY_ROOT_NAME = "__ENTITY_ROOT"; @@ -326,7 +327,11 @@ public class AtlasEntityType extends AtlasStructType { String relationshipType = relationsEntry.getKey(); AtlasAttribute relationshipAttr = relationsEntry.getValue(); - relationshipAttrDefs.add(new AtlasRelationshipAttributeDef(relationshipType, relationshipAttr.isLegacyAttribute(), relationshipAttr.getAttributeDef())); + AtlasRelationshipAttributeDef relationshipAttributeDef = new AtlasRelationshipAttributeDef(relationshipType, relationshipAttr.isLegacyAttribute(), relationshipAttr.getAttributeDef()); + + updateRelationshipAttrDefForPartialUpdate(relationshipAttributeDef, entityDef); + + relationshipAttrDefs.add(relationshipAttributeDef); } } @@ -369,6 +374,19 @@ public class AtlasEntityType extends AtlasStructType { } } + private void updateRelationshipAttrDefForPartialUpdate(AtlasRelationshipAttributeDef relationshipAttributeDef, AtlasEntityDef entityDef) { + String appendRelationshipsOnPartialUpdate = entityDef.getOption(OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE); + String relationshipAttributeName = relationshipAttributeDef.getName(); + + if (StringUtils.isNotEmpty(appendRelationshipsOnPartialUpdate)) { + Set<String> relationshipTypesToAppend = AtlasType.fromJson(appendRelationshipsOnPartialUpdate, Set.class); + + if (CollectionUtils.isNotEmpty(relationshipTypesToAppend) && relationshipTypesToAppend.contains(relationshipAttributeName)) { + relationshipAttributeDef.setOption(AtlasAttributeDef.ATTRDEF_OPTION_APPEND_ON_PARTIAL_UPDATE, Boolean.toString(true)); + } + } + } + @Override public AtlasAttribute getSystemAttribute(String attributeName) { return AtlasEntityType.ENTITY_ROOT.allAttributes.get(attributeName); diff --git a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java index e276a42..65fa62f 100755 --- a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java +++ b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java @@ -32,6 +32,7 @@ import org.apache.atlas.model.typedef.AtlasRelationshipDef; import org.apache.atlas.model.typedef.AtlasRelationshipEndDef; import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasType; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; @@ -97,6 +98,7 @@ public final class TestRelationshipUtilsV2 { createOptionalAttrDef("orgLevel", ORG_LEVEL_TYPE), createOptionalAttrDef("shares", "long"), createOptionalAttrDef("salary", "double")); + employeeType.setOption(AtlasEntityType.OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE, "[\"friends\"]"); /******* Department Type *******/ AtlasEntityDef departmentType = createClassTypeDef(DEPARTMENT_TYPE, description(DEPARTMENT_TYPE), superType(null), createUniqueRequiredAttrDef("name", "string")); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index b51cbfc..6d8305a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -37,6 +37,8 @@ import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality; import org.apache.atlas.repository.Constants; @@ -1392,6 +1394,7 @@ public class EntityGraphMapper { AtlasAttribute inverseRefAttribute = attribute.getInverseRefAttribute(); Cardinality cardinality = attribute.getAttributeDef().getCardinality(); List<Object> newElementsCreated = new ArrayList<>(); + List<Object> allArrayElements = null; List<Object> currentElements; if (isReference && !isSoftReference) { @@ -1442,13 +1445,22 @@ public class EntityGraphMapper { } if (isReference && !isSoftReference) { - List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(attribute, (List) currentElements, (List) newElementsCreated, ctx.getReferringVertex()); - newElementsCreated.addAll(additionalEdges); + boolean isAppendOnPartialUpdate = getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName()); + + if (isAppendOnPartialUpdate) { + allArrayElements = unionCurrentAndNewElements(attribute, (List) currentElements, (List) newElementsCreated); + } else { + List<AtlasEdge> activeCurrentElements = removeUnusedArrayEntries(attribute, (List) currentElements, (List) newElementsCreated, ctx.getReferringVertex()); + + allArrayElements = unionCurrentAndNewElements(attribute, activeCurrentElements, (List) newElementsCreated); + } + } else { + allArrayElements = newElementsCreated; } // add index to attributes of array type - for (int index = 0; index < newElementsCreated.size(); index++) { - Object element = newElementsCreated.get(index); + for (int index = 0; allArrayElements != null && index < allArrayElements.size(); index++) { + Object element = allArrayElements.get(index); if (element instanceof AtlasEdge) { AtlasGraphUtilsV2.setEncodedProperty((AtlasEdge) element, ATTRIBUTE_INDEX_PROPERTY_KEY, index); @@ -1458,14 +1470,28 @@ public class EntityGraphMapper { if (isNewElementsNull) { setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), null); } else { - setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), newElementsCreated); + setArrayElementsProperty(elementType, isSoftReference, ctx.getReferringVertex(), ctx.getVertexProperty(), allArrayElements); } if (LOG.isDebugEnabled()) { LOG.debug("<== mapArrayValue({})", ctx); } - return newElementsCreated; + return allArrayElements; + } + + private boolean getAppendOptionForRelationship(AtlasVertex entityVertex, String relationshipAttributeName) { + boolean ret = false; + String entityTypeName = AtlasGraphUtilsV2.getTypeName(entityVertex); + AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(entityTypeName); + List<AtlasRelationshipAttributeDef> relationshipAttributeDefs = entityDef.getRelationshipAttributeDefs(); + + if (CollectionUtils.isNotEmpty(relationshipAttributeDefs)) { + ret = relationshipAttributeDefs.stream().anyMatch(relationshipAttrDef -> relationshipAttrDef.getName().equals(relationshipAttributeName) + && relationshipAttrDef.isAppendOnPartialUpdate()); + } + + return ret; } private AtlasEdge createVertex(AtlasStruct struct, AtlasVertex referringVertex, String edgeLabel, EntityMutationContext context) throws AtlasBaseException { @@ -1848,6 +1874,17 @@ public class EntityGraphMapper { return ret; } + private List<AtlasEdge> unionCurrentAndNewElements(AtlasAttribute attribute, List<AtlasEdge> currentElements, List<AtlasEdge> newElements) { + Collection<AtlasEdge> ret = null; + AtlasType arrayElementType = ((AtlasArrayType) attribute.getAttributeType()).getElementType(); + + if (arrayElementType != null && isReference(arrayElementType)) { + ret = CollectionUtils.union(currentElements, newElements); + } + + return CollectionUtils.isNotEmpty(ret) ? new ArrayList<>(ret) : Collections.emptyList(); + } + //Removes unused edges from the old collection, compared to the new collection private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute, List<AtlasEdge> currentEntries, List<AtlasEdge> newEntries, AtlasVertex entityVertex) throws AtlasBaseException { diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java index 397e2ab..0bc91a7 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java @@ -210,7 +210,7 @@ public class AtlasEntityTestBase extends AtlasTestBase { Assert.assertTrue(actualList.size() >= expectedList.size()); for (int i = 0; i < expectedList.size(); i++) { - validateAttribute(entityExtInfo, actualList.get(i), expectedList.get(i), elemType, attrName); + Assert.assertTrue(actualList.contains(expectedList.get(i))); } } break; diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java index 9099924..8bb7d09 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java @@ -51,6 +51,7 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -560,6 +561,38 @@ public abstract class AtlasRelationshipStoreV2Test extends AtlasTestBase { verifyRelationshipAttributeList(a2Entity, "manyB", ImmutableList.of(getAtlasObjectId(b1))); } + @Test + public void testRelationshipAttributeOnPartialUpdate() throws Exception { + AtlasObjectId maxId = employeeNameIdMap.get("Max"); + AtlasObjectId janeId = employeeNameIdMap.get("Jane"); + AtlasObjectId mikeId = employeeNameIdMap.get("Mike"); + AtlasObjectId johnId = employeeNameIdMap.get("John"); + + // Partial Update Max's Employee.friends reference with Jane and apply the change as a partial update. + // This should also update friends list of Max and Jane. + AtlasEntity maxEntityForUpdate = new AtlasEntity(EMPLOYEE_TYPE); + maxEntityForUpdate.setRelationshipAttribute("friends", Arrays.asList(janeId)); + + AtlasEntityType employeeType = typeRegistry.getEntityTypeByName(EMPLOYEE_TYPE); + Map<String, Object> uniqAttributes = Collections.<String, Object>singletonMap("name", "Max"); + + init(); + EntityMutationResponse updateResponse = entityStore.updateByUniqueAttributes(employeeType, uniqAttributes, new AtlasEntityWithExtInfo(maxEntityForUpdate)); + + List<AtlasEntityHeader> partialUpdatedEntities = updateResponse.getPartialUpdatedEntities(); + assertEquals(partialUpdatedEntities.size(), 2); + // 2 entities should have been updated: + // * Max to add Employee.friends reference + // * Jane to add Max from Employee.friends + AtlasEntitiesWithExtInfo updatedEntities = entityStore.getByIds(ImmutableList.of(maxId.getGuid(), janeId.getGuid())); + + AtlasEntity maxEntity = updatedEntities.getEntity(maxId.getGuid()); + verifyRelationshipAttributeList(maxEntity, "friends", ImmutableList.of(mikeId, johnId, janeId)); + + AtlasEntity janeEntity = updatedEntities.getEntity(janeId.getGuid()); + verifyRelationshipAttributeList(janeEntity, "friends", ImmutableList.of(maxId)); + } + protected abstract void verifyRelationshipAttributeUpdate_NonComposite_OneToOne(AtlasEntity a1, AtlasEntity b); protected abstract void verifyRelationshipAttributeUpdate_NonComposite_OneToMany(AtlasEntity entity) throws Exception;