This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 3016ba6 ATLAS-4285: Multiple propagations with intersecting lineage. 3016ba6 is described below commit 3016ba6fb16f2f4b598eefc48e97ac2c50f16a84 Author: Ashutosh Mestry <ames...@cloudera.com> AuthorDate: Thu May 13 17:28:26 2021 -0700 ATLAS-4285: Multiple propagations with intersecting lineage. Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit aba97b35393f1732eb30858e69fd5f489634afdc) --- .../store/graph/v2/EntityGraphMapper.java | 79 +++------------------- .../tasks/ClassificationPropagateTaskFactory.java | 5 -- .../v2/tasks/ClassificationPropagationTasks.java | 15 ---- .../apache/atlas/tasks/TaskFactoryRegistry.java | 23 ++++++- .../org/apache/atlas/tasks/TaskManagement.java | 11 +++ .../ClassificationPropagationWithTasksTest.java | 6 -- 6 files changed, 43 insertions(+), 96 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 7984a34..5baff33 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -2041,10 +2041,7 @@ public class EntityGraphMapper { return null; } - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid); - AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid); - if (entityVertex == null) { LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId); @@ -2052,7 +2049,6 @@ public class EntityGraphMapper { } AtlasVertex classificationVertex = graph.getVertex(classificationVertexId); - if (classificationVertex == null) { LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId); @@ -2060,13 +2056,15 @@ public class EntityGraphMapper { } List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId); - if (CollectionUtils.isEmpty(impactedVertices)) { LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId); return null; } + List<String> impactedVerticesGuidsToLock = impactedVertices.stream().map(x -> GraphHelper.getGuid(x)).collect(Collectors.toList()); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedVerticesGuidsToLock); + AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, impactedVertices); @@ -2453,63 +2451,6 @@ public class EntityGraphMapper { AtlasPerfTracer.log(perf); } - @GraphTransaction - public List<String> updateClassificationsPropagation(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException { - try { - if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) { - LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId); - return null; - } - - AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid); - - if (entityVertex == null) { - LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId); - return null; - } - - AtlasVertex classificationVertex = graph.getVertex(classificationVertexId); - - if (classificationVertex == null) { - LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId); - return null; - } - - List<AtlasVertex> entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertex.getIdForDisplay()); - - if (CollectionUtils.isEmpty(entitiesToPropagateTo)) { - LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no impacted vertices found!", entityGuid, classificationVertexId); - return null; - } - - List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo); - - if (CollectionUtils.isEmpty(entitiesPropagatedTo)) { - LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no propagations added!", entityGuid, classificationVertexId); - return null; - } - - AtlasClassification updatedClassification = entityRetriever.toAtlasClassification(classificationVertex); - List<String> ret = new ArrayList<>(); - - for (AtlasVertex vertex : entitiesToPropagateTo) { - AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); - - ret.add(entity.getGuid()); - - if (isActive(entity)) { - vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity)); - - entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(updatedClassification)); - } - } - - return ret; - } catch (Exception ex) { - throw new AtlasBaseException(ex); - } - } - private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification, AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex) throws AtlasBaseException { @@ -2566,11 +2507,8 @@ public class EntityGraphMapper { return null; } - GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid); - AtlasVertex classificationVertex = graph.getVertex(classificationVertexId); AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); - if (classificationVertex == null) { LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex not found", classificationVertexId); @@ -2578,14 +2516,14 @@ public class EntityGraphMapper { } List<AtlasVertex> entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex); - deleteDelegate.getHandler().deleteClassificationVertex(classificationVertex, true); - if (CollectionUtils.isEmpty(entityVertices)) { - return null; } + List<String> impactedGuids = entityVertices.stream().map(x -> GraphHelper.getGuid(x)).collect(Collectors.toList()); + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(impactedGuids); + List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices); entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification)); @@ -2787,8 +2725,11 @@ public class EntityGraphMapper { AtlasEntity entity = instanceConverter.getAndCacheEntity(graphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES); if (isActive(entity)) { - vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity)); + String classificationTextForEntity = fullTextMapperV2.getClassificationTextForEntity(entity); + vertex.setProperty(CLASSIFICATION_TEXT_KEY, classificationTextForEntity); propagatedEntities.add(entity); + + LOG.info("updateClassificationText: {}: {}", classification.getTypeName(), classificationTextForEntity); } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java index 8a81dc9..b3320c6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java @@ -37,13 +37,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory { private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class); public static final String CLASSIFICATION_PROPAGATION_ADD = "CLASSIFICATION_PROPAGATION_ADD"; - public static final String CLASSIFICATION_PROPAGATION_UPDATE = "CLASSIFICATION_PROPAGATION_UPDATE"; public static final String CLASSIFICATION_PROPAGATION_DELETE = "CLASSIFICATION_PROPAGATION_DELETE"; public static final String CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE = "CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE"; private static final List<String> supportedTypes = new ArrayList<String>() {{ add(CLASSIFICATION_PROPAGATION_ADD); - add(CLASSIFICATION_PROPAGATION_UPDATE); add(CLASSIFICATION_PROPAGATION_DELETE); add(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE); }}; @@ -69,9 +67,6 @@ public class ClassificationPropagateTaskFactory implements TaskFactory { case CLASSIFICATION_PROPAGATION_ADD: return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore); - case CLASSIFICATION_PROPAGATION_UPDATE: - return new ClassificationPropagationTasks.Update(task, graph, entityGraphMapper, deleteDelegate, relationshipStore); - case CLASSIFICATION_PROPAGATION_DELETE: return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java index f86cbc7..aec5b0c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java @@ -45,21 +45,6 @@ public class ClassificationPropagationTasks { } } - public static class Update extends ClassificationTask { - public Update(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) { - super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore); - } - - @Override - protected void run(Map<String, Object> parameters) throws AtlasBaseException { - String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID); - String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID); - String relationshipGuid = (String) parameters.get(PARAM_RELATIONSHIP_GUID); - - entityGraphMapper.updateClassificationsPropagation(entityGuid, classificationVertexId, relationshipGuid); - } - } - public static class Delete extends ClassificationTask { public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) { super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java index 38f2cc9..4c93f0e 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java @@ -17,10 +17,12 @@ */ package org.apache.atlas.tasks; +import org.apache.atlas.AtlasException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import javax.inject.Inject; import java.util.Set; @@ -28,12 +30,31 @@ import java.util.Set; public class TaskFactoryRegistry { private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class); + private final TaskManagement taskManagement; + @Inject public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) { + this.taskManagement = taskManagement; for (TaskFactory factory : factories) { taskManagement.addFactory(factory); } LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size()); } -} \ No newline at end of file + + @PostConstruct + public void startTaskManagement() throws AtlasException { + try { + if (!taskManagement.hasStarted()) { + LOG.info("TaskFactoryRegistry: TaskManagement start skipped! Someone else will start it."); + return; + } + + LOG.info("TaskFactoryRegistry: Starting TaskManagement..."); + taskManagement.start(); + } catch (AtlasException e) { + LOG.error("Error starting TaskManagement!", e); + throw e; + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java index 2756504..9a519ba 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java @@ -49,6 +49,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { private final TaskRegistry registry; private final Statistics statistics; private final Map<String, TaskFactory> taskTypeFactoryMap; + private boolean hasStarted; @Inject public TaskManagement(Configuration configuration, TaskRegistry taskRegistry) { @@ -75,6 +76,12 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { } else { LOG.info("TaskManagement.start(): deferring until instance activation"); } + + this.hasStarted = true; + } + + public boolean hasStarted() { + return this.hasStarted; } @Override @@ -183,6 +190,10 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { } LOG.info("TaskManagement: Started!"); + if (this.taskTypeFactoryMap.size() == 0) { + LOG.warn("Not factories registered! Pending tasks will be queued once factories are registered!"); + return; + } queuePendingTasks(); } diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java index 84aefc9..d440f2f 100644 --- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java @@ -131,9 +131,6 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase { List<String> ret = entityGraphMapper.propagateClassification(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY); assertNull(ret); - ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY); - assertNull(ret); - ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY); assertNull(ret); @@ -191,9 +188,6 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase { assertNotNull(entityVertex); assertNotNull(classificationVertex); - - List<String> impactedEntities = entityGraphMapper.updateClassificationsPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY); - assertNotNull(impactedEntities); } @Test(dependsOnMethods = "update")