This is an automated email from the ASF dual-hosted git repository. mandarambawane pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new dd6b17bd8 ATLAS-4733 : Download Basic and DSL search results dd6b17bd8 is described below commit dd6b17bd8f654f61fc5b739b581824f265dbfbd6 Author: Mandar Ambawane <mandar.ambaw...@freestoneinfotech.com> AuthorDate: Fri Apr 14 13:26:59 2023 +0530 ATLAS-4733 : Download Basic and DSL search results Signed-off-by: Mandar Ambawane <mandar.ambaw...@freestoneinfotech.com> (cherry picked from commit 205b975cceee0e642fbeb4d5e9d6ffe5620c001c) --- .../main/java/org/apache/atlas/AtlasErrorCode.java | 1 + .../discovery/AtlasSearchResultDownloadStatus.java | 134 ++++++++++ .../atlas/discovery/AtlasDiscoveryService.java | 17 +- .../atlas/discovery/EntityDiscoveryService.java | 60 ++++- .../searchdownload/SearchResultDownloadTask.java | 270 +++++++++++++++++++++ .../SearchResultDownloadTaskFactory.java | 152 ++++++++++++ .../org/apache/atlas/tasks/TaskManagement.java | 4 + .../java/org/apache/atlas/tasks/TaskRegistry.java | 25 ++ .../org/apache/atlas/web/rest/DiscoveryREST.java | 189 +++++++++++++-- 9 files changed, 825 insertions(+), 27 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 608342433..21ac7f78e 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -219,6 +219,7 @@ public enum AtlasErrorCode { GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with qualifiedName {0} already exists"), GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"), METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"), + PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"), // All internal errors go here INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"), diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResultDownloadStatus.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResultDownloadStatus.java new file mode 100644 index 000000000..f2f73e6c4 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResultDownloadStatus.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.discovery; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.model.tasks.AtlasTask; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.Date; +import java.util.List; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AtlasSearchResultDownloadStatus implements Serializable { + + private List<AtlasSearchDownloadRecord> searchDownloadRecords; + + public List<AtlasSearchDownloadRecord> getSearchDownloadRecords() { + return searchDownloadRecords; + } + + public void setSearchDownloadRecords(List<AtlasSearchDownloadRecord> searchDownloadRecords) { + this.searchDownloadRecords = searchDownloadRecords; + } + + @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.PROPERTY) + public static class AtlasSearchDownloadRecord implements Serializable { + private AtlasTask.Status status; + private String fileName; + private String createdBy; + private Date createdTime; + private Date startTime; + + + public AtlasSearchDownloadRecord(AtlasTask.Status status, String fileName, String createdBy, Date createdTime, Date startTime) { + this.status = status; + this.fileName = fileName; + this.createdBy = createdBy; + this.createdTime = createdTime; + this.startTime = startTime; + } + + public AtlasSearchDownloadRecord(AtlasTask.Status status, String fileName, String createdBy, Date createdTime) { + this(status, fileName, createdBy, createdTime, null); + } + + public AtlasTask.Status getStatus() { + return status; + } + + public void setStatus(AtlasTask.Status status) { + this.status = status; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public String getCreatedBy() { + return createdBy; + } + + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + + public Date getCreatedTime() { + return createdTime; + } + + public void setCreatedTime(Date createdTime) { + this.createdTime = createdTime; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + sb.append("AtlasSearchDownloadRecord{"); + sb.append("status=").append(status); + sb.append(", fileName=").append(fileName); + sb.append(", createdBy=").append(createdBy); + sb.append(", createTime=").append(createdTime); + sb.append(", startTime=").append(startTime); + sb.append("}"); + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java index d94110004..f8e55b886 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java @@ -19,13 +19,13 @@ package org.apache.atlas.discovery; -import org.apache.atlas.SortOrder; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.*; import org.apache.atlas.model.profile.AtlasUserSavedSearch; +import java.io.IOException; import java.util.List; -import java.util.Set; +import java.util.Map; public interface AtlasDiscoveryService { /** @@ -160,4 +160,17 @@ public interface AtlasDiscoveryService { * @return top 5 suggestion strings for the given prefix. */ AtlasSuggestionsResult getSuggestions(String prefixString, String fieldName); + + /** + * Creates task to search and download the results of Basic and DSL search + * @param taskParams parameters of AtlasTask + */ + void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException; + + /** + * + * @return AtlasSearchResultDownloadStatus + * @throws IOException + */ + AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 582d97542..5b4395355 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -22,6 +22,7 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; import org.apache.atlas.SortOrder; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.AtlasAuthorizationUtils; @@ -32,6 +33,8 @@ import org.apache.atlas.model.discovery.AtlasQuickSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult; import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType; +import org.apache.atlas.model.discovery.AtlasSearchResultDownloadStatus; +import org.apache.atlas.model.discovery.AtlasSearchResultDownloadStatus.AtlasSearchDownloadRecord; import org.apache.atlas.model.discovery.AtlasSuggestionsResult; import org.apache.atlas.model.discovery.QuickSearchParameters; import org.apache.atlas.model.discovery.RelationshipSearchParameters; @@ -41,6 +44,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelationshipHeader; import org.apache.atlas.model.profile.AtlasUserSavedSearch; +import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.query.QueryParams; import org.apache.atlas.query.executors.DSLQueryExecutor; import org.apache.atlas.query.executors.ScriptEngineBasedExecutor; @@ -56,7 +60,10 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask; +import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory; import org.apache.atlas.repository.userprofile.UserProfileService; +import org.apache.atlas.tasks.TaskManagement; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; import org.apache.atlas.type.AtlasClassificationType; @@ -83,15 +90,21 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import javax.script.ScriptEngine; import javax.script.ScriptException; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; import static org.apache.atlas.SortOrder.ASCENDING; @@ -120,13 +133,15 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private final UserProfileService userProfileService; private final SuggestionsProvider suggestionsProvider; private final DSLQueryExecutor dslQueryExecutor; + private final TaskManagement taskManagement; @Inject EntityDiscoveryService(AtlasTypeRegistry typeRegistry, AtlasGraph graph, GraphBackedSearchIndexer indexer, SearchTracker searchTracker, - UserProfileService userProfileService) throws AtlasException { + UserProfileService userProfileService, + TaskManagement taskManagement) throws AtlasException { this.graph = graph; this.entityRetriever = new EntityGraphRetriever(this.graph, typeRegistry); this.indexer = indexer; @@ -142,6 +157,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { this.dslQueryExecutor = AtlasConfiguration.DSL_EXECUTOR_TRAVERSAL.getBoolean() ? new TraversalBasedExecutor(typeRegistry, graph, entityRetriever) : new ScriptEngineBasedExecutor(typeRegistry, graph, entityRetriever); + this.taskManagement = taskManagement; LOG.info("DSL Executor: {}", this.dslQueryExecutor.getClass().getSimpleName()); } @@ -451,6 +467,48 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return searchWithSearchContext(new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys())); } + @Override + @GraphTransaction + public void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException { + + List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD); + if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() > SearchResultDownloadTaskFactory.MAX_PENDING_TASKS_ALLOWED) { + throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS, String.valueOf(pendingTasks.size())); + } + AtlasTask task = taskManagement.createTask(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD, RequestContext.getCurrentUser(), taskParams); + RequestContext.get().queueTask(task); + } + + @Override + public AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException { + List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD); + List<AtlasTask> currentUserPendingTasks = pendingTasks.stream().filter(task -> task.getCreatedBy() + .equals(RequestContext.getCurrentUser())).collect(Collectors.toList()); + + List<AtlasSearchDownloadRecord> searchDownloadRecords = new ArrayList<>(); + for (AtlasTask pendingTask : currentUserPendingTasks) { + String fileName = (String) pendingTask.getParameters().get(SearchResultDownloadTask.CSV_FILE_NAME_KEY); + AtlasSearchDownloadRecord searchDownloadRecord = new AtlasSearchDownloadRecord(pendingTask.getStatus(), fileName, pendingTask.getCreatedBy(), pendingTask.getCreatedTime(), pendingTask.getStartTime()); + searchDownloadRecords.add(searchDownloadRecord); + } + + File fileDir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser()); + if (fileDir.exists()) { + File[] currentUserFiles = fileDir.listFiles(); + if (currentUserFiles != null) { + for (File file : currentUserFiles) { + BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class); + Date createdTime = new Date(attr.creationTime().toMillis()); + AtlasSearchDownloadRecord searchDownloadRecord = new AtlasSearchDownloadRecord(AtlasTask.Status.COMPLETE, file.getName(), RequestContext.getCurrentUser(), createdTime); + searchDownloadRecords.add(searchDownloadRecord); + } + } + } + AtlasSearchResultDownloadStatus result = new AtlasSearchResultDownloadStatus(); + result.setSearchDownloadRecords(searchDownloadRecords); + return result; + } + @Override @GraphTransaction public AtlasSearchResult searchRelationsWithParameters(RelationshipSearchParameters searchParameters) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTask.java new file mode 100644 index 000000000..fd90fd440 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTask.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload; + +import com.opencsv.CSVWriter; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.tasks.AbstractTask; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasJson; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC; +import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL; +import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE; +import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED; + +public class SearchResultDownloadTask extends AbstractTask { + private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTask.class); + + public static final String SEARCH_PARAMETERS_JSON_KEY = "search_parameters_json"; + public static final String CSV_FILE_NAME_KEY = "csv_file_Name"; + public static final String SEARCH_TYPE_KEY = "search_type"; + public static final String ATTRIBUTE_LABEL_MAP_KEY = "attribute_label_map"; + public static final String QUERY_KEY = "query"; + public static final String TYPE_NAME_KEY = "type_name"; + public static final String CLASSIFICATION_KEY = "classification"; + public static final String LIMIT_KEY = "limit"; + public static final String OFFSET_KEY = "offset"; + public static final String CSV_FILE_EXTENSION = ".csv"; + public static String DOWNLOAD_DIR_PATH; + private static final String EMPTY_STRING = ""; + private static final String DOWNLOAD_DIR_PATH_KEY = "atlas.download.search.dir.path"; + private static final String DOWNLOAD_DIR_PATH_DEFAULT = StringUtils.isEmpty(System.getProperty("atlas.home")) ? System.getProperty("user.dir") : System.getProperty("atlas.home"); + private static final String CSV_DOWNLOAD_DIR = "search_result_downloads"; + + private static Configuration configuration; + + static { + try { + configuration = ApplicationProperties.get(); + } catch (AtlasException e) { + LOG.error("Failed to load application properties", e); + } + if (configuration != null) { + DOWNLOAD_DIR_PATH = configuration.getString(DOWNLOAD_DIR_PATH_KEY, DOWNLOAD_DIR_PATH_DEFAULT) + File.separator + CSV_DOWNLOAD_DIR; + } else { + DOWNLOAD_DIR_PATH = DOWNLOAD_DIR_PATH_DEFAULT + File.separator + CSV_DOWNLOAD_DIR; + } + } + + private final AtlasDiscoveryService discoveryService; + private final AtlasTypeRegistry typeRegistry; + + public SearchResultDownloadTask(AtlasTask task, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) { + super(task); + this.discoveryService = discoveryService; + this.typeRegistry = typeRegistry; + } + + @Override + public AtlasTask.Status perform() throws Exception { + RequestContext.clear(); + Map<String, Object> params = getTaskDef().getParameters(); + + if (MapUtils.isEmpty(params)) { + LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid()); + + return FAILED; + } + + String userName = getTaskDef().getCreatedBy(); + + if (StringUtils.isEmpty(userName)) { + LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid()); + + return FAILED; + } + + RequestContext.get().setUser(userName, null); + + try { + run(params); + + setStatus(COMPLETE); + } catch (Exception e) { + LOG.error("Task: {}: Error performing task!", getTaskGuid(), e); + + setStatus(FAILED); + + throw e; + } finally { + RequestContext.clear(); + } + + return getStatus(); + } + + protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException { + Map<String, String> attributeLabelMap; + AtlasSearchResult searchResult = null; + AtlasSearchResult.AtlasQueryType queryType = null; + + if (parameters.get(SEARCH_TYPE_KEY) == BASIC) { + String searchParametersJson = (String) parameters.get(SEARCH_PARAMETERS_JSON_KEY); + SearchParameters searchParameters = AtlasJson.fromJson(searchParametersJson, SearchParameters.class); + searchParameters.setLimit(AtlasConfiguration.SEARCH_MAX_LIMIT.getInt()); + searchResult = discoveryService.searchWithParameters(searchParameters); + queryType = BASIC; + + } else if (parameters.get(SEARCH_TYPE_KEY) == DSL) { + String query = (String) parameters.get(QUERY_KEY); + String typeName = (String) parameters.get(TYPE_NAME_KEY); + String classification = (String) parameters.get(CLASSIFICATION_KEY); + int offset = (int) parameters.get(OFFSET_KEY); + String queryStr = discoveryService.getDslQueryUsingTypeNameClassification(query, typeName, classification); + searchResult = discoveryService.searchUsingDslQuery(queryStr, AtlasConfiguration.SEARCH_MAX_LIMIT.getInt(), offset); + queryType = DSL; + } + + String attributeLabelMapJson = (String) parameters.get(ATTRIBUTE_LABEL_MAP_KEY); + attributeLabelMap = AtlasJson.fromJson(attributeLabelMapJson, Map.class); + + generateCSVFileFromSearchResult(searchResult, attributeLabelMap, queryType); + } + + private void generateCSVFileFromSearchResult(AtlasSearchResult searchResult, Map<String, String> attributeLabelMap, AtlasSearchResult.AtlasQueryType queryType) throws IOException { + + List<AtlasEntityHeader> allEntityHeaders = searchResult.getEntities(); + AtlasSearchResult.AttributeSearchResult attributeSearchResult = searchResult.getAttributes(); + String fileName = (String) getTaskDef().getParameters().get(CSV_FILE_NAME_KEY); + + if ((queryType == BASIC && CollectionUtils.isEmpty(allEntityHeaders)) + || (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult == null))) { + LOG.info("No result found. Not generating csv file: {}", fileName); + return; + } + + File dir = new File(DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser()); + File csvFile; + if (!dir.exists()) { + dir.mkdirs(); + } + + csvFile = new File(dir, fileName); + try (FileWriter fileWriter = new FileWriter(csvFile); + CSVWriter csvWriter = new CSVWriter(fileWriter)) { + + String[] defaultHeaders = new String[]{"Type name", "Name", "Classifications", "Terms"}; + String[] attributeHeaders; + int attrSize = 0; + + if (attributeLabelMap == null) { + attributeLabelMap = new HashMap<>(); + } + attributeLabelMap.put("Owner", "owner"); + attributeLabelMap.put("Description", "description"); + + Collection<String> attributeHeaderLabels = attributeLabelMap.keySet(); + + if (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult != null)) { + attributeHeaderLabels = attributeSearchResult.getName(); + defaultHeaders = new String[0]; + } + + attrSize = (attributeHeaderLabels == null) ? 0 : attributeHeaderLabels.size(); + attributeHeaders = new String[attrSize]; + if (attributeHeaderLabels != null) { + attributeHeaders = attributeHeaderLabels.toArray(attributeHeaders); + } + + int headerSize = attrSize + defaultHeaders.length; + String[] headers = new String[headerSize]; + System.arraycopy(defaultHeaders, 0, headers, 0, defaultHeaders.length); + if (ArrayUtils.isNotEmpty(attributeHeaders)) { + System.arraycopy(attributeHeaders, 0, headers, defaultHeaders.length, attrSize); + } + + csvWriter.writeNext(headers); + + String[] entityRecords = new String[headerSize]; + if (CollectionUtils.isNotEmpty(allEntityHeaders)) { + for (AtlasEntityHeader entityHeader : allEntityHeaders) { + + entityRecords[0] = entityHeader.getTypeName(); + entityRecords[1] = entityHeader.getDisplayText() != null ? entityHeader.getDisplayText() : entityHeader.getGuid(); + entityRecords[2] = String.join(",", entityHeader.getClassificationNames()); + entityRecords[3] = String.join(",", entityHeader.getMeaningNames()); + + if (MapUtils.isNotEmpty(entityHeader.getAttributes())) { + + for (int i = defaultHeaders.length; i < headerSize; i++) { + + Object attrValue = entityHeader.getAttribute(attributeLabelMap.get(headers[i])); + if (attrValue instanceof AtlasObjectId) { + entityRecords[i] = String.valueOf(((AtlasObjectId) attrValue).getUniqueAttributes().get("qualifiedName")); + + } else if (attrValue instanceof List) { + + if (CollectionUtils.isNotEmpty((List<?>) attrValue)) { + List<String> valueList = new ArrayList<>(); + for (Object attrVal : (List) attrValue) { + if (attrVal instanceof AtlasObjectId) { + String value = String.valueOf(((AtlasObjectId) attrVal).getUniqueAttributes().get("qualifiedName")); + valueList.add(value); + } else { + valueList.add(String.valueOf(attrVal)); + } + } + entityRecords[i] = String.join(",", valueList); + } + } else { + entityRecords[i] = attrValue == null ? EMPTY_STRING : String.valueOf(attrValue); + } + } + } + csvWriter.writeNext(entityRecords); + } + } + + if (queryType == DSL && attributeSearchResult != null) { + for (List<Object> resultSet : attributeSearchResult.getValues()) { + for (int i = defaultHeaders.length; i < headerSize; i++) { + entityRecords[i] = resultSet.get(i) == null ? EMPTY_STRING : String.valueOf(resultSet.get(i)); + } + csvWriter.writeNext(entityRecords); + } + } + } + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTaskFactory.java new file mode 100644 index 000000000..dda694883 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTaskFactory.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.discovery.AtlasDiscoveryService; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.tasks.AbstractTask; +import org.apache.atlas.tasks.TaskFactory; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@Singleton +@Service +@EnableScheduling +public class SearchResultDownloadTaskFactory implements TaskFactory { + private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTaskFactory.class); + + public static final String SEARCH_RESULT_DOWNLOAD = "SEARCH_RESULT_DOWNLOAD"; + public static int MAX_PENDING_TASKS_ALLOWED; + private static final int MAX_PENDING_TASKS_ALLOWED_DEFAULT = 50; + private static final String MAX_PENDING_TASKS_ALLOWED_KEY = "atlas.download.search.max.pending.tasks"; + private static final String FILES_CLEANUP_INTERVAL = "0 0/1 * * * *"; + private static final long FILE_EXP_DURATION_IN_MILLIS_DEFAULT = 24 * 60 * 60 * 1000; + private static long FILE_EXP_DURATION_IN_MILLIS; + private static final String FILE_EXP_DURATION_IN_MILLIS_KEY = "atlas.download.search.file.expiry.millis"; + private static Configuration configuration; + + static { + try { + configuration = ApplicationProperties.get(); + } catch (Exception e) { + LOG.info("Failed to load application properties", e); + } + if (configuration != null) { + MAX_PENDING_TASKS_ALLOWED = configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY, MAX_PENDING_TASKS_ALLOWED_DEFAULT); + FILE_EXP_DURATION_IN_MILLIS = configuration.getLong(FILE_EXP_DURATION_IN_MILLIS_KEY, FILE_EXP_DURATION_IN_MILLIS_DEFAULT); + } else { + MAX_PENDING_TASKS_ALLOWED = MAX_PENDING_TASKS_ALLOWED_DEFAULT; + FILE_EXP_DURATION_IN_MILLIS = FILE_EXP_DURATION_IN_MILLIS_DEFAULT; + } + } + + private static final List<String> supportedTypes = new ArrayList<String>() {{ + add(SEARCH_RESULT_DOWNLOAD); + }}; + + private final AtlasDiscoveryService discoveryService; + private final AtlasTypeRegistry typeRegistry; + + @Inject + public SearchResultDownloadTaskFactory(AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) { + this.discoveryService = discoveryService; + this.typeRegistry = typeRegistry; + } + + @Override + public AbstractTask create(AtlasTask task) { + String taskType = task.getType(); + String taskGuid = task.getGuid(); + + switch (taskType) { + case SEARCH_RESULT_DOWNLOAD: + return new SearchResultDownloadTask(task, discoveryService, typeRegistry); + + default: + LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid); + return null; + } + } + + @Override + public List<String> getSupportedTypes() { + return this.supportedTypes; + } + + @Scheduled(cron = "#{getCronExpressionForCleanup}") + public void cleanupExpiredFiles() { + File csvDir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH); + deleteFiles(csvDir); + } + + @Bean + private String getCronExpressionForCleanup() { + return FILES_CLEANUP_INTERVAL; + } + + private void deleteFiles(File downloadDir) { + + File[] subDirs = downloadDir.listFiles(); + + if (ArrayUtils.isNotEmpty(subDirs)) { + for (File subDir : subDirs) { + File[] csvFiles = subDir.listFiles(); + + if (ArrayUtils.isNotEmpty(csvFiles)) { + for (File csv : csvFiles) { + BasicFileAttributes attr; + + try { + attr = Files.readAttributes(csv.toPath(), BasicFileAttributes.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + + Date now = new Date(); + long fileAgeInMillis = now.getTime() - attr.creationTime().toMillis(); + + if (FILE_EXP_DURATION_IN_MILLIS < fileAgeInMillis) { + if (LOG.isDebugEnabled()) { + LOG.debug("deleting file: {}", csv.getName()); + } + csv.delete(); + } + } + } + } + } + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java index 5b4bf71cc..a8a1b9e69 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java @@ -120,6 +120,10 @@ public class TaskManagement implements Service, ActiveStateChangeHandler { return this.registry.getPendingTasks(); } + public List<AtlasTask> getPendingTasksByType(String type) { + return this.registry.getPendingTasksByType(type); + } + public List<AtlasTask> getAll() { return this.registry.getAll(); } diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index 6f770edb1..5d1f50f5b 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -84,6 +84,31 @@ public class TaskRegistry { return ret; } + @GraphTransaction + public List<AtlasTask> getPendingTasksByType(String type) { + List<AtlasTask> ret = new ArrayList<>(); + + try { + AtlasGraphQuery query = graph.query() + .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME) + .has(Constants.TASK_STATUS, AtlasTask.Status.PENDING) + .has(Constants.TASK_TYPE, type) + .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC); + + Iterator<AtlasVertex> results = query.vertices().iterator(); + + while (results.hasNext()) { + AtlasVertex vertex = results.next(); + + ret.add(toAtlasTask(vertex)); + } + } catch (Exception exception) { + LOG.error("Error fetching pending tasks by type!", exception); + } + + return ret; + } + @GraphTransaction public void updateStatus(AtlasVertex taskVertex, AtlasTask task) { if (taskVertex == null) { diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java index a6ca04f3c..d1d19075f 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java @@ -19,6 +19,7 @@ package org.apache.atlas.web.rest; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; import org.apache.atlas.SortOrder; import org.apache.atlas.annotation.Timed; import org.apache.atlas.authorize.AtlasAuthorizationUtils; @@ -29,9 +30,11 @@ import org.apache.atlas.model.discovery.*; import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; import org.apache.atlas.model.profile.AtlasUserSavedSearch; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasJson; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.CollectionUtils; @@ -54,10 +57,21 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.File; import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC; +import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL; +import static org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask.*; + /** * REST interface for data discovery using dsl or full text search */ @@ -107,18 +121,10 @@ public class DiscoveryREST { @QueryParam("classification") String classification, @QueryParam("limit") int limit, @QueryParam("offset") int offset) throws AtlasBaseException { - Servlets.validateQueryParamLength("typeName", typeName); - Servlets.validateQueryParamLength("classification", classification); - if (StringUtils.isNotEmpty(query)) { - if (query.length() > maxDslQueryLength) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_DSL_QUERY_STR_LENGTH); - } - query = Servlets.decodeQueryString(query); - } + validateDSLSearchParameters(query, typeName, classification); AtlasPerfTracer perf = null; - try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchUsingDSL(" + query + "," + typeName @@ -134,6 +140,23 @@ public class DiscoveryREST { } + /** + * + * @param parameterMap + * @throws AtlasBaseException + */ + @POST + @Timed + @Path("dsl/download/create_file") + public void dslSearchCreateFile(Map<String, Object> parameterMap) throws AtlasBaseException { + SearchParameters parameters = AtlasJson.fromLinkedHashMap(parameterMap.get("searchParameters"), SearchParameters.class); + + validateDSLSearchParameters(parameters.getQuery(), parameters.getTypeName(), parameters.getClassification()); + + Map<String, Object> taskParams = populateTaskParams(parameters.getQuery(), parameters.getTypeName(), parameters.getClassification(), parameters.getLimit(), parameters.getOffset()); + + discoveryService.createAndQueueSearchResultDownloadTask(taskParams); + } /** * Retrieve data for the specified fulltext query @@ -330,31 +353,75 @@ public class DiscoveryREST { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchWithParameters(" + parameters + ")"); } - if (parameters.getLimit() < 0 || parameters.getOffset() < 0) { - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative"); - } + validateBasicSearchParameters(parameters); - if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) { - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name"); - } + return discoveryService.searchWithParameters(parameters); + } finally { + AtlasPerfTracer.log(perf); + } + } - if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) { - throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name"); - } + /** + * + * @param parameterMap + * @throws AtlasBaseException + */ + @POST + @Timed + @Path("basic/download/create_file") + public void basicSearchCreateFile(Map<String, Object> parameterMap) throws AtlasBaseException { + AtlasPerfTracer perf = null; - if (StringUtils.isEmpty(parameters.getTypeName()) && StringUtils.isEmpty(parameters.getClassification()) && - StringUtils.isEmpty(parameters.getQuery()) && StringUtils.isEmpty(parameters.getTermName())) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_SEARCH_PARAMS); + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.basicSearchCreateFile(" + parameterMap + ")"); } - validateSearchParameters(parameters); + Map<String, String> attributeLabelMap = (Map<String, String>) parameterMap.get("attributeLabelMap"); + SearchParameters parameters = AtlasJson.fromLinkedHashMap(parameterMap.get("searchParameters"), SearchParameters.class); + + validateBasicSearchParameters(parameters); + + Map<String, Object> taskParams = populateTaskParams(parameters, attributeLabelMap); + + discoveryService.createAndQueueSearchResultDownloadTask(taskParams); - return discoveryService.searchWithParameters(parameters); } finally { AtlasPerfTracer.log(perf); } } + @GET + @Timed + @Path("download/status") + public AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException { + return discoveryService.getSearchResultDownloadStatus(); + } + + /** + * + * @param fileName + * @return + */ + @GET + @Timed + @Path("download/{filename}") + @Produces(MediaType.APPLICATION_OCTET_STREAM) + public Response downloadSearchResultFile(@PathParam("filename") String fileName) { + + File dir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser()); + File csvFile = new File(dir, fileName); + + if (!csvFile.exists()) { + return Response.noContent().build(); + } + + Response.ResponseBuilder response = Response.ok(csvFile); + response.header("Content-Disposition", "attachment; filename=\"" + fileName + "\""); + + return response.build(); + } + /** * Relationship search to search for relations(edges) * @@ -827,4 +894,78 @@ public class DiscoveryREST { validateSearchParameters(EntityDiscoveryService.createSearchParameters(parameters)); } } -} + + private void validateBasicSearchParameters(SearchParameters parameters) throws AtlasBaseException { + + if (parameters.getLimit() < 0 || parameters.getOffset() < 0) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative"); + } + + if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name"); + } + + if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name"); + } + + if (StringUtils.isEmpty(parameters.getTypeName()) && StringUtils.isEmpty(parameters.getClassification()) && + StringUtils.isEmpty(parameters.getQuery()) && StringUtils.isEmpty(parameters.getTermName())) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_SEARCH_PARAMS); + } + + validateSearchParameters(parameters); + } + + private void validateDSLSearchParameters(String query, String typeName, String classification) throws AtlasBaseException { + + Servlets.validateQueryParamLength("typeName", typeName); + Servlets.validateQueryParamLength("classification", classification); + + if (StringUtils.isNotEmpty(query)) { + if (query.length() > maxDslQueryLength) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_DSL_QUERY_STR_LENGTH); + } + Servlets.decodeQueryString(query); + } + } + + private Map<String, Object> populateTaskParams(SearchParameters parameters, Map<String, String> attributeLabelMap) { + + String searchParametersJson = AtlasJson.toJson(parameters); + String attrLabelMapJson = AtlasJson.toJson(attributeLabelMap); + + Map<String, Object> taskParams = new HashMap<>(); + taskParams.put(SEARCH_TYPE_KEY, BASIC); + taskParams.put(SEARCH_PARAMETERS_JSON_KEY, searchParametersJson); + taskParams.put(ATTRIBUTE_LABEL_MAP_KEY, attrLabelMapJson); + + String csvFileName = RequestContext.getCurrentUser() + "_" + BASIC + "_" + getDateTimeString() + CSV_FILE_EXTENSION; + taskParams.put(CSV_FILE_NAME_KEY, csvFileName); + + return taskParams; + } + + private Map<String, Object> populateTaskParams(String query, String typeName, String classification, int limit, int offset) { + + Map<String, Object> taskParams = new HashMap<>(); + taskParams.put(SEARCH_TYPE_KEY, DSL); + taskParams.put(QUERY_KEY, query); + taskParams.put(TYPE_NAME_KEY, typeName); + taskParams.put(CLASSIFICATION_KEY, classification); + taskParams.put(LIMIT_KEY, limit); + taskParams.put(OFFSET_KEY, offset); + + String csvFileName = RequestContext.getCurrentUser() + "_" + DSL + "_" + getDateTimeString() + CSV_FILE_EXTENSION; + taskParams.put(CSV_FILE_NAME_KEY, csvFileName); + + return taskParams; + } + + private String getDateTimeString() { + LocalDateTime localDateTime = LocalDateTime.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss.SSS"); + + return formatter.format(localDateTime); + } +} \ No newline at end of file