ATLAS-2814: Cluster stores replication details.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/03f2754d Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/03f2754d Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/03f2754d Branch: refs/heads/master Commit: 03f2754d158e445faafe253bf232057a0c591c82 Parents: 561cdc9 Author: Ashutosh Mestry <[email protected]> Authored: Thu Aug 16 12:11:39 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Wed Oct 10 22:16:32 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasBaseClient.java | 8 +- .../atlas/model/clusterinfo/AtlasCluster.java | 115 ---------- .../apache/atlas/model/impexp/AtlasCluster.java | 154 ++++++++++++++ .../atlas/model/impexp/AtlasExportResult.java | 16 ++ .../atlas/model/impexp/AtlasImportResult.java | 9 + .../atlas/repository/impexp/AuditsWriter.java | 82 ++++---- .../atlas/repository/impexp/ClusterService.java | 77 ++++--- .../impexp/ExportImportAuditService.java | 109 +++++++--- .../atlas/repository/impexp/ExportService.java | 46 ++-- .../atlas/repository/impexp/ImportService.java | 12 ++ .../repository/impexp/ImportTransformer.java | 209 ++++++++++++++++++- .../atlas/repository/ogm/AtlasClusterDTO.java | 3 +- .../apache/atlas/repository/ogm/DataAccess.java | 20 ++ .../ogm/ExportImportAuditEntryDTO.java | 41 ++-- .../store/graph/v2/EntityGraphMapper.java | 8 + .../store/graph/v2/EntityGraphRetriever.java | 6 +- .../repository/impexp/ClusterServiceTest.java | 59 +++--- .../impexp/ExportImportAuditServiceTest.java | 30 +-- .../repository/impexp/ExportImportTestBase.java | 24 ++- .../impexp/ExportIncrementalTest.java | 3 - .../repository/impexp/ImportServiceTest.java | 2 +- .../repository/impexp/ImportTransformsTest.java | 128 ++++++++++-- .../impexp/ReplicationEntityAttributeTest.java | 54 ++--- .../stocksDB-Entities/replicationAttrs.json | 5 +- .../atlas/web/resources/AdminResource.java | 24 +-- .../web/resources/AdminExportImportTestIT.java | 47 ++++- .../test/resources/json/export-incremental.json | 4 +- webapp/src/test/resources/stocks-base.zip | Bin 13166 -> 17706 bytes 28 files changed, 920 insertions(+), 375 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java ---------------------------------------------------------------------- diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java index df021ad..c247902 100644 --- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java +++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java @@ -37,6 +37,7 @@ import com.sun.jersey.multipart.MultiPart; import com.sun.jersey.multipart.file.FileDataBodyPart; import com.sun.jersey.multipart.file.StreamDataBodyPart; import com.sun.jersey.multipart.impl.MultiPartWriter; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; @@ -79,7 +80,7 @@ public abstract class AtlasBaseClient { public static final String ADMIN_METRICS = "admin/metrics"; public static final String ADMIN_IMPORT = "admin/import"; public static final String ADMIN_EXPORT = "admin/export"; - public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; + public static final String ADMIN_CLUSTER_TEMPLATE = "%sadmin/cluster/%s"; public static final String QUERY = "query"; public static final String LIMIT = "limit"; @@ -519,6 +520,11 @@ public abstract class AtlasBaseClient { return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE); } + public AtlasCluster getCluster(String clusterName) throws AtlasServiceException { + API api = new API(String.format(ADMIN_CLUSTER_TEMPLATE, BASE_URI, clusterName), HttpMethod.GET, Response.Status.OK); + return callAPI(api, AtlasCluster.class, null); + } + boolean isRetryableException(ClientHandlerException che) { return che.getCause().getClass().equals(IOException.class) || che.getCause().getClass().equals(ConnectException.class); http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java b/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java deleted file mode 100644 index efea55a..0000000 --- a/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.model.clusterinfo; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.atlas.model.AtlasBaseModelObject; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; -import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; - -@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class AtlasCluster extends AtlasBaseModelObject implements Serializable { - private static final long serialVersionUID = 1L; - - public static final String SYNC_INFO_KEY = "syncInfo"; - public static final String OPERATION = "operation"; - public static final String NEXT_MODIFIED_TIMESTAMP = "nextModifiedTimestamp"; - - private String name; - private String qualifiedName; - private Map<String, String> additionalInfo; - private List<String> urls; - - public AtlasCluster() { - urls = new ArrayList<>(); - } - - public AtlasCluster(String name, String qualifiedName) { - this.name = name; - this.qualifiedName = qualifiedName; - } - - public void setName(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - public void setAdditionalInfo(Map<String, String> additionalInfo) { - if(this.additionalInfo == null) { - this.additionalInfo = new HashMap<>(); - } - - this.additionalInfo = additionalInfo; - } - - public void setAdditionalInfo(String key, String value) { - if(this.additionalInfo == null) { - this.additionalInfo = new HashMap<>(); - } - - additionalInfo.put(key, value); - } - - public Map<String, String> getAdditionalInfo() { - return this.additionalInfo; - } - - public String getAdditionalInfo(String key) { - return additionalInfo.get(key); - } - - public String getQualifiedName() { - return qualifiedName; - } - - public void setQualifiedName(String qualifiedName) { - this.qualifiedName = qualifiedName; - } - - public void setUrls(List<String> urls) { - this.urls = urls; - } - - public List<String> getUrls() { - return this.urls; - } - - @Override - public StringBuilder toString(StringBuilder sb) { - sb.append(", name=").append(name); - sb.append(", qualifiedName=").append(getQualifiedName()); - sb.append(", urls=").append(urls); - sb.append(", additionalInfo=").append(additionalInfo); - sb.append("}"); - return sb; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java new file mode 100644 index 0000000..f70a219 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.impexp; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.model.AtlasBaseModelObject; +import org.apache.atlas.type.AtlasType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AtlasCluster extends AtlasBaseModelObject implements Serializable { + private static final long serialVersionUID = 1L; + + public static final String KEY_REPLICATION_DETAILS = "REPL_DETAILS"; + + private String name; + private String qualifiedName; + private Map<String, String> additionalInfo; + private List<String> urls; + + public AtlasCluster() { + urls = new ArrayList<>(); + additionalInfo = new HashMap<>(); + } + + public AtlasCluster(String name, String qualifiedName) { + this(); + this.name = name; + this.qualifiedName = qualifiedName; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + + public void setAdditionalInfo(Map<String, String> additionalInfo) { + this.additionalInfo = additionalInfo; + } + + public void setAdditionalInfo(String key, String value) { + if(additionalInfo == null) { + additionalInfo = new HashMap<>(); + } + + additionalInfo.put(key, value); + } + + public void setAdditionalInfoRepl(String guid, long modifiedTimestamp) { + Map<String, Object> replicationDetailsMap = null; + + if(additionalInfo != null && additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { + replicationDetailsMap = AtlasType.fromJson(getAdditionalInfo().get(KEY_REPLICATION_DETAILS), Map.class); + } + + if(replicationDetailsMap == null) { + replicationDetailsMap = new HashMap<>(); + } + + if(modifiedTimestamp == 0) { + replicationDetailsMap.remove(guid); + } else { + replicationDetailsMap.put(guid, modifiedTimestamp); + } + + updateReplicationMap(replicationDetailsMap); + } + + private void updateReplicationMap(Map<String, Object> replicationDetailsMap) { + String json = AtlasType.toJson(replicationDetailsMap); + setAdditionalInfo(KEY_REPLICATION_DETAILS, json); + } + + + public Object getAdditionalInfoRepl(String guid) { + if(additionalInfo == null || !additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) { + return null; + } + + String key = guid; + String mapJson = additionalInfo.get(KEY_REPLICATION_DETAILS); + + Map<String, String> replicationDetailsMap = AtlasType.fromJson(mapJson, Map.class); + if(!replicationDetailsMap.containsKey(key)) { + return null; + } + + return replicationDetailsMap.get(key); + } + + public Map<String, String> getAdditionalInfo() { + return this.additionalInfo; + } + + public String getAdditionalInfo(String key) { + return additionalInfo.get(key); + } + + public String getQualifiedName() { + return qualifiedName; + } + + public void setQualifiedName(String qualifiedName) { + this.qualifiedName = qualifiedName; + } + + public void setUrls(List<String> urls) { + this.urls = urls; + } + + public List<String> getUrls() { + return this.urls; + } + + @Override + public StringBuilder toString(StringBuilder sb) { + sb.append(", name=").append(name); + sb.append(", qualifiedName=").append(getQualifiedName()); + sb.append(", urls=").append(urls); + sb.append(", additionalInfo=").append(additionalInfo); + sb.append("}"); + return sb; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java index 14a1f65..a5203c9 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java @@ -173,6 +173,22 @@ public class AtlasExportResult implements Serializable { metrics.put(key, currentValue + incrementBy); } + public AtlasExportResult shallowCopy() { + AtlasExportResult result = new AtlasExportResult(); + + result.setRequest(getRequest()); + result.setUserName(getUserName()); + result.setClientIpAddress(getClientIpAddress()); + result.setHostName(getHostName()); + result.setTimeStamp(getTimeStamp()); + result.setMetrics(getMetrics()); + result.setOperationStatus(getOperationStatus()); + result.setSourceClusterName(getSourceClusterName()); + result.setLastModifiedTimestamp(getLastModifiedTimestamp()); + + return result; + } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java index b97cbb3..30e93d5 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -56,6 +56,7 @@ public class AtlasImportResult { private Map<String, Integer> metrics; private List<String> processedEntities; private OperationStatus operationStatus; + private AtlasExportResult exportResultWithoutData; public AtlasImportResult() { this(null, null, null, null, System.currentTimeMillis()); @@ -143,6 +144,14 @@ public class AtlasImportResult { public List<String> getProcessedEntities() { return this.processedEntities; } + public AtlasExportResult getExportResult() { + return exportResultWithoutData; + } + + public void setExportResult(AtlasExportResult exportResult) { + this.exportResultWithoutData = exportResult.shallowCopy(); + } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 6a3fbec..467d383 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -22,19 +22,24 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.Constants; import org.apache.atlas.type.AtlasType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import javax.inject.Inject; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,7 +60,9 @@ public class AuditsWriter { this.auditService = auditService; } - public void write(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entityCreationOrder) throws AtlasBaseException { + public void write(String userName, AtlasExportResult result, + long startTime, long endTime, + List<String> entityCreationOrder) throws AtlasBaseException { auditForExport.add(userName, result, startTime, endTime, entityCreationOrder); } @@ -67,15 +74,17 @@ public class AuditsWriter { return options.containsKey(replicatedKey); } - private void updateReplicationAttribute(boolean isReplicationSet, String clusterName, + private void updateReplicationAttribute(boolean isReplicationSet, + String clusterName, List<String> exportedGuids, - String attrNameReplicated) throws AtlasBaseException { - if (!isReplicationSet) { + String attrNameReplicated, + long lastModifiedTimestamp) throws AtlasBaseException { + if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) { return; } - AtlasCluster cluster = saveCluster(clusterName); - clusterService.updateEntityWithCluster(cluster, exportedGuids, attrNameReplicated); + AtlasCluster cluster = saveCluster(clusterName, exportedGuids.get(0), lastModifiedTimestamp); + clusterService.updateEntitiesWithCluster(cluster, exportedGuids, attrNameReplicated); } private String getClusterNameFromOptions(Map options, String key) { @@ -84,27 +93,14 @@ public class AuditsWriter { : ""; } - private void addAuditEntry(String userName, String sourceCluster, String targetCluster, String operation, - String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException { - if(!hasData) return; - - ExportImportAuditEntry entry = new ExportImportAuditEntry(); - - entry.setUserName(userName); - entry.setSourceClusterName(sourceCluster); - entry.setTargetClusterName(targetCluster); - entry.setOperation(operation); - entry.setResultSummary(result); - entry.setStartTime(startTime); - entry.setEndTime(endTime); - - auditService.save(entry); - LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(), - entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation()); + private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException { + AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); + return clusterService.save(cluster); } - private AtlasCluster saveCluster(String clusterName) throws AtlasBaseException { + private AtlasCluster saveCluster(String clusterName, String entityGuid, long lastModifiedTimestamp) throws AtlasBaseException { AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); + cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp); return clusterService.save(cluster); } @@ -128,22 +124,25 @@ public class AuditsWriter { public void add(String userName, AtlasExportResult result, long startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException { optionKeyReplicatedTo = AtlasExportRequest.OPTION_KEY_REPLICATED_TO; request = result.getRequest(); - cluster = saveCluster(getCurrentClusterName()); replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo); targetClusterName = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo); - addAuditEntry(userName, - cluster.getName(), targetClusterName, + cluster = saveCluster(getCurrentClusterName()); + + auditService.add(userName, getCurrentClusterName(), targetClusterName, ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); - updateReplicationAttributeForExport(entitityGuids, request); + updateReplicationAttributeForExport(request, entitityGuids); } - private void updateReplicationAttributeForExport(List<String> entityGuids, AtlasExportRequest request) throws AtlasBaseException { - if(!replicationOptionState) return; + private void updateReplicationAttributeForExport(AtlasExportRequest request, List<String> entityGuids) throws AtlasBaseException { + if(!replicationOptionState) { + return; + } - updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER); + updateReplicationAttribute(replicationOptionState, targetClusterName, + entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 0L); } } @@ -159,12 +158,13 @@ public class AuditsWriter { request = result.getRequest(); optionKeyReplicatedFrom = AtlasImportRequest.OPTION_KEY_REPLICATED_FROM; replicationOptionState = isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom); - cluster = saveCluster(getClusterNameFromOptionsState()); - String sourceCluster = getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom); - addAuditEntry(userName, - sourceCluster, cluster.getName(), - ExportImportAuditEntry.OPERATION_EXPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); + String sourceCluster = getClusterNameFromOptionsState(); + cluster = saveCluster(sourceCluster); + + auditService.add(userName, + sourceCluster, getCurrentClusterName(), + ExportImportAuditEntry.OPERATION_IMPORT, AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty()); updateReplicationAttributeForImport(entitityGuids); } @@ -173,13 +173,17 @@ public class AuditsWriter { if(!replicationOptionState) return; String targetClusterName = cluster.getName(); - updateReplicationAttribute(replicationOptionState, targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER); + + updateReplicationAttribute(replicationOptionState, targetClusterName, + entityGuids, + Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER, + result.getExportResult().getLastModifiedTimestamp()); } private String getClusterNameFromOptionsState() { return replicationOptionState ? getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom) - : getCurrentClusterName(); + : ""; } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java index ab3333d..5da4b75 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java @@ -21,12 +21,18 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,21 +47,24 @@ public class ClusterService { private final DataAccess dataAccess; private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityGraphRetriever; @Inject - public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore) { + public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever) { this.dataAccess = dataAccess; this.entityStore = entityStore; + this.typeRegistry = typeRegistry; + this.entityGraphRetriever = entityGraphRetriever; } - public AtlasCluster get(AtlasCluster cluster) { + public AtlasCluster get(AtlasCluster cluster) throws AtlasBaseException { try { return dataAccess.load(cluster); } catch (AtlasBaseException e) { LOG.error("dataAccess", e); + throw e; } - - return null; } @GraphTransaction @@ -68,14 +77,15 @@ public class ClusterService { } @GraphTransaction - public void updateEntityWithCluster(AtlasCluster cluster, List<String> guids, String attributeName) throws AtlasBaseException { - if(cluster != null && StringUtils.isEmpty(cluster.getGuid())) return; + public void updateEntitiesWithCluster(AtlasCluster cluster, List<String> entityGuids, String attributeName) throws AtlasBaseException { + if (cluster != null && StringUtils.isEmpty(cluster.getGuid())) { + return; + } AtlasObjectId objectId = getObjectId(cluster); - for (String guid : guids) { + for (String guid : entityGuids) { AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(guid); updateAttribute(entityWithExtInfo, attributeName, objectId); - entityStore.createOrUpdate(new AtlasEntityStream(entityWithExtInfo), true); } } @@ -88,33 +98,46 @@ public class ClusterService { * Attribute passed by name is updated with the value passed. * @param entityWithExtInfo Entity to be updated * @param propertyName attribute name - * @param value Value to be set for attribute + * @param objectId Value to be set for attribute */ - private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, String propertyName, Object value) { + private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, + String propertyName, + AtlasObjectId objectId) { + String value = EntityGraphMapper.getSoftRefFormattedValue(objectId); updateAttribute(entityWithExtInfo.getEntity(), propertyName, value); for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { updateAttribute(e, propertyName, value); } } - private void updateAttribute(AtlasEntity e, String propertyName, Object value) { - if(e.hasAttribute(propertyName) == false) return; - - Object oVal = e.getAttribute(propertyName); - if (oVal != null && !(oVal instanceof List)) return; - - List list; + private void updateAttribute(AtlasEntity entity, String attributeName, Object value) { + if(entity.hasAttribute(attributeName) == false) return; - if (oVal == null) { - list = new ArrayList(); - } else { - list = (List) oVal; + try { + AtlasVertex vertex = entityGraphRetriever.getEntityVertex(entity.getGuid()); + if(vertex == null) { + return; + } + + String qualifiedFieldName = getVertexPropertyName(entity, attributeName); + List list = vertex.getListProperty(qualifiedFieldName); + if (CollectionUtils.isEmpty(list)) { + list = new ArrayList(); + } + + if (!list.contains(value)) { + list.add(value); + vertex.setListProperty(qualifiedFieldName, list); + } } - - if (!list.contains(value)) { - list.add(value); + catch (AtlasBaseException ex) { + LOG.error("error retrieving vertex from guid: {}", entity.getGuid(), ex); } + } - e.setAttribute(propertyName, list); + private String getVertexPropertyName(AtlasEntity entity, String attributeName) throws AtlasBaseException { + AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); + AtlasStructType.AtlasAttribute attribute = type.getAttribute(attributeName); + return attribute.getVertexPropertyName(); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java index f7e32dc..89b1110 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java @@ -25,14 +25,19 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.repository.ogm.DataAccess; import org.apache.atlas.repository.ogm.ExportImportAuditEntryDTO; +import org.apache.cassandra.cql3.statements.Restriction; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; import javax.inject.Inject; import java.util.ArrayList; +import java.util.List; +import java.util.Set; @AtlasService public class ExportImportAuditService { @@ -59,17 +64,43 @@ public class ExportImportAuditService { return dataAccess.load(entry); } - public AtlasSearchResult get(String userName, String operation, String sourceCluster, String targetCluster, - String startTime, String endTime, - int limit, int offset) throws AtlasBaseException { + public List<ExportImportAuditEntry> get(String userName, String operation, String cluster, + String startTime, String endTime, + int limit, int offset) throws AtlasBaseException { SearchParameters.FilterCriteria criteria = new SearchParameters.FilterCriteria(); - criteria.setCriterion(new ArrayList<SearchParameters.FilterCriteria>()); + criteria.setCondition(SearchParameters.FilterCriteria.Condition.AND); + criteria.setCriterion(new ArrayList<>()); - addSearchParameters(criteria, userName, operation, sourceCluster, targetCluster, startTime, endTime); + addSearchParameters(criteria, userName, operation, cluster, startTime, endTime); SearchParameters searchParameters = getSearchParameters(limit, offset, criteria); + searchParameters.setAttributes(getAuditEntityAttributes()); - return discoveryService.searchWithParameters(searchParameters); + AtlasSearchResult result = discoveryService.searchWithParameters(searchParameters); + return toExportImportAuditEntry(result); + } + + private Set<String> getAuditEntityAttributes() { + return ExportImportAuditEntryDTO.getAttributes(); + } + + private List<ExportImportAuditEntry> toExportImportAuditEntry(AtlasSearchResult result) { + List<ExportImportAuditEntry> ret = new ArrayList<>(); + if(CollectionUtils.isEmpty(result.getEntities())) { + return ret; + } + + for (AtlasEntityHeader entityHeader : result.getEntities()) { + ExportImportAuditEntry entry = ExportImportAuditEntryDTO.from(entityHeader.getGuid(), + entityHeader.getAttributes()); + if(entry == null) { + continue; + } + + ret.add(entry); + } + + return ret; } private SearchParameters getSearchParameters(int limit, int offset, SearchParameters.FilterCriteria criteria) { @@ -78,46 +109,64 @@ public class ExportImportAuditService { searchParameters.setEntityFilters(criteria); searchParameters.setLimit(limit); searchParameters.setOffset(offset); + return searchParameters; } - private void addSearchParameters(SearchParameters.FilterCriteria criteria, - String userName, String operation, String sourceCluster, String targetCluster, - String startTime, String endTime) { - + private void addSearchParameters(SearchParameters.FilterCriteria criteria, String userName, String operation, + String cluster, String startTime, String endTime) { addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_USER_NAME, userName); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_OPERATION, operation); - addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, sourceCluster); - addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, targetCluster); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_START_TIME, startTime); addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_END_TIME, endTime); + + addClusterFilterCriteria(criteria, cluster); } - private void addParameterIfValueNotEmpty(SearchParameters.FilterCriteria criteria, - String attributeName, String value) { - if(StringUtils.isEmpty(value)) return; + private void addClusterFilterCriteria(SearchParameters.FilterCriteria parentCriteria, String cluster) { + if (StringUtils.isEmpty(cluster)) { + return; + } - boolean isFirstCriteria = criteria.getAttributeName() == null; - SearchParameters.FilterCriteria cx = isFirstCriteria - ? criteria - : new SearchParameters.FilterCriteria(); + SearchParameters.FilterCriteria criteria = new SearchParameters.FilterCriteria(); + criteria.setCondition(SearchParameters.FilterCriteria.Condition.OR); + criteria.setCriterion(new ArrayList<>()); - setCriteria(cx, attributeName, value); + addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, cluster); + addParameterIfValueNotEmpty(criteria, ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, cluster); - if(isFirstCriteria) { - cx.setCondition(SearchParameters.FilterCriteria.Condition.AND); - } + parentCriteria.getCriterion().add(criteria); + } - if(!isFirstCriteria) { - criteria.getCriterion().add(cx); + private void addParameterIfValueNotEmpty(SearchParameters.FilterCriteria criteria, String attributeName, String value) { + if(StringUtils.isEmpty(value)) { + return; } + + SearchParameters.FilterCriteria filterCriteria = new SearchParameters.FilterCriteria(); + filterCriteria.setAttributeName(attributeName); + filterCriteria.setAttributeValue(value); + filterCriteria.setOperator(SearchParameters.Operator.EQ); + + criteria.getCriterion().add(filterCriteria); } - private SearchParameters.FilterCriteria setCriteria(SearchParameters.FilterCriteria criteria, String attributeName, String value) { - criteria.setAttributeName(attributeName); - criteria.setAttributeValue(value); - criteria.setOperator(SearchParameters.Operator.EQ); + public void add(String userName, String sourceCluster, String targetCluster, String operation, + String result, long startTime, long endTime, boolean hasData) throws AtlasBaseException { + if(!hasData) return; + + ExportImportAuditEntry entry = new ExportImportAuditEntry(); + + entry.setUserName(userName); + entry.setSourceClusterName(sourceCluster); + entry.setTargetClusterName(targetCluster); + entry.setOperation(operation); + entry.setResultSummary(result); + entry.setStartTime(startTime); + entry.setEndTime(endTime); - return criteria; + save(entry); + LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: {}", entry.getUserName(), + entry.getSourceClusterName(), entry.getTargetClusterName(), entry.getOperation()); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 30dd8c1..97c2123 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -125,11 +125,11 @@ public class ExportService { context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); - auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); - clearContextData(context); + context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.incrementMeticsCounter("duration", duration); - context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); + auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); + clearContextData(context); context.sink.setResult(context.result); } @@ -194,9 +194,7 @@ public class ExportService { } private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> processObjectId({})", item); - } + debugLog("==> processObjectId({})", item); try { List<String> entityGuids = getStartingEntity(item, context); @@ -225,11 +223,16 @@ public class ExportService { return AtlasExportResult.OperationStatus.FAIL; } - if (LOG.isDebugEnabled()) { - LOG.debug("<== processObjectId({})", item); + debugLog("<== processObjectId({})", item); + return AtlasExportResult.OperationStatus.SUCCESS; + } + + private void debugLog(String s, Object... params) { + if (!LOG.isDebugEnabled()) { + return; } - return AtlasExportResult.OperationStatus.SUCCESS; + LOG.debug(s, params); } private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { @@ -330,9 +333,7 @@ public class ExportService { } private void processEntity(String guid, ExportContext context) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> processEntity({})", guid); - } + debugLog("==> processEntity({})", guid); if (!context.guidsProcessed.contains(guid)) { TraversalDirection direction = context.guidDirection.get(guid); @@ -358,9 +359,7 @@ public class ExportService { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== processEntity({})", guid); - } + debugLog("<== processEntity({})", guid); } private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { @@ -403,8 +402,8 @@ public class ExportService { for (TraversalDirection direction : directions) { String query = getQueryForTraversalDirection(direction); - if (LOG.isDebugEnabled()) { - LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); + if(LOG.isDebugEnabled()) { + debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); } context.bindings.clear(); @@ -433,8 +432,8 @@ public class ExportService { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + if(LOG.isDebugEnabled()) { + debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); } } } @@ -451,8 +450,8 @@ public class ExportService { } private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + if(LOG.isDebugEnabled()) { + debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); } String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); @@ -477,8 +476,9 @@ public class ExportService { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + if(LOG.isDebugEnabled()) { + debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", + entity.getGuid(), result.size(), context.guidsToProcess.size()); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 98ef389..8a184fa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; @@ -111,6 +112,16 @@ public class ImportService { updateTransformsWithSubTypes(importTransform); source.setImportTransform(importTransform); + + if(LOG.isDebugEnabled()) { + debugLog(" => transforms: {}", AtlasType.toJson(importTransform)); + } + } + + private void debugLog(String s, Object... params) { + if(!LOG.isDebugEnabled()) return; + + LOG.debug(s, params); } private void updateTransformsWithSubTypes(ImportTransforms importTransforms) throws AtlasBaseException { @@ -189,6 +200,7 @@ public class ImportService { endTimestamp = System.currentTimeMillis(); result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp)); + result.setExportResult(importSource.getExportResult()); auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java index 1b9305c..d68938b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java @@ -19,12 +19,25 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.commons.lang.StringUtils; +import java.util.ArrayList; +import java.util.List; + public abstract class ImportTransformer { private static final String TRANSFORMER_PARAMETER_SEPARATOR = "\\:"; + private static final String TRANSFORMER_NAME_ADD = "add"; + private static final String TRANSFORMER_NAME_CLEAR_ATTR = "clearAttrValue"; + private static final String TRANSFORMER_NAME_LOWERCASE = "lowercase"; + private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase"; + private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = "removeClassification"; + private static final String TRANSFORMER_NAME_REPLACE = "replace"; + private static final String TRANSFORMER_SET_DELETED = "setDeleted"; + private final String transformType; @@ -36,15 +49,26 @@ public abstract class ImportTransformer { if (StringUtils.isEmpty(key)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Invalid transformer-specification: {}.", transformerSpec); - } else if (key.equals("replace")) { + } else if (key.equals(TRANSFORMER_NAME_REPLACE)) { String toFindStr = (params == null || params.length < 2) ? "" : params[1]; String replaceStr = (params == null || params.length < 3) ? "" : params[2]; ret = new Replace(toFindStr, replaceStr); - } else if (key.equals("lowercase")) { + } else if (key.equals(TRANSFORMER_NAME_LOWERCASE)) { ret = new Lowercase(); - } else if (key.equals("uppercase")) { + } else if (key.equals(TRANSFORMER_NAME_UPPERCASE)) { ret = new Uppercase(); + } else if (key.equals(TRANSFORMER_NAME_REMOVE_CLASSIFICATION)) { + String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length); + ret = new RemoveClassification(name); + } else if (key.equals(TRANSFORMER_NAME_ADD)) { + String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length); + ret = new AddValueToAttribute(name); + } else if (key.equals(TRANSFORMER_NAME_CLEAR_ATTR)) { + String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length); + ret = new ClearAttributes(name); + } else if (key.equals(TRANSFORMER_SET_DELETED)) { + ret = new SetDeleted(); } else { throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec); } @@ -66,7 +90,7 @@ public abstract class ImportTransformer { private final String replaceStr; public Replace(String toFindStr, String replaceStr) { - super("replace"); + super(TRANSFORMER_NAME_REPLACE); this.toFindStr = toFindStr; this.replaceStr = replaceStr; @@ -77,7 +101,7 @@ public abstract class ImportTransformer { public String getReplaceStr() { return replaceStr; } @Override - public Object apply(Object o) throws AtlasBaseException { + public Object apply(Object o) { Object ret = o; if(o instanceof String) { @@ -90,7 +114,7 @@ public abstract class ImportTransformer { static class Lowercase extends ImportTransformer { public Lowercase() { - super("lowercase"); + super(TRANSFORMER_NAME_LOWERCASE); } @Override @@ -107,7 +131,7 @@ public abstract class ImportTransformer { static class Uppercase extends ImportTransformer { public Uppercase() { - super("uppercase"); + super(TRANSFORMER_NAME_UPPERCASE); } @Override @@ -121,4 +145,175 @@ public abstract class ImportTransformer { return ret; } } + + static class RemoveClassification extends ImportTransformer { + private final String classificationToBeRemoved; + + public RemoveClassification(String name) { + super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION); + + this.classificationToBeRemoved = name; + } + + @Override + public Object apply(Object o) { + if (!(o instanceof AtlasEntity)) { + return o; + } + + AtlasEntity entity = (AtlasEntity) o; + if(entity.getClassifications().size() == 0) { + return o; + } + + List<AtlasClassification> toRemove = null; + for (AtlasClassification classification : entity.getClassifications()) { + if (classification.getTypeName().equals(classificationToBeRemoved)) { + if (toRemove == null) { + toRemove = new ArrayList<AtlasClassification>(); + } + + + toRemove.add(classification); + + } + } + + if (toRemove != null) { + entity.getClassifications().removeAll(toRemove); + } + + return entity; + } + + @Override + public String toString() { + return String.format("%s=%s", "RemoveClassification", classificationToBeRemoved); + } + } + + static class AddValueToAttribute extends ImportTransformer { + private final String nameValuePair; + private String attrName; + private String attrValueRaw; + private Object attrValue; + + protected AddValueToAttribute(String nameValuePair) { + super(TRANSFORMER_NAME_ADD); + + this.nameValuePair = nameValuePair; + setAttrNameValue(this.nameValuePair); + } + + private void setAttrNameValue(String nameValuePair) { + String SEPARATOR_EQUALS = "="; + if(!nameValuePair.contains(SEPARATOR_EQUALS)) return; + + String splits[] = StringUtils.split(nameValuePair, SEPARATOR_EQUALS); + if(splits.length == 0) { + return; + } + + if(splits.length >= 1) { + attrName = splits[0]; + } + + if(splits.length >= 1) { + attrValueRaw = splits[1]; + } + + setAttrValue(attrValueRaw); + } + + private void setAttrValue(String attrValueRaw) { + final String type_prefix = "list:"; + + if(attrValueRaw.startsWith(type_prefix)) { + final String item = StringUtils.remove(attrValueRaw, type_prefix); + attrValue = new ArrayList<String>() {{ + add(item); + }}; + } else { + attrValue = attrValueRaw; + } + } + + @Override + public Object apply(Object o) { + if(o == null) { + return o; + } + + if(!(o instanceof AtlasEntity)) { + return o; + } + + AtlasEntity entity = (AtlasEntity) o; + Object attrExistingValue = entity.getAttribute(attrName); + if(attrExistingValue == null) { + entity.setAttribute(attrName, attrValue); + } else if(attrExistingValue instanceof List) { + List list = (List) attrExistingValue; + + if(attrValue instanceof List) { + list.addAll((List) attrValue); + } else { + list.add(attrValue); + } + } else { + entity.setAttribute(attrName, attrValueRaw); + } + + return entity; + } + } + + static class ClearAttributes extends ImportTransformer { + private String[] attrNames; + + protected ClearAttributes(String attrNames) { + super(TRANSFORMER_NAME_CLEAR_ATTR); + + this.attrNames = StringUtils.split(attrNames, ","); + } + + @Override + public Object apply(Object o) { + if (o == null) { + return o; + } + + if (!(o instanceof AtlasEntity)) { + return o; + } + + AtlasEntity entity = (AtlasEntity) o; + for (String attrName : attrNames) { + entity.setAttribute(attrName, null); + } + + return entity; + } + } + + static class SetDeleted extends ImportTransformer { + protected SetDeleted() { + super(TRANSFORMER_SET_DELETED); + } + + @Override + public Object apply(Object o) { + if (o == null) { + return o; + } + + if (!(o instanceof AtlasEntity)) { + return o; + } + + AtlasEntity entity = (AtlasEntity) o; + entity.setStatus(AtlasEntity.Status.DELETED); + return entity; + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java index 8a89884..3427bd6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java @@ -18,7 +18,7 @@ package org.apache.atlas.repository.ogm; -import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasCluster; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.type.AtlasTypeRegistry; import org.springframework.stereotype.Component; @@ -63,6 +63,7 @@ public class AtlasClusterDTO extends AbstractDataTransferObject<AtlasCluster> { entity.setAttribute(PROPERTY_CLUSTER_NAME, obj.getName()); entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName()); entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo()); + entity.setAttribute(PROPERTY_URLS, obj.getUrls()); return entity; } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java b/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java index bef7d05..f2df179 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java @@ -189,6 +189,26 @@ public class DataAccess { } + public <T extends AtlasBaseModelObject> T load(String guid, Class<? extends AtlasBaseModelObject> clazz) throws AtlasBaseException { + DataTransferObject<T> dto = (DataTransferObject<T>)dtoRegistry.get(clazz); + + AtlasEntityWithExtInfo entityWithExtInfo = null; + + if (StringUtils.isNotEmpty(guid)) { + entityWithExtInfo = entityStore.getById(guid); + } + + if(entityWithExtInfo == null) { + return null; + } + + return dto.from(entityWithExtInfo); + } + + public void deleteUsingGuid(String guid) throws AtlasBaseException { + entityStore.deleteById(guid); + } + public void delete(String guid) throws AtlasBaseException { Objects.requireNonNull(guid, "guid"); AtlasPerfTracer perf = null; http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java b/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java index c22d41f..fd19c80 100644 --- a/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java +++ b/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java @@ -19,14 +19,17 @@ package org.apache.atlas.repository.ogm; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.repository.Constants; import org.apache.atlas.type.AtlasTypeRegistry; import org.springframework.stereotype.Component; +import java.util.Arrays; +import java.util.HashSet; import javax.inject.Inject; import java.util.Map; +import java.util.Set; @Component public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<ExportImportAuditEntry> { @@ -40,30 +43,44 @@ public class ExportImportAuditEntryDTO extends AbstractDataTransferObject<Export public static final String PROPERTY_SOURCE_CLUSTER_NAME = "sourceClusterName"; public static final String PROPERTY_TARGET_CLUSTER_NAME = "targetClusterName"; + private static final Set<String> ATTRIBUTE_NAMES = new HashSet<>(Arrays.asList(PROPERTY_USER_NAME, + PROPERTY_OPERATION, PROPERTY_OPERATION_PARAMS, + PROPERTY_START_TIME, PROPERTY_END_TIME, + PROPERTY_RESULT_SUMMARY, + PROPERTY_SOURCE_CLUSTER_NAME, PROPERTY_TARGET_CLUSTER_NAME)); + @Inject public ExportImportAuditEntryDTO(AtlasTypeRegistry typeRegistry) { super(typeRegistry, ExportImportAuditEntry.class, Constants.INTERNAL_PROPERTY_KEY_PREFIX + ExportImportAuditEntry.class.getSimpleName()); } - @Override - public ExportImportAuditEntry from(AtlasEntity entity) { + public static Set<String> getAttributes() { + return ATTRIBUTE_NAMES; + } + + public static ExportImportAuditEntry from(String guid, Map<String,Object> attributes) { ExportImportAuditEntry entry = new ExportImportAuditEntry(); - setGuid(entry, entity); - entry.setUserName((String) entity.getAttribute(PROPERTY_USER_NAME)); - entry.setOperation((String) entity.getAttribute(PROPERTY_OPERATION)); - entry.setOperationParams((String) entity.getAttribute(PROPERTY_OPERATION_PARAMS)); - entry.setStartTime((long) entity.getAttribute(PROPERTY_START_TIME)); - entry.setEndTime((long) entity.getAttribute(PROPERTY_END_TIME)); - entry.setSourceClusterName((String) entity.getAttribute(PROPERTY_SOURCE_CLUSTER_NAME)); - entry.setTargetClusterName((String) entity.getAttribute(PROPERTY_TARGET_CLUSTER_NAME)); - entry.setResultSummary((String) entity.getAttribute(PROPERTY_RESULT_SUMMARY)); + entry.setGuid(guid); + entry.setUserName((String) attributes.get(PROPERTY_USER_NAME)); + entry.setOperation((String) attributes.get(PROPERTY_OPERATION)); + entry.setOperationParams((String) attributes.get(PROPERTY_OPERATION_PARAMS)); + entry.setStartTime((long) attributes.get(PROPERTY_START_TIME)); + entry.setEndTime((long) attributes.get(PROPERTY_END_TIME)); + entry.setSourceClusterName((String) attributes.get(PROPERTY_SOURCE_CLUSTER_NAME)); + entry.setTargetClusterName((String) attributes.get(PROPERTY_TARGET_CLUSTER_NAME)); + entry.setResultSummary((String) attributes.get(PROPERTY_RESULT_SUMMARY)); return entry; } @Override + public ExportImportAuditEntry from(AtlasEntity entity) { + return from(entity.getGuid(), entity.getAttributes()); + } + + @Override public ExportImportAuditEntry from(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { return from(entityWithExtInfo.getEntity()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index d58d18f..c08f4c4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -1909,4 +1909,12 @@ public class EntityGraphMapper { type.getNormalizedValueForUpdate(classification); } + + public static String getSoftRefFormattedValue(AtlasObjectId objectId) { + return getSoftRefFormattedString(objectId.getTypeName(), objectId.getGuid()); + } + + private static String getSoftRefFormattedString(String typeName, String resolvedGuid) { + return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 2a385e3..d9be2f7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.store.graph.v2; import com.fasterxml.jackson.core.type.TypeReference; +import jnr.ffi.annotations.In; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TimeBoundary; @@ -58,7 +59,9 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import javax.inject.Inject; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; @@ -92,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; - +@Component public final class EntityGraphRetriever { private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class); @@ -116,6 +119,7 @@ public final class EntityGraphRetriever { private final boolean ignoreRelationshipAttr; + @Inject public EntityGraphRetriever(AtlasTypeRegistry typeRegistry) { this(typeRegistry, false); } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java index c931e74..2e4481e 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java @@ -20,11 +20,10 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.TestModules; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.clusterinfo.AtlasCluster; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.repository.impexp.ClusterService; +import org.apache.atlas.model.impexp.AtlasCluster; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.repository.Constants; import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; @@ -35,18 +34,20 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; @Guice(modules = TestModules.TestOnlyModule.class) public class ClusterServiceTest { private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1"; private final String CLUSTER_NAME = "testCl1"; private final String TARGET_CLUSTER_NAME = "testCl2"; + private final String QUALIFIED_NAME_STOCKS = "stocks@cl1"; + private final String TYPE_HIVE_DB = "hive_db"; + private final String topLevelEntityGuid = "AAA-BBB-CCC"; @Inject private AtlasTypeDefStore typeDefStore; @@ -64,8 +65,8 @@ public class ClusterServiceTest { @Test public void saveAndRetrieveClusterInfo() throws AtlasBaseException { - AtlasCluster expected = getCluster(CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); - AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); + AtlasCluster expected = getCluster(CLUSTER_NAME + "_1", TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME); + AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME + "_1", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME); AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME); AtlasCluster actual = clusterService.save(expected); @@ -83,36 +84,38 @@ public class ClusterServiceTest { assertEquals(actual.getName(), expected.getName()); assertEquals(actual.getQualifiedName(), expected.getQualifiedName()); - assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION), - getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION)); - - assertEquals(getAdditionalInfo(actual, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP), - getAdditionalInfo(expected, TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP)); } - private AtlasCluster getCluster(String name, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { - AtlasCluster cluster = new AtlasCluster(name, name); + private AtlasCluster getCluster(String clusterName, String topLevelEntity, String operation, long nextModifiedTimestamp, String targetClusterName) { + AtlasCluster cluster = new AtlasCluster(clusterName, clusterName); + + Map<String, String> syncMap = new HashMap<>(); - Map<String, Object> syncMap = new HashMap<>(); + syncMap.put("topLevelEntity", topLevelEntity); syncMap.put("operation", operation); - syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp); + syncMap.put("nextModifiedTimestamp", Long.toString(nextModifiedTimestamp)); syncMap.put("targetCluster", targetClusterName); - String syncMapJson = AtlasType.toJson(syncMap); - String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntity); - cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson); + cluster.setAdditionalInfo(syncMap); + return cluster; } - private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String topLevelEntityName) { - String topLevelEntitySpecificKey = getTopLevelEntitySpecificKey(topLevelEntityName); - assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey)); + @Test + public void verifyAdditionalInfo() throws AtlasBaseException { + final long expectedLastModifiedTimestamp = 200L; + + AtlasCluster expectedCluster = new AtlasCluster(CLUSTER_NAME, CLUSTER_NAME); - String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey); - return AtlasType.fromJson(json, Map.class); - } + String qualifiedNameAttr = Constants.QUALIFIED_NAME.replace(ATTR_NAME_REFERENCEABLE, ""); + AtlasObjectId objectId = new AtlasObjectId(TYPE_HIVE_DB, qualifiedNameAttr, QUALIFIED_NAME_STOCKS); + expectedCluster.setAdditionalInfoRepl(topLevelEntityGuid, expectedLastModifiedTimestamp); + + AtlasCluster actualCluster = clusterService.save(expectedCluster); + assertEquals(actualCluster.getName(), expectedCluster.getName()); + + int actualModifiedTimestamp = (int) actualCluster.getAdditionalInfoRepl(topLevelEntityGuid); - private String getTopLevelEntitySpecificKey(String topLevelEntity) { - return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, topLevelEntity); + assertEquals(actualModifiedTimestamp, expectedLastModifiedTimestamp); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java index e019feb..13277a3 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java @@ -20,7 +20,6 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.TestModules; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasType; @@ -31,14 +30,16 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.IOException; +import java.util.List; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; @Guice(modules = TestModules.TestOnlyModule.class) -public class ExportImportAuditServiceTest { +public class ExportImportAuditServiceTest extends ExportImportTestBase { @Inject AtlasTypeRegistry typeRegistry; @@ -69,7 +70,7 @@ public class ExportImportAuditServiceTest { String target2 = "clx1"; ExportImportAuditEntry entry2 = saveAndGet(source2, ExportImportAuditEntry.OPERATION_EXPORT, target2); - Thread.sleep(1000); + pauseForIndexCreation(); ExportImportAuditEntry actualEntry = retrieveEntry(entry); ExportImportAuditEntry actualEntry2 = retrieveEntry(entry2); @@ -80,7 +81,7 @@ public class ExportImportAuditServiceTest { assertEquals(actualEntry.getOperation(), entry.getOperation()); } - @Test(enabled = false) + @Test public void numberOfSavedEntries_Retrieved() throws AtlasBaseException, InterruptedException { final String source1 = "cluster1"; final String target1 = "cly"; @@ -90,19 +91,20 @@ public class ExportImportAuditServiceTest { saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, target1); } - Thread.sleep(5000); - AtlasSearchResult results = auditService.get(source1, ExportImportAuditEntry.OPERATION_EXPORT, "", "", "", "", 10, 0); - assertEquals(results.getEntities().size(), MAX_ENTRIES); + pauseForIndexCreation(); + List<ExportImportAuditEntry> results = auditService.get("", + ExportImportAuditEntry.OPERATION_EXPORT, + "", "", "", 10, 0); + assertTrue(results.size() > 0); } - - private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException, InterruptedException { - Thread.sleep(5000); - AtlasSearchResult result = auditService.get(entry.getUserName(), entry.getOperation(), entry.getSourceClusterName(), - entry.getTargetClusterName(), Long.toString(entry.getStartTime()), "", 10, 0); + private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) throws AtlasBaseException { + List<ExportImportAuditEntry> result = auditService.get(entry.getUserName(), entry.getOperation(), + entry.getSourceClusterName(), + Long.toString(entry.getStartTime()), "", 10, 0); assertNotNull(result); - assertEquals(result.getEntities().size(), 1); - entry.setGuid(result.getEntities().get(0).getGuid()); + assertEquals(result.size(), 1); + entry.setGuid(result.get(0).getGuid()); return auditService.get(entry); } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java index 79fd308..4b253ff 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java @@ -22,7 +22,7 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.impexp.ExportImportAuditEntry; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; @@ -33,6 +33,7 @@ import org.testng.SkipException; import java.io.IOException; import java.util.Arrays; +import java.util.List; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel; @@ -76,21 +77,28 @@ public class ExportImportTestBase { } } - protected void assertAuditEntry(ExportImportAuditService auditService) { - AtlasSearchResult result = null; + protected void assertAuditEntry(ExportImportAuditService auditService) throws InterruptedException { + pauseForIndexCreation(); + List<ExportImportAuditEntry> result = null; try { - Thread.sleep(5000); - result = auditService.get("", "", "", "", "", "", 10, 0); + result = auditService.get("", "", "", "", "", 10, 0); } catch (Exception e) { - throw new SkipException("auditService.get: failed!"); + throw new SkipException("audit entries not retrieved."); } assertNotNull(result); - assertNotNull(result.getEntities()); - assertTrue(result.getEntities().size() > 0); + assertTrue(result.size() > 0); } private String getCurrentCluster() throws AtlasException { return ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, "default"); } + + protected void pauseForIndexCreation() { + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + throw new SkipException("pause interrupted."); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index f86a463..b2dcf44 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -60,9 +60,6 @@ public class ExportIncrementalTest extends ExportImportTestBase { ExportService exportService; @Inject - ClusterService clusterService; - - @Inject private AtlasEntityStoreV2 entityStore; private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; http://git-wip-us.apache.org/repos/asf/atlas/blob/03f2754d/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java index 7c62efb..c283258 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -108,7 +108,7 @@ public class ImportServiceTest extends ExportImportTestBase { } @AfterTest - public void postTest() { + public void postTest() throws InterruptedException { assertAuditEntry(auditService); }
