This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit db624c3e89e5444a3462893e6c86973f3fb72790 Author: fanshu.kong <1714585...@qq.com> AuthorDate: Wed Nov 23 15:11:15 2022 +0800 KYLIN-5454 Downloading the async query result may cause OOM Co-authored-by: Dorris Zhang <ruixuan.zh...@kyligence.io> --- .../org/apache/kylin/rest/request/SQLRequest.java | 2 + .../java/org/apache/kylin/common/QueryContext.java | 1 + .../common/exception/code/ErrorCodeServer.java | 6 + .../org/apache/kylin/common/msg/CnMessage.java | 10 - .../java/org/apache/kylin/common/msg/Message.java | 11 +- .../resources/kylin_error_msg_conf_cn.properties | 6 +- .../resources/kylin_error_msg_conf_en.properties | 8 + .../main/resources/kylin_errorcode_conf.properties | 6 + .../apache/kylin/query/util/AsyncQueryUtil.java | 9 +- .../rest/controller/NAsyncQueryController.java | 39 +- .../rest/controller/NAsyncQueryControllerV2.java | 24 +- .../rest/controller/NAsyncQueryControllerTest.java | 98 ++-- .../controller/NAsyncQueryControllerV2Test.java | 40 +- .../kylin/rest/request/AsyncQuerySQLRequestV2.java | 3 + .../kylin/rest/service/AsyncQueryService.java | 83 +--- .../org/apache/kylin/rest/service/CSVWriter.java | 120 ----- .../apache/kylin/rest/service/XLSXExcelWriter.java | 155 ------- .../kylin/rest/service/AysncQueryServiceTest.java | 496 ++++++++++++--------- .../kylin/query/pushdown/SparkSqlClient.scala | 51 ++- .../kylin/query/runtime/plan/ResultPlan.scala | 214 +++++++-- 20 files changed, 629 insertions(+), 753 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java index 597ebd52c6..4bc4ce91d8 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java @@ -69,6 +69,8 @@ public class SQLRequest implements Serializable, ProjectInsensitiveRequest, Vali @JsonProperty("file_name") private String fileName = "result"; private Integer forcedToTieredStorage; //0:CH->DFS; 1:CH->pushDown; 2:CH->return error + @JsonProperty("include_header") + private boolean includeHeader; private Map<String, String> backdoorToggles; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 551b66d90f..d65396e95b 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -360,6 +360,7 @@ public class QueryContext implements Closeable { private String fileName; private String separator; private boolean isRefused; + private boolean includeHeader; } @Getter diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java index 54f81183e7..a51a2f54ef 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java @@ -132,6 +132,12 @@ public enum ErrorCodeServer implements ErrorCodeProducer { USER_GROUP_NOT_EXIST("KE-010043220"), REPEATED_PARAMETER("KE-010043221"), + // 100313xx async query + ASYNC_QUERY_RESULT_NOT_FOUND("KE-010031301"), + ASYNC_QUERY_PROJECT_NAME_EMPTY("KE-010031302"), + ASYNC_QUERY_TIME_FORMAT_ERROR("KE-010031303"), + ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY("KE-010031304"), + // 400272XX resource group RESOURCE_GROUP_DISABLE_FAILED("KE-040027201"), RESOURCE_GROUP_ENABLE_FAILED("KE-040027202"), diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java index 6e099d683b..1f7923622b 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java @@ -423,16 +423,6 @@ public class CnMessage extends Message { return "当前无法清理文件夹。请确保相关 HDFS 文件可以正常访问。"; } - @Override - public String getAsyncQueryTimeFormatError() { - return "无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。"; - } - - @Override - public String getAsyncQueryProjectNameEmpty() { - return "项目名称不能为空。请检查后重试。"; - } - @Override public String getUserNotFound() { return "找不到用户 '%s'"; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java index 8f349bd31f..805caa5688 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java @@ -60,8 +60,7 @@ public class Message { private static final String LICENSE_MISMATCH_LICENSE = "The license doesn’t match the current cluster information. Please upload a new license, or contact Kyligence."; private static final String LICENSE_NOT_EFFECTIVE = "License is not effective yet, please apply for a new license."; private static final String LICENSE_EXPIRED = "The license has expired. Please upload a new license, or contact Kyligence."; - private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop " - + "view`, `alter view`, `show create table`"; + private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop view`, `alter view`, `show create table`"; private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KE_"; private static final String DDL_VIEW_NAME_DUPLICATE_ERROR = "Logical View names is duplicate"; private static final String DDL_DROP_ERROR = "Only support drop view"; @@ -522,14 +521,6 @@ public class Message { return "Can’t clean file folder at the moment. Please ensure that the related file on HDFS could be accessed."; } - public String getAsyncQueryTimeFormatError() { - return "The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”."; - } - - public String getAsyncQueryProjectNameEmpty() { - return "The project name can’t be empty. Please check and try again."; - } - public String getUserNotFound() { return "User '%s' not found."; } diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties index 2768232b9d..7697a01c8f 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties @@ -134,7 +134,6 @@ KE-010043219=请使用中文、英文、空格命名用户名和公司。 KE-010043220=找不到用户组 “%s”。请检查后重试。 KE-010043221=参数 “%s” 已存在。请检查后重试。 - ## Streaming KE-010035202=使用解析器 “%s” 解析Topic “%s” 的消息时发生异常,请检查后重试。 KE-010035215=无法正确读取 Kafka 认证文件,请检查后再试。 @@ -160,6 +159,11 @@ KE-010042214=Jar文件 “%s” 不存在。 KE-010042215=解析器 “%s” 已存在。 KE-010042216=Jar文件 “%s” 已存在。 +## 100313xx async query +KE-010031301=该项目下无法找到该 Query ID 对应的异步查询。请检查后重试。 +KE-010031302=项目名称不能为空。请检查后重试。 +KE-010031303=无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。 +KE-010031304=在当前版本中,"include header"参数被移至提交异步查询的API,因此您在下载结果中的"include header"参数将不起作用。请参考产品手册以了解更多细节。 # System ## 400052XX password diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties index fc75aa0610..6acc8373fc 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties @@ -156,6 +156,14 @@ KE-010042214=Jar "%s" does not exist. KE-010042215=Parser "%s" already exists. KE-010042216=Jar "%s" already exists. +## 100313xx async query +KE-010031301=Can’t find the query by this query ID in this project. Please check and try again. +KE-010031302=The project name can’t be empty. Please check and try again. +KE-010031303=The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”. +KE-010031304=Notice:Now we move the "include_header" parameter to Submit Async Query API, so the parameter here doesn't work.Please read user manual for details. + + +### batch 3 # System ## 400052XX password KE-040005201=Can't find PASSWORD ENCODER. Please check configuration item kylin.security.user-password-encoder. diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties index 976e342cc9..8fb380af3f 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties @@ -102,6 +102,12 @@ KE-010032221 KE-010031201 KE-010031202 +## 100313xx async query +KE-010031301 +KE-010031302 +KE-010031303 +KE-010031304 + ## 100102XX computed column KE-010010201 KE-010010202 diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java index b7deb09e12..09e05a4e53 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java @@ -18,6 +18,8 @@ package org.apache.kylin.query.util; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; + import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.Charset; @@ -30,10 +32,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; -import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +80,7 @@ public class AsyncQueryUtil { osw.write(metaString); } } else { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()); + throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND); } } @@ -96,7 +97,7 @@ public class AsyncQueryUtil { osw.write(separator); } } else { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()); + throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND); } } diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java index 30cdb088fe..3be9d9d753 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java @@ -17,9 +17,12 @@ */ package org.apache.kylin.rest.controller; -import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON; +import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR; import java.io.IOException; import java.text.ParseException; @@ -40,16 +43,15 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.QueryErrorCode; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; -import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.exception.ForbiddenException; +import org.apache.kylin.rest.request.AsyncQuerySQLRequest; +import org.apache.kylin.rest.response.AsyncQueryResponse; import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.service.AsyncQueryService; import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.rest.request.AsyncQuerySQLRequest; -import org.apache.kylin.rest.response.AsyncQueryResponse; -import org.apache.kylin.rest.service.AsyncQueryService; import org.apache.spark.sql.SparderEnv; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,6 +138,7 @@ public class NAsyncQueryController extends NBasicController { queryContext.getQueryTagInfo().setFileEncode(encode); queryContext.getQueryTagInfo().setFileName(sqlRequest.getFileName()); queryContext.getQueryTagInfo().setSeparator(sqlRequest.getSeparator()); + queryContext.getQueryTagInfo().setIncludeHeader(sqlRequest.isIncludeHeader()); queryContext.setProject(sqlRequest.getProject()); logger.info("Start a new async query with queryId: {}", queryContext.getQueryId()); String queryId = queryContext.getQueryId(); @@ -203,8 +206,8 @@ public class NAsyncQueryController extends NBasicController { MsgPicker.getMsg().getCleanFolderFail()); } } catch (ParseException e) { - logger.error(MsgPicker.getMsg().getAsyncQueryTimeFormatError(), e); - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryTimeFormatError()); + logger.error(ASYNC_QUERY_TIME_FORMAT_ERROR.getMsg(), e); + throw new KylinException(ASYNC_QUERY_TIME_FORMAT_ERROR); } } @@ -216,7 +219,7 @@ public class NAsyncQueryController extends NBasicController { @RequestParam(value = "project", required = false) String project) throws IOException { if (project == null) { if (sqlRequest == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty()); + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); } project = sqlRequest.getProject(); } @@ -242,7 +245,7 @@ public class NAsyncQueryController extends NBasicController { throws IOException { if (project == null) { if (sqlRequest == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty()); + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); } project = sqlRequest.getProject(); } @@ -283,7 +286,7 @@ public class NAsyncQueryController extends NBasicController { @RequestParam(value = "project", required = false) String project) throws IOException { if (project == null) { if (sqlRequest == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty()); + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); } project = sqlRequest.getProject(); } @@ -306,7 +309,7 @@ public class NAsyncQueryController extends NBasicController { throws IOException { if (project == null) { if (sqlRequest == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty()); + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); } project = sqlRequest.getProject(); } @@ -323,16 +326,19 @@ public class NAsyncQueryController extends NBasicController { @GetMapping(value = "/async_query/{query_id:.+}/result_download") @ResponseBody public void downloadQueryResult(@PathVariable("query_id") String queryId, - @RequestParam(value = "include_header", required = false, defaultValue = "false") boolean include_header, - @RequestParam(value = "includeHeader", required = false, defaultValue = "false") boolean includeHeader, + @RequestParam(value = "oldIncludeHeader", required = false) Boolean oldIncludeHeader, + @RequestParam(value = "includeHeader", required = false) Boolean includeHeader, @Valid @RequestBody(required = false) final AsyncQuerySQLRequest sqlRequest, HttpServletResponse response, @RequestParam(value = "project", required = false) String project) throws IOException { if (project == null) { if (sqlRequest == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty()); + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); } project = sqlRequest.getProject(); } + if (oldIncludeHeader != null || includeHeader != null) { + throw new KylinException(ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY); + } aclEvaluate.checkProjectQueryPermission(project); checkProjectName(project); KylinConfig config = queryService.getConfig(); @@ -356,8 +362,7 @@ public class NAsyncQueryController extends NBasicController { response.setContentType("application/" + format + ";charset=" + encode); } response.setHeader("Content-Disposition", "attachment; filename=\"" + fileName + "." + format + "\""); - asyncQueryService.retrieveSavedQueryResult(project, queryId, includeHeader || include_header, response, format, - encode, fileInfo.getSeparator()); + asyncQueryService.retrieveSavedQueryResult(project, queryId, response, format, encode); } @ApiOperation(value = "async query result path", tags = { "QE" }) @@ -368,7 +373,7 @@ public class NAsyncQueryController extends NBasicController { @RequestParam(value = "project", required = false) String project) throws IOException { if (project == null) { if (sqlRequest == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty()); + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); } project = sqlRequest.getProject(); } diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java index 498f154904..190de73c9b 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2.java @@ -19,17 +19,21 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; +import java.io.IOException; +import java.util.List; + +import javax.servlet.http.HttpServletResponse; +import javax.validation.Valid; + +import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.rest.request.AsyncQuerySQLRequest; import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2; import org.apache.kylin.rest.response.AsyncQueryResponse; import org.apache.kylin.rest.response.AsyncQueryResponseV2; -import org.apache.kylin.rest.service.AsyncQueryService; -import io.swagger.annotations.ApiOperation; -import org.apache.kylin.common.exception.KylinException; -import org.apache.kylin.common.msg.MsgPicker; -import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException; import org.apache.kylin.rest.response.EnvelopeResponse; +import org.apache.kylin.rest.service.AsyncQueryService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; @@ -41,10 +45,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; -import javax.servlet.http.HttpServletResponse; -import javax.validation.Valid; -import java.io.IOException; -import java.util.List; +import io.swagger.annotations.ApiOperation; @RestController @@ -68,6 +69,7 @@ public class NAsyncQueryControllerV2 extends NBasicController { sqlRequest.setProject(asyncQuerySQLRequest.getProject()); sqlRequest.setSql(asyncQuerySQLRequest.getSql()); sqlRequest.setSeparator(asyncQuerySQLRequest.getSeparator()); + sqlRequest.setIncludeHeader(asyncQuerySQLRequest.isIncludeHeader()); sqlRequest.setFormat("csv"); sqlRequest.setEncode("utf-8"); sqlRequest.setFileName("result"); @@ -112,7 +114,7 @@ public class NAsyncQueryControllerV2 extends NBasicController { @GetMapping(value = "/async_query/{query_id:.+}/result_download") @ResponseBody public void downloadQueryResult(@PathVariable("query_id") String queryId, - @RequestParam(value = "includeHeader", required = false, defaultValue = "false") boolean includeHeader, + @RequestParam(value = "includeHeader", required = false) Boolean includeHeader, HttpServletResponse response) throws IOException { asyncQueryController.downloadQueryResult(queryId, includeHeader, includeHeader, null, response, searchProject(queryId)); } @@ -120,7 +122,7 @@ public class NAsyncQueryControllerV2 extends NBasicController { private String searchProject(String queryId) throws IOException { String project = asyncQueryService.searchQueryResultProject(queryId); if (project == null) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()); + throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND); } return project; } diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java index 093c5e9c48..65d74a56f0 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java @@ -19,11 +19,14 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; +import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.FAILED; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.MISS; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS; -import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; import java.io.IOException; import java.text.ParseException; @@ -33,10 +36,13 @@ import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.request.AsyncQuerySQLRequest; +import org.apache.kylin.rest.response.AsyncQueryResponse; import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.service.AsyncQueryService; import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.util.AclEvaluate; import org.junit.After; @@ -56,11 +62,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import org.springframework.test.web.servlet.result.MockMvcResultMatchers; import org.springframework.test.web.servlet.setup.MockMvcBuilders; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.rest.request.AsyncQuerySQLRequest; -import org.apache.kylin.rest.response.AsyncQueryResponse; -import org.apache.kylin.rest.service.AsyncQueryService; - public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { private static final String PROJECT = "default"; @@ -108,6 +109,7 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { asyncQuerySQLRequest.setProject(PROJECT); asyncQuerySQLRequest.setSql("select PART_DT from KYLIN_SALES limit 500"); asyncQuerySQLRequest.setSeparator(","); + asyncQuerySQLRequest.setIncludeHeader(false); return asyncQuerySQLRequest; } @@ -216,8 +218,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(MockMvcResultMatchers.status().isInternalServerError()); - Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -225,8 +227,7 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString()); Mockito.doThrow(new IOException()).when(asyncQueryService).getFileInfo(Mockito.anyString(), Mockito.anyString()); - Mockito.doThrow(new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound())) - .when(asyncQueryService) + Mockito.doThrow(new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND)).when(asyncQueryService) .checkStatus(Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString()); mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123") @@ -238,8 +239,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { "Can’t find the query by this query ID in this project. Please check and try again.")); }); - Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -407,11 +408,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query").param("project", PROJECT) .param("older_than", "2011-11/11 11:11:11") .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))).andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryTimeFormatError(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031303", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_TIME_FORMAT_ERROR.getMsg(), result.getResolvedException().getMessage()); }); } @@ -586,8 +586,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { resolvedException.getMessage()); }); - Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -603,8 +603,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(MockMvcResultMatchers.status().isOk()); - Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), - Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.verify(nAsyncQueryController).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any()); } @Test @@ -638,11 +638,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/{query_id}", "123") .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031302", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(), result.getResolvedException().getMessage()); }); } @@ -652,11 +651,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/status", "123") .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031302", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(), result.getResolvedException().getMessage()); }); } @@ -666,11 +664,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/file_status", "123") .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031302", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(), result.getResolvedException().getMessage()); }); } @@ -680,11 +677,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/metadata", "123") .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031302", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(), result.getResolvedException().getMessage()); }); } @@ -694,11 +690,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123") .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031302", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(), result.getResolvedException().getMessage()); }); } @@ -708,11 +703,10 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/result_path", "123") .contentType(MediaType.APPLICATION_JSON).accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) .andExpect(result -> { - Assert.assertTrue(result.getResolvedException() instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", - ((NAsyncQueryIllegalParamException) result.getResolvedException()).getErrorCode() - .getCodeString()); - Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryProjectNameEmpty(), + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals("KE-010031302", + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(ASYNC_QUERY_PROJECT_NAME_EMPTY.getMsg(), result.getResolvedException().getMessage()); }); } diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java index e570c0fd3e..d33135cc6f 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java @@ -19,22 +19,27 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.FAILED; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.MISS; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING; import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2; -import org.apache.kylin.rest.service.AsyncQueryService; +import java.io.IOException; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2; import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.service.AsyncQueryService; import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.util.AclEvaluate; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.InjectMocks; @@ -50,9 +55,6 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import org.springframework.test.web.servlet.result.MockMvcResultMatchers; import org.springframework.test.web.servlet.setup.MockMvcBuilders; - -import java.io.IOException; - public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase { private static final String PROJECT = "default"; @@ -80,8 +82,8 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase { public void setup() throws IOException { MockitoAnnotations.initMocks(this); - mockMvc = MockMvcBuilders.standaloneSetup(nAsyncQueryControllerV2).defaultRequest(MockMvcRequestBuilders.get("/")) - .build(); + mockMvc = MockMvcBuilders.standaloneSetup(nAsyncQueryControllerV2) + .defaultRequest(MockMvcRequestBuilders.get("/")).build(); SecurityContextHolder.getContext().setAuthentication(authentication); createTestMetadata(); @@ -237,7 +239,27 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase { .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON))) .andExpect(MockMvcResultMatchers.status().isOk()); - Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.anyBoolean(), Mockito.any()); + Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any()); + } + + @Test + public void testDownloadQueryResultNotIncludeHeader() throws Exception { + Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString()); + AsyncQueryService.FileInfo fileInfo = new AsyncQueryService.FileInfo("csv", "gbk", "result"); + Mockito.doReturn(fileInfo).when(asyncQueryService).getFileInfo(Mockito.anyString(), Mockito.anyString()); + Mockito.doReturn(KylinConfig.getInstanceFromEnv()).when(kapQueryService).getConfig(); + + mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id:.+}/result_download", "123") + .param("includeHeader", "false").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON))) + .andExpect(MockMvcResultMatchers.status().isInternalServerError()).andExpect(result -> { + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals(ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY.getMsg(), + result.getResolvedException().getMessage()); + }); + + Mockito.verify(nAsyncQueryControllerV2).downloadQueryResult(Mockito.anyString(), Mockito.any(), Mockito.any()); } } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java b/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java index 5e16dde7b1..eb8e11bd07 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/request/AsyncQuerySQLRequestV2.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.request; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -35,5 +36,7 @@ public class AsyncQuerySQLRequestV2 implements Serializable, ProjectInsensitiveR private String separator = ","; private Integer offset = 0; private Integer limit = 0; + @JsonProperty("include_header") + private boolean includeHeader; } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java index 55faca587a..71f8ca7e9a 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryService.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.service; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; import static org.apache.kylin.query.util.AsyncQueryUtil.getUserFileName; import static org.apache.kylin.rest.util.AclPermissionUtil.isAdmin; @@ -47,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.metadata.project.NProjectManager; @@ -54,10 +56,6 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.exception.NotFoundException; -import org.apache.poi.ss.usermodel.Sheet; -import org.apache.poi.ss.usermodel.Workbook; -import org.apache.poi.xssf.usermodel.XSSFWorkbook; -import org.apache.spark.sql.SparderEnv; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.core.context.SecurityContextHolder; @@ -114,8 +112,8 @@ public class AsyncQueryService extends BasicService { } } - public void retrieveSavedQueryResult(String project, String queryId, boolean includeHeader, - HttpServletResponse response, String fileFormat, String encode, String separator) throws IOException { + public void retrieveSavedQueryResult(String project, String queryId, HttpServletResponse response, + String fileFormat, String encode) throws IOException { checkStatus(queryId, QueryStatus.SUCCESS, project, MsgPicker.getMsg().getQueryResultNotFound()); FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); @@ -126,37 +124,15 @@ public class AsyncQueryService extends BasicService { } try (ServletOutputStream outputStream = response.getOutputStream()) { - String columnNames = null; - if (includeHeader) { - columnNames = processHeader(fileSystem, dataPath); - if (columnNames != null) { - logger.debug("Query:{}, columnMeta:{}", columnNames, columnNames); - if (!columnNames.endsWith(IOUtils.LINE_SEPARATOR_UNIX)) { - columnNames = columnNames + IOUtils.LINE_SEPARATOR_UNIX; - } - } else { - logger.error("Query:{}, no columnMeta found", queryId); - } - } switch (fileFormat) { case "csv": - CSVWriter csvWriter = new CSVWriter(); - processCSV(outputStream, dataPath, includeHeader, columnNames, csvWriter, separator); - break; - case "json": - processJSON(outputStream, dataPath, encode); - break; case "xlsx": - if (!includeHeader) { - processFile(outputStream, dataPath); - } else { - XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter(); - processXLSX(outputStream, dataPath, includeHeader, columnNames, xlsxExcelWriter); - } - break; case "parquet": processFile(outputStream, dataPath); break; + case "json": + processJSON(outputStream, dataPath, encode); + break; default: logger.info("Query:{}, processed", queryId); } @@ -281,7 +257,7 @@ public class AsyncQueryService extends BasicService { public boolean deleteByQueryId(String project, String queryId) throws IOException { Path resultDir = getAsyncQueryResultDir(project, queryId); if (queryStatus(project, queryId) == QueryStatus.MISS) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()); + throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND); } logger.info("clean async query result for query id [{}]", queryId); return AsyncQueryUtil.getFileSystem().delete(resultDir, true); @@ -324,7 +300,7 @@ public class AsyncQueryService extends BasicService { public String asyncQueryResultPath(String project, String queryId) throws IOException { if (queryStatus(project, queryId) == QueryStatus.MISS) { - throw new NAsyncQueryIllegalParamException(MsgPicker.getMsg().getQueryResultNotFound()); + throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND); } return getAsyncQueryResultDir(project, queryId).toString(); } @@ -344,28 +320,6 @@ public class AsyncQueryService extends BasicService { return new Path(KapConfig.getInstanceFromEnv().getAsyncResultBaseDir(project), queryId); } - private String processHeader(FileSystem fileSystem, Path dataPath) throws IOException { - - FileStatus[] fileStatuses = fileSystem.listStatus(dataPath); - for (FileStatus header : fileStatuses) { - if (header.getPath().getName().equals(AsyncQueryUtil.getMetaDataFileName())) { - try (FSDataInputStream inputStream = fileSystem.open(header.getPath()); - BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(inputStream, Charset.defaultCharset()))) { - return bufferedReader.readLine(); - } - } - } - return null; - } - - private void processCSV(OutputStream outputStream, Path dataPath, boolean includeHeader, String columnNames, - CSVWriter excelWriter, String separator) throws IOException { - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - FileStatus[] fileStatuses = fileSystem.listStatus(dataPath); - excelWriter.writeData(fileStatuses, outputStream, columnNames, separator, includeHeader); - } - private void processJSON(OutputStream outputStream, Path dataPath, String encode) throws IOException { FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); FileStatus[] fileStatuses = fileSystem.listStatus(dataPath); @@ -395,25 +349,6 @@ public class AsyncQueryService extends BasicService { } } - private void processXLSX(OutputStream outputStream, Path dataPath, boolean includeHeader, String columnNames, XLSXExcelWriter excelWriter) - throws IOException { - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - FileStatus[] fileStatuses = fileSystem.listStatus(dataPath); - try (Workbook wb = new XSSFWorkbook()) { - Sheet sheet = wb.createSheet("query_result"); - // Apply column names - if (includeHeader && columnNames != null) { - org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(0); - String[] columnNameArray = columnNames.split(SparderEnv.getSeparator()); - for (int i = 0; i < columnNameArray.length; i++) { - excelRow.createCell(i).setCellValue(columnNameArray[i]); - } - } - excelWriter.writeData(fileStatuses, sheet); - wb.write(outputStream); - } - } - public enum QueryStatus { RUNNING, FAILED, SUCCESS, MISS } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java deleted file mode 100644 index a17dcf318c..0000000000 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/CSVWriter.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.rest.service; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.spark.sql.SparderEnv; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.collection.JavaConverters; - -public class CSVWriter { - - private static final Logger logger = LoggerFactory.getLogger("query"); - - private static final String QUOTE_CHAR = "\""; - private static final String END_OF_LINE_SYMBOLS = IOUtils.LINE_SEPARATOR_UNIX; - - public void writeData(FileStatus[] fileStatuses, OutputStream outputStream, - String columnNames, String separator, boolean includeHeaders) throws IOException { - - try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { - if (includeHeaders) { - writer.write(columnNames.replace(",", separator)); - writer.flush(); - } - for (FileStatus fileStatus : fileStatuses) { - if (!fileStatus.getPath().getName().startsWith("_")) { - if (fileStatus.getPath().getName().endsWith("parquet")) { - writeDataByParquet(fileStatus, writer, separator); - } else { - writeDataByCsv(fileStatus, writer, separator); - } - } - } - - writer.flush(); - } - } - - public static void writeCsv(Iterator<List<Object>> rows, Writer writer, String separator) { - rows.forEachRemaining(row -> { - StringBuilder builder = new StringBuilder(); - - for (int i = 0; i < row.size(); i++) { - Object cell = row.get(i); - String column = cell == null ? "" : cell.toString(); - - if (i > 0) { - builder.append(separator); - } - - final String escapedCsv = encodeCell(column, separator); - builder.append(escapedCsv); - } - builder.append(END_OF_LINE_SYMBOLS); // EOL - try { - writer.write(builder.toString()); - } catch (IOException e) { - logger.error("Failed to download asyncQueryResult csvExcel by parquet", e); - } - }); - } - - private void writeDataByParquet(FileStatus fileStatus, Writer writer, String separator) { - List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read() - .parquet(fileStatus.getPath().toString()).collectAsList(); - writeCsv(rowList.stream().map(row -> JavaConverters.seqAsJavaList(row.toSeq())).iterator(), writer, separator); - } - - // the encode logic is copied from org.supercsv.encoder.DefaultCsvEncoder.encode - private static String encodeCell(String cell, String separator) { - - boolean needQuote = cell.contains(separator) || cell.contains("\r") || cell.contains("\n"); - - if (cell.contains(QUOTE_CHAR)) { - needQuote = true; - // escape - cell = cell.replace(QUOTE_CHAR, QUOTE_CHAR + QUOTE_CHAR); - } - - if (needQuote) { - return QUOTE_CHAR + cell + QUOTE_CHAR; - } else { - return cell; - } - } - - private void writeDataByCsv(FileStatus fileStatus, Writer writer, String separator) { - List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read() - .csv(fileStatus.getPath().toString()).collectAsList(); - writeCsv(rowList.stream().map(row -> JavaConverters.seqAsJavaList(row.toSeq())).iterator(), writer, separator); - } - -} diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java deleted file mode 100644 index e54678f375..0000000000 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/XLSXExcelWriter.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.rest.service; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.query.util.AsyncQueryUtil; -import org.apache.poi.ss.usermodel.CellType; -import org.apache.poi.ss.usermodel.Row; -import org.apache.poi.ss.usermodel.Sheet; -import org.apache.poi.xssf.usermodel.XSSFCell; -import org.apache.poi.xssf.usermodel.XSSFRow; -import org.apache.poi.xssf.usermodel.XSSFSheet; -import org.apache.poi.xssf.usermodel.XSSFWorkbook; -import org.apache.spark.sql.SparderEnv; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.clearspring.analytics.util.Lists; - -import lombok.val; - -public class XLSXExcelWriter { - - private static final Logger logger = LoggerFactory.getLogger("query"); - - public void writeData(FileStatus[] fileStatuses, Sheet sheet) { - for (FileStatus fileStatus : fileStatuses) { - if (!fileStatus.getPath().getName().startsWith("_")) { - if (fileStatus.getPath().getName().endsWith("parquet")) { - writeDataByParquet(fileStatus, sheet); - } else if (fileStatus.getPath().getName().endsWith("xlsx")) { - writeDataByXlsx(fileStatus, sheet); - } else { - writeDataByCsv(fileStatus, sheet); - } - } - } - } - - private void writeDataByXlsx(FileStatus f, Sheet sheet) { - boolean createTempFileStatus = false; - File file = new File("temp.xlsx"); - try { - createTempFileStatus = file.createNewFile(); - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath())); - } catch (Exception e) { - logger.error("Export excel writeDataByXlsx create exception f:{} createTempFileStatus:{} ", - f.getPath(), createTempFileStatus, e); - } - try (InputStream is = new FileInputStream(file.getAbsolutePath()); - XSSFWorkbook sheets = new XSSFWorkbook(is)) { - final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows()); - XSSFSheet sheetAt = sheets.getSheetAt(0); - for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) { - XSSFRow row = sheetAt.getRow(i); - org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(offset.get()); - offset.incrementAndGet(); - for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) { - XSSFCell cell = row.getCell(index); - excelRow.createCell(index).setCellValue(getString(cell)); - } - } - Files.delete(file.toPath()); - } catch (Exception e) { - logger.error("Export excel writeDataByXlsx handler exception f:{} createTempFileStatus:{} ", - f.getPath(), createTempFileStatus, e); - } - } - - private static String getString(XSSFCell xssfCell) { - if (xssfCell == null) { - return ""; - } - if (xssfCell.getCellType() == CellType.NUMERIC) { - return String.valueOf(xssfCell.getNumericCellValue()); - } else if (xssfCell.getCellType() == CellType.BOOLEAN) { - return String.valueOf(xssfCell.getBooleanCellValue()); - } else { - return xssfCell.getStringCellValue(); - } - } - - private void writeDataByParquet(FileStatus fileStatus, Sheet sheet) { - final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows()); - List<org.apache.spark.sql.Row> rowList = SparderEnv.getSparkSession().read() - .parquet(fileStatus.getPath().toString()).collectAsList(); - rowList.stream().forEach(row -> { - org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(offset.get()); - offset.incrementAndGet(); - val list = row.toSeq().toList(); - for (int i = 0; i < list.size(); i++) { - Object cell = list.apply(i); - String column = cell == null ? "" : cell.toString(); - excelRow.createCell(i).setCellValue(column); - } - }); - } - - public void writeDataByCsv(FileStatus fileStatus, Sheet sheet) { - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - List<String> rowResults = Lists.newArrayList(); - List<String[]> results = Lists.newArrayList(); - final AtomicInteger offset = new AtomicInteger(sheet.getPhysicalNumberOfRows()); - try (FSDataInputStream inputStream = fileSystem.open(fileStatus.getPath())) { - BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)); - rowResults.addAll(Lists.newArrayList(bufferedReader.lines().collect(Collectors.toList()))); - for (String row : rowResults) { - results.add(row.split(SparderEnv.getSeparator())); - } - for (int i = 0; i < results.size(); i++) { - Row row = sheet.createRow(offset.get()); - offset.incrementAndGet(); - String[] rowValues = results.get(i); - for (int j = 0; j < rowValues.length; j++) { - row.createCell(j).setCellValue(rowValues[j]); - } - } - } catch (IOException e) { - logger.error("Failed to download asyncQueryResult xlsxExcel by csv", e); - } - } -} diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java index 282824ac65..241ffd3d10 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/AysncQueryServiceTest.java @@ -37,10 +37,10 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; -import java.io.StringWriter; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -58,10 +58,14 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.metadata.query.QueryMetricsContext; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; +import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.query.exception.NAsyncQueryIllegalParamException; import org.apache.kylin.query.pushdown.SparkSqlClient; +import org.apache.kylin.query.runtime.plan.ResultPlan; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.response.SQLResponse; import org.apache.poi.ss.usermodel.CellType; @@ -87,6 +91,7 @@ import org.supercsv.io.ICsvListWriter; import org.supercsv.prefs.CsvPreference; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.val; @@ -163,38 +168,23 @@ public class AysncQueryServiceTest extends ServiceTestBase { } @Test - public void testAsyncQueryWithParquetSpecialCharacters() throws IOException { + public void testAsyncQueryAndDownloadCsvResultNotIncludeHeader() throws IOException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); - queryContext.getQueryTagInfo().setFileFormat("CSV"); + queryContext.getQueryTagInfo().setFileFormat("csv"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); - String sql = "select '\\(123\\)','123'"; - queryContext.setProject(PROJECT); + queryContext.getQueryTagInfo().setSeparator(","); + queryContext.getQueryTagInfo().setIncludeHeader(false); - ss.sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false"); - SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); - - await().atMost(60000, TimeUnit.MILLISECONDS).until( - () -> AsyncQueryService.QueryStatus.SUCCESS.equals(asyncQueryService.queryStatus(PROJECT, queryId))); - HttpServletResponse response = mock(HttpServletResponse.class); - ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - when(response.getOutputStream()).thenReturn(servletOutputStream); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] arguments = invocationOnMock.getArguments(); - baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); - return null; - } - }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); + String sql = "select '123\"','123'"; + queryContext.setProject(PROJECT); + ResultPlan.getResult(ss.sql(sql), null); + assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); - SparderEnv.getSparkSession().sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false"); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, ","); List<org.apache.spark.sql.Row> rowList = ss.read() - .parquet(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); + .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); List<String> result = Lists.newArrayList(); rowList.stream().forEach(row -> { val list = row.toSeq().toList(); @@ -204,35 +194,35 @@ public class AysncQueryServiceTest extends ServiceTestBase { result.add(column); } }); - assertEquals("(123)" + "123", result.get(0) + result.get(1)); + assertEquals("123\"\"" + "123", result.get(0) + result.get(1)); + + // download asyncQuery result + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream baos = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault); + Assert.assertEquals("\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test - public void testAsyncQueryDownCsvResultByParquet() throws IOException { + public void testAsyncQueryAndDownloadCsvResultIncludeHeader() throws IOException, SQLException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); queryContext.getQueryTagInfo().setFileFormat("csv"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); + queryContext.getQueryTagInfo().setSeparator(","); + queryContext.getQueryTagInfo().setIncludeHeader(true); + String sql = "select '123\"','123'"; queryContext.setProject(PROJECT); - SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); + + new QueryExec(PROJECT, getTestConfig()).executeQuery(sql); + assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); - HttpServletResponse response = mock(HttpServletResponse.class); - ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - when(response.getOutputStream()).thenReturn(servletOutputStream); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] arguments = invocationOnMock.getArguments(); - baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); - return null; - } - }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, ","); - List<org.apache.spark.sql.Row> rowList = ss.read().csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); + + List<org.apache.spark.sql.Row> rowList = ss.read() + .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); List<String> result = Lists.newArrayList(); rowList.stream().forEach(row -> { val list = row.toSeq().toList(); @@ -242,130 +232,146 @@ public class AysncQueryServiceTest extends ServiceTestBase { result.add(column); } }); - assertEquals("123\"" + "123", result.get(0) + result.get(1)); + assertEquals("EXPR$0" + "EXPR$1", result.get(0) + result.get(1)); + assertEquals("123\"\"" + "123", result.get(2) + result.get(3)); + + // download asyncQuery result + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream baos = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault); + Assert.assertEquals("EXPR$0,EXPR$1\n\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test - public void testSuccessQueryAndDownloadXlsxWriter() throws IOException { + public void testAsyncQueryPushDownAndDownloadCsvResultNotIncludeHeader() throws IOException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); - queryContext.getQueryTagInfo().setFileFormat("xlsx"); + queryContext.getQueryTagInfo().setFileFormat("csv"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); - String sql = "select '123\"' as col1,'123' as col2"; + queryContext.getQueryTagInfo().setSeparator(","); + queryContext.getQueryTagInfo().setIncludeHeader(false); + + String sql = "select '123\"','123'"; queryContext.setProject(PROJECT); + SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); - HttpServletResponse response = mock(HttpServletResponse.class); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); - when(response.getOutputStream()).thenReturn(servletOutputStream); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] arguments = invocationOnMock.getArguments(); - baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); - return null; + + List<org.apache.spark.sql.Row> rowList = ss.read() + .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); + List<String> result = Lists.newArrayList(); + rowList.stream().forEach(row -> { + val list = row.toSeq().toList(); + for (int i = 0; i < list.size(); i++) { + Object cell = list.apply(i); + String column = cell == null ? "" : cell.toString(); + result.add(column); } - }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ","); - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString())); - XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter(); - XSSFWorkbook workbook = new XSSFWorkbook(); - XSSFSheet sheet = workbook.createSheet(); - xlsxExcelWriter.writeData(fileStatuses, sheet); - XSSFRow row = sheet.getRow(0); - assertEquals("123\",123", row.getCell(0) + "," + row.getCell(1)); - assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString()); + }); + assertEquals("123\"\"" + "123", result.get(0) + result.get(1)); + + // download asyncQuery pushDown result + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream baos = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault); + Assert.assertEquals("\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test - public void testSuccessQueryAndDownloadCSV() throws IOException { + public void testAsyncQueryPushDownAndDownloadCsvResultIncludeHeader() throws IOException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); queryContext.getQueryTagInfo().setFileFormat("csv"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); - String sql = "select '123\"' as col1,'123' as col2"; + queryContext.getQueryTagInfo().setSeparator(","); + queryContext.getQueryTagInfo().setIncludeHeader(true); + + String sql = "select '123\"','123'"; queryContext.setProject(PROJECT); + SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); - HttpServletResponse response = mock(HttpServletResponse.class); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); - when(response.getOutputStream()).thenReturn(servletOutputStream); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] arguments = invocationOnMock.getArguments(); - baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); - return null; + + List<org.apache.spark.sql.Row> rowList = ss.read() + .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); + List<String> result = Lists.newArrayList(); + rowList.stream().forEach(row -> { + val list = row.toSeq().toList(); + for (int i = 0; i < list.size(); i++) { + Object cell = list.apply(i); + String column = cell == null ? "" : cell.toString(); + result.add(column); } - }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, "xlsx", encodeDefault, ","); - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString())); - XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter(); - XSSFWorkbook workbook = new XSSFWorkbook(); - XSSFSheet sheet = workbook.createSheet(); - xlsxExcelWriter.writeData(fileStatuses, sheet); - XSSFRow row = sheet.getRow(0); - assertEquals("\"123\\\"\",123", row.getCell(0) + "," + row.getCell(1)); - assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString()); + }); + assertEquals("123\"" + "123", result.get(0) + result.get(1)); + assertEquals("123\"\"" + "123", result.get(2) + result.get(3)); + + // download asyncQuery pushDown result + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream baos = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault); + Assert.assertEquals("123\",123\n\"123\"\"\",123\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test - public void testSuccessQueryAndDownloadCSVForDateFormat() throws IOException { + public void testAsyncQueryAndDownloadCsvResultSpecialSeparator() throws IOException, SQLException { + String separator = "\n"; QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); queryContext.getQueryTagInfo().setFileFormat("csv"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); - String sql = "select '123\"' as col1,'123' as col2, date'2021-02-01' as col3"; + queryContext.getQueryTagInfo().setSeparator(separator); + queryContext.getQueryTagInfo().setIncludeHeader(false); + + String sql = "select '123\"','123'"; queryContext.setProject(PROJECT); - SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); + + new QueryExec(PROJECT, getTestConfig()).executeQuery(sql); + assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); - HttpServletResponse response = mock(HttpServletResponse.class); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); - when(response.getOutputStream()).thenReturn(servletOutputStream); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] arguments = invocationOnMock.getArguments(); - baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); - return null; + + List<org.apache.spark.sql.Row> rowList = ss.read() + .csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); + List<String> result = Lists.newArrayList(); + rowList.stream().forEach(row -> { + val list = row.toSeq().toList(); + for (int i = 0; i < list.size(); i++) { + Object cell = list.apply(i); + String column = cell == null ? "" : cell.toString(); + result.add(column); } - }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, "xlsx", encodeDefault, ","); - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString())); - XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter(); - XSSFWorkbook workbook = new XSSFWorkbook(); - XSSFSheet sheet = workbook.createSheet(); - xlsxExcelWriter.writeData(fileStatuses, sheet); - XSSFRow row = sheet.getRow(0); - assertEquals("\"123\\\"\",123,2021-02-01", row.getCell(0) - + "," + row.getCell(1) + "," + row.getCell(2)); - assertEquals("[col1, col2, col3]", QueryContext.current().getColumnNames().toString()); + }); + assertEquals("123\"\"" + "123", result.get(0) + result.get(1)); + + // download asyncQuery result + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream baos = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault); + Assert.assertEquals("\"123\"\"\"\n" + "123\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test - public void testSuccessQueryAndDownloadCSVNotIncludeHeader() throws IOException { + public void testAsyncQueryWithParquetSpecialCharacters() throws IOException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); - queryContext.getQueryTagInfo().setFileFormat("csv"); + queryContext.getQueryTagInfo().setFileFormat("CSV"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); - String sql = "select '123\"','123'"; + String sql = "select '\\(123\\)','123'"; queryContext.setProject(PROJECT); + + ss.sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false"); SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); - assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); + + await().atMost(60000, TimeUnit.MILLISECONDS).until( + () -> AsyncQueryService.QueryStatus.SUCCESS.equals(asyncQueryService.queryStatus(PROJECT, queryId))); HttpServletResponse response = mock(HttpServletResponse.class); ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -378,9 +384,11 @@ public class AysncQueryServiceTest extends ServiceTestBase { return null; } }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "csv", encodeDefault, "#"); - List<org.apache.spark.sql.Row> rowList = ss.read().csv(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); - Assert.assertEquals("\"123\"\"\"#123\n", baos.toString(StandardCharsets.UTF_8.name())); + + SparderEnv.getSparkSession().sqlContext().setConf("spark.sql.parquet.columnNameCheck.enabled", "false"); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "csv", encodeDefault); + List<org.apache.spark.sql.Row> rowList = ss.read() + .parquet(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString()).collectAsList(); List<String> result = Lists.newArrayList(); rowList.stream().forEach(row -> { val list = row.toSeq().toList(); @@ -390,18 +398,19 @@ public class AysncQueryServiceTest extends ServiceTestBase { result.add(column); } }); - assertEquals("123\"" + "123", result.get(0) + result.get(1)); + assertEquals("(123)" + "123", result.get(0) + result.get(1)); } @Test - public void testSuccessQueryAndDownloadJSON() throws IOException { + public void testSuccessQueryAndDownloadCSVForDateFormat() throws IOException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); - queryContext.getQueryTagInfo().setFileFormat("json"); + queryContext.getQueryTagInfo().setFileFormat("csv"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); - String sql = "select '123\"' as col1,'123' as col2"; + queryContext.getQueryTagInfo().setSeparator(","); + String sql = "select '123\"' as col1,'123' as col2, date'2021-02-01' as col3"; queryContext.setProject(PROJECT); SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); @@ -417,25 +426,15 @@ public class AysncQueryServiceTest extends ServiceTestBase { return null; } }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "json", encodeDefault, ","); - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString())); - XLSXExcelWriter xlsxExcelWriter = new XLSXExcelWriter(); - XSSFWorkbook workbook = new XSSFWorkbook(); - XSSFSheet sheet = workbook.createSheet(); - xlsxExcelWriter.writeData(fileStatuses, sheet); - XSSFRow row = sheet.getRow(0); - assertEquals("{\"col1\":\"123\\\"\",\"col2\":\"123\"}", row.getCell(0) + "," + row.getCell(1)); - assertEquals("[col1, col2]", QueryContext.current().getColumnNames().toString()); } @Test - public void testSuccessQueryAndDownloadXlsxResultByParquet() throws IOException { + public void testSuccessQueryAndDownloadJSON() throws IOException { QueryContext queryContext = QueryContext.current(); String queryId = queryContext.getQueryId(); mockMetadata(queryId, true); queryContext.getQueryTagInfo().setAsyncQuery(true); - queryContext.getQueryTagInfo().setFileFormat("xlsx"); + queryContext.getQueryTagInfo().setFileFormat("json"); queryContext.getQueryTagInfo().setFileEncode("utf-8"); String sql = "select '123\"' as col1,'123' as col2"; queryContext.setProject(PROJECT); @@ -453,44 +452,63 @@ public class AysncQueryServiceTest extends ServiceTestBase { return null; } }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ","); - FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); + } + + @Test + public void testSuccessQueryAndDownloadXlsxResultNotIncludeHeader() throws IOException { + QueryContext queryContext = QueryContext.current(); + String queryId = queryContext.getQueryId(); + mockMetadata(queryId, true); + queryContext.getQueryTagInfo().setAsyncQuery(true); + queryContext.getQueryTagInfo().setFileFormat("xlsx"); + queryContext.getQueryTagInfo().setFileEncode("utf-8"); + String sql = "select '123\"' as col1,'123' as col2"; + queryContext.setProject(PROJECT); + SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); + assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream outputStream = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "xlsx", encodeDefault); + File file = new File("result.xlsx"); boolean createTempFileStatus = file.createNewFile(); - ArrayList<String> list = new ArrayList<>(); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString())); - for (FileStatus f : fileStatuses) { - if (!f.getPath().getName().startsWith("_")) { - fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath())); - try(InputStream is = new FileInputStream(file.getAbsolutePath()); - XSSFWorkbook sheets = new XSSFWorkbook(is)) { - XSSFSheet sheetAt = sheets.getSheetAt(0); - for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) { - XSSFRow row = sheetAt.getRow(i); - StringBuilder builder = new StringBuilder(); - for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) { - XSSFCell cell = row.getCell(index); - if (index > 0) { - builder.append(","); - } - builder.append(getString(cell)); - } - list.add(builder.toString()); - } - } - } - } + List<String> list = getXlsxResult(queryId, file); Files.delete(file.toPath()); - logger.info("Temp File status createTempFileStatus:{}", - createTempFileStatus); + logger.info("Temp File status createTempFileStatus:{}", createTempFileStatus); assertEquals("123\",123", list.get(0)); } + @Test + public void testSuccessQueryAndDownloadXlsxResultIncludeHeader() throws IOException { + QueryContext queryContext = QueryContext.current(); + String queryId = queryContext.getQueryId(); + mockMetadata(queryId, true); + queryContext.getQueryTagInfo().setAsyncQuery(true); + queryContext.getQueryTagInfo().setFileFormat("xlsx"); + queryContext.getQueryTagInfo().setFileEncode("utf-8"); + queryContext.getQueryTagInfo().setIncludeHeader(true); + String sql = "select '123\"' as col1,'123' as col2"; + queryContext.setProject(PROJECT); + SparkSqlClient.executeSql(ss, sql, UUID.fromString(queryId), PROJECT); + assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); + HttpServletResponse response = mock(HttpServletResponse.class); + ByteArrayOutputStream outputStream = mockOutputStream(response); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "xlsx", encodeDefault); + + File file = new File("result.xlsx"); + boolean createTempFileStatus = file.createNewFile(); + List<String> list = getXlsxResult(queryId, file); + Files.delete(file.toPath()); + logger.info("Temp File status createTempFileStatus:{}", createTempFileStatus); + assertEquals("col1,col2", list.get(0)); + assertEquals("123\",123", list.get(1)); + } + private static String getString(XSSFCell xssfCell) { if (xssfCell == null) { return ""; } - if (xssfCell.getCellType()== CellType.NUMERIC) { + if (xssfCell.getCellType() == CellType.NUMERIC) { return String.valueOf(xssfCell.getNumericCellValue()); } else if (xssfCell.getCellType() == CellType.BOOLEAN) { return String.valueOf(xssfCell.getBooleanCellValue()); @@ -519,9 +537,9 @@ public class AysncQueryServiceTest extends ServiceTestBase { } }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, formatDefault, encodeDefault, ","); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault); - assertEquals("a1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name())); + assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test @@ -542,9 +560,9 @@ public class AysncQueryServiceTest extends ServiceTestBase { return null; }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, true, response, formatDefault, encodeDefault, ","); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault); - assertEquals("name,age,city\na1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name())); + assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test @@ -565,9 +583,9 @@ public class AysncQueryServiceTest extends ServiceTestBase { return null; }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, formatDefault, encodeDefault, ","); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, formatDefault, encodeDefault); - assertEquals("a1,b1,c1\n" + "a2,b2,c2\n", baos.toString(StandardCharsets.UTF_8.name())); + assertEquals("a1,b1,c1\r\n" + "a2,b2,c2\r\n", baos.toString(StandardCharsets.UTF_8.name())); } @Test @@ -590,35 +608,12 @@ public class AysncQueryServiceTest extends ServiceTestBase { } }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "json", encodeDefault, ","); + asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, response, "json", encodeDefault); assertEquals("[\"{'column1':'a1', 'column2':'b1'}\",\"{'column1':'a2', 'column2':'b2'}\"]", baos.toString(StandardCharsets.UTF_8.name())); } - @Test - public void testSuccessQueryAndDownloadXlsxResult() throws IOException, InterruptedException { - SQLResponse sqlResponse = mock(SQLResponse.class); - when(sqlResponse.isException()).thenReturn(false); - String queryId = RandomUtil.randomUUIDStr(); - mockResultFile(queryId, false, true); - assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); - HttpServletResponse response = mock(HttpServletResponse.class); - ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - when(response.getOutputStream()).thenReturn(servletOutputStream); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] arguments = invocationOnMock.getArguments(); - baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); - return null; - } - }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); - - asyncQueryService.retrieveSavedQueryResult(PROJECT, queryId, false, response, "xlsx", encodeDefault, ","); - } - @Test public void testCleanFolder() throws IOException, InterruptedException { String queryId = RandomUtil.randomUUIDStr(); @@ -643,7 +638,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { try { new Path(asyncQueryService.asyncQueryResultPath(PROJECT, queryId)); } catch (Exception e) { - Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException); + Assert.assertTrue(e instanceof KylinException); Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.", e.getMessage()); } @@ -654,7 +649,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { try { asyncQueryService.deleteByQueryId(PROJECT, "123"); } catch (Exception e) { - Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException); + Assert.assertTrue(e instanceof KylinException); Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.", e.getMessage()); } @@ -678,7 +673,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { try { new Path(asyncQueryService.asyncQueryResultPath(PROJECT, queryId)); } catch (Exception e) { - Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException); + Assert.assertTrue(e instanceof KylinException); Assert.assertEquals("Can’t find the query by this query ID in this project. Please check and try again.", e.getMessage()); } @@ -808,14 +803,14 @@ public class AysncQueryServiceTest extends ServiceTestBase { try { AsyncQueryUtil.saveMetaData(PROJECT, sqlResponse.getColumnMetas(), queryId); } catch (Exception e) { - Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", ((NAsyncQueryIllegalParamException) e).getErrorCode().getCodeString()); + Assert.assertTrue(e instanceof KylinException); + Assert.assertEquals("KE-010031301", ((KylinException) e).getErrorCode().getCodeString()); } try { AsyncQueryUtil.saveFileInfo(PROJECT, formatDefault, encodeDefault, fileNameDefault, queryId, ","); } catch (Exception e) { - Assert.assertTrue(e instanceof NAsyncQueryIllegalParamException); - Assert.assertEquals("KE-020040001", ((NAsyncQueryIllegalParamException) e).getErrorCode().getCodeString()); + Assert.assertTrue(e instanceof KylinException); + Assert.assertEquals("KE-010031301", ((KylinException) e).getErrorCode().getCodeString()); } } @@ -902,7 +897,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { Path asyncQueryResultDir = AsyncQueryUtil.getAsyncQueryResultDir(PROJECT, queryId); fileSystem.delete(new Path(asyncQueryResultDir, AsyncQueryUtil.getFileInfo())); try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, AsyncQueryUtil.getFileInfo())); - OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset())) { + OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset())) { osw.write(formatDefault + "\n"); osw.write(encodeDefault + "\n"); osw.write("foo" + "\n"); @@ -924,6 +919,33 @@ public class AysncQueryServiceTest extends ServiceTestBase { assertArrayEquals(dataTypes.toArray(), metaData.get(1).toArray()); } + @Test + public void testAsyncQueryResultRowCount() throws Exception { + overwriteSystemProp("kylin.env", "DEV"); + QueryContext queryContext = QueryContext.current(); + String queryId = queryContext.getQueryId(); + mockMetadata(queryId, true); + queryContext.getQueryTagInfo().setAsyncQuery(true); + queryContext.getQueryTagInfo().setFileFormat("csv"); + queryContext.getQueryTagInfo().setFileEncode("utf-8"); + queryContext.getQueryTagInfo().setSeparator(","); + queryContext.getQueryTagInfo().setIncludeHeader(false); + queryContext.setAclInfo(new QueryContext.AclInfo("ADMIN", Sets.newHashSet("g1"), true)); + + String sql = "select '123\"','123'"; + queryContext.setProject(PROJECT); + + new QueryExec(PROJECT, getTestConfig()).executeQuery(sql); + + assertSame(AsyncQueryService.QueryStatus.SUCCESS, asyncQueryService.queryStatus(PROJECT, queryId)); + + QueryMetricsContext.start(queryId, ""); + Assert.assertTrue(QueryMetricsContext.isStarted()); + QueryMetricsContext metrics = QueryMetricsContext.collect(queryContext); + Assert.assertEquals(1, metrics.getResultRowCount()); + QueryMetricsContext.reset(); + } + public Path mockResultFile(String queryId, boolean block, boolean needMeta) throws IOException, InterruptedException { @@ -939,8 +961,8 @@ public class AysncQueryServiceTest extends ServiceTestBase { } try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, "m00")); // - OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); // - ICsvListWriter csvWriter = new CsvListWriter(osw, CsvPreference.STANDARD_PREFERENCE)) { + OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8); // + ICsvListWriter csvWriter = new CsvListWriter(osw, CsvPreference.STANDARD_PREFERENCE)) { csvWriter.write(row1); csvWriter.write(row2); fileSystem.createNewFile(new Path(asyncQueryResultDir, AsyncQueryUtil.getSuccessFlagFileName())); @@ -963,7 +985,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { fileSystem.mkdirs(asyncQueryResultDir); } try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, "m00")); // - OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { + OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { osw.write(StringEscapeUtils.unescapeJson(row1)); osw.write(StringEscapeUtils.unescapeJson(row2)); fileSystem.createNewFile(new Path(asyncQueryResultDir, AsyncQueryUtil.getSuccessFlagFileName())); @@ -982,7 +1004,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { } try (FSDataOutputStream os = fileSystem .create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); // - OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { // + OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { // String metaString = String.join(",", columnNames) + "\n" + String.join(",", dataTypes); osw.write(metaString); if (needMeta) { @@ -1003,7 +1025,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { } try (FSDataOutputStream os = fileSystem .create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); // - OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { // + OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { // osw.write(formatDefault); } catch (IOException e) { @@ -1019,7 +1041,7 @@ public class AysncQueryServiceTest extends ServiceTestBase { } try (FSDataOutputStream os = fileSystem .create(new Path(asyncQueryResultDir, AsyncQueryUtil.getMetaDataFileName())); // - OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { // + OutputStreamWriter osw = new OutputStreamWriter(os, StandardCharsets.UTF_8)) { // osw.write(encodeDefault); } catch (IOException e) { @@ -1027,17 +1049,49 @@ public class AysncQueryServiceTest extends ServiceTestBase { } } - @Test - public void testCsvWriter() throws IOException { - List<List<Object>> rows = Lists.newArrayList( - Lists.newArrayList(1, 3.12, "foo"), - Lists.newArrayList(2, 3.123, "fo<>o"), - Lists.newArrayList(3, 3.124, "fo\ro") - ); - String expected = "1<>3.12<>foo\n2<>3.123<>\"fo<>o\"\n3<>3.124<>\"fo\ro\"\n"; - try (StringWriter sw = new StringWriter()) { - CSVWriter.writeCsv(rows.iterator(), sw, "<>"); - assertEquals(expected, sw.toString()); + public ByteArrayOutputStream mockOutputStream(HttpServletResponse response) throws IOException { + + ServletOutputStream servletOutputStream = mock(ServletOutputStream.class); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + when(response.getOutputStream()).thenReturn(servletOutputStream); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] arguments = invocationOnMock.getArguments(); + baos.write((byte[]) arguments[0], (int) arguments[1], (int) arguments[2]); + return null; + } + }).when(servletOutputStream).write(any(byte[].class), anyInt(), anyInt()); + return baos; + } + + public List<String> getXlsxResult(String queryId, File file) throws IOException { + FileSystem fileSystem = AsyncQueryUtil.getFileSystem(); + List<String> list = new ArrayList<>(); + FileStatus[] fileStatuses = fileSystem + .listStatus(new Path(asyncQueryService.getAsyncQueryResultDir(PROJECT, queryId).toString())); + for (FileStatus f : fileStatuses) { + if (f.getPath().getName().startsWith("_")) { + continue; + } + fileSystem.copyToLocalFile(f.getPath(), new Path(file.getPath())); + try (InputStream is = new FileInputStream(file.getAbsolutePath()); + XSSFWorkbook sheets = new XSSFWorkbook(is)) { + XSSFSheet sheetAt = sheets.getSheetAt(0); + for (int i = 0; i < sheetAt.getPhysicalNumberOfRows(); i++) { + XSSFRow row = sheetAt.getRow(i); + StringBuilder builder = new StringBuilder(); + for (int index = 0; index < row.getPhysicalNumberOfCells(); index++) { + XSSFCell cell = row.getCell(index); + if (index > 0) { + builder.append(","); + } + builder.append(getString(cell)); + } + list.add(builder.toString()); + } + } } + return list; } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index 5b6d20c715..b5640d7c29 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -33,10 +33,11 @@ import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.hive.QueryMetricUtils import org.apache.spark.sql.hive.utils.ResourceDetectUtils import org.apache.spark.sql.util.SparderTypeUtil -import org.apache.spark.sql.{DataFrame, SparderEnv, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparderEnv, SparkSession} import org.slf4j.{Logger, LoggerFactory} import java.sql.Timestamp +import java.util import java.util.{UUID, List => JList} import scala.collection.JavaConverters._ import scala.collection.{immutable, mutable} @@ -130,29 +131,8 @@ object SparkSqlClient { QueryContext.current().getMetrics.setQueryJobCount(jobCount) QueryContext.current().getMetrics.setQueryStageCount(stageCount) QueryContext.current().getMetrics.setQueryTaskCount(taskCount) - ( - () => new java.util.Iterator[JList[String]] { - /* - * After fetching a batch of 1000, checks whether the query thread is interrupted. - */ - val checkInterruptSize = 1000; - var readRowSize = 0; - - override def hasNext: Boolean = resultRows.hasNext - - override def next(): JList[String] = { - val row = resultRows.next() - readRowSize += 1; - if (readRowSize % checkInterruptSize == 0) { - QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", - "Current step: Collecting dataset of push-down.") - } - row.toSeq.map(rawValueToString(_)).asJava - } - }, - resultSize, - fieldList - ) + // return result + (readPushDownResultRow(resultRows, true), resultSize, fieldList) } catch { case e: Throwable => if (e.isInstanceOf[InterruptedException]) { @@ -169,6 +149,29 @@ object SparkSqlClient { } } + def readPushDownResultRow(resultRows: util.Iterator[Row], checkInterrupt: Boolean): java.lang.Iterable[JList[String]] = { + () => + new java.util.Iterator[JList[String]] { + /* + * After fetching a batch of 1000, checks whether the query thread is interrupted. + */ + val checkInterruptSize = 1000; + var readRowSize = 0; + + override def hasNext: Boolean = resultRows.hasNext + + override def next(): JList[String] = { + val row = resultRows.next() + readRowSize += 1; + if (checkInterrupt && readRowSize % checkInterruptSize == 0) { + QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", + "Current step: Collecting dataset of push-down.") + } + row.toSeq.map(rawValueToString(_)).asJava + } + } + } + private def rawValueToString(value: Any, wrapped: Boolean = false): String = value match { case null => null case value: Timestamp => DateFormat.castTimestampToString(value.getTime) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 7ecc408674..8cc3a6ba25 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -20,7 +20,8 @@ package org.apache.kylin.query.runtime.plan import com.google.common.cache.{Cache, CacheBuilder} import io.kyligence.kap.secondstorage.SecondStorageUtil -import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path import org.apache.kylin.common.exception.NewQueryRefuseException import org.apache.kylin.common.util.{HadoopUtil, RandomUtil} @@ -30,19 +31,22 @@ import org.apache.kylin.metadata.query.{BigQueryThresholdUpdater, StructField} import org.apache.kylin.metadata.state.QueryShareStateManager import org.apache.kylin.query.engine.RelColumnMetaDataExtractor import org.apache.kylin.query.engine.exec.ExecuteResult +import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow import org.apache.kylin.query.relnode.OLAPContext import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, SparkQueryJobManager} -import org.apache.poi.xssf.usermodel.XSSFWorkbook +import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook} import org.apache.spark.SparkConf import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.QueryMetricUtils import org.apache.spark.sql.util.SparderTypeUtil -import org.apache.spark.sql.{DataFrame, SaveMode, SparderEnv} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv} -import java.io.{File, FileOutputStream} -import java.util +import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicLong +import java.{lang, util} import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions.`iterator asScala` import scala.collection.mutable // scalastyle:off @@ -55,6 +59,10 @@ object ResultPlan extends LogEx { val PARTITION_SPLIT_BYTES: Long = KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 // 64MB val SPARK_SCHEDULER_POOL: String = "spark.scheduler.pool" + val QUOTE_CHAR = "\"" + val END_OF_LINE_SYMBOLS = IOUtils.LINE_SEPARATOR_UNIX + val CHECK_WRITE_SIZE = 1000 + private def collectInternal(df: DataFrame, rowType: RelDataType): (java.lang.Iterable[util.List[String]], Int) = logTime("collectInternal", debug = true) { val jobGroup = Thread.currentThread().getName val sparkContext = SparderEnv.getSparkSession.sparkContext @@ -139,20 +147,7 @@ object ResultPlan extends LogEx { s"Is TableIndex: ${QueryContext.current().getQueryTagInfo.isTableIndex}") val resultTypes = rowType.getFieldList.asScala - (() => new util.Iterator[util.List[String]] { - - override def hasNext: Boolean = resultRows.hasNext - - override def next(): util.List[String] = { - val row = resultRows.next() - if (Thread.interrupted()) { - throw new InterruptedException - } - row.toSeq.zip(resultTypes).map { - case (value, relField) => SparderTypeUtil.convertToStringWithCalciteType(value, relField.getType) - }.asJava - } - }, resultSize) + (readResultRow(resultRows, resultTypes), resultSize) } catch { case e: Throwable => if (e.isInstanceOf[InterruptedException]) { @@ -167,6 +162,25 @@ object ResultPlan extends LogEx { } } + + def readResultRow(resultRows: util.Iterator[Row], resultTypes: mutable.Buffer[RelDataTypeField]): lang.Iterable[util.List[String]] = { + () => + new util.Iterator[util.List[String]] { + + override def hasNext: Boolean = resultRows.hasNext + + override def next(): util.List[String] = { + val row = resultRows.next() + if (Thread.interrupted()) { + throw new InterruptedException + } + row.toSeq.zip(resultTypes).map { + case (value, relField) => SparderTypeUtil.convertToStringWithCalciteType(value, relField.getType) + }.asJava + } + } + } + private def getNormalizedExplain(df: DataFrame): String = { df.queryExecution.executedPlan.toString.replaceAll("#\\d+", "#x") } @@ -282,6 +296,8 @@ object ResultPlan extends LogEx { QueryContext.currentTrace().endLastSpan() val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), QueryContext.current().getQueryId, sparkContext) val dateTimeFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ" + val queryId = QueryContext.current().getQueryId + val includeHeader = QueryContext.current().getQueryTagInfo.isIncludeHeader format match { case "json" => val oldColumnNames = df.columns @@ -302,31 +318,8 @@ object ResultPlan extends LogEx { normalizeSchema(df).write.mode(SaveMode.Overwrite).option("encoding", encode).option("charset", "utf-8").parquet(path) } sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "false") - case "csv" => - df.write - .option("timestampFormat", dateTimeFormat) - .option("encoding", encode) - .option("dateFormat", "yyyy-MM-dd") - .option("charset", "utf-8").mode(SaveMode.Append).csv(path) - case "xlsx" => { - val queryId = QueryContext.current().getQueryId - val file = new File(queryId + ".xlsx") - file.createNewFile(); - val outputStream = new FileOutputStream(file) - val workbook = new XSSFWorkbook - val sheet = workbook.createSheet("query_result"); - var num = 0 - df.collect().foreach(row => { - val row1 = sheet.createRow(num) - for (i <- 0 until row.length) { - row1.createCell(i).setCellValue(row.apply(i).toString) - } - num = num + 1 - }) - workbook.write(outputStream) - HadoopUtil.getWorkingFileSystem - .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + ".xlsx")) - } + case "csv" => processCsv(df, format, rowType, path, queryId, includeHeader) + case "xlsx" => processXlsx(df, format, rowType, path, queryId, includeHeader) case _ => normalizeSchema(df).write.option("timestampFormat", dateTimeFormat).option("encoding", encode) .option("charset", "utf-8").mode(SaveMode.Append).parquet(path) @@ -345,11 +338,142 @@ object ResultPlan extends LogEx { QueryContext.current().getMetrics.setQueryJobCount(jobCount) QueryContext.current().getMetrics.setQueryStageCount(stageCount) QueryContext.current().getMetrics.setQueryTaskCount(taskCount) - QueryContext.current().getMetrics.setResultRowCount(newExecution.executedPlan.metrics.get("numOutputRows") + setResultRowCount(newExecution.executedPlan) + } + } + + def setResultRowCount(plan: SparkPlan): Unit = { + if (QueryContext.current().getMetrics.getResultRowCount == 0) { + QueryContext.current().getMetrics.setResultRowCount(plan.metrics.get("numOutputRows") .map(_.value).getOrElse(0)) } } + def processCsv(df: DataFrame, format: String, rowType: RelDataType, path: String, queryId: String, includeHeader: Boolean) = { + val file = createTmpFile(queryId, format) + val writer = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8) + if (includeHeader) processCsvHeader(writer, rowType) + val (iterator, resultRowSize) = df.toIterator() + asyncQueryIteratorWriteCsv(iterator, writer, rowType) + uploadAsyncQueryResult(file, path, queryId, format) + setResultRowCount(resultRowSize) + } + + def processXlsx(df: DataFrame, format: String, rowType: RelDataType, path: String, queryId: String, includeHeader: Boolean) = { + val file = createTmpFile(queryId, format) + val outputStream = new FileOutputStream(file) + val workbook = new XSSFWorkbook + val sheet = workbook.createSheet("query_result") + var num = 0 + if (includeHeader) { + processXlsxHeader(sheet, rowType) + num += 1 + } + val (iterator, resultRowSize) = df.toIterator() + iterator.foreach(row => { + val row1 = sheet.createRow(num) + row.toSeq.zipWithIndex.foreach(it => row1.createCell(it._2).setCellValue(it._1.toString)) + num += 1 + }) + workbook.write(outputStream) + uploadAsyncQueryResult(file, path, queryId, format) + setResultRowCount(resultRowSize) + } + + private def setResultRowCount(resultRowSize: Int) = { + if (!KylinConfig.getInstanceFromEnv.isUTEnv) { + QueryContext.current().getMetrics.setResultRowCount(resultRowSize) + } + } + + def processCsvHeader(writer: OutputStreamWriter, rowType: RelDataType): Unit = { + val separator = QueryContext.current().getQueryTagInfo.getSeparator + rowType match { + case null => + val columnNames = QueryContext.current().getColumnNames.asScala.mkString(separator) + writer.write(columnNames + END_OF_LINE_SYMBOLS) + case _ => + val builder = new StringBuilder + rowType.getFieldList.asScala.map(t => t.getName).foreach(column => builder.append(separator + column)) + builder.deleteCharAt(0) + writer.write(builder.toString() + END_OF_LINE_SYMBOLS) + } + writer.flush() + } + + def processXlsxHeader(sheet: XSSFSheet, rowType: RelDataType): Unit = { + val excelRow = sheet.createRow(0) + + rowType match { + case null => + val columnNameArray = QueryContext.current().getColumnNames + columnNameArray.asScala.zipWithIndex + .foreach(it => excelRow.createCell(it._2).setCellValue(it._1)) + case _ => + val columnArray = rowType.getFieldList.asScala.map(t => t.getName) + columnArray.zipWithIndex.foreach(it => excelRow.createCell(it._2).setCellValue(it._1)) + } + } + + def createTmpFile(queryId: String, format: String): File = { + val file = new File(queryId + format) + file.createNewFile() + file + } + + def uploadAsyncQueryResult(file: File, path: String, queryId: String, format: String): Unit = { + HadoopUtil.getWorkingFileSystem + .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + "." + format)) + if (file.exists()) file.delete() + } + + def asyncQueryIteratorWriteCsv(resultRows: util.Iterator[Row], outputStream: OutputStreamWriter, rowType: RelDataType): Unit = { + var asyncQueryRowSize = 0 + val separator = QueryContext.current().getQueryTagInfo.getSeparator + val asyncQueryResult = if (rowType != null) { + val resultTypes = rowType.getFieldList.asScala + readResultRow(resultRows, resultTypes) + } else { + readPushDownResultRow(resultRows, false) + } + + asyncQueryResult.forEach(row => { + + asyncQueryRowSize += 1 + val builder = new StringBuilder + + for (i <- 0 until row.size()) { + val column = if (row.get(i) == null) "" else row.get(i) + + if (i > 0) builder.append(separator) + + val escapedCsv = encodeCell(column, separator) + builder.append(escapedCsv) + } + builder.append(END_OF_LINE_SYMBOLS) + outputStream.write(builder.toString()) + if (asyncQueryRowSize % CHECK_WRITE_SIZE == 0) { + outputStream.flush() + } + }) + outputStream.flush() + } + + // the encode logic is copied from org.supercsv.encoder.DefaultCsvEncoder.encode + def encodeCell(column1: String, separator: String): String = { + + var column = column1 + var needQuote = column.contains(separator) || column.contains("\r") || column.contains("\n") + + if (column.contains(QUOTE_CHAR)) { + needQuote = true + column = column.replace(QUOTE_CHAR, QUOTE_CHAR + QUOTE_CHAR) + } + + if (needQuote) QUOTE_CHAR + column + QUOTE_CHAR + else column + } + /** * Normalize column name by replacing invalid characters with underscore * and strips accents