This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 88f829f6c ATLAS-4861: Export/Import: add flag to skip updating 
replicated attributes
88f829f6c is described below

commit 88f829f6ce99768ead72a2a1fa74c3c1b9dd330c
Author: Pinal Shah <pinal.s...@freestoneinfotech.com>
AuthorDate: Fri May 10 17:29:19 2024 +0700

    ATLAS-4861: Export/Import: add flag to skip updating replicated attributes
    
    Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com>
---
 .../apache/atlas/model/impexp/AtlasExportRequest.java  | 18 ++++++++++++++++++
 .../apache/atlas/model/impexp/AtlasImportRequest.java  | 18 ++++++++++++++++++
 .../apache/atlas/repository/impexp/AuditsWriter.java   | 13 +++++++++----
 3 files changed, 45 insertions(+), 4 deletions(-)

diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index 878b1d8bc..fc4d11d54 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -53,6 +53,7 @@ public class AtlasExportRequest implements Serializable {
     public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
     public static final String OPTION_SKIP_LINEAGE = "skipLineage";
     public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
+    public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR = 
"skipUpdateReplicationAttr";
     public static final String FETCH_TYPE_FULL = "full";
     public static final String FETCH_TYPE_CONNECTED = "connected";
     public static final String FETCH_TYPE_INCREMENTAL = "incremental";
@@ -141,6 +142,23 @@ public class AtlasExportRequest implements Serializable {
         return MapUtils.isNotEmpty(options) && 
options.containsKey(OPTION_KEY_REPLICATED_TO);
     }
 
+    @JsonIgnore
+    public boolean skipUpdateReplicationAttr() {
+        if (MapUtils.isNotEmpty(getOptions()) && 
getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) {
+
+            Object o = 
getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR);
+            if (o instanceof String) {
+                return Boolean.parseBoolean((String) o);
+            }
+
+            if (o instanceof Boolean) {
+                return (Boolean) o;
+            }
+
+        }
+        return false;
+    }
+
     @JsonIgnore
     public String getOptionKeyReplicatedTo() {
         String replicateToServerName = isReplicationOptionSet() ? (String) 
options.get(OPTION_KEY_REPLICATED_TO) : StringUtils.EMPTY;
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index cbc1aa938..12945ca2a 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -43,6 +43,7 @@ public class AtlasImportRequest implements Serializable {
     public  static final String TRANSFORMS_KEY             = "transforms";
     public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+    public  static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR = 
"skipUpdateReplicationAttr";
     public  static final String OPTION_KEY_MIGRATION_FILE_NAME = 
"migrationFileName";
     public  static final String OPTION_KEY_MIGRATION       = "migration";
     public  static final String OPTION_KEY_NUM_WORKERS     = "numWorkers";
@@ -122,6 +123,23 @@ public class AtlasImportRequest implements Serializable {
         return MapUtils.isNotEmpty(options) && 
options.containsKey(OPTION_KEY_REPLICATED_FROM);
     }
 
+    @JsonIgnore
+    public boolean skipUpdateReplicationAttr() {
+        if (MapUtils.isNotEmpty(getOptions()) && 
getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) {
+
+            Object o = 
getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR);
+            if (o instanceof String) {
+                return Boolean.parseBoolean((String) o);
+            }
+
+            if (o instanceof Boolean) {
+                return (Boolean) o;
+            }
+
+        }
+        return false;
+    }
+
     @JsonIgnore
     public String getOptionKeyReplicatedFrom() {
         return isReplicationOptionSet() ? 
options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index c4de0ed27..130092b44 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -97,6 +97,7 @@ public class AuditsWriter {
                                             String attrNameReplicated,
                                             long lastModifiedTimestamp) throws 
AtlasBaseException {
         if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) {
+            LOG.warn("Skipping Updating Replication Attributes");
             return;
         }
 
@@ -200,13 +201,15 @@ public class AuditsWriter {
         private AtlasExportRequest request;
         private String targetServerName;
         private boolean replicationOptionState;
+        private boolean skipUpdateReplicationAttr;
         private String targetServerFullName;
 
         public void add(String userName, AtlasExportResult result,
                         long startTime, long endTime,
                         List<String> entityGuids) throws AtlasBaseException {
             request = result.getRequest();
-            replicationOptionState = request.isReplicationOptionSet();
+            replicationOptionState    = request.isReplicationOptionSet();
+            skipUpdateReplicationAttr = request.skipUpdateReplicationAttr();
 
             saveCurrentServer();
 
@@ -220,7 +223,7 @@ public class AuditsWriter {
                 return;
             }
 
-            updateReplicationAttribute(replicationOptionState, 
targetServerName, targetServerFullName,
+            updateReplicationAttribute((replicationOptionState && 
!skipUpdateReplicationAttr), targetServerName, targetServerFullName,
                     entityGuids, Constants.ATTR_NAME_REPLICATED_TO, 
result.getChangeMarker());
         }
     }
@@ -228,6 +231,7 @@ public class AuditsWriter {
     private class ImportAudits {
         private AtlasImportRequest request;
         private boolean replicationOptionState;
+        private boolean skipUpdateReplicationAttr;
         private String sourceServerName;
         private String sourceServerFullName;
 
@@ -235,7 +239,8 @@ public class AuditsWriter {
                         long startTime, long endTime,
                         List<String> entityGuids) throws AtlasBaseException {
             request = result.getRequest();
-            replicationOptionState = request.isReplicationOptionSet();
+            replicationOptionState    = request.isReplicationOptionSet();
+            skipUpdateReplicationAttr = request.skipUpdateReplicationAttr();
 
             saveCurrentServer();
 
@@ -250,7 +255,7 @@ public class AuditsWriter {
                 return;
             }
 
-            updateReplicationAttribute(replicationOptionState, 
sourceServerName, sourceServerFullName, entityGuids,
+            updateReplicationAttribute((replicationOptionState && 
!skipUpdateReplicationAttr), sourceServerName, sourceServerFullName, 
entityGuids,
                     Constants.ATTR_NAME_REPLICATED_FROM, 
result.getExportResult().getChangeMarker());
         }
 

Reply via email to