This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 5086968 ATLAS-4254 : Basic Search : Optimize pagination 5086968 is described below commit 50869680e4cc5c27d80738101f2a122c52b0000f Author: Pinal <pinal-shah> AuthorDate: Mon May 10 21:55:10 2021 +0530 ATLAS-4254 : Basic Search : Optimize pagination Signed-off-by: Pinal <pinal-shah> --- .../atlas/model/discovery/AtlasSearchResult.java | 11 +- .../atlas/model/discovery/SearchParameters.java | 20 +- .../discovery/ClassificationSearchProcessor.java | 81 ++++-- .../atlas/discovery/EntityDiscoveryService.java | 5 + .../atlas/discovery/EntitySearchProcessor.java | 79 +++--- .../atlas/discovery/FreeTextSearchProcessor.java | 28 +- .../atlas/discovery/FullTextSearchProcessor.java | 25 +- .../org/apache/atlas/discovery/SearchContext.java | 91 ++++++- .../apache/atlas/discovery/SearchProcessor.java | 83 ++++-- .../atlas/discovery/TermSearchProcessor.java | 52 ++-- .../atlas/discovery/AtlasDiscoveryServiceTest.java | 301 ++++++++++++++------- .../ClassificationSearchProcessorTest.java | 57 ++++ .../org/apache/atlas/web/rest/DiscoveryREST.java | 4 +- 13 files changed, 603 insertions(+), 234 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index e1c550e..ce0f84b 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -53,6 +53,7 @@ public class AtlasSearchResult implements Serializable { private List<AtlasFullTextResult> fullTextResult; private Map<String, AtlasEntityHeader> referredEntities; private long approximateCount = -1; + private String nextMarker; public AtlasSearchResult() {} @@ -131,8 +132,12 @@ public class AtlasSearchResult implements Serializable { public void setApproximateCount(long approximateCount) { this.approximateCount = approximateCount; } + public String getNextMarker() { return nextMarker; } + + public void setNextMarker(String nextMarker) { this.nextMarker = nextMarker; } + @Override - public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); } + public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities, nextMarker); } @Override public boolean equals(Object o) { @@ -147,7 +152,8 @@ public class AtlasSearchResult implements Serializable { Objects.equals(entities, that.entities) && Objects.equals(attributes, that.attributes) && Objects.equals(fullTextResult, that.fullTextResult) && - Objects.equals(referredEntities, that.referredEntities); + Objects.equals(referredEntities, that.referredEntities) && + Objects.equals(nextMarker, that.nextMarker); } public void addEntity(AtlasEntityHeader newEntity) { @@ -190,6 +196,7 @@ public class AtlasSearchResult implements Serializable { ", fullTextResult=" + fullTextResult + ", referredEntities=" + referredEntities + ", approximateCount=" + approximateCount + + ", nextMarker=" + nextMarker + '}'; } diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java index 9d2cd4f..78fb4a4 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java @@ -52,6 +52,7 @@ public class SearchParameters implements Serializable { private boolean includeSubClassifications = true; private int limit; private int offset; + private String marker; private FilterCriteria entityFilters; private FilterCriteria tagFilters; @@ -216,6 +217,16 @@ public class SearchParameters implements Serializable { } /** + * @return marker (offset of the next page) + */ + public String getMarker() { return marker; } + + /** + * @param marker + */ + public void setMarker(String marker) { this.marker = marker; } + + /** * Entity attribute filters for the type (if type name is specified) * @return */ @@ -294,6 +305,8 @@ public class SearchParameters implements Serializable { SearchParameters that = (SearchParameters) o; return excludeDeletedEntities == that.excludeDeletedEntities && includeClassificationAttributes == that.includeClassificationAttributes && + includeSubTypes == that.includeSubTypes && + includeSubClassifications == that.includeSubClassifications && limit == that.limit && offset == that.offset && Objects.equals(query, that.query) && @@ -309,8 +322,9 @@ public class SearchParameters implements Serializable { @Override public int hashCode() { - return Objects.hash(query, typeName, classification, termName, excludeDeletedEntities, includeClassificationAttributes, - limit, offset, entityFilters, tagFilters, attributes, sortBy, sortOrder); + return Objects.hash(query, typeName, classification, termName, includeSubTypes, includeSubClassifications, + excludeDeletedEntities, includeClassificationAttributes, limit, offset, entityFilters, + tagFilters, attributes, sortBy, sortOrder); } public StringBuilder toString(StringBuilder sb) { @@ -323,6 +337,8 @@ public class SearchParameters implements Serializable { sb.append(", typeName='").append(typeName).append('\''); sb.append(", classification='").append(classification).append('\''); sb.append(", termName='").append(termName).append('\''); + sb.append(", includeSubTypes='").append(includeSubTypes).append('\''); + sb.append(", includeSubClassifications='").append(includeSubClassifications).append('\''); sb.append(", excludeDeletedEntities=").append(excludeDeletedEntities); sb.append(", includeClassificationAttributes=").append(includeClassificationAttributes); sb.append(", limit=").append(limit); diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java index 647ff9c..dfcc441 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java @@ -27,6 +27,7 @@ import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.util.SearchPredicateUtil; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.PredicateUtils; import org.apache.commons.lang3.StringUtils; @@ -212,26 +213,32 @@ public class ClassificationSearchProcessor extends SearchProcessor { } try { - final int startIdx = context.getSearchParameters().getOffset(); final int limit = context.getSearchParameters().getLimit(); + Integer marker = context.getMarker(); // query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could // have been dropped: like non-active-entities or duplicate-entities (same entity pointed to by multiple // classifications in the result) // // first 'startIdx' number of entries will be ignored - int qryOffset = 0; + //marker functionality will not work when there is need to fetch classificationVertices and get entities from it + if (indexQuery == null) { + marker = null; + } + // if marker is provided, start query with marker offset + int startIdx = marker != null ? marker : context.getSearchParameters().getOffset(); + int qryOffset = marker != null ? marker : 0; int resultIdx = qryOffset; - final Set<String> processedGuids = new HashSet<>(); - final List<AtlasVertex> entityVertices = new ArrayList<>(); - final List<AtlasVertex> classificationVertices = new ArrayList<>(); + final Set<String> processedGuids = new HashSet<>(); + LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>(); + final List<AtlasVertex> classificationVertices = new ArrayList<>(); final String sortBy = context.getSearchParameters().getSortBy(); final SortOrder sortOrder = context.getSearchParameters().getSortOrder(); for (; ret.size() < limit; qryOffset += limit) { - entityVertices.clear(); + offsetEntityVertexMap.clear(); classificationVertices.clear(); if (context.terminateSearch()) { @@ -251,12 +258,12 @@ public class ClassificationSearchProcessor extends SearchProcessor { queryResult = indexQuery.vertices(qryOffset, limit); } - getVerticesFromIndexQueryResult(queryResult, entityVertices); - isLastResultPage = entityVertices.size() < limit; + offsetEntityVertexMap = getVerticesFromIndexQueryResult(queryResult, offsetEntityVertexMap, qryOffset); + isLastResultPage = offsetEntityVertexMap.size() < limit; // Do in-memory filtering - CollectionUtils.filter(entityVertices, traitPredicate); - CollectionUtils.filter(entityVertices, isEntityPredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate); } else { if (classificationIndexQuery != null) { @@ -283,11 +290,14 @@ public class ClassificationSearchProcessor extends SearchProcessor { // Since tag filters are present, we need to collect the entity vertices after filtering the classification // vertex results (as these might be lower in number) if (CollectionUtils.isNotEmpty(classificationVertices)) { + int resultCount = 0; + for (AtlasVertex classificationVertex : classificationVertices) { Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN, Constants.CLASSIFICATION_LABEL); for (AtlasEdge edge : edges) { AtlasVertex entityVertex = edge.getOutVertex(); + resultCount++; String guid = AtlasGraphUtilsV2.getIdFromVertex(entityVertex); @@ -295,7 +305,7 @@ public class ClassificationSearchProcessor extends SearchProcessor { continue; } - entityVertices.add(entityVertex); + offsetEntityVertexMap.put((qryOffset + resultCount) - 1, entityVertex); processedGuids.add(guid); } @@ -303,22 +313,28 @@ public class ClassificationSearchProcessor extends SearchProcessor { } if (whiteSpaceFilter) { - filterWhiteSpaceClassification(entityVertices); + offsetEntityVertexMap = filterWhiteSpaceClassification(offsetEntityVertexMap); } - // Do in-memory filtering - CollectionUtils.filter(entityVertices, isEntityPredicate); + // Do in-memory filtering + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate); if (activePredicate != null) { - CollectionUtils.filter(entityVertices, activePredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, activePredicate); } - super.filter(entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); - resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices); + resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker); if (isLastResultPage) { + resultIdx = SearchContext.MarkerUtil.MARKER_END - 1; break; } } + + if (marker != null) { + nextOffset = resultIdx + 1; + } + } finally { AtlasPerfTracer.log(perf); } @@ -331,20 +347,23 @@ public class ClassificationSearchProcessor extends SearchProcessor { } @Override - public void filter(List<AtlasVertex> entityVertices) { + public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) { if (LOG.isDebugEnabled()) { - LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size()); + LOG.debug("==> ClassificationSearchProcessor.filter({})", offsetEntityVertexMap.size()); } if (inMemoryPredicate != null) { //in case of classification type + index attributes - CollectionUtils.filter(entityVertices, traitPredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate); //filter attributes (filterCriteria). Find classification vertex(typeName = classification) from entity vertex (traitName = classification) final Set<String> processedGuids = new HashSet<>(); - List<AtlasVertex> matchEntityVertices = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(entityVertices)) { - for (AtlasVertex entityVertex : entityVertices) { + LinkedHashMap<Integer, AtlasVertex> matchEntityVertices = new LinkedHashMap<>(); + + if (MapUtils.isNotEmpty(offsetEntityVertexMap)) { + for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) { + + AtlasVertex entityVertex = offsetToEntity.getValue(); Iterable<AtlasEdge> edges = entityVertex.getEdges(AtlasEdgeDirection.OUT, Constants.CLASSIFICATION_LABEL); for (AtlasEdge edge : edges) { @@ -358,7 +377,7 @@ public class ClassificationSearchProcessor extends SearchProcessor { continue; } - matchEntityVertices.add(entityVertex); + matchEntityVertices.put(offsetToEntity.getKey(), entityVertex); processedGuids.add(guid); break; @@ -366,20 +385,22 @@ public class ClassificationSearchProcessor extends SearchProcessor { } } } - entityVertices.clear(); - entityVertices.addAll(matchEntityVertices); + offsetEntityVertexMap.clear(); + offsetEntityVertexMap.putAll(matchEntityVertices); } else { //in case of only classsification type - CollectionUtils.filter(entityVertices, traitPredicate); - CollectionUtils.filter(entityVertices, isEntityPredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate); } - super.filter(entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); if (LOG.isDebugEnabled()) { - LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", entityVertices.size()); + LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size()); } + + return offsetEntityVertexMap; } @Override diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index a3ab6e3..f2290c6 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -463,6 +463,11 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { ret.setApproximateCount(searchContext.getSearchProcessor().getResultCount()); + String nextMarker = searchContext.getSearchProcessor().getNextMarker(); + if (StringUtils.isNotEmpty(nextMarker)) { + ret.setNextMarker(nextMarker); + } + // By default any attribute that shows up in the search parameter should be sent back in the response // If additional values are requested then the entityAttributes will be a superset of the all search attributes // and the explicitly requested attribute(s) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java index 5dcff3b..f45ccaf 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java @@ -240,32 +240,26 @@ public class EntitySearchProcessor extends SearchProcessor { } try { - final int startIdx = context.getSearchParameters().getOffset(); - final int limit = context.getSearchParameters().getLimit(); + final int limit = context.getSearchParameters().getLimit(); + final Integer marker = context.getMarker(); + final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset(); // when subsequent filtering stages are involved, query should start at 0 even though startIdx can be higher // // first 'startIdx' number of entries will be ignored - int qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx; - int resultIdx = qryOffset; - - final List<AtlasVertex> entityVertices = new ArrayList<>(); - - SortOrder sortOrder = context.getSearchParameters().getSortOrder(); - String sortBy = context.getSearchParameters().getSortBy(); - - final AtlasEntityType entityType = context.getEntityTypes().iterator().next(); - AtlasAttribute sortByAttribute = entityType.getAttribute(sortBy); - if (sortByAttribute == null) { - sortBy = null; + // if marker is provided, start query with marker offset + int qryOffset; + if (marker != null) { + qryOffset = marker; } else { - sortBy = sortByAttribute.getVertexPropertyName(); + qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx; } + int resultIdx = qryOffset; - if (sortOrder == null) { sortOrder = ASCENDING; } + LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>(); for (; ret.size() < limit; qryOffset += limit) { - entityVertices.clear(); + offsetEntityVertexMap.clear(); if (context.terminateSearch()) { LOG.warn("query terminated: {}", context.getSearchParameters()); @@ -277,41 +271,36 @@ public class EntitySearchProcessor extends SearchProcessor { if (indexQuery != null) { Iterator<AtlasIndexQuery.Result> idxQueryResult = executeIndexQuery(context, indexQuery, qryOffset, limit); + offsetEntityVertexMap = getVerticesFromIndexQueryResult(idxQueryResult, offsetEntityVertexMap, qryOffset); - getVerticesFromIndexQueryResult(idxQueryResult, entityVertices); - - isLastResultPage = entityVertices.size() < limit; - - // Do in-memory filtering before the graph query - CollectionUtils.filter(entityVertices, inMemoryPredicate); - - if (graphQueryPredicate != null) { - CollectionUtils.filter(entityVertices, graphQueryPredicate); - } } else { Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator(); + offsetEntityVertexMap = getVertices(queryResult, offsetEntityVertexMap, qryOffset); + } - getVertices(queryResult, entityVertices); - - isLastResultPage = entityVertices.size() < limit; - - // Do in-memory filtering - CollectionUtils.filter(entityVertices, inMemoryPredicate); + isLastResultPage = offsetEntityVertexMap.size() < limit; + // Do in-memory filtering + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, inMemoryPredicate); - //incase when operator is NEQ in pipeSeperatedSystemAttributes - if (graphQueryPredicate != null) { - CollectionUtils.filter(entityVertices, graphQueryPredicate); - } + //incase when operator is NEQ in pipeSeperatedSystemAttributes + if (graphQueryPredicate != null) { + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, graphQueryPredicate); } - super.filter(entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); - resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices); + resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker); if (isLastResultPage) { + resultIdx = MarkerUtil.MARKER_END - 1; break; } } + + if (marker != null) { + nextOffset = resultIdx + 1; + } + } finally { AtlasPerfTracer.log(perf); } @@ -324,23 +313,25 @@ public class EntitySearchProcessor extends SearchProcessor { } @Override - public void filter(List<AtlasVertex> entityVertices) { + public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) { if (LOG.isDebugEnabled()) { - LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size()); + LOG.debug("==> EntitySearchProcessor.filter({})", offsetEntityVertexMap.size()); } // Since we already have the entity vertices, a in-memory filter will be faster than fetching the same // vertices again with the required filtering if (filterGraphQueryPredicate != null) { LOG.debug("Filtering in-memory"); - CollectionUtils.filter(entityVertices, filterGraphQueryPredicate); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap, filterGraphQueryPredicate); } - super.filter(entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); if (LOG.isDebugEnabled()) { - LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", entityVertices.size()); + LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size()); } + + return offsetEntityVertexMap; } @Override diff --git a/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java index 92152ff..86f2cea 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/FreeTextSearchProcessor.java @@ -96,20 +96,23 @@ public class FreeTextSearchProcessor extends SearchProcessor { } try { - final int startIdx = context.getSearchParameters().getOffset(); - final int limit = context.getSearchParameters().getLimit(); + final int limit = context.getSearchParameters().getLimit(); + final Integer marker = context.getMarker(); + final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset(); // query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could // have been dropped: like vertices of non-entity or non-active-entity // // first 'startIdx' number of entries will be ignored - int qryOffset = 0; + // if marker is provided, start query with marker offset + int qryOffset = marker != null ? marker : 0; int resultIdx = qryOffset; - final List<AtlasVertex> entityVertices = new ArrayList<>(); + LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>(); + try { for (; ret.size() < limit; qryOffset += limit) { - entityVertices.clear(); + offsetEntityVertexMap.clear(); if (context.terminateSearch()) { LOG.warn("query terminated: {}", context.getSearchParameters()); @@ -150,22 +153,27 @@ public class FreeTextSearchProcessor extends SearchProcessor { } } - entityVertices.add(vertex); + offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex); } - isLastResultPage = resultCount < limit; - - super.filter(entityVertices); + isLastResultPage = resultCount < limit; - resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); + resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker); if (isLastResultPage) { + resultIdx = SearchContext.MarkerUtil.MARKER_END - 1; break; } } } catch (Throwable t) { throw t; } + + if (marker != null) { + nextOffset = resultIdx + 1; + } + } finally { AtlasPerfTracer.log(perf); } diff --git a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java index b37d93a..2d8a448 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_NOT_CLASSIFIED; @@ -96,21 +97,23 @@ public class FullTextSearchProcessor extends SearchProcessor { } try { - final int startIdx = context.getSearchParameters().getOffset(); final int limit = context.getSearchParameters().getLimit(); final boolean activeOnly = context.getSearchParameters().getExcludeDeletedEntities(); + final Integer marker = context.getMarker(); + final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset(); // query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could // have been dropped: like vertices of non-entity or non-active-entity // // first 'startIdx' number of entries will be ignored - int qryOffset = 0; + // if marker is provided, start query with marker offset + int qryOffset = marker != null ? marker : 0; int resultIdx = qryOffset; - final List<AtlasVertex> entityVertices = new ArrayList<>(); + LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>(); for (; ret.size() < limit; qryOffset += limit) { - entityVertices.clear(); + offsetEntityVertexMap.clear(); if (context.terminateSearch()) { LOG.warn("query terminated: {}", context.getSearchParameters()); @@ -141,19 +144,25 @@ public class FullTextSearchProcessor extends SearchProcessor { continue; } - entityVertices.add(vertex); + offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex); } - isLastResultPage = resultCount < limit; + isLastResultPage = resultCount < limit; - super.filter(entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); - resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices); + resultIdx = collectResultVertices(ret,startIdx, limit, resultIdx, offsetEntityVertexMap, marker); if (isLastResultPage) { + resultIdx = SearchContext.MarkerUtil.MARKER_END - 1 ; break; } } + + if (marker != null) { + nextOffset = resultIdx + 1; + } + } finally { AtlasPerfTracer.log(perf); } diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java index aa49121..01954d0 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java @@ -18,10 +18,10 @@ package org.apache.atlas.discovery; +import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.SearchParameters; -import org.apache.atlas.model.discovery.SearchParameters.*; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.repository.Constants; @@ -43,11 +43,25 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.atlas.discovery.SearchProcessor.ALL_TYPE_QUERY; -import static org.apache.atlas.model.discovery.SearchParameters.*; +import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS; +import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATION_TYPES; +import static org.apache.atlas.model.discovery.SearchParameters.ALL_ENTITY_TYPES; +import static org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; +import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS; +import static org.apache.atlas.model.discovery.SearchParameters.WILDCARD_CLASSIFICATIONS; /* * Search context captures elements required for performing a basic search @@ -71,6 +85,7 @@ public class SearchContext { private final String classificationTypeAndSubTypesQryStr; private boolean terminateSearch = false; private SearchProcessor searchProcessor; + private Integer marker; public final static AtlasClassificationType MATCH_ALL_WILDCARD_CLASSIFICATION = new AtlasClassificationType(new AtlasClassificationDef(WILDCARD_CLASSIFICATIONS)); public final static AtlasClassificationType MATCH_ALL_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(ALL_CLASSIFICATIONS)); @@ -124,6 +139,10 @@ public class SearchContext { } } + if (StringUtils.isNotEmpty(searchParameters.getMarker())) { + marker = MarkerUtil.decodeMarker(searchParameters); + } + //remove other types if builtin type is present filterStructTypes(); @@ -231,6 +250,8 @@ public class SearchContext { public Set<String> getClassificationNames() {return classificationNames;} + public Integer getMarker() { return marker; } + public boolean includeEntityType(String entityType) { return typeAndSubTypes.isEmpty() || typeAndSubTypes.contains(entityType); } @@ -238,9 +259,7 @@ public class SearchContext { public boolean includeClassificationTypes(Collection<String> traitNames) { final boolean ret; - if (CollectionUtils.isEmpty(classificationTypes) || classificationTypeAndSubTypes.isEmpty()) { - ret = true; - } else if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) { + if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) { ret = CollectionUtils.isEmpty(traitNames); } else if (classificationTypes.iterator().next() == MATCH_ALL_CLASSIFICATION_TYPES) { ret = CollectionUtils.isNotEmpty(traitNames); @@ -503,4 +522,64 @@ public class SearchContext { private AtlasEntityType getTermEntityType() { return typeRegistry.getEntityTypeByName(TermSearchProcessor.ATLAS_GLOSSARY_TERM_ENTITY_TYPE); } + + public static class MarkerUtil { + private final static int IDX_HASH_CODE = 0; + private final static int IDX_OFFSET = 1; + + private final static String MARKER_DELIMITER = ":"; + + @VisibleForTesting + final static String MARKER_START = "*"; + + @VisibleForTesting + final static int MARKER_END = -1; + + public static String getNextEncMarker(SearchParameters searchParameters, Integer nextOffset) { + if (nextOffset == null) { + return null; + } + + if (nextOffset == MARKER_END) { + return String.valueOf(nextOffset); + } + + String value = searchParameters.hashCode() + MARKER_DELIMITER + nextOffset; + return Base64.getEncoder().encodeToString(value.getBytes()); + } + + public static Integer decodeMarker(SearchParameters searchParameters) throws AtlasBaseException { + if (searchParameters == null || searchParameters.getOffset() > 0) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Marker can be used only if offset=0."); + } + + String encodedMarker = searchParameters.getMarker(); + if (StringUtils.equals(encodedMarker, MARKER_START)) { + return 0; + } + + try { + byte[] inputMarkerBytes = Base64.getDecoder().decode(encodedMarker); + String inputMarker = new String(inputMarkerBytes); + if (StringUtils.isEmpty(inputMarker) || !inputMarker.contains(MARKER_DELIMITER)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Marker does not contain delimiter: " + MARKER_DELIMITER); + } + + String[] str = inputMarker.split(MARKER_DELIMITER); + if (str == null || str.length != 2) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Decoding using delimiter did not yield correct result!"); + } + + int hashCode = Integer.parseInt(str[IDX_HASH_CODE]); + int currentHashCode = searchParameters.hashCode(); + if (hashCode == currentHashCode && Integer.parseInt(str[IDX_OFFSET]) >= 0) { + return Integer.parseInt(str[IDX_OFFSET]); + } + + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid Marker! Parsing resulted in error."); + } catch (Exception e) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker!"); + } + } + } } diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java index f9832c3..f69dc42 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java @@ -39,6 +39,7 @@ import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.util.SearchPredicateUtil; import org.apache.atlas.util.SearchPredicateUtil.*; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.PredicateUtils; import org.apache.commons.lang.StringUtils; @@ -52,6 +53,7 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.atlas.SortOrder.ASCENDING; import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_CLASSIFICATION_TYPES; @@ -137,6 +139,7 @@ public abstract class SearchProcessor { protected SearchProcessor nextProcessor; protected Predicate inMemoryPredicate; protected GraphIndexQueryBuilder graphIndexQueryBuilder; + protected Integer nextOffset; protected SearchProcessor(SearchContext context) { this.context = context; @@ -151,6 +154,10 @@ public abstract class SearchProcessor { } } + public String getNextMarker() { + return SearchContext.MarkerUtil.getNextEncMarker(context.getSearchParameters(), nextOffset); + } + public abstract List<AtlasVertex> execute(); public abstract long getResultCount(); @@ -181,28 +188,41 @@ public abstract class SearchProcessor { StringUtils.equals(attrName, CUSTOM_ATTRIBUTES_PROPERTY_KEY); } - protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final List<AtlasVertex> entityVertices) { - for (AtlasVertex entityVertex : entityVertices) { - resultIdx++; + protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final Map<Integer, AtlasVertex> offsetEntityVertexMap, Integer marker) { + int lastOffset = resultIdx; - if (resultIdx <= startIdx) { - continue; - } + for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) { + resultIdx++; + + if (resultIdx <= startIdx) { + continue; + } - ret.add(entityVertex); + lastOffset = offsetToEntity.getKey(); + ret.add(offsetToEntity.getValue()); - if (ret.size() == limit) { - break; + if (ret.size() == limit) { + break; + } } - } + return marker == null ? resultIdx : lastOffset; + } - return resultIdx; + public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) { + if (nextProcessor != null && MapUtils.isNotEmpty(offsetEntityVertexMap)) { + return nextProcessor.filter(offsetEntityVertexMap); + } + return offsetEntityVertexMap; } - public void filter(List<AtlasVertex> entityVertices) { - if (nextProcessor != null && CollectionUtils.isNotEmpty(entityVertices)) { - nextProcessor.filter(entityVertices); + public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, Predicate predicate) { + if (predicate != null) { + offsetEntityVertexMap = offsetEntityVertexMap.entrySet() + .stream() + .filter(x -> predicate.evaluate(x.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new)); } + return offsetEntityVertexMap; } protected Predicate buildTraitPredict(Set<AtlasClassificationType> classificationTypes) { @@ -361,13 +381,13 @@ public abstract class SearchProcessor { return ret; } - protected void filterWhiteSpaceClassification(List<AtlasVertex> entityVertices) { - if (CollectionUtils.isNotEmpty(entityVertices)) { - final Iterator<AtlasVertex> it = entityVertices.iterator(); - final Set<String> typeAndSubTypes = context.getClassificationTypeNames(); + protected LinkedHashMap<Integer,AtlasVertex> filterWhiteSpaceClassification(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) { + if (offsetEntityVertexMap != null) { + final Iterator<Map.Entry<Integer, AtlasVertex>> it = offsetEntityVertexMap.entrySet().iterator(); + final Set<String> typeAndSubTypes = context.getClassificationTypeNames(); while (it.hasNext()) { - AtlasVertex entityVertex = it.next(); + AtlasVertex entityVertex = it.next().getValue(); List<String> classificationNames = AtlasGraphUtilsV2.getClassificationNames(entityVertex); if (CollectionUtils.isNotEmpty(classificationNames)) { @@ -387,6 +407,7 @@ public abstract class SearchProcessor { it.remove(); } } + return offsetEntityVertexMap; } protected void constructFilterQuery(StringBuilder indexQuery, Set<? extends AtlasStructType> structTypes, FilterCriteria filterCriteria, Set<String> indexAttributes) { @@ -1205,6 +1226,18 @@ public abstract class SearchProcessor { return vertices; } + protected LinkedHashMap<Integer, AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) { + if (idxQueryResult != null) { + while (idxQueryResult.hasNext()) { + AtlasVertex vertex = idxQueryResult.next().getVertex(); + + offsetEntityVertexMap.put(qryOffset++, vertex); + } + } + + return offsetEntityVertexMap; + } + protected Collection<AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, Collection<AtlasVertex> vertices) { if (iterator != null) { while (iterator.hasNext()) { @@ -1217,6 +1250,18 @@ public abstract class SearchProcessor { return vertices; } + protected LinkedHashMap<Integer, AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) { + if (iterator != null) { + while (iterator.hasNext()) { + AtlasVertex vertex = iterator.next(); + + offsetEntityVertexMap.put(qryOffset++, vertex); + } + } + + return offsetEntityVertexMap; + } + protected Set<String> getGuids(List<AtlasVertex> vertices) { Set<String> ret = new HashSet<>(); diff --git a/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java index 45a8158..b8a507e 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/TermSearchProcessor.java @@ -20,12 +20,15 @@ package org.apache.atlas.discovery; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; - +import java.util.Map; +import java.util.stream.Collectors; public class TermSearchProcessor extends SearchProcessor { @@ -58,15 +61,22 @@ public class TermSearchProcessor extends SearchProcessor { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TermSearchProcessor.execute(" + context + ")"); } + //marker functionality will not work when there is need to fetch Term vertices and get entities from it try { if (CollectionUtils.isNotEmpty(assignedEntities)) { + LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>(); + final int startIdx = context.getSearchParameters().getOffset(); final int limit = context.getSearchParameters().getLimit(); final List<AtlasVertex> tmpList = new ArrayList<>(assignedEntities); - super.filter(tmpList); + for (int i = 0; i < tmpList.size(); i++) { + offsetEntityVertexMap.put(i, tmpList.get(i)); + } + + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); - collectResultVertices(ret, startIdx, limit, 0, tmpList); + collectResultVertices(ret, startIdx, limit, 0, offsetEntityVertexMap, null); } } finally { AtlasPerfTracer.log(perf); @@ -79,37 +89,41 @@ public class TermSearchProcessor extends SearchProcessor { return ret; } + //this filter is never used @Override - public void filter(List<AtlasVertex> entityVertices) { + public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) { if (LOG.isDebugEnabled()) { - LOG.debug("==> TermSearchProcessor.filter({})", entityVertices.size()); + LOG.debug("==> TermSearchProcessor.filter({})", offsetEntityVertexMap.size()); } - if (CollectionUtils.isNotEmpty(entityVertices)) { + if (MapUtils.isNotEmpty(offsetEntityVertexMap)) { if (CollectionUtils.isEmpty(assignedEntities)) { - entityVertices.clear(); + offsetEntityVertexMap.clear(); } else { - CollectionUtils.filter(entityVertices, o -> { - if (o instanceof AtlasVertex) { - AtlasVertex entityVertex = (AtlasVertex) o; - - for (AtlasVertex assignedEntity : assignedEntities) { - if (assignedEntity.getId().equals(entityVertex.getId())) { - return true; + offsetEntityVertexMap.entrySet().stream(). + filter(o -> { + if (o instanceof AtlasVertex) { + AtlasVertex entityVertex = (AtlasVertex) o; + + for (AtlasVertex assignedEntity : assignedEntities) { + if (assignedEntity.getId().equals(entityVertex.getId())) { + return true; + } + } } - } - } return false; - }); + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new)); } } - super.filter(entityVertices); + offsetEntityVertexMap = super.filter(offsetEntityVertexMap); if (LOG.isDebugEnabled()) { - LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", entityVertices.size()); + LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size()); } + + return offsetEntityVertexMap; } @Override diff --git a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java index 027827a..a9fbd43 100644 --- a/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/AtlasDiscoveryServiceTest.java @@ -34,6 +34,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.testng.Assert; import org.testng.annotations.*; @@ -73,67 +74,51 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { SearchParameters params = new SearchParameters(); params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 10); + assertSearchProcessorWithoutMarker(params, 10); } // TSP execute and CSP,ESP filter @Test - public void term_tag() throws AtlasBaseException { + public void termTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY); params.setClassification(METRIC_CLASSIFICATION); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - for(AtlasEntityHeader e : entityHeaders){ - System.out.println(e.toString()); - } - assertEquals(entityHeaders.size(), 4); + assertSearchProcessorWithoutMarker(params, 4); } @Test - public void term_entity() throws AtlasBaseException { + public void termEntity() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY); params.setTypeName(HIVE_TABLE_TYPE); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 10); + assertSearchProcessorWithoutMarker(params, 10); } @Test - public void term_entity_tag() throws AtlasBaseException { + public void termEntityTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY); params.setTypeName(HIVE_TABLE_TYPE); params.setClassification(DIMENSIONAL_CLASSIFICATION); List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - Assert.assertTrue(CollectionUtils.isEmpty(entityHeaders)); } //FSP execute and CSP,ESP filter @Test - public void query_ALLTag() throws AtlasBaseException { + public void queryALLTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification(ALL_CLASSIFICATION_TYPES); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 5); + assertSearchProcessorWithoutMarker(params, 5); } @Test - public void query_ALLTag_tagFilter() throws AtlasBaseException { + public void queryALLTagTagFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification(ALL_CLASSIFICATION_TYPES); //typeName will check for only classification name not propogated classification @@ -141,103 +126,79 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { params.setTagFilters(fc); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 4); + assertSearchProcessorWithoutMarker(params, 4); } @Test - public void query_NOTCLASSIFIEDTag() throws AtlasBaseException { + public void queryNOTCLASSIFIEDTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification(NO_CLASSIFICATIONS); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 11); + assertSearchProcessorWithoutMarker(params, 11); } @Test - public void query_ALLWildcardTag() throws AtlasBaseException { + public void queryALLWildcardTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification("*"); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 5); + assertSearchProcessorWithoutMarker(params, 5); } @Test - public void query_wildcardTag() throws AtlasBaseException { + public void queryWildcardTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification("Dimen*on"); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 2); + assertSearchProcessorWithoutMarker(params, 2); } @Test - public void query_tag() throws AtlasBaseException { + public void queryTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification(METRIC_CLASSIFICATION); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 3); + assertSearchProcessorWithoutMarker(params, 3); } @Test - public void query_tag_tagFilter() throws AtlasBaseException { + public void queryTagTagFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setClassification(METRIC_CLASSIFICATION); SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis())); params.setTagFilters(fc); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 3); + assertSearchProcessorWithoutMarker(params, 3); } @Test - public void query_entity() throws AtlasBaseException { + public void queryEntity() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 4); + assertSearchProcessorWithoutMarker(params, 4); } @Test - public void query_entity_entityFilter() throws AtlasBaseException { + public void queryEntityEntityFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.NOT_NULL, "null"); params.setEntityFilters(fc); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 3); + assertSearchProcessorWithoutMarker(params, 3); } @Test - public void query_entity_entityFilter_tag() throws AtlasBaseException { + public void queryEntityEntityFilterTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.IS_NULL, "null"); @@ -245,14 +206,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { params.setClassification(DIMENSIONAL_CLASSIFICATION); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 1); + assertSearchProcessorWithoutMarker(params, 1); } @Test - public void query_entity_entityFilter_tag_tagFilter() throws AtlasBaseException { + public void queryEntityEntityFilterTagTagFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.IS_NULL, "null"); @@ -262,14 +220,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { SearchParameters.FilterCriteria fcC = getSingleFilterCondition("attr1", Operator.EQ, "value1"); params.setTagFilters(fcC); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 1); + assertSearchProcessorWithoutMarker(params, 1); } @Test - public void query_entity_tag_tagFilter() throws AtlasBaseException { + public void queryEntityTagTagFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); params.setClassification(METRIC_CLASSIFICATION); @@ -277,29 +232,22 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { params.setTagFilters(fc); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 2); - + assertSearchProcessorWithoutMarker(params, 2); } @Test - public void query_entity_tag() throws AtlasBaseException { + public void queryEntityTag() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); params.setClassification(METRIC_CLASSIFICATION); params.setQuery("sales"); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 2); + assertSearchProcessorWithoutMarker(params, 2); } // CSP Execute and ESP filter @Test - public void entity_entityFilter_tag_tagFilter() throws AtlasBaseException { + public void entityEntityFilterTagTagFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed"); @@ -308,27 +256,171 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis())); params.setTagFilters(fcC); - List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); - - Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 4); - + assertSearchProcessorWithoutMarker(params, 4); } @Test - public void entity_tag_tagFilter() throws AtlasBaseException { + public void entityTagTagFilter() throws AtlasBaseException { SearchParameters params = new SearchParameters(); params.setTypeName(HIVE_TABLE_TYPE); params.setClassification(METRIC_CLASSIFICATION); SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis())); params.setTagFilters(fc); + assertSearchProcessorWithoutMarker(params, 4); + } + + @Test + public void searchWith0offsetMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setOffset(0); + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit(5); + + assertSearchProcessorWithMarker(params, 5); + } + + @Test + public void searchWithNoOffsetMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit(5); + + assertSearchProcessorWithMarker(params, 5); + } + + @Test + public void searchWithGreaterThan0OffsetBlankMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setOffset(1); + params.setMarker(""); + params.setLimit(5); + + assertSearchProcessorWithoutMarker(params, 5); + } + + @Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Marker can be used only if offset=0.") + public void searchWithGreaterThan0OffsetMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setOffset(1); + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit(5); List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + assertNotNull(entityHeaders); + } + + @Test + public void searchWithMarkerSet() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit(5); + AtlasSearchResult searchResult = discoveryService.searchWithParameters(params); + List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); + + Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); + assertEquals(entityHeaders.size(), 5); + Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker())); + + //get next marker and set in marker of subsequent request + params.setMarker(searchResult.getNextMarker()); + AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params); + List<AtlasEntityHeader> nextentityHeaders = nextsearchResult.getEntities(); + + Assert.assertTrue(CollectionUtils.isNotEmpty(nextentityHeaders)); + Assert.assertTrue(StringUtils.isNotEmpty(nextsearchResult.getNextMarker())); + + if (entityHeaders.size() < params.getLimit()) { + Assert.assertTrue(nextsearchResult.getNextMarker() == String.valueOf(-1)); + } + } + + @Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Invalid marker!") + public void searchWithInvalidMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit(5); + AtlasSearchResult searchResult = discoveryService.searchWithParameters(params); + List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); + + Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); + assertEquals(entityHeaders.size(), 5); + Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker())); + + //get next marker and set in marker of subsequent request + params.setMarker(searchResult.getNextMarker()+"abc"); + AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params); + + } + + @Test + public void searchWithLastPageMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setExcludeDeletedEntities(true); + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit(5); + AtlasSearchResult searchResult = discoveryService.searchWithParameters(params); + List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); + Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); - assertEquals(entityHeaders.size(), 4); + assertEquals(entityHeaders.size(), 5); + Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker())); + + long maxEntities = searchResult.getApproximateCount(); + + //get next marker and set in marker of subsequent request + params.setMarker(SearchContext.MarkerUtil.MARKER_START); + params.setLimit((int)maxEntities + 10); + AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params); + + Assert.assertTrue(nextsearchResult.getNextMarker().equals("-1")); } + + @Test //marker functionality is not supported + public void termMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY); + params.setMarker("*"); + + assertSearchProcessorWithoutMarker(params, 10); + + } + + @Test + public void queryEntityTagMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + params.setClassification(METRIC_CLASSIFICATION); + params.setQuery("sales"); + params.setMarker("*"); + params.setLimit(5); + + assertSearchProcessorWithMarker(params, 2); + } + + // CSP Execute and ESP filter + @Test + public void entityEntityFilterTagTagFilterMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(HIVE_TABLE_TYPE); + SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed"); + params.setEntityFilters(fcE); + params.setClassification(METRIC_CLASSIFICATION); + SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis())); + params.setTagFilters(fcC); + params.setMarker("*"); + assertSearchProcessorWithoutMarker(params, 4); + } + + String spChar1 = "default.test_dot_name"; String spChar2 = "default.test_dot_name@db.test_db"; String spChar3 = "default.test_dot_name_12.col1@db1"; @@ -794,6 +886,29 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup { entityStore.addClassification(Arrays.asList(guid), new AtlasClassification(DIMENSIONAL_CLASSIFICATION, attr)); } + + private void assertSearchProcessorWithoutMarker(SearchParameters params, int expected) throws AtlasBaseException { + assertSearchProcessor(params, expected, false); + } + + private void assertSearchProcessorWithMarker(SearchParameters params, int expected) throws AtlasBaseException { + assertSearchProcessor(params, expected, true); + } + + private void assertSearchProcessor(SearchParameters params, int expected, boolean checkMarker) throws AtlasBaseException { + AtlasSearchResult searchResult = discoveryService.searchWithParameters(params); + List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); + + Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); + assertEquals(entityHeaders.size(), expected); + + if (checkMarker) { + Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker())); + } else { + Assert.assertTrue(StringUtils.isEmpty(searchResult.getNextMarker())); + } + } + @AfterClass public void teardown() throws Exception { AtlasGraphProvider.cleanup(); diff --git a/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java b/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java index e1ebbfc..121dca9 100644 --- a/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/ClassificationSearchProcessorTest.java @@ -33,6 +33,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -263,6 +264,62 @@ public class ClassificationSearchProcessorTest extends BasicTestSetup { } + @Test + public void searchByWildcardTagMarker() throws AtlasBaseException { + final String LAST_MARKER = "-1"; + SearchParameters params = new SearchParameters(); + params.setClassification("*"); + int limit = 5; + String marker = "*"; + params.setLimit(limit); + + while (!StringUtils.equals(marker, LAST_MARKER)) { + params.setMarker(marker); + SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys()); + ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context); + List<AtlasVertex> vertices = processor.execute(); + long totalCount = vertices.size(); + marker = processor.getNextMarker(); + + if (totalCount < limit) { + assertEquals(marker, LAST_MARKER); + break; + } else { + Assert.assertNotNull(marker); + assertEquals(vertices.size(), 5); + } + } + } + + @Test //marker functionality is not supported in this case + public void searchByTagAndGraphSysFiltersMarker() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(DIMENSION_CLASSIFICATION); + FilterCriteria filterCriteria = getSingleFilterCondition("__entityStatus", Operator.EQ, "DELETED"); + params.setTagFilters(filterCriteria); + params.setExcludeDeletedEntities(false); + params.setLimit(20); + params.setMarker("*"); + + SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys()); + ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context); + List<AtlasVertex> vertices = processor.execute(); + + Assert.assertTrue(CollectionUtils.isNotEmpty(vertices)); + assertEquals(vertices.size(), 1); + List<String> guids = vertices.stream().map(g -> { + try { + return entityRetriever.toAtlasEntityHeader(g).getGuid(); + } catch (AtlasBaseException e) { + fail("Failure in mapping vertex to AtlasEntityHeader"); + } + return ""; + }).collect(Collectors.toList()); + Assert.assertTrue(guids.contains(dimensionTagDeleteGuid)); + + Assert.assertNull(processor.getNextMarker()); + } + private void createDimensionTaggedEntityAndDelete() throws AtlasBaseException { AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE); entityToDelete.setAttribute("name", "entity to be deleted"); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java index 4c7b622..e4c74a9 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java @@ -197,7 +197,8 @@ public class DiscoveryREST { @QueryParam("sortOrder") SortOrder sortOrder, @QueryParam("excludeDeletedEntities") boolean excludeDeletedEntities, @QueryParam("limit") int limit, - @QueryParam("offset") int offset) throws AtlasBaseException { + @QueryParam("offset") int offset, + @QueryParam("marker") String marker) throws AtlasBaseException { Servlets.validateQueryParamLength("typeName", typeName); Servlets.validateQueryParamLength("classification", classification); Servlets.validateQueryParamLength("sortBy", sortByAttribute); @@ -220,6 +221,7 @@ public class DiscoveryREST { searchParameters.setExcludeDeletedEntities(excludeDeletedEntities); searchParameters.setLimit(limit); searchParameters.setOffset(offset); + searchParameters.setMarker(marker); searchParameters.setSortBy(sortByAttribute); searchParameters.setSortOrder(sortOrder);