This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 79ef3cce1 ATLAS-4768: Implement aging for audits stored by Atlas. 79ef3cce1 is described below commit 79ef3cce1be4874c17b296aff8b200c9edb42bcb Author: radhikakundam <radhikakun...@apache.org> AuthorDate: Fri Sep 29 10:39:51 2023 -0700 ATLAS-4768: Implement aging for audits stored by Atlas. Signed-off-by: radhikakundam <radhikakun...@apache.org> --- .../main/java/org/apache/atlas/AtlasClientV2.java | 10 + .../org/apache/atlas/repository/Constants.java | 23 ++ .../java/org/apache/atlas/AtlasConfiguration.java | 18 +- .../atlas/model/audit/AuditReductionCriteria.java | 226 +++++++++++++ .../atlas/discovery/AtlasDiscoveryService.java | 18 + .../atlas/discovery/EntityDiscoveryService.java | 60 ++++ .../audit/AbstractStorageBasedAuditRepository.java | 11 +- .../audit/AtlasAuditReductionService.java | 363 +++++++++++++++++++++ .../repository/audit/EntityAuditRepository.java | 12 + .../audit/HBaseBasedAuditRepository.java | 158 ++++++++- .../audit/InMemoryEntityAuditRepository.java | 53 ++- .../audit/NoopEntityAuditRepository.java | 7 + .../repository/graph/GraphBackedSearchIndexer.java | 3 + .../tasks/AuditReductionEntityRetrievalTask.java | 231 +++++++++++++ .../store/graph/v2/tasks/AuditReductionTask.java | 150 +++++++++ .../graph/v2/tasks/AuditReductionTaskFactory.java | 113 +++++++ .../repository/audit/AuditRepositoryTestBase.java | 67 +++- .../resources/solr/core-template/solrconfig.xml | 2 +- .../atlas/web/filters/ActiveServerFilter.java | 2 +- .../apache/atlas/web/resources/AdminResource.java | 46 ++- .../atlas/web/resources/AdminResourceTest.java | 4 +- 21 files changed, 1543 insertions(+), 34 deletions(-) diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java index 6f97da192..6477b4091 100644 --- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java +++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java @@ -31,6 +31,7 @@ import com.sun.jersey.multipart.file.StreamDataBodyPart; import org.apache.atlas.bulkimport.BulkImportResponse; import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.audit.AtlasAuditEntry; +import org.apache.atlas.model.audit.AuditReductionCriteria; import org.apache.atlas.model.audit.AuditSearchParameters; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.discovery.AtlasQuickSearchResult; @@ -838,6 +839,14 @@ public class AtlasClientV2 extends AtlasBaseClient { }); } + public void ageoutAtlasAudits(AuditReductionCriteria auditReductionCriteria, boolean useAuditConfig) throws AtlasServiceException { + MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl(); + + queryParams.add("useAuditConfig", String.valueOf(useAuditConfig)); + + callAPI(API_V2.AGEOUT_ATLAS_AUDITS, List.class, auditReductionCriteria, queryParams); + } + // Glossary APIs public List<AtlasGlossary> getAllGlossaries(String sortByAttribute, int limit, int offset) throws AtlasServiceException { @@ -1269,6 +1278,7 @@ public class AtlasClientV2 extends AtlasBaseClient { // Admin APIs public static final API_V2 GET_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API, HttpMethod.POST, Response.Status.OK); + public static final API_V2 AGEOUT_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API + "ageout/", HttpMethod.POST, Response.Status.OK); // Glossary APIs public static final API_V2 GET_ALL_GLOSSARIES = new API_V2(GLOSSARY_URI, HttpMethod.GET, Response.Status.OK); diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 51b093284..2134133f5 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -239,6 +239,15 @@ public final class Constants { public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime"); public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime"); + /** + * Audit Reduction vertex property keys. + */ + public static final String AUDIT_REDUCTION_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "auditReduction_"; + public static final String PROPERTY_KEY_AUDIT_REDUCTION_NAME = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "name"); + public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "default"); + public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom"); + public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout"); + public static final String SQOOP_SOURCE = "sqoop"; public static final String FALCON_SOURCE = "falcon"; public static final String HBASE_SOURCE = "hbase"; @@ -248,6 +257,20 @@ public final class Constants { public static final String STORM_SOURCE = "storm"; public static final String FILE_SPOOL_SOURCE = "file_spool"; + /** + * Audit Reduction related constants + */ + public enum AtlasAuditAgingType { DEFAULT, CUSTOM, SWEEP } + public static final String AUDIT_REDUCTION_TYPE_NAME = "__auditReductionInfo"; + public static final String AUDIT_AGING_TYPE_KEY = "auditAgingType"; + public static final String AUDIT_AGING_TTL_KEY = "ttl"; + public static final String AUDIT_AGING_COUNT_KEY = "auditCount"; + public static final String AUDIT_AGING_ENTITY_TYPES_KEY = "entityTypes"; + public static final String AUDIT_AGING_ACTION_TYPES_KEY = "actionTypes"; + public static final String AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY = "excludeEntityTypes"; + public static final String CREATE_EVENTS_AGEOUT_ALLOWED_KEY = "createEventsAgeoutAllowed"; + public static final String AUDIT_AGING_SUBTYPES_INCLUDED_KEY = "subTypesIncluded"; + /* * All supported file-format extensions for Bulk Imports through file upload */ diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 58a2fa725..090889e1c 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -93,7 +93,23 @@ public enum AtlasConfiguration { SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1), UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true), METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days default - SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10 days default + SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240), //10 days default + + ATLAS_AUDIT_AGING_ENABLED("atlas.audit.aging.enabled", false), + ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED("atlas.audit.default.ageout.enabled", true), + ATLAS_AUDIT_DEFAULT_AGEOUT_TTL("atlas.audit.default.ageout.ttl.in.days", 90), + ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT("atlas.audit.default.ageout.count", 0), + ATLAS_AUDIT_CUSTOM_AGEOUT_TTL("atlas.audit.custom.ageout.ttl.in.days", 0), + ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT("atlas.audit.custom.ageout.count", 0), + ATLAS_AUDIT_SWEEP_OUT("atlas.audit.sweep.out.enabled", false), + ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED("atlas.audit.create.events.ageout.allowed", false), + ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY("atlas.audit.aging.scheduler.frequency.in.days", 30), + ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED("atlas.audit.aging.subtypes.included", true), + MIN_TTL_TO_MAINTAIN("atlas.audit.min.ttl.to.maintain", 7), + MIN_AUDIT_COUNT_TO_MAINTAIN("atlas.audit.min.count.to.maintain", 50), + ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT("atlas.audit.aging.search.maxlimit", 10000), + ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl", false), + ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation", false); //Only for test automation private static final Configuration APPLICATION_PROPERTIES; diff --git a/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java b/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java new file mode 100644 index 000000000..c956b3ac8 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.audit; + + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.Serializable; +import java.util.*; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AuditReductionCriteria implements Serializable { + private static final long serialVersionUID = 1L; + + private boolean auditAgingEnabled = false; + private boolean defaultAgeoutEnabled = false; + private boolean auditSweepoutEnabled = false; + private boolean createEventsAgeoutAllowed = false; + private boolean subTypesIncluded = true; + private boolean ignoreDefaultAgeoutTTL = false; + + private int defaultAgeoutAuditCount; + private int defaultAgeoutTTLInDays; + private int customAgeoutAuditCount; + private int customAgeoutTTLInDays; + + private String customAgeoutEntityTypes; + private String customAgeoutActionTypes; + + private String sweepoutEntityTypes; + private String sweepoutActionTypes; + + public boolean isAuditAgingEnabled() { + return auditAgingEnabled; + } + + public void setAuditAgingEnabled(boolean auditAgingEnabled) { + this.auditAgingEnabled = auditAgingEnabled; + } + + public boolean isDefaultAgeoutEnabled() { + return defaultAgeoutEnabled; + } + + public void setDefaultAgeoutEnabled(boolean defaultAgeoutEnabled) { + this.defaultAgeoutEnabled = defaultAgeoutEnabled; + } + + public boolean isAuditSweepoutEnabled() { + return auditSweepoutEnabled; + } + + public void setAuditSweepoutEnabled(boolean auditSweepoutEnabled) { + this.auditSweepoutEnabled = auditSweepoutEnabled; + } + + public boolean isCreateEventsAgeoutAllowed() { + return createEventsAgeoutAllowed; + } + + public void setCreateEventsAgeoutAllowed(boolean createEventsAgeoutAllowed) { + this.createEventsAgeoutAllowed = createEventsAgeoutAllowed; + } + + public boolean isSubTypesIncluded() { + return subTypesIncluded; + } + + public void setSubTypesIncluded(boolean subTypesIncluded) { + this.subTypesIncluded = subTypesIncluded; + } + + public boolean ignoreDefaultAgeoutTTL() { + return ignoreDefaultAgeoutTTL; + } + + public void setIgnoreDefaultAgeoutTTL(boolean ignoreDefaultAgeoutTTL) { + this.ignoreDefaultAgeoutTTL = ignoreDefaultAgeoutTTL; + } + + public int getDefaultAgeoutTTLInDays() { + return defaultAgeoutTTLInDays; + } + + public void setDefaultAgeoutTTLInDays(int defaultAgeoutTTLInDays) { + this.defaultAgeoutTTLInDays = defaultAgeoutTTLInDays; + } + + public int getDefaultAgeoutAuditCount() { + return defaultAgeoutAuditCount; + } + + public void setDefaultAgeoutAuditCount(int defaultAgeoutAuditCount) { + this.defaultAgeoutAuditCount = defaultAgeoutAuditCount; + } + + public int getCustomAgeoutTTLInDays() { + return customAgeoutTTLInDays; + } + + public void setCustomAgeoutTTLInDays(int customAgeoutTTLInDays) { + this.customAgeoutTTLInDays = customAgeoutTTLInDays; + } + + public int getCustomAgeoutAuditCount() { + return customAgeoutAuditCount; + } + + public void setCustomAgeoutAuditCount(int customAgeoutAuditCount) { + this.customAgeoutAuditCount = customAgeoutAuditCount; + } + + public String getCustomAgeoutEntityTypes() { + return customAgeoutEntityTypes; + } + + public void setCustomAgeoutEntityTypes(String customAgeoutEntityTypes) { + this.customAgeoutEntityTypes = customAgeoutEntityTypes; + } + + public String getCustomAgeoutActionTypes() { + return customAgeoutActionTypes; + } + + public void setCustomAgeoutActionTypes(String customAgeoutActionTypes) { + this.customAgeoutActionTypes = customAgeoutActionTypes; + } + + public String getSweepoutEntityTypes() { + return sweepoutEntityTypes; + } + + public void setSweepoutEntityTypes(String sweepoutEntityTypes) { + this.sweepoutEntityTypes = sweepoutEntityTypes; + } + + public String getSweepoutActionTypes() { + return sweepoutActionTypes; + } + + public void setSweepoutActionTypes(String sweepoutActionTypes) { + this.sweepoutActionTypes = sweepoutActionTypes; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AuditReductionCriteria that = (AuditReductionCriteria) o; + return auditAgingEnabled == that.auditAgingEnabled && + defaultAgeoutEnabled == that.defaultAgeoutEnabled && + auditSweepoutEnabled == that.auditSweepoutEnabled && + createEventsAgeoutAllowed == that.createEventsAgeoutAllowed && + subTypesIncluded == that.subTypesIncluded && + ignoreDefaultAgeoutTTL == that.ignoreDefaultAgeoutTTL && + defaultAgeoutAuditCount == that.defaultAgeoutAuditCount && + defaultAgeoutTTLInDays == that.defaultAgeoutTTLInDays && + customAgeoutAuditCount == that.customAgeoutAuditCount && + customAgeoutTTLInDays == that.customAgeoutTTLInDays && + Objects.equals(customAgeoutEntityTypes, that.customAgeoutEntityTypes) && + Objects.equals(customAgeoutActionTypes, that.customAgeoutActionTypes) && + Objects.equals(sweepoutEntityTypes, that.sweepoutEntityTypes) && + Objects.equals(sweepoutActionTypes, that.sweepoutActionTypes); + } + + @Override + public int hashCode() { + return Objects.hash(auditAgingEnabled, defaultAgeoutEnabled, auditSweepoutEnabled, createEventsAgeoutAllowed, subTypesIncluded, ignoreDefaultAgeoutTTL, defaultAgeoutAuditCount, defaultAgeoutTTLInDays, customAgeoutAuditCount, customAgeoutTTLInDays, + customAgeoutEntityTypes, customAgeoutActionTypes, sweepoutEntityTypes, sweepoutActionTypes); + } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append('{'); + sb.append("auditAgingEnabled='").append(auditAgingEnabled).append('\''); + sb.append(", createEventsAgeoutAllowed='").append(createEventsAgeoutAllowed).append('\''); + sb.append(", subTypesIncluded='").append(subTypesIncluded).append('\''); + sb.append(", defaultAgeoutEnabled='").append(defaultAgeoutEnabled).append('\''); + sb.append(", ignoreDefaultAgeoutTTL='").append(ignoreDefaultAgeoutTTL).append('\''); + sb.append(", defaultAgeoutTTLInDays='").append(defaultAgeoutTTLInDays).append('\''); + sb.append(", defaultAgeoutAuditCount='").append(defaultAgeoutAuditCount).append('\''); + sb.append(", auditSweepoutEnabled='").append(auditSweepoutEnabled).append('\''); + sb.append(", customAgeoutAuditCount='").append(customAgeoutAuditCount).append('\''); + sb.append(", customAgeoutTTLInDays='").append(customAgeoutTTLInDays).append('\''); + sb.append(", customAgeoutEntityTypes=").append(customAgeoutEntityTypes); + sb.append(", customAgeoutActionTypes=").append(customAgeoutActionTypes); + sb.append(", sweepoutEntityTypes=").append(sweepoutEntityTypes); + sb.append(", sweepoutActionTypes=").append(sweepoutActionTypes); + sb.append('}'); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } + + +} diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java index f8e55b886..acab4ce0a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java @@ -22,10 +22,13 @@ package org.apache.atlas.discovery; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.*; import org.apache.atlas.model.profile.AtlasUserSavedSearch; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; public interface AtlasDiscoveryService { /** @@ -79,6 +82,14 @@ public interface AtlasDiscoveryService { */ AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException; + /** + * Search for guids of entities matching the search criteria + * @param searchParameters Search criteria + * @return GUIDs of matching entities + * @throws AtlasBaseException + */ + Set<String> searchGUIDsWithParameters(AtlasAuditAgingType auditAgingType, Set<String> entityTypes, SearchParameters searchParameters) throws AtlasBaseException; + /** * Search for relations (edges) matching the search criteria * @param searchParameters Search criteria @@ -173,4 +184,11 @@ public interface AtlasDiscoveryService { * @throws IOException */ AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException; + + /** + * Creates task to age out audits + * @param taskParams parameters of AtlasTask + * @return Task created to perform audit aging + */ + AtlasTask createAndQueueAuditReductionTask(Map<String, Object> taskParams, String taskType) throws AtlasBaseException; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5b4395355..9be6517e9 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -50,6 +50,7 @@ import org.apache.atlas.query.executors.DSLQueryExecutor; import org.apache.atlas.query.executors.ScriptEngineBasedExecutor; import org.apache.atlas.query.executors.TraversalBasedExecutor; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasEdge; @@ -60,6 +61,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory; import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask; import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory; import org.apache.atlas.repository.userprofile.UserProfileService; @@ -467,6 +469,12 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return searchWithSearchContext(new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys())); } + @Override + @GraphTransaction + public Set<String> searchGUIDsWithParameters(AtlasAuditAgingType auditAgingType, Set<String> entityTypes, SearchParameters searchParameters) throws AtlasBaseException { + return searchEntityGUIDs(auditAgingType, entityTypes, new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys())); + } + @Override @GraphTransaction public void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException { @@ -509,6 +517,19 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return result; } + @Override + @GraphTransaction + public AtlasTask createAndQueueAuditReductionTask(Map<String, Object> taskParams, String taskType) throws AtlasBaseException { + List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(taskType); + if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() > AuditReductionTaskFactory.MAX_PENDING_TASKS_ALLOWED) { + throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS, String.valueOf(pendingTasks.size())); + } + AtlasTask task = taskManagement.createTask(taskType, RequestContext.getCurrentUser(), taskParams); + RequestContext.get().queueTask(task); + + return task; + } + @Override @GraphTransaction public AtlasSearchResult searchRelationsWithParameters(RelationshipSearchParameters searchParameters) throws AtlasBaseException { @@ -549,6 +570,45 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return ret; } + public Set<String> searchEntityGUIDs(AtlasAuditAgingType auditAgingType, Set<String> entityTypes, SearchContext searchContext) throws AtlasBaseException { + SearchParameters searchParameters = searchContext.getSearchParameters(); + final QueryParams params = QueryParams.getNormalizedParams(searchParameters.getLimit(),searchParameters.getOffset()); + String searchID = searchTracker.add(searchContext); // For future cancellations + + searchParameters.setLimit(params.limit()); + searchParameters.setOffset(params.offset()); + + Set<String> guids = new HashSet<>(); + try { + List<AtlasVertex> resultList = searchContext.getSearchProcessor().execute(); + do { + for (AtlasVertex atlasVertex : resultList) { + if (atlasVertex != null && checkVertexMatchesSearchCriteria(atlasVertex, auditAgingType, entityTypes)) { + guids.add(atlasVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class)); + } + } + searchParameters.setOffset(searchParameters.getOffset() + searchParameters.getLimit()); + resultList = searchContext.getSearchProcessor().execute(); + } while (CollectionUtils.isNotEmpty(resultList)); + LOG.info("Total {} entities are eligible for Audit aging", guids.size()); + } catch (Throwable t) { + LOG.error("Error while retrieving eligible entities for audit aging"); + } finally { + searchTracker.remove(searchID); + } + + return guids; + } + + private boolean checkVertexMatchesSearchCriteria(AtlasVertex vertex, AtlasAuditAgingType auditAgingType, Set<String> entityTypes) { + if (CollectionUtils.isEmpty(entityTypes)) { + return true; + } + String typeName = vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class); + boolean typeNameMatchesCriteria = entityTypes.contains(typeName); + return (auditAgingType == AtlasAuditAgingType.DEFAULT) ? !typeNameMatchesCriteria : typeNameMatchesCriteria; + } + private AtlasSearchResult searchWithSearchContext(SearchContext searchContext) throws AtlasBaseException { SearchParameters searchParameters = searchContext.getSearchParameters(); AtlasSearchResult ret = new AtlasSearchResult(searchParameters); diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java index 1aac37577..1b3dd478e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java @@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import org.apache.atlas.service.Service; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; @@ -31,10 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * This abstract base class should be used when adding support for an audit storage backend. @@ -162,4 +160,9 @@ public abstract class AbstractStorageBasedAuditRepository implements Service, En return Bytes.toBytes(keyStr); } + @Override + public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException { + return null; + } + } diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java new file mode 100644 index 000000000..3dcfe73fe --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasException; +import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.model.audit.AuditReductionCriteria; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.IntervalTask; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL; + +@Component +public class AtlasAuditReductionService implements SchedulingConfigurer { + private static final Logger LOG = LoggerFactory.getLogger(AtlasAuditReductionService.class); + + private final AtlasGraph graph; + private final AtlasDiscoveryService discoveryService; + private final AtlasTypeRegistry typeRegistry; + + private static final String VALUE_DELIMITER = ","; + + private static final String ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES = "atlas.audit.sweep.out.entity.types"; + private static final String ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES = "atlas.audit.sweep.out.action.types"; + + private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES = "atlas.audit.custom.ageout.entity.types"; + private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES = "atlas.audit.custom.ageout.action.types"; + + private static final String ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY = "atlas.audit.aging.scheduler.initial.delay.in.min"; + + private static final int MIN_TTL_TO_MAINTAIN = AtlasConfiguration.MIN_TTL_TO_MAINTAIN.getInt(); + private static final int MIN_AUDIT_COUNT_TO_MAINTAIN = AtlasConfiguration.MIN_AUDIT_COUNT_TO_MAINTAIN.getInt(); + + private Configuration atlasConfiguration; + private AuditReductionCriteria ageoutCriteriaByConfig; + private List<Map<String, Object>> ageoutTypeCriteriaMap; + + @Inject + public AtlasAuditReductionService(Configuration config, AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) { + this.atlasConfiguration = config; + this.graph = graph; + this.discoveryService = discoveryService; + this.typeRegistry = typeRegistry; + } + + public List<AtlasTask> startAuditAgingByConfig() { + List<AtlasTask> tasks = null; + try { + if (ageoutCriteriaByConfig == null) { + ageoutCriteriaByConfig = convertConfigToAuditReductionCriteria(); + LOG.info("Audit aging is enabled by configuration"); + } + LOG.info("Audit aging is triggered with configuration: {}", ageoutCriteriaByConfig.toString()); + + if (ageoutTypeCriteriaMap == null) { + ageoutTypeCriteriaMap = buildAgeoutCriteriaForAllAgingTypes(ageoutCriteriaByConfig); + } + + tasks = startAuditAgingByCriteria(ageoutTypeCriteriaMap); + + } catch (Exception e) { + LOG.error("Error while aging out audits by configuration: ", e); + } + return tasks; + } + + public List<AtlasTask> startAuditAgingByCriteria(List<Map<String, Object>> ageoutTypeCriteriaMap) { + if (CollectionUtils.isEmpty(ageoutTypeCriteriaMap)) { + return null; + } + List<AtlasTask> tasks = new ArrayList<>(); + try { + for (Map<String, Object> eachCriteria : ageoutTypeCriteriaMap) { + AtlasTask auditAgingTask = discoveryService.createAndQueueAuditReductionTask(eachCriteria, ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL); + if (auditAgingTask != null) { + tasks.add(auditAgingTask); + } + } + } catch (Exception e) { + LOG.error("Error while aging out audits by criteria: ", e); + } + return tasks; + } + + private AuditReductionCriteria convertConfigToAuditReductionCriteria() { + boolean auditAgingEnabled = AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean(); + boolean createAuditsAgeoutAllowed = AtlasConfiguration.ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED.getBoolean(); + boolean subTypesIncluded = AtlasConfiguration.ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED.getBoolean(); + boolean ignoreDefaultAgeoutTTL = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getBoolean(); + + int defaultAgeoutTTLConfigured = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt(); + int defaultAgeoutAuditCountConfigured = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT.getInt(); + + int customAgeoutTTLConfigured = AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL.getInt(); + int customAgeoutAuditCountConfigured = AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT.getInt(); + + boolean defaultAgeoutEnabled = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED.getBoolean(); + int defaultAgeoutTTL = getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL, defaultAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN); + int defaultAgeoutAuditCount = defaultAgeoutAuditCountConfigured <= 0 ? defaultAgeoutAuditCountConfigured + : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT, defaultAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN); + + int customAgeoutTTL = customAgeoutTTLConfigured <= 0 ? customAgeoutTTLConfigured + : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL, customAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN); + int customAgeoutAuditCount = customAgeoutAuditCountConfigured <= 0 ? customAgeoutAuditCountConfigured + : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT, customAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN); + String customAgeoutEntityTypes = getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES); + String customAgeoutActionTypes = getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES); + + AuditReductionCriteria auditReductionCriteria = new AuditReductionCriteria(); + + auditReductionCriteria.setAuditAgingEnabled(auditAgingEnabled); + auditReductionCriteria.setCreateEventsAgeoutAllowed(createAuditsAgeoutAllowed); + auditReductionCriteria.setSubTypesIncluded(subTypesIncluded); + auditReductionCriteria.setIgnoreDefaultAgeoutTTL(ignoreDefaultAgeoutTTL); + + auditReductionCriteria.setDefaultAgeoutEnabled(defaultAgeoutEnabled); + auditReductionCriteria.setDefaultAgeoutTTLInDays(defaultAgeoutTTL); + auditReductionCriteria.setDefaultAgeoutAuditCount(defaultAgeoutAuditCount); + + auditReductionCriteria.setCustomAgeoutTTLInDays(customAgeoutTTL); + auditReductionCriteria.setCustomAgeoutAuditCount(customAgeoutAuditCount); + auditReductionCriteria.setCustomAgeoutEntityTypes(customAgeoutEntityTypes); + auditReductionCriteria.setCustomAgeoutActionTypes(customAgeoutActionTypes); + + boolean isSweepOutEnabled = AtlasConfiguration.ATLAS_AUDIT_SWEEP_OUT.getBoolean(); + auditReductionCriteria.setAuditSweepoutEnabled(isSweepOutEnabled); + if (isSweepOutEnabled) { + String sweepoutEntityTypes = getStringOf(ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES); + String sweepoutActionTypes = getStringOf(ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES); + auditReductionCriteria.setSweepoutEntityTypes(sweepoutEntityTypes); + auditReductionCriteria.setSweepoutActionTypes(sweepoutActionTypes); + } + + return auditReductionCriteria; + } + + public List<Map<String, Object>> buildAgeoutCriteriaForAllAgingTypes(AuditReductionCriteria auditReductionCriteria) { + if (auditReductionCriteria == null || !auditReductionCriteria.isAuditAgingEnabled()) { + return null; + } + List<Map<String, Object>> auditAgeoutCriteriaByType = new ArrayList<>(); + Set<String> defaultAgeoutEntityTypesToExclude = new HashSet<>(); + Set<String> defaultAgeoutActionTypes = Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x -> x.toString()).collect(Collectors.toSet()); + + boolean createEventsAgeoutAllowed = auditReductionCriteria.isCreateEventsAgeoutAllowed(); + boolean subTypesIncluded = auditReductionCriteria.isSubTypesIncluded(); + boolean ignoreDefaultAgeoutTTL = auditReductionCriteria.ignoreDefaultAgeoutTTL(); + + boolean defaultAgeoutEnabled = auditReductionCriteria.isDefaultAgeoutEnabled(); + int defaultAgeoutTTL = ignoreDefaultAgeoutTTL ? 0 : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL, auditReductionCriteria.getDefaultAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN); + int defaultAgeoutAuditCount = auditReductionCriteria.getDefaultAgeoutAuditCount() <= 0 ? auditReductionCriteria.getDefaultAgeoutAuditCount() + : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT, auditReductionCriteria.getDefaultAgeoutAuditCount(), MIN_AUDIT_COUNT_TO_MAINTAIN); + + int customAgeoutTTL = auditReductionCriteria.getCustomAgeoutTTLInDays() <= 0 ? auditReductionCriteria.getCustomAgeoutTTLInDays() + : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL, auditReductionCriteria.getCustomAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN); + int customAgeoutAuditCount = auditReductionCriteria.getCustomAgeoutAuditCount() <= 0 ? auditReductionCriteria.getCustomAgeoutAuditCount() + : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT, auditReductionCriteria.getCustomAgeoutAuditCount(), MIN_AUDIT_COUNT_TO_MAINTAIN); + Set<String> customAgeoutEntityTypes = getUniqueListOf(auditReductionCriteria.getCustomAgeoutEntityTypes()); + Set<String> customAgeoutActionTypes = getValidActionTypes(AtlasAuditAgingType.CUSTOM, getUniqueListOf(auditReductionCriteria.getCustomAgeoutActionTypes())); + + defaultAgeoutEntityTypesToExclude.addAll(customAgeoutEntityTypes); + if (CollectionUtils.isEmpty(customAgeoutEntityTypes)) { + defaultAgeoutActionTypes.removeAll(customAgeoutActionTypes); + } + + boolean isSweepOutEnabled = auditReductionCriteria.isAuditSweepoutEnabled(); + if (isSweepOutEnabled) { + Set<String> sweepOutEntityTypes = getUniqueListOf(auditReductionCriteria.getSweepoutEntityTypes()); + Set<String> sweepOutActionTypes = getValidActionTypes(AtlasAuditAgingType.SWEEP, getUniqueListOf(auditReductionCriteria.getSweepoutActionTypes())); + + if (CollectionUtils.isNotEmpty(sweepOutEntityTypes) || CollectionUtils.isNotEmpty(sweepOutActionTypes)) { + Map<String, Object> sweepAgeoutCriteria = getAgeoutCriteriaMap(AtlasAuditAgingType.SWEEP, 0, 0, sweepOutEntityTypes, sweepOutActionTypes, createEventsAgeoutAllowed, subTypesIncluded); + auditAgeoutCriteriaByType.add(sweepAgeoutCriteria); + } else { + LOG.error("Sweepout of audits is skipped.At least one of two properties: entity types/action types should be configured."); + } + + defaultAgeoutEntityTypesToExclude.addAll(sweepOutEntityTypes); + customAgeoutEntityTypes.removeAll(sweepOutEntityTypes); + + if (CollectionUtils.isEmpty(sweepOutEntityTypes)) { + defaultAgeoutActionTypes.removeAll(sweepOutActionTypes); + customAgeoutActionTypes.removeAll(sweepOutActionTypes); + } + } + + if ((customAgeoutTTL > 0 || customAgeoutAuditCount > 0) && (CollectionUtils.isNotEmpty(customAgeoutEntityTypes) || CollectionUtils.isNotEmpty(customAgeoutActionTypes))) { + Map<String, Object> customAgeoutCriteria = getAgeoutCriteriaMap(AtlasAuditAgingType.CUSTOM, customAgeoutTTL, customAgeoutAuditCount, customAgeoutEntityTypes, customAgeoutActionTypes, createEventsAgeoutAllowed, subTypesIncluded); + auditAgeoutCriteriaByType.add(customAgeoutCriteria); + } else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0 && CollectionUtils.isEmpty(customAgeoutEntityTypes) && CollectionUtils.isEmpty(customAgeoutActionTypes)) { + //Do Nothing + } else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0) { + LOG.error("Custom Audit aging is skipped.At least one of two properties: TTL/Audit Count should be configured."); + } else { + LOG.error("Custom Audit aging is skipped.At least one of two properties: entity types/action types should be configured."); + } + + if (defaultAgeoutEnabled) { + if (ignoreDefaultAgeoutTTL) { + LOG.info("'{}' config property or 'ignoreDefaultAgeoutTTL' property in API configured as: {}, Default audit aging will be done by audit count only", AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getPropertyName(), ignoreDefaultAgeoutTTL); + } + + /**In case of default ageout with all available audit actions, query to ATLAS_ENTITY_AUDIT_EVENTS table + * without any action type provides data for all audit actions and is more performant than + * multiple queries to ATLAS_ENTITY_AUDIT_EVENTS with each action type + */ + if (defaultAgeoutActionTypes.size() == EntityAuditEventV2.EntityAuditActionV2.values().length) { + defaultAgeoutActionTypes.clear(); + } + if (!ignoreDefaultAgeoutTTL || defaultAgeoutAuditCount > 0) { + Map<String, Object> defaultAgeoutCriteria = getAgeoutCriteriaMap(AtlasAuditAgingType.DEFAULT, defaultAgeoutTTL, defaultAgeoutAuditCount, defaultAgeoutEntityTypesToExclude, defaultAgeoutActionTypes, createEventsAgeoutAllowed, subTypesIncluded); + auditAgeoutCriteriaByType.add(defaultAgeoutCriteria); + } else { + LOG.error("Default Audit aging is skipped. Valid audit count should be configured when TTL criteria is ignored."); + } + } + return auditAgeoutCriteriaByType; + } + + private Map<String, Object> getAgeoutCriteriaMap(AtlasAuditAgingType agingOption, int ttl, int minCount, Set<String> entityTypes, Set<String> actionTypes, boolean createEventsAgeoutAllowed, boolean subTypesIncluded) { + Map<String, Object> auditAgingOptions = new HashMap<>(); + auditAgingOptions.put(AUDIT_AGING_TYPE_KEY, agingOption); + auditAgingOptions.put(AUDIT_AGING_TTL_KEY, ttl); + auditAgingOptions.put(AUDIT_AGING_COUNT_KEY, minCount); + auditAgingOptions.put(AUDIT_AGING_ENTITY_TYPES_KEY, entityTypes); + auditAgingOptions.put(AUDIT_AGING_ACTION_TYPES_KEY, actionTypes); + auditAgingOptions.put(CREATE_EVENTS_AGEOUT_ALLOWED_KEY, createEventsAgeoutAllowed); + auditAgingOptions.put(AUDIT_AGING_SUBTYPES_INCLUDED_KEY, subTypesIncluded); + + return auditAgingOptions; + } + + private int getGuaranteedMinValueOf(AtlasConfiguration configuration, int configuredValue, int minValueToMaintain) { + if (configuredValue < minValueToMaintain) { + LOG.info("Minimum value for '{}' should be {}", configuration.getPropertyName(), minValueToMaintain); + } + return configuredValue < minValueToMaintain ? minValueToMaintain : configuredValue; + } + + private String getStringOf(String configProperty) { + String configuredValue = null; + + if (StringUtils.isNotEmpty(configProperty)) { + configuredValue = String.join(VALUE_DELIMITER , (List) atlasConfiguration.getList(configProperty)); + } + + return configuredValue; + } + + private Set<String> getUniqueListOf(String value) { + Set<String> configuredValues = null; + + if (StringUtils.isNotEmpty(value)) { + configuredValues = Stream.of(value.split(VALUE_DELIMITER)).map(String::trim).collect(Collectors.toSet()); + } + + return configuredValues == null ? new HashSet<>() : configuredValues; + } + + private Set<String> getValidActionTypes(AtlasAuditAgingType auditAgingType, Set<String> actionTypes) { + if (CollectionUtils.isEmpty(actionTypes)) { + return Collections.emptySet(); + } + Set<String> allActionTypes = Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x -> x.toString()).collect(Collectors.toSet()); + Set<String> entityAuditActions = new HashSet<>(); + Set<String> invalidActionTypes = new HashSet<>(); + + for (String actionType : actionTypes) { + Set<String> matchedActionTypes; + final String actionTypeToMatch = actionType.contains("*") ? actionType.replace("*", "") : actionType; + if (actionTypeToMatch.startsWith("*")) { + matchedActionTypes = allActionTypes.stream().filter(x -> x.contains(actionTypeToMatch)).collect(Collectors.toSet()); + } else { + matchedActionTypes = allActionTypes.stream().filter(x -> x.startsWith(actionTypeToMatch)).collect(Collectors.toSet()); + } + + if (CollectionUtils.isEmpty(matchedActionTypes)) { + invalidActionTypes.add(actionType); + } else { + entityAuditActions.addAll(matchedActionTypes); + } + } + + if (CollectionUtils.isNotEmpty(actionTypes) && CollectionUtils.isEmpty(entityAuditActions)) { + throw new IllegalArgumentException("No enum constant " + EntityAuditEventV2.EntityAuditActionV2.class.getCanonicalName() + "." + String.join(VALUE_DELIMITER, invalidActionTypes)); + } else { + LOG.info("Action type name(s) {} provided for aging type-{}", String.join(VALUE_DELIMITER, entityAuditActions), auditAgingType); + } + + if (CollectionUtils.isNotEmpty(invalidActionTypes)){ + LOG.warn("Invalid action type name(s) {} provided for aging type-{}", String.join(VALUE_DELIMITER, invalidActionTypes), auditAgingType); + } + + return entityAuditActions; + } + + @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + if (!AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean()) { + LOG.warn("Audit aging is not enabled"); + return; + } + IntervalTask task = new IntervalTask(new Runnable() { + @Override + public void run() { + startAuditAgingByConfig(); + } + }, getAuditAgingFrequencyInMillis(), getAuditAgingInitialDelayInMillis()); + + taskRegistrar.addFixedRateTask(task); + } + + private long getAuditAgingFrequencyInMillis() { + int frequencyInDays = getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY, AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY.getInt(), 1); + return frequencyInDays * DateUtils.MILLIS_PER_DAY; + } + + private long getAuditAgingInitialDelayInMillis() { + int initialDelayInMins = 1; + try { + initialDelayInMins = ApplicationProperties.get().getInt(ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY, 1); + } catch (AtlasException ex) { + LOG.error("Error while fetching application properties", ex); + } + return (initialDelayInMins < 1 ? 1 : initialDelayInMins) * DateUtils.MILLIS_PER_MINUTE; + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java index bad2a89fc..1fdacab29 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java @@ -22,6 +22,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import java.util.List; import java.util.Set; @@ -92,6 +93,17 @@ public interface EntityAuditRepository { */ List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException; + /** + * Delete all events for the given entity id by keeping only auditCount number of events with entityAuditActions + * @param entityId entity id + * @param entityAuditActions operation(s) to be used to filter at HBase column + * @param auditCount Max numbers of events to keep without deleting + * @param ttlInDays time-to-live of events + * @return list of events + * @throws AtlasBaseException + */ + List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException; + /*** * List events for given time range where classifications have been added, deleted or updated. * @param fromTimestamp from timestamp diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index a99f9b383..b7f0dd5e2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.audit; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.RequestContext; @@ -28,10 +29,12 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.DateUtils; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -48,6 +52,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -63,14 +68,18 @@ import org.springframework.core.annotation.Order; import javax.inject.Singleton; import java.io.Closeable; import java.io.IOException; +import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.HashSet; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; /** @@ -329,8 +338,13 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito @Override public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException { + return listEventsV2(entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit, false, true, false, null); + } + + private List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit, boolean isAgeoutTransaction, boolean createEventsAgeoutAllowed, boolean allowAgeoutByAuditCount, List<EntityAuditEventV2> eventsToKeep) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", entityId, auditAction, sortByColumn, offset, limit); + LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", + entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit); } AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2"); @@ -343,7 +357,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito offset = 0; } - if (limit < 0) { + if (!isAgeoutTransaction && limit < 0) { limit = 100; } @@ -356,19 +370,29 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito * MultiRowRangeFilter and then again scan the table to get all the columns from the table this time. */ Scan scan = new Scan().setReversed(true) - .setCaching(DEFAULT_CACHING) - .setSmall(true) - .setStopRow(Bytes.toBytes(entityId)) - .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE)) - .addColumn(COLUMN_FAMILY, COLUMN_ACTION) - .addColumn(COLUMN_FAMILY, COLUMN_USER); - + .setCaching(DEFAULT_CACHING) + .setSmall(true) + .setStopRow(Bytes.toBytes(entityId)) + .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE)) + .addColumn(COLUMN_FAMILY, COLUMN_ACTION) + .addColumn(COLUMN_FAMILY, COLUMN_USER); + FilterList filterList = new FilterList(); if (auditAction != null) { Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString()))); + filterList.addFilter(filterAction); + } - scan.setFilter(filterAction); + if (!createEventsAgeoutAllowed) { + FilterList createEventFilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + Filter filterByCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_CREATE.toString()))); + Filter filterByImportCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_IMPORT_CREATE.toString()))); + createEventFilterList.addFilter(filterByCreateActionType); + createEventFilterList.addFilter(filterByImportCreateActionType); + filterList.addFilter(createEventFilterList); } + scan.setFilter(filterList); + List<EntityAuditEventV2> events = new ArrayList<>(); try (ResultScanner scanner = table.getScanner(scan)) { @@ -383,10 +407,25 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito } EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc); + int fromIndex = Math.min(events.size(), offset); + int endIndex = events.size(); + if (limit > 0) { + endIndex = Math.min(events.size(), offset + limit); + } + + if (isAgeoutTransaction) { + if (!allowAgeoutByAuditCount) { //No audit events allowed to age-out by audit count + eventsToKeep.addAll(events); + return Collections.emptyList(); + } + eventsToKeep.addAll(events.subList(0, fromIndex)); + } + + events = events.subList(fromIndex, endIndex); - events = events.subList(Math.min(events.size(), offset), Math.min(events.size(), offset + limit)); if (events.size() > 0) { + List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>(); events.forEach(e -> { @@ -394,11 +433,11 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito }); scan = new Scan().setReversed(true) - .setCaching(DEFAULT_CACHING) - .setSmall(true) - .setStopRow(Bytes.toBytes(entityId)) - .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE)) - .setFilter(new MultiRowRangeFilter(ranges)); + .setCaching(DEFAULT_CACHING) + .setSmall(true) + .setStopRow(Bytes.toBytes(entityId)) + .setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE)) + .setFilter(new MultiRowRangeFilter(ranges)); try (ResultScanner scanner = table.getScanner(scan)) { events = new ArrayList<>(); @@ -426,7 +465,8 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito } if (LOG.isDebugEnabled()) { - LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}", entityId, auditAction, sortByColumn, offset, limit, events.size()); + LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #records returned {}", + entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit, events.size()); } return events; @@ -452,6 +492,88 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito return ret; } + @Override + public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short allowedAuditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException { + final String SORT_BY_COLUMN = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP; + final boolean SORT_ORDER_DESC = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("==> HBaseBasedAuditRepository.deleteEventsV2(entityId={}, auditActions={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", + entityId, Arrays.toString(entityAuditActions.toArray()), SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1); + } + + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEventsV2"); + + Table table = null; + List<EntityAuditEventV2> eventsEligibleForAgeout = new ArrayList<>(); + try { + table = connection.getTable(tableName); + List<EntityAuditEventV2> eventsToKeep = new ArrayList<>(); + boolean allowAgeoutByAuditCount = allowedAuditCount > 0 || (auditAgingType == AtlasAuditAgingType.SWEEP); + if (CollectionUtils.isEmpty(entityAuditActions)) { + eventsEligibleForAgeout.addAll(listEventsV2(entityId, null, SORT_BY_COLUMN, SORT_ORDER_DESC, + allowedAuditCount, (short) -1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep)); + } else { + for (EntityAuditActionV2 eachAuditAction : entityAuditActions) { + List<EntityAuditEventV2> eventsByEachAuditAction = listEventsV2(entityId, eachAuditAction, SORT_BY_COLUMN, SORT_ORDER_DESC, + allowedAuditCount, (short) -1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep); + + if (CollectionUtils.isNotEmpty(eventsByEachAuditAction)) { + eventsEligibleForAgeout.addAll(eventsByEachAuditAction); + } + } + } + + if (CollectionUtils.isNotEmpty(eventsToKeep)) { + //Limit events based on configured audit count by grouping events of all action types + if (allowAgeoutByAuditCount && (auditAgingType == AtlasAuditAgingType.DEFAULT || CollectionUtils.isEmpty(entityAuditActions))) { + LOG.debug("Aging out audit events by audit count for entity: {}", entityId); + EntityAuditEventV2.sortEvents(eventsToKeep, SORT_BY_COLUMN, SORT_ORDER_DESC); + if (allowedAuditCount < eventsToKeep.size()) { + eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount, eventsToKeep.size())); + eventsToKeep = eventsToKeep.subList(0, allowedAuditCount); + } + } + //TTL based aging + LocalDateTime now = LocalDateTime.now(); + boolean isTTLTestAutomation = AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean(); + if (ttlInDays > 0) { + LOG.debug("Aging out audit events by TTL for entity: {}", entityId); + long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation ? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime(); + eventsToKeep.forEach(e -> { + if (e.getTimestamp() < ttlTimestamp) { + eventsEligibleForAgeout.add(e); + } + }); + } + } + + List<Delete> eventsToDelete = new ArrayList<>(); + for (EntityAuditEventV2 event : eventsEligibleForAgeout) { + Delete delete = new Delete(Bytes.toBytes(event.getEventKey())); + eventsToDelete.add(delete); + } + + if (CollectionUtils.isNotEmpty(eventsToDelete)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting events from table:{} are {}", tableName, Arrays.toString(eventsToDelete.toArray())); + } + table.delete(eventsToDelete); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HBaseBasedAuditRepository.deleteEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): ", + entityId, Arrays.toString(entityAuditActions.toArray()), SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1); + } + } catch (IOException e) { + LOG.error("Failed deleting audit events for guid:{}", entityId); + } finally { + RequestContext.get().endMetricRecord(metric); + close(table); + } + return eventsEligibleForAgeout; + } + private <T> void addColumn(Put put, byte[] columnName, T columnValue) { if (columnValue != null && !columnValue.toString().isEmpty()) { put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString())); diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java index 51efff1d3..6d31e9727 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java @@ -23,10 +23,13 @@ import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import javax.inject.Singleton; +import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -103,17 +106,59 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { @Override public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException { + return listEventsV2(entityId, auditAction, sortByColumn, sortOrderDesc, 0, offset, limit, true, true); + } + + private List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int ttlInDays, int offset, short limit, boolean allowMaxResults, boolean createEventsAgeoutAllowed) throws AtlasBaseException { List<EntityAuditEventV2> events = new ArrayList<>(); SortedMap<String, EntityAuditEventV2> subMap = auditEventsV2.tailMap(entityId); for (EntityAuditEventV2 event : subMap.values()) { if (event.getEntityId().equals(entityId)) { - events.add(event); + if (auditAction == null || event.getAction() == auditAction) { + if (event.getAction() == EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE && !createEventsAgeoutAllowed) { + continue; + } + events.add(event); + } } } + + if (allowMaxResults && limit == -1) { + limit = (short) events.size(); + } EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc); - events = events.subList( - Math.min(events.size(), offset), - Math.min(events.size(), offset + limit)); + int fromIndex = Math.min(events.size(), offset); + int endIndex = Math.min(events.size(), offset + limit); + + List<EntityAuditEventV2> possibleExpiredEvents = events.subList(0, fromIndex); + + events = new ArrayList<>(events.subList(fromIndex, endIndex)); + + // This is only for Audit Aging, including expired audit events to result + if (CollectionUtils.isNotEmpty(possibleExpiredEvents) && ttlInDays > 0 ) { + LocalDateTime now = LocalDateTime.now(); + long ttlTimestamp = Timestamp.valueOf(now.minusDays(ttlInDays)).getTime(); + possibleExpiredEvents.removeIf(e -> (auditAction!= null && e.getAction() != auditAction) || e.getTimestamp() > ttlTimestamp); + if (CollectionUtils.isNotEmpty(possibleExpiredEvents)) { + events.addAll(possibleExpiredEvents); + } + } + return events; + } + + @Override + public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException { + List<EntityAuditEventV2> events = new ArrayList<>(); + if (CollectionUtils.isEmpty(entityAuditActions)) { + events = listEventsV2(entityId, null, "timestamp", true, ttlInDays, auditCount, (short) -1, true, createEventsAgeoutAllowed); + } else { + for (EntityAuditEventV2.EntityAuditActionV2 auditAction : entityAuditActions) { + List<EntityAuditEventV2> eventsByAction = listEventsV2(entityId, auditAction, "timestamp", true, ttlInDays, auditCount, (short) -1, true, createEventsAgeoutAllowed); + if (CollectionUtils.isNotEmpty(eventsByAction)) { + events.addAll(eventsByAction); + } + } + } return events; } diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java index c62bb7eaa..c357c0f59 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java @@ -18,10 +18,12 @@ package org.apache.atlas.repository.audit; +import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import org.springframework.stereotype.Component; import javax.inject.Singleton; @@ -67,6 +69,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository { return Collections.emptyList(); } + @Override + public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException { + return null; + } + @Override public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) { return Collections.emptyList(); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 5780f2ea0..01aa22f69 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -398,6 +398,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang // index recovery createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + // audit reduction + createCommonVertexIndex(management, PROPERTY_KEY_AUDIT_REDUCTION_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + //metrics createCommonVertexIndex(management," __AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); createCommonVertexIndex(management," __AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java new file mode 100644 index 000000000..e53416f6b --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.tasks; + +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.Constants.*; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.tasks.AbstractTask; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE; +import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED; +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; +import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP; +import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION; + + +public class AuditReductionEntityRetrievalTask extends AbstractTask { + private static final Logger LOG = LoggerFactory.getLogger(AuditReductionEntityRetrievalTask.class); + + private static final String VALUE_DELIMITER = ","; + + private final AtlasDiscoveryService discoveryService; + private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph graph; + + public AuditReductionEntityRetrievalTask(AtlasTask task, AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) { + super(task); + this.graph = graph; + this.discoveryService = discoveryService; + this.typeRegistry = typeRegistry; + } + + @Override + public AtlasTask.Status perform() throws Exception { + RequestContext.clear(); + Map<String, Object> params = getTaskDef().getParameters(); + + if (MapUtils.isEmpty(params)) { + LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid()); + return FAILED; + } + + String userName = getTaskDef().getCreatedBy(); + + if (StringUtils.isEmpty(userName)) { + LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid()); + + return FAILED; + } + + RequestContext.get().setUser(userName, null); + + try { + run(params); + + setStatus(COMPLETE); + } catch (Exception e) { + LOG.error("Task: {}: Error performing task!", getTaskGuid(), e); + + setStatus(FAILED); + + throw e; + } finally { + RequestContext.clear(); + } + return getStatus(); + } + + protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException, AtlasException { + try { + AtlasTask auditAgingTask = createAgingTaskWithEligibleGUIDs(parameters); + if (auditAgingTask != null) { + LOG.info("{} task created for audit aging type-{}", ATLAS_AUDIT_REDUCTION, parameters.get(AUDIT_AGING_TYPE_KEY)); + } + } catch (Exception e) { + LOG.error("Error while retrieving entities eligible for audit aging and creating audit aging tasks", e.getMessage()); + } + } + + protected AtlasTask createAgingTaskWithEligibleGUIDs(Map<String, Object> parameters) throws AtlasBaseException { + final String ALL_ENTITY_TYPES = "_ALL_ENTITY_TYPES"; + final int SEARCH_OFFSET = 0; + final int SEARCH_LIMIT = AtlasConfiguration.ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT.getInt(); + + Set<String> entityTypes = ((Collection<String>) parameters.get(AUDIT_AGING_ENTITY_TYPES_KEY)).stream().collect(Collectors.toSet()); + AtlasAuditAgingType auditAgingType = (AtlasAuditAgingType)parameters.get(AUDIT_AGING_TYPE_KEY); + boolean subTypesIncluded = (boolean)parameters.get(AUDIT_AGING_SUBTYPES_INCLUDED_KEY); + + SearchParameters searchEntitiesToReduceAudit = new SearchParameters(); + searchEntitiesToReduceAudit.setTypeName(ALL_ENTITY_TYPES); + searchEntitiesToReduceAudit.setOffset(SEARCH_OFFSET); + searchEntitiesToReduceAudit.setLimit(SEARCH_LIMIT); + searchEntitiesToReduceAudit.setIncludeSubTypes(subTypesIncluded); + + if (CollectionUtils.isNotEmpty(entityTypes)) { + if (!validateTypesAndIncludeSubTypes(entityTypes, auditAgingType, subTypesIncluded)) { + LOG.error("All entity type names provided for audit aging type-{} are invalid", auditAgingType); + return null; + } + + String queryString = String.join(VALUE_DELIMITER, entityTypes); + if (auditAgingType == AtlasAuditAgingType.DEFAULT && StringUtils.isNotEmpty(queryString)) { + queryString = new StringBuilder().append("!").append(queryString).toString(); + } + searchEntitiesToReduceAudit.setQuery(queryString); + } + + LOG.info("Getting GUIDs eligible for Audit aging type-{} with SearchParameters: {}", auditAgingType.toString(), searchEntitiesToReduceAudit.toString()); + + Set<String> guids = discoveryService.searchGUIDsWithParameters(auditAgingType, entityTypes, searchEntitiesToReduceAudit); + + AtlasVertex auditReductionVertex = getOrCreateVertex(); + + AtlasTask ageoutTask = updateVertexWithGuidsAndCreateAgingTask(auditReductionVertex, AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType), guids, parameters); + + /** For DEFAULT audit aging, "entityTypes" should be excluded from _ALL_ENTITY_TYPES i.e., negating the queryString + * Including AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY to indicate the same in AtlasTask response to user + */ + if (ageoutTask != null) { + if (auditAgingType == AtlasAuditAgingType.DEFAULT && CollectionUtils.isNotEmpty(entityTypes)) { + ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, true); + } else { + ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, false); + } + } + + return ageoutTask; + } + + private boolean validateTypesAndIncludeSubTypes(Set<String> entityTypes, AtlasAuditAgingType auditAgingType, boolean subTypesIncluded) throws AtlasBaseException { + Collection<String> allEntityTypeNames = typeRegistry.getAllEntityDefNames(); + Set<String> entityTypesToSearch = new HashSet<>(); + Set<String> invalidEntityTypeNames = new HashSet<>(); + + entityTypes.stream().forEach(entityType -> { + if (entityType.endsWith("*")) { + String suffix = entityType.replace("*", ""); + entityTypesToSearch.addAll(allEntityTypeNames.stream().filter(e -> e.startsWith(suffix)).collect(Collectors.toSet())); + } else if (allEntityTypeNames.contains(entityType)) { + entityTypesToSearch.add(entityType); + } else { + invalidEntityTypeNames.add(entityType); + } + }); + + if (auditAgingType != AtlasAuditAgingType.DEFAULT) { + if (CollectionUtils.isNotEmpty(invalidEntityTypeNames)) { + LOG.warn("Invalid entity type name(s) {} provided for aging type-{}", String.join(VALUE_DELIMITER, invalidEntityTypeNames), auditAgingType); + } + + if (CollectionUtils.isEmpty(entityTypesToSearch)) { + return false; + } + } + + entityTypes.clear(); + entityTypes.addAll(subTypesIncluded ? AtlasEntityType.getEntityTypesAndAllSubTypes(entityTypesToSearch, typeRegistry) : entityTypesToSearch); + + return true; + } + + @GraphTransaction + private AtlasTask updateVertexWithGuidsAndCreateAgingTask(AtlasVertex vertex, String vertexProperty, Set<String> guids, Map<String, Object> params) throws AtlasBaseException { + List<String> guidsEligibleForAuditReduction = vertex.getProperty(vertexProperty, List.class); + if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction) && CollectionUtils.isEmpty(guids)) { + return null; + } + + if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction)) { + guidsEligibleForAuditReduction = new ArrayList<>(); + } + + if (CollectionUtils.isNotEmpty(guids)) { + guidsEligibleForAuditReduction.addAll(guids); + setEncodedProperty(vertex, vertexProperty, guidsEligibleForAuditReduction); + } + + return discoveryService.createAndQueueAuditReductionTask(params, ATLAS_AUDIT_REDUCTION); + } + + private AtlasVertex getOrCreateVertex() { + + AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME); + Iterator<AtlasVertex> results = query.vertices().iterator(); + + AtlasVertex auditReductionVertex = results.hasNext() ? results.next() : null; + + if (auditReductionVertex == null) { + auditReductionVertex = graph.addVertex(); + setEncodedProperty(auditReductionVertex, PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME); + } + return auditReductionVertex; + } + +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java new file mode 100644 index 000000000..e8d7f6806 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.tasks; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; +import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.tasks.AbstractTask; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE; +import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED; +import static org.apache.atlas.repository.Constants.*; + +import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP; + + +public class AuditReductionTask extends AbstractTask { + private static final Logger LOG = LoggerFactory.getLogger(AuditReductionTask.class); + + private static final int GUID_BATCH_SIZE_PER_AGE_OUT_TASK = 100; + + private final EntityAuditRepository auditRepository; + private final AtlasGraph graph; + + public AuditReductionTask(AtlasTask task, EntityAuditRepository auditRepository, AtlasGraph graph) { + super(task); + this.auditRepository = auditRepository; + this.graph = graph; + } + + @Override + public AtlasTask.Status perform() throws Exception { + RequestContext.clear(); + Map<String, Object> params = getTaskDef().getParameters(); + + if (MapUtils.isEmpty(params)) { + LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid()); + return FAILED; + } + + String userName = getTaskDef().getCreatedBy(); + + if (StringUtils.isEmpty(userName)) { + LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid()); + + return FAILED; + } + + RequestContext.get().setUser(userName, null); + + try { + run(params); + + setStatus(COMPLETE); + } catch (Exception e) { + LOG.error("Task: {}: Error performing task!", getTaskGuid(), e); + + setStatus(FAILED); + + throw e; + } finally { + RequestContext.clear(); + } + return getStatus(); + } + + protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException, AtlasException { + AtlasVertex vertex = findVertex(); + + if (vertex == null) { + return; + } + + Map<String, List<EntityAuditEventV2>> entitiesWithSucceededAgeout = new HashMap<>(); + + AtlasAuditAgingType auditAgingType = AtlasAuditAgingType.valueOf(String.valueOf(parameters.get(AUDIT_AGING_TYPE_KEY))); + Set<String> actionTypes = ((Collection<String>) parameters.get(AUDIT_AGING_ACTION_TYPES_KEY)).stream().collect(Collectors.toSet()); + int auditCountInput = (int) parameters.get(AUDIT_AGING_COUNT_KEY); + short auditCount = auditCountInput > Short.MAX_VALUE ? Short.MAX_VALUE : auditCountInput < Short.MIN_VALUE ? Short.MIN_VALUE : (short)auditCountInput; + int ttl = (int) parameters.get(AUDIT_AGING_TTL_KEY); + boolean createEventsAgeoutAllowed = (boolean) parameters.get(CREATE_EVENTS_AGEOUT_ALLOWED_KEY); + String vertexPropertyKeyForGuids = AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType); + + List<String> entityGuidsEligibleForAuditAgeout = vertex.getProperty(vertexPropertyKeyForGuids, List.class); + int guidsCount = CollectionUtils.isNotEmpty(entityGuidsEligibleForAuditAgeout) ? entityGuidsEligibleForAuditAgeout.size() : 0; + int batchIndex = 1; + + Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions = actionTypes.stream().map(x -> EntityAuditEventV2.EntityAuditActionV2.fromString(x)).collect(Collectors.toSet()); + + for (int startIndex = 0; startIndex < guidsCount; ) { + int endIndex = startIndex + GUID_BATCH_SIZE_PER_AGE_OUT_TASK < guidsCount ? startIndex + GUID_BATCH_SIZE_PER_AGE_OUT_TASK : guidsCount; + List<String> guidsBatch = entityGuidsEligibleForAuditAgeout.subList(startIndex, endIndex); + + for (String guid : guidsBatch) { + List<EntityAuditEventV2> deletedAuditEvents = auditRepository.deleteEventsV2(guid, entityAuditActions, auditCount, ttl, createEventsAgeoutAllowed, auditAgingType); + entitiesWithSucceededAgeout.put(guid, deletedAuditEvents); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("{} Audit aging completed for batch-{} with guids: {}", auditAgingType.toString(), batchIndex, Arrays.toString(entitiesWithSucceededAgeout.keySet().toArray())); + } + + entitiesWithSucceededAgeout.clear(); + startIndex = endIndex; + batchIndex++; + List<String> remainingGuids = startIndex < guidsCount ? new ArrayList<>(entityGuidsEligibleForAuditAgeout.subList(startIndex, guidsCount)) : null; + vertex.setProperty(vertexPropertyKeyForGuids, remainingGuids); + } + + + } + + public AtlasVertex findVertex() { + AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME); + Iterator<AtlasVertex> results = query.vertices().iterator(); + + return results.hasNext() ? results.next() : null; + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java new file mode 100644 index 000000000..7e6aca1ab --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.tasks; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; +import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.tasks.AbstractTask; +import org.apache.atlas.tasks.TaskFactory; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.*; + +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT; +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM; +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_SWEEPOUT; + + +@Component +public class AuditReductionTaskFactory implements TaskFactory { + private static final Logger LOG = LoggerFactory.getLogger(AuditReductionTaskFactory.class); + + private static Configuration configuration; + public static final int MAX_PENDING_TASKS_ALLOWED; + private static final int MAX_PENDING_TASKS_ALLOWED_DEFAULT = 50; + private static final String MAX_PENDING_TASKS_ALLOWED_KEY = "atlas.audit.reduction.max.pending.tasks"; + public static final String ATLAS_AUDIT_REDUCTION = "ATLAS_AUDIT_REDUCTION"; + public static final String ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL = "AUDIT_REDUCTION_ENTITY_RETRIEVAL"; + + static { + try { + configuration = ApplicationProperties.get(); + } catch (Exception e) { + LOG.info("Failed to load application properties", e); + } + if (configuration != null) { + MAX_PENDING_TASKS_ALLOWED = configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY, MAX_PENDING_TASKS_ALLOWED_DEFAULT); + } else { + MAX_PENDING_TASKS_ALLOWED = MAX_PENDING_TASKS_ALLOWED_DEFAULT; + } + } + + private static final List<String> supportedTypes = new ArrayList<String>() {{ + add(ATLAS_AUDIT_REDUCTION); + add(ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL); + }}; + + public static final Map<AtlasAuditAgingType, String> AGING_TYPE_PROPERTY_KEY_MAP = new HashMap<AtlasAuditAgingType, String>() { + { + put(AtlasAuditAgingType.DEFAULT, PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT); + put(AtlasAuditAgingType.SWEEP, PROPERTY_KEY_GUIDS_TO_SWEEPOUT); + put(AtlasAuditAgingType.CUSTOM, PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM); + } + }; + + private final EntityAuditRepository auditRepository; + private final AtlasGraph graph; + private final AtlasDiscoveryService discoveryService; + private final AtlasTypeRegistry typeRegistry; + + @Inject + public AuditReductionTaskFactory(EntityAuditRepository auditRepository, AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) { + this.auditRepository = auditRepository; + this.graph = graph; + this.discoveryService = discoveryService; + this.typeRegistry = typeRegistry; + } + + @Override + public AbstractTask create(AtlasTask task) { + String taskType = task.getType(); + String taskGuid = task.getGuid(); + switch (taskType) { + case ATLAS_AUDIT_REDUCTION: + return new AuditReductionTask(task, auditRepository, graph); + + case ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL: + return new AuditReductionEntityRetrievalTask(task, graph, discoveryService, typeRegistry); + + default: + LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid); + return null; + } + } + + @Override + public List<String> getSupportedTypes() { + return this.supportedTypes; + } +} \ No newline at end of file diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java index 679df3c5c..24a8bda23 100644 --- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java @@ -22,15 +22,20 @@ import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.TestUtilsV2; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.Constants.AtlasAuditAgingType; import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.commons.lang.time.DateUtils; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.stream.Collectors; + +import static org.testng.Assert.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; public class AuditRepositoryTestBase { protected EntityAuditRepository eventRepository; @@ -203,4 +208,62 @@ public class AuditRepositoryTestBase { assertEquals(actual.getDetails(), expected.getDetails()); } + @Test + public void testDeleteEventsV2() throws Exception { + String id1 = "id1" + rand(); + int ttlInDays = 1; + long ts = System.currentTimeMillis() - (ttlInDays * DateUtils.MILLIS_PER_DAY); + AtlasEntity entity = new AtlasEntity(rand()); + + int j = 0; + List<EntityAuditEventV2> expectedEvents = new ArrayList<>(); + expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-a", EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details" + j, entity)); + expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-C", EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, entity)); + for (int i = 0; i < 5; i++) { + expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" + i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, entity)); + expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" + i, EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD, "details" + j, entity)); + } + expectedEvents.add(new EntityAuditEventV2(id1, ts+ j++, "User-b", EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j, entity)); + for(EntityAuditEventV2 event : expectedEvents) { + eventRepository.putEventsV2(event); + } + + List<EntityAuditEventV2> events = eventRepository.listEventsV2(id1, null, "timestamp", false, 0, (short) -1); + assertEquals(events.size(), 13); + assertEventV2Equals(events.get(0), expectedEvents.get(0)); + assertEventV2Equals(events.get(1), expectedEvents.get(1)); + + short expectedUpdateEventsCount = 2; + List<EntityAuditEventV2> deletedUpdateEvents = eventRepository.deleteEventsV2(id1, new HashSet<>(Arrays.asList(EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE)), expectedUpdateEventsCount , 0, false, AtlasAuditAgingType.CUSTOM); + List<EntityAuditEventV2> remainingEvents = events.stream().filter(x -> !deletedUpdateEvents.contains(x)).collect(Collectors.toList()); + List<EntityAuditEventV2> remainingUpdateEvents = remainingEvents.stream().filter(x -> x.getAction() == EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE).collect(Collectors.toList()); + + assertEquals(remainingUpdateEvents.size(), expectedUpdateEventsCount); + + short expectedEventsCount = 4; + List<EntityAuditEventV2> deletedEvents = eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, false, AtlasAuditAgingType.DEFAULT); + remainingEvents = events.stream().filter(x -> !deletedEvents.contains(x)).collect(Collectors.toList()); + + assertEquals(remainingEvents.size(), expectedEventsCount + 1); + assertEventV2Equals(remainingEvents.get(0), events.get(0)); + assertTrue(remainingEvents.contains(events.get(0))); + + List<EntityAuditEventV2> deletedEventsIncludingCreate = eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, true, AtlasAuditAgingType.DEFAULT); + remainingEvents = events.stream().filter(x -> !deletedEventsIncludingCreate.contains(x)).collect(Collectors.toList()); + + assertEquals(remainingEvents.size(), expectedEventsCount); + assertNotEquals(remainingEvents.get(3), events.get(0)); + assertFalse(remainingEvents.contains(events.get(0))); + + EntityAuditEventV2 latestEvent = new EntityAuditEventV2(id1, System.currentTimeMillis(), "User-b", EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j++, entity); + eventRepository.putEventsV2(latestEvent); + List<EntityAuditEventV2> allEvents = eventRepository.listEventsV2(id1, null, "timestamp", false, 0, (short) -1); + + List<EntityAuditEventV2> deletedEventsByTTL = eventRepository.deleteEventsV2(id1, null, expectedUpdateEventsCount , ttlInDays, true, AtlasAuditAgingType.DEFAULT); + assertEquals(deletedEventsByTTL.size(), allEvents.size() - 1); + + List<EntityAuditEventV2> remainingEventsByTTL = allEvents.stream().filter(x -> !deletedEventsByTTL.contains(x)).collect(Collectors.toList()); + assertEquals(remainingEventsByTTL.size(), 1); + assertEquals(latestEvent, remainingEventsByTTL.get(0)); + } } diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml b/test-tools/src/main/resources/solr/core-template/solrconfig.xml index 1550052b4..7a0e8dd16 100644 --- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml +++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml @@ -445,7 +445,7 @@ --> <lst name="defaults"> <str name="defType">edismax</str> - <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 1ssl_t 1udh_t 1wqt_t 4eth_t 4rgl_s 4pvp_s 4nid_s 4lxh_s 4p39_t 4wzp_t 4t1h_t 4umd_l 4vet_t 51qd_t 505h_t 53b9_t 5af9_t 5fyd_t 5kp1_t 5ibp_t 5j45_t 5hj9_t 5r0l_t 5q85_t 5y4l_l 5zph_l 5rt1_t 5xc5_t 68ed_l 69z9_l 66th_t 658l_t 6bk5_t 61ad_t 622t_t 63np_t 6ltx_t 6mmd_ [...] + <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 1ssl_t 1v5x_t 1wqt_t 1z45_t 4h6t_t 4ttx_s 4s91_s 4pvp_s 4oat_s 4rgl_t 4zd1_t 4vet_t 4wzp_l 4xs5_t 543p_t 52it_t 55ol_t 5csl_t 5ibp_t 5n2d_t 5kp1_t 5lhh_t 5jwl_t 5tdx_t 5slh_t 60hx_l 622t_l 5u6d_t 5zph_t 6arp_l 6ccl_l 696t_t 67lx_t 6dxh_t 63np_t 64g5_t 6611_t 6o79_ [...] <str name="hl.fl">*</str> <bool name="hl.requireFieldMatch">true</bool> <bool name="lowercaseOperators">true</bool> diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java index a25a51b5a..e19beccb1 100644 --- a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java +++ b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java @@ -111,7 +111,7 @@ public class ActiveServerFilter implements Filter { final String adminUriNotFiltered[] = { "/admin/export", "/admin/import", "/admin/importfile", "/admin/audits", "/admin/purge", "/admin/expimp/audit", "/admin/metrics", "/admin/server", "/admin/audit/", "admin/tasks", - "/admin/debug/metrics"}; + "/admin/debug/metrics", "/admin/audits/ageout"}; private boolean isFilteredURI(ServletRequest servletRequest) { HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest; String requestURI = httpServletRequest.getRequestURI(); diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index b19095b48..4d59fa3d6 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -31,6 +31,7 @@ import org.apache.atlas.discovery.SearchContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.AtlasAuditEntry; import org.apache.atlas.model.audit.AtlasAuditEntry.AuditOperation; +import org.apache.atlas.model.audit.AuditReductionCriteria; import org.apache.atlas.model.audit.AuditSearchParameters; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2; @@ -52,6 +53,7 @@ import org.apache.atlas.model.metrics.AtlasMetricsStat; import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches; import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.repository.audit.AtlasAuditService; +import org.apache.atlas.repository.audit.AtlasAuditReductionService; import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.impexp.AtlasServerService; import org.apache.atlas.repository.impexp.ExportImportAuditService; @@ -187,6 +189,8 @@ public class AdminResource { private final boolean isOnDemandLineageEnabled; private final int defaultLineageNodeCount; + private AtlasAuditReductionService auditReductionService; + static { try { atlasProperties = ApplicationProperties.get(); @@ -202,7 +206,7 @@ public class AdminResource { AtlasServerService serverService, ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore, AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository, - TaskManagement taskManagement, AtlasDebugMetricsSink debugMetricsRESTSink) { + TaskManagement taskManagement, AtlasDebugMetricsSink debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService) { this.serviceState = serviceState; this.metricsService = metricsService; this.exportService = exportService; @@ -219,6 +223,7 @@ public class AdminResource { this.auditRepository = auditRepository; this.taskManagement = taskManagement; this.debugMetricsRESTSink = debugMetricsRESTSink; + this.auditReductionService = atlasAuditReductionService; if (atlasProperties != null) { this.defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2); @@ -820,6 +825,45 @@ public class AdminResource { } } + @POST + @Path("/audits/ageout") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public List<AtlasTask> ageoutAuditData(AuditReductionCriteria auditReductionCriteria, @QueryParam("useAuditConfig") @DefaultValue("false") Boolean useAuditConfig) throws AtlasBaseException { + AtlasPerfTracer perf = null; + try { + AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_AUDITS), "Admin Audits Ageout"); + + if (useAuditConfig) { + return auditReductionService.startAuditAgingByConfig(); + } + + if (!auditReductionCriteria.isAuditAgingEnabled()) { + LOG.warn("Audit aging should be enabled"); + return null; + } + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.ageoutAuditData(" + auditReductionCriteria + ")"); + } + + updateCriteriaWithDefaultValues(auditReductionCriteria); + + List<Map<String, Object>> ageoutTypeCriteriaMap = auditReductionService.buildAgeoutCriteriaForAllAgingTypes(auditReductionCriteria); + + return auditReductionService.startAuditAgingByCriteria(ageoutTypeCriteriaMap); + + } finally { + AtlasPerfTracer.log(perf); + } + } + + private void updateCriteriaWithDefaultValues(AuditReductionCriteria auditReductionCriteria) { + if (auditReductionCriteria.getDefaultAgeoutTTLInDays() <= 0) { + auditReductionCriteria.setDefaultAgeoutTTLInDays(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt()); + } + } + @POST @Path("/audits") @Consumes(Servlets.JSON_MEDIA_TYPE) diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index a4d794615..5b16ba149 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -51,7 +51,7 @@ public class AdminResourceTest { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); assertEquals(response.getStatus(), HttpServletResponse.SC_OK); JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity()); @@ -62,7 +62,7 @@ public class AdminResourceTest { public void testResourceGetsValueFromServiceState() throws IOException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); verify(serviceState).getState();