This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new fe7d79c84 ATLAS-4903 : When migration restarts it results into deletion of edges and vertices fe7d79c84 is described below commit fe7d79c841b7d294c9990d676672898102d22bb0 Author: chaitali <chaitali.bor...@cloudera.com> AuthorDate: Wed Sep 18 23:18:20 2024 +0530 ATLAS-4903 : When migration restarts it results into deletion of edges and vertices Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../atlas/repository/store/graph/v2/EntityGraphMapper.java | 2 +- .../repository/store/graph/v2/bulkimport/MigrationImport.java | 7 +++++-- .../repository/store/graph/v2/bulkimport/pc/EntityConsumer.java | 8 +++++--- .../store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java | 6 ++++-- server-api/src/main/java/org/apache/atlas/RequestContext.java | 9 +++++++++ 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 6b395dd17..3f7f73e70 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -1458,7 +1458,7 @@ public class EntityGraphMapper { } if (isReference && !isSoftReference) { - boolean isAppendOnPartialUpdate = getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName()); + boolean isAppendOnPartialUpdate = RequestContext.get().isMigrationInProgress() || getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName()); if (isAppendOnPartialUpdate) { allArrayElements = unionCurrentAndNewElements(attribute, (List) currentElements, (List) newElementsCreated); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index f8c9218c6..d34d9b658 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -106,10 +106,13 @@ public class MigrationImport extends ImportStrategy { int batchSize = importResult.getRequest().getOptionKeyBatchSize(); int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); - + boolean isMigrationImport = false; + if (importResult.getRequest().getOptions().get("migration")!=null) { + isMigrationImport = Boolean.valueOf(importResult.getRequest().getOptions().get("migration")); + } EntityConsumerBuilder consumerBuilder = new EntityConsumerBuilder(typeRegistry, this.graph, entityStore, entityGraphRetriever, graphBulk, - entityStoreBulk, entityGraphRetrieverBulk, batchSize); + entityStoreBulk, entityGraphRetrieverBulk, batchSize, isMigrationImport); LOG.info("MigrationImport: EntityCreationManager: Created!"); return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java index b73988fd7..a22300f1b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -21,7 +21,6 @@ import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.pc.WorkItemConsumer; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -33,8 +32,8 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityStream; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +56,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith private final AtlasEntityStore entityStoreBulk; private final AtlasTypeRegistry typeRegistry; private final EntityGraphRetriever entityRetrieverBulk; + private final boolean isMigrationImport; private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>(); private List<String> localResults = new ArrayList<>(); @@ -64,7 +64,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith public EntityConsumer(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AtlasEntityStore entityStore, AtlasGraph atlasGraphBulk, AtlasEntityStore entityStoreBulk, EntityGraphRetriever entityRetrieverBulk, - BlockingQueue queue, int batchSize) { + BlockingQueue queue, int batchSize , boolean isMigrationImport) { super(queue); this.typeRegistry = typeRegistry; @@ -76,6 +76,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith this.entityRetrieverBulk = entityRetrieverBulk; this.batchSize = batchSize; + this.isMigrationImport = isMigrationImport; } @Override @@ -98,6 +99,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) { RequestContext.get().setImportInProgress(true); RequestContext.get().setCreateShellEntityForNonExistingReference(true); + RequestContext.get().setMigrationInProgress(this.isMigrationImport); try { LOG.debug("Processing: {}", currentCount); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java index 7eac8df73..b6836ee20 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java @@ -37,10 +37,11 @@ public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, At private final AtlasTypeRegistry typeRegistry; private EntityGraphRetriever entityRetrieverBulk; private int batchSize; + private final boolean isMigrationImport; public EntityConsumerBuilder(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AtlasEntityStoreV2 entityStore, EntityGraphRetriever entityRetriever, AtlasGraph atlasGraphBulk, AtlasEntityStoreV2 entityStoreBulk, EntityGraphRetriever entityRetrieverBulk, - int batchSize) { + int batchSize, boolean isMigrationImport) { this.typeRegistry = typeRegistry; this.atlasGraph = atlasGraph; @@ -52,12 +53,13 @@ public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, At this.entityRetrieverBulk = entityRetrieverBulk; this.batchSize = batchSize; + this.isMigrationImport = isMigrationImport; } @Override public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) { return new EntityConsumer(typeRegistry, atlasGraph, entityStore, atlasGraphBulk, entityStoreBulk, entityRetrieverBulk, - queue, this.batchSize); + queue, this.batchSize, this.isMigrationImport); } } diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index e144d3650..e1670a924 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -74,6 +74,7 @@ public class RequestContext { private int maxAttempts = 1; private int attemptCount = 1; private boolean isImportInProgress = false; + private boolean isMigrationInProgress = false; private boolean isInNotificationProcessing = false; private boolean isInTypePatching = false; private boolean createShellEntityForNonExistingReference = false; @@ -202,6 +203,14 @@ public class RequestContext { return isImportInProgress; } + public boolean isMigrationInProgress() { + return isMigrationInProgress; + } + + public void setMigrationInProgress(boolean migrationInProgress) { + isMigrationInProgress = migrationInProgress; + } + public void setImportInProgress(boolean importInProgress) { isImportInProgress = importInProgress; }