This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new e6d9d7041 ATLAS-4861: Export/Import: add flag to skip updating replicated attributes e6d9d7041 is described below commit e6d9d70411b239f1b59104c9ce55663764aa478a Author: Pinal Shah <pinal.s...@freestoneinfotech.com> AuthorDate: Fri May 10 17:29:19 2024 +0700 ATLAS-4861: Export/Import: add flag to skip updating replicated attributes Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../apache/atlas/model/impexp/AtlasExportRequest.java | 18 ++++++++++++++++++ .../apache/atlas/model/impexp/AtlasImportRequest.java | 18 ++++++++++++++++++ .../apache/atlas/repository/impexp/AuditsWriter.java | 13 +++++++++---- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java index 878b1d8bc..fc4d11d54 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java @@ -53,6 +53,7 @@ public class AtlasExportRequest implements Serializable { public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; public static final String OPTION_SKIP_LINEAGE = "skipLineage"; public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo"; + public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR = "skipUpdateReplicationAttr"; public static final String FETCH_TYPE_FULL = "full"; public static final String FETCH_TYPE_CONNECTED = "connected"; public static final String FETCH_TYPE_INCREMENTAL = "incremental"; @@ -141,6 +142,23 @@ public class AtlasExportRequest implements Serializable { return MapUtils.isNotEmpty(options) && options.containsKey(OPTION_KEY_REPLICATED_TO); } + @JsonIgnore + public boolean skipUpdateReplicationAttr() { + if (MapUtils.isNotEmpty(getOptions()) && getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) { + + Object o = getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR); + if (o instanceof String) { + return Boolean.parseBoolean((String) o); + } + + if (o instanceof Boolean) { + return (Boolean) o; + } + + } + return false; + } + @JsonIgnore public String getOptionKeyReplicatedTo() { String replicateToServerName = isReplicationOptionSet() ? (String) options.get(OPTION_KEY_REPLICATED_TO) : StringUtils.EMPTY; diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index cbc1aa938..12945ca2a 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -43,6 +43,7 @@ public class AtlasImportRequest implements Serializable { public static final String TRANSFORMS_KEY = "transforms"; public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; + public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR = "skipUpdateReplicationAttr"; public static final String OPTION_KEY_MIGRATION_FILE_NAME = "migrationFileName"; public static final String OPTION_KEY_MIGRATION = "migration"; public static final String OPTION_KEY_NUM_WORKERS = "numWorkers"; @@ -122,6 +123,23 @@ public class AtlasImportRequest implements Serializable { return MapUtils.isNotEmpty(options) && options.containsKey(OPTION_KEY_REPLICATED_FROM); } + @JsonIgnore + public boolean skipUpdateReplicationAttr() { + if (MapUtils.isNotEmpty(getOptions()) && getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) { + + Object o = getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR); + if (o instanceof String) { + return Boolean.parseBoolean((String) o); + } + + if (o instanceof Boolean) { + return (Boolean) o; + } + + } + return false; + } + @JsonIgnore public String getOptionKeyReplicatedFrom() { return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY; diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index c4de0ed27..130092b44 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -97,6 +97,7 @@ public class AuditsWriter { String attrNameReplicated, long lastModifiedTimestamp) throws AtlasBaseException { if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) { + LOG.warn("Skipping Updating Replication Attributes"); return; } @@ -200,13 +201,15 @@ public class AuditsWriter { private AtlasExportRequest request; private String targetServerName; private boolean replicationOptionState; + private boolean skipUpdateReplicationAttr; private String targetServerFullName; public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entityGuids) throws AtlasBaseException { request = result.getRequest(); - replicationOptionState = request.isReplicationOptionSet(); + replicationOptionState = request.isReplicationOptionSet(); + skipUpdateReplicationAttr = request.skipUpdateReplicationAttr(); saveCurrentServer(); @@ -220,7 +223,7 @@ public class AuditsWriter { return; } - updateReplicationAttribute(replicationOptionState, targetServerName, targetServerFullName, + updateReplicationAttribute((replicationOptionState && !skipUpdateReplicationAttr), targetServerName, targetServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker()); } } @@ -228,6 +231,7 @@ public class AuditsWriter { private class ImportAudits { private AtlasImportRequest request; private boolean replicationOptionState; + private boolean skipUpdateReplicationAttr; private String sourceServerName; private String sourceServerFullName; @@ -235,7 +239,8 @@ public class AuditsWriter { long startTime, long endTime, List<String> entityGuids) throws AtlasBaseException { request = result.getRequest(); - replicationOptionState = request.isReplicationOptionSet(); + replicationOptionState = request.isReplicationOptionSet(); + skipUpdateReplicationAttr = request.skipUpdateReplicationAttr(); saveCurrentServer(); @@ -250,7 +255,7 @@ public class AuditsWriter { return; } - updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids, + updateReplicationAttribute((replicationOptionState && !skipUpdateReplicationAttr), sourceServerName, sourceServerFullName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); }