This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new fe7d79c84 ATLAS-4903 : When migration restarts it results into 
deletion of edges and vertices
fe7d79c84 is described below

commit fe7d79c841b7d294c9990d676672898102d22bb0
Author: chaitali <chaitali.bor...@cloudera.com>
AuthorDate: Wed Sep 18 23:18:20 2024 +0530

    ATLAS-4903 : When migration restarts it results into deletion of edges and 
vertices
    
    Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com>
---
 .../atlas/repository/store/graph/v2/EntityGraphMapper.java       | 2 +-
 .../repository/store/graph/v2/bulkimport/MigrationImport.java    | 7 +++++--
 .../repository/store/graph/v2/bulkimport/pc/EntityConsumer.java  | 8 +++++---
 .../store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java      | 6 ++++--
 server-api/src/main/java/org/apache/atlas/RequestContext.java    | 9 +++++++++
 5 files changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 6b395dd17..3f7f73e70 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -1458,7 +1458,7 @@ public class EntityGraphMapper {
         }
 
         if (isReference && !isSoftReference) {
-            boolean isAppendOnPartialUpdate = 
getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
+            boolean isAppendOnPartialUpdate = 
RequestContext.get().isMigrationInProgress() || 
getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
 
             if (isAppendOnPartialUpdate) {
                 allArrayElements = unionCurrentAndNewElements(attribute, 
(List) currentElements, (List) newElementsCreated);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index f8c9218c6..d34d9b658 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -106,10 +106,13 @@ public class MigrationImport extends ImportStrategy {
 
         int batchSize = importResult.getRequest().getOptionKeyBatchSize();
         int numWorkers = 
getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
-
+        boolean isMigrationImport = false;
+        if (importResult.getRequest().getOptions().get("migration")!=null) {
+            isMigrationImport = 
Boolean.valueOf(importResult.getRequest().getOptions().get("migration"));
+        }
         EntityConsumerBuilder consumerBuilder =
                 new EntityConsumerBuilder(typeRegistry, this.graph, 
entityStore, entityGraphRetriever, graphBulk,
-                        entityStoreBulk, entityGraphRetrieverBulk, batchSize);
+                        entityStoreBulk, entityGraphRetrieverBulk, batchSize, 
isMigrationImport);
 
         LOG.info("MigrationImport: EntityCreationManager: Created!");
         return new EntityCreationManager(consumerBuilder, batchSize, 
numWorkers, importResult, dataMigrationStatusService);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index b73988fd7..a22300f1b 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -21,7 +21,6 @@ import org.apache.atlas.GraphTransactionInterceptor;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.pc.WorkItemConsumer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -33,8 +32,8 @@ import 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.repository.store.graph.v2.EntityStream;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.AtlasPerfMetrics;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +56,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     private final AtlasEntityStore entityStoreBulk;
     private final AtlasTypeRegistry typeRegistry;
     private final EntityGraphRetriever entityRetrieverBulk;
+    private final boolean isMigrationImport;
 
     private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new 
ArrayList<>();
     private List<String> localResults = new ArrayList<>();
@@ -64,7 +64,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     public EntityConsumer(AtlasTypeRegistry typeRegistry,
                           AtlasGraph atlasGraph, AtlasEntityStore entityStore,
                           AtlasGraph atlasGraphBulk, AtlasEntityStore 
entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
-                          BlockingQueue queue, int batchSize) {
+                          BlockingQueue queue, int batchSize , boolean 
isMigrationImport) {
         super(queue);
         this.typeRegistry = typeRegistry;
 
@@ -76,6 +76,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
         this.entityRetrieverBulk = entityRetrieverBulk;
 
         this.batchSize = batchSize;
+        this.isMigrationImport = isMigrationImport;
     }
 
     @Override
@@ -98,6 +99,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     private void processEntity(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo, long currentCount) {
         RequestContext.get().setImportInProgress(true);
         RequestContext.get().setCreateShellEntityForNonExistingReference(true);
+        RequestContext.get().setMigrationInProgress(this.isMigrationImport);
 
         try {
             LOG.debug("Processing: {}", currentCount);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
index 7eac8df73..b6836ee20 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
@@ -37,10 +37,11 @@ public class EntityConsumerBuilder implements 
WorkItemBuilder<EntityConsumer, At
     private final AtlasTypeRegistry typeRegistry;
     private EntityGraphRetriever entityRetrieverBulk;
     private int batchSize;
+    private final boolean isMigrationImport;
 
     public EntityConsumerBuilder(AtlasTypeRegistry typeRegistry, AtlasGraph 
atlasGraph, AtlasEntityStoreV2 entityStore, EntityGraphRetriever 
entityRetriever,
                                  AtlasGraph atlasGraphBulk, AtlasEntityStoreV2 
entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
-                                 int batchSize) {
+                                 int batchSize, boolean isMigrationImport) {
         this.typeRegistry = typeRegistry;
 
         this.atlasGraph = atlasGraph;
@@ -52,12 +53,13 @@ public class EntityConsumerBuilder implements 
WorkItemBuilder<EntityConsumer, At
         this.entityRetrieverBulk = entityRetrieverBulk;
 
         this.batchSize = batchSize;
+        this.isMigrationImport = isMigrationImport;
     }
 
     @Override
     public EntityConsumer 
build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
         return new EntityConsumer(typeRegistry, atlasGraph, entityStore,
                 atlasGraphBulk, entityStoreBulk, entityRetrieverBulk,
-                queue, this.batchSize);
+                queue, this.batchSize, this.isMigrationImport);
     }
 }
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java 
b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index e144d3650..e1670a924 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -74,6 +74,7 @@ public class RequestContext {
     private int         maxAttempts  = 1;
     private int         attemptCount = 1;
     private boolean     isImportInProgress = false;
+    private boolean     isMigrationInProgress = false;
     private boolean     isInNotificationProcessing = false;
     private boolean     isInTypePatching           = false;
     private boolean     createShellEntityForNonExistingReference = false;
@@ -202,6 +203,14 @@ public class RequestContext {
         return isImportInProgress;
     }
 
+    public boolean isMigrationInProgress() {
+        return isMigrationInProgress;
+    }
+
+    public void setMigrationInProgress(boolean migrationInProgress) {
+        isMigrationInProgress = migrationInProgress;
+    }
+
     public void setImportInProgress(boolean importInProgress) {
         isImportInProgress = importInProgress;
     }

Reply via email to