This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new eddda69 ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios eddda69 is described below commit eddda699def908895f17384735bf90e08f02d6bf Author: Ashutosh Mestry <ames...@cloudera.com> AuthorDate: Mon Apr 26 22:02:02 2021 -0700 ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios Signed-off-by: Sarath Subramanian <sar...@apache.org> --- .../org/apache/atlas/repository/Constants.java | 2 +- .../repository/graph/GraphBackedSearchIndexer.java | 1 + .../repository/store/graph/v1/DeleteHandlerV1.java | 2 +- .../store/graph/v2/AtlasGraphUtilsV2.java | 26 ++++++++++++++ .../store/graph/v2/EntityGraphMapper.java | 42 ++++++++++++++++++++-- .../store/graph/v2/EntityGraphRetriever.java | 6 +++- .../tasks/ClassificationPropagateTaskFactory.java | 4 +-- .../v2/tasks/ClassificationPropagationTasks.java | 3 +- .../store/graph/v2/tasks/ClassificationTask.java | 26 ++++++++------ .../apache/atlas/tasks/TaskFactoryRegistry.java | 39 ++++++++++++++++++++ .../org/apache/atlas/tasks/TaskManagement.java | 6 ++-- .../ClassificationPropagationWithTasksTest.java | 4 +-- .../apache/atlas/web/resources/AdminResource.java | 5 --- 13 files changed, 137 insertions(+), 29 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 4df38a5..ffcec97 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -94,7 +94,7 @@ public final class Constants { public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames"); public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes"); public static final String LABELS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels"); - public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks"); + public static final String EDGE_PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(RELATIONSHIP_PROPERTY_KEY_PREFIX + "__pendingTasks"); /** * Patch vertices property keys. diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 276343e..cc727c6 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -65,6 +65,7 @@ import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isRef import static org.apache.atlas.type.AtlasStructType.UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX; import static org.apache.atlas.type.AtlasTypeUtil.isArrayType; import static org.apache.atlas.type.AtlasTypeUtil.isMapType; +import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY; /** diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 20d5e6f..f118ae6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -1203,7 +1203,7 @@ public abstract class DeleteHandlerV1 { Map<String, Object> taskParams = ClassificationTask.toParameters(relationshipEdgeId, relationship); AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams); - AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); + AtlasGraphUtilsV2.addItemToListProperty(relationshipEdge, EDGE_PENDING_TASKS_PROPERTY_KEY, task.getGuid()); RequestContext.get().queueTask(task); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 8d4fdf3..0a94708 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -47,6 +47,7 @@ import org.apache.atlas.type.AtlasType; import org.apache.atlas.util.FileUtils; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; @@ -841,4 +842,29 @@ public class AtlasGraphUtilsV2 { return ret; } + public static void addItemToListProperty(AtlasEdge edge, String property, String value) { + List list = getListFromProperty(edge, property); + + list.add(value); + + edge.setListProperty(property, list); + } + + public static void removeItemFromListProperty(AtlasEdge edge, String property, String value) { + List list = getListFromProperty(edge, property); + + list.remove(value); + + if (CollectionUtils.isEmpty(list)) { + edge.removeProperty(property); + } else { + edge.setListProperty(property, list); + } + } + + private static List getListFromProperty(AtlasEdge edge, String property) { + List list = edge.getListProperty(property); + + return CollectionUtils.isEmpty(list) ? new ArrayList() : list; + } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index d8ef32b..2a71e34 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -25,7 +25,8 @@ import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TimeBoundary; + import org.apache.atlas.exception.EntityNotFoundException; + import org.apache.atlas.model.TimeBoundary; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; @@ -124,6 +125,7 @@ import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPro import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; +import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY; @Component public class EntityGraphMapper { @@ -2039,6 +2041,8 @@ public class EntityGraphMapper { return null; } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid); + AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid); if (entityVertex == null) { @@ -2554,7 +2558,7 @@ public class EntityGraphMapper { } @GraphTransaction - public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException { + public List<String> deleteClassificationPropagation(String entityGuid, String classificationVertexId) throws AtlasBaseException { try { if (StringUtils.isEmpty(classificationVertexId)) { LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId); @@ -2562,6 +2566,8 @@ public class EntityGraphMapper { return null; } + GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid); + AtlasVertex classificationVertex = graph.getVertex(classificationVertexId); if (classificationVertex == null) { @@ -2822,4 +2828,36 @@ public class EntityGraphMapper { private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) { deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null); } + + public void removePendingTaskFromEntity(String entityGuid, String taskGuid) throws EntityNotFoundException { + if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(taskGuid)) { + return; + } + + AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid); + + if (entityVertex == null) { + LOG.warn("Error fetching vertex: {}", entityVertex); + + return; + } + + entityVertex.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, taskGuid); + } + + public void removePendingTaskFromEdge(String edgeId, String taskGuid) throws AtlasBaseException { + if (StringUtils.isEmpty(edgeId) || StringUtils.isEmpty(taskGuid)) { + return; + } + + AtlasEdge edge = graph.getEdge(edgeId); + + if (edge == null) { + LOG.warn("Error fetching edge: {}", edgeId); + + return; + } + + AtlasGraphUtilsV2.removeItemFromListProperty(edge, EDGE_PENDING_TASKS_PROPERTY_KEY, taskGuid); + } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index b6f1ef7..2e0f39a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -1189,7 +1189,7 @@ public class EntityGraphRetriever { continue; } - if (ignoreInactive && GraphHelper.getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE) { + if (isInactiveEdge(element, ignoreInactive)) { continue; } @@ -1710,4 +1710,8 @@ public class EntityGraphRetriever { return new HashSet<>(ret); } + + private boolean isInactiveEdge(Object element, boolean ignoreInactive) { + return ignoreInactive && element instanceof AtlasEdge && getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE; + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java index 6244b2d..8a81dc9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagateTaskFactory.java @@ -54,13 +54,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory { private final AtlasRelationshipStore relationshipStore; @Inject - public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) { + public ClassificationPropagateTaskFactory(AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) { this.graph = graph; this.entityGraphMapper = entityGraphMapper; this.deleteDelegate = deleteDelegate; this.relationshipStore = relationshipStore; - - taskManagement.addFactory(this); } public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java index 4fda34a..f86cbc7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationPropagationTasks.java @@ -67,9 +67,10 @@ public class ClassificationPropagationTasks { @Override protected void run(Map<String, Object> parameters) throws AtlasBaseException { + String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID); String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID); - entityGraphMapper.deleteClassificationPropagation(classificationVertexId); + entityGraphMapper.deleteClassificationPropagation(entityGuid, classificationVertexId); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java index 369db08..00c9caa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java @@ -19,8 +19,10 @@ package org.apache.atlas.repository.store.graph.v2.tasks; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.exception.EntityNotFoundException; import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; @@ -56,7 +58,11 @@ public abstract class ClassificationTask extends AbstractTask { protected final DeleteHandlerDelegate deleteDelegate; protected final AtlasRelationshipStore relationshipStore; - public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) { + public ClassificationTask(AtlasTask task, + AtlasGraph graph, + EntityGraphMapper entityGraphMapper, + DeleteHandlerDelegate deleteDelegate, + AtlasRelationshipStore relationshipStore) { super(task); this.graph = graph; @@ -120,17 +126,15 @@ public abstract class ClassificationTask extends AbstractTask { protected void setStatus(AtlasTask.Status status) { super.setStatus(status); - // remove pending task guid from entity vertex or relationship edge - AtlasElement element; - - if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) { - element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID)); - - } else { - element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID)); + try { + if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) { + entityGraphMapper.removePendingTaskFromEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID), getTaskGuid()); + } else { + entityGraphMapper.removePendingTaskFromEntity((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID), getTaskGuid()); + } + } catch (EntityNotFoundException | AtlasBaseException e) { + LOG.error("Error updating associated element for: {}", getTaskGuid(), e); } - - element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid()); } protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException; diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java new file mode 100644 index 0000000..38f2cc9 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactoryRegistry.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.tasks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Set; + +@Component +public class TaskFactoryRegistry { + private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class); + + @Inject + public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) { + for (TaskFactory factory : factories) { + taskManagement.addFactory(factory); + } + + LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size()); + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java index 264aa8c..2756504 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java @@ -73,7 +73,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) { startInternal(); } else { - LOG.info("TaskManagement.start(): deferring patches until instance activation"); + LOG.info("TaskManagement.start(): deferring until instance activation"); } } @@ -183,9 +183,11 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { } LOG.info("TaskManagement: Started!"); + + queuePendingTasks(); } - public void queuePendingTasks() { + private void queuePendingTasks() { if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) { return; } diff --git a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java index e309a76..84aefc9 100644 --- a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java @@ -134,7 +134,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase { ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY); assertNull(ret); - ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY); + ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY); assertNull(ret); AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES); @@ -215,7 +215,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase { assertNotNull(entityVertex); assertNotNull(classificationVertex); - List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString()); + List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString()); assertNotNull(impactedEntities); } diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 714b400..a9fa8ba 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -211,11 +211,6 @@ public class AdminResource { } } - @PostConstruct - public void init() { - taskManagement.queuePendingTasks(); - } - /** * Fetches the thread stack dump for this application. *