This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 64c7941daa20e140d58fae1d442599fb7b14c011 Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com> AuthorDate: Fri Nov 10 11:35:26 2023 +0800 KYLIN-5870 Fix async query status for out of segment range situation --- .../java/org/apache/kylin/common/QueryContext.java | 4 ++ .../common/exception/code/ErrorCodeServer.java | 15 ++++++++ .../resources/kylin_error_msg_conf_cn.properties | 1 + .../resources/kylin_error_msg_conf_en.properties | 2 +- .../main/resources/kylin_errorcode_conf.properties | 1 + .../kylin/common/exception/code/ErrorCodeTest.java | 16 ++++++++ .../apache/kylin/query/util/AsyncQueryUtil.java | 44 ++++++++++++++++++++++ .../rest/controller/NAsyncQueryController.java | 7 +++- .../rest/controller/NAsyncQueryControllerTest.java | 7 ++++ .../controller/NAsyncQueryControllerV2Test.java | 2 + .../kylin/query/engine/exec/SparderPlanExec.java | 10 +++-- .../kylin/query/runtime/plan/ResultPlan.scala | 6 ++- 12 files changed, 107 insertions(+), 8 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index c0dbfcbf33..2bf371fc0e 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -176,6 +176,10 @@ public class QueryContext implements Closeable { @Setter private boolean isBigQuery = false; + @Getter + @Setter + private boolean outOfSegmentRange = false; + private QueryContext() { // use QueryContext.current() instead queryId = RandomUtil.randomUUIDStr(); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java index 16f290a6f1..4dcac02280 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java @@ -17,6 +17,8 @@ */ package org.apache.kylin.common.exception.code; +import org.apache.commons.lang3.StringUtils; + public enum ErrorCodeServer implements ErrorCodeProducer { // 100012XX project @@ -165,6 +167,7 @@ public enum ErrorCodeServer implements ErrorCodeProducer { ASYNC_QUERY_PROJECT_NAME_EMPTY("KE-010031302"), ASYNC_QUERY_TIME_FORMAT_ERROR("KE-010031303"), ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY("KE-010031304"), + ASYNC_QUERY_OUT_OF_DATA_RANGE("KE-010031305"), // 400272XX resource group RESOURCE_GROUP_DISABLE_FAILED("KE-040027201"), @@ -202,6 +205,18 @@ public enum ErrorCodeServer implements ErrorCodeProducer { CUSTOM_PARSER_ALREADY_EXISTS_PARSER("KE-010042215"), CUSTOM_PARSER_ALREADY_EXISTS_JAR("KE-010042216"); + public static ErrorCodeServer of(String keCode) { + if (StringUtils.isBlank(keCode)) { + return null; + } + for (ErrorCodeServer value : values()) { + if (value.getErrorCode().getCode().equals(keCode)) { + return value; + } + } + return null; + } + private final ErrorCode errorCode; private final ErrorMsg errorMsg; private final ErrorSuggestion errorSuggestion; diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties index ed5a7c517f..c2e4ae4f1c 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties @@ -191,6 +191,7 @@ KE-010031301=该项目下无法找到该 Query ID 对应的异步查询。请检 KE-010031302=项目名称不能为空。请检查后重试。 KE-010031303=无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。 KE-010031304=在当前版本中,"include header"参数被移至提交异步查询的API,因此您在下载结果中的"include header"参数将不起作用。请参考产品手册以了解更多细节。 +KE-010031305=因查询范围超出模型服务的数据范围,查询结果为空,请构建相应 Segment 或子分区数据。 # System ## 400052XX password diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties index 635092107a..a9e6235a2a 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties @@ -189,7 +189,7 @@ KE-010031301=Can’t find the query by this query ID in this project. Please che KE-010031302=The project name can’t be empty. Please check and try again. KE-010031303=The time format is invalid. Please enter the date in the format “yyyy-MM-dd HH:mm:ss”. KE-010031304=Notice:Now we move the "include_header" parameter to Submit Async Query API, so the parameter here doesn't work.Please read user manual for details. - +KE-010031305=Because the query is out of the data range of served models, the query result is empty. Please build segments accordingly. ### batch 3 # System diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties index 68e5388772..cb664d837d 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties @@ -126,6 +126,7 @@ KE-010031301 KE-010031302 KE-010031303 KE-010031304 +KE-010031305 ## 100102XX computed column KE-010010201 diff --git a/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java b/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java index 476f1d4b13..adf9e3c7ee 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java @@ -37,4 +37,20 @@ public class ErrorCodeTest { Assert.assertEquals("KE-060100201: An Exception occurred outside Kylin 5.0.", nonKeException.getCodeMsg()); } + + @Test + public void testConstructFromString() { + ErrorCodeServer code = null; + // blank + code = ErrorCodeServer.of(null); + Assert.assertNull(code); + + // KE-010001201 + code = ErrorCodeServer.of("KE-010001201"); + Assert.assertEquals(ErrorCodeServer.PROJECT_NOT_EXIST, code); + + // No such code + code = ErrorCodeServer.of("No-Such-Code"); + Assert.assertNull(code); + } } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java index fba00e5708..f8ebc8a0ed 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java @@ -27,19 +27,34 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + public class AsyncQueryUtil { + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class SuccessFileContent { + private String code; + } + public static final String ASYNC_QUERY_JOB_ID_PRE = "ASYNC-QUERY-"; private static final Logger logger = LoggerFactory.getLogger(AsyncQueryUtil.class); @@ -123,6 +138,35 @@ public class AsyncQueryUtil { } } + public static void createSuccessFlagWithContent(String project, String queryId, SuccessFileContent content) + throws IOException { + FileSystem fileSystem = getFileSystem(); + Path asyncQueryResultDir = getAsyncQueryResultDir(project, queryId); + try (FSDataOutputStream os = fileSystem.create(new Path(asyncQueryResultDir, getSuccessFlagFileName()))) { + if (content != null) { + os.writeUTF(JsonUtil.writeValueAsString(content)); + } + os.hflush(); + } + } + + public static SuccessFileContent getSuccessFileContent(String project, String queryId) throws IOException { + FileSystem fileSystem = getFileSystem(); + Path asyncQueryResultDir = getAsyncQueryResultDir(project, queryId); + Path successFilePath = new Path(asyncQueryResultDir, getSuccessFlagFileName()); + FileStatus successFileStatus = fileSystem.getFileStatus(successFilePath); + if (successFileStatus.getLen() == 0) { + return null; + } + try (FSDataInputStream in = fileSystem.open(successFilePath)) { + String content = in.readUTF(); + if (StringUtils.isNotBlank(content)) { + return JsonUtil.readValue(content, SuccessFileContent.class); + } + } + return null; + } + public static Path getAsyncQueryResultDir(String project, String queryId) { return new Path(KapConfig.getInstanceFromEnv().getAsyncResultBaseDir(project), queryId); } diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java index b7b0da24e2..620069975b 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java @@ -44,6 +44,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.QueryErrorCode; +import org.apache.kylin.common.exception.code.ErrorCodeServer; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNotifier; @@ -312,8 +313,10 @@ public class NAsyncQueryController extends NBasicController { AsyncQueryResponse asyncQueryResponse; switch (queryStatus) { case SUCCESS: - asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.SUCCESSFUL, - "await fetching results"); + AsyncQueryUtil.SuccessFileContent content = AsyncQueryUtil.getSuccessFileContent(project, queryId); + ErrorCodeServer code = content == null ? null : ErrorCodeServer.of(content.getCode()); + String info = code == null ? "await fetching results" : code.getErrorMsg().getLocalizedString(); + asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.SUCCESSFUL, info); break; case RUNNING: asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.RUNNING, "still running"); diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java index e26e2c3c2a..bfad277739 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java @@ -21,6 +21,7 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; import static org.apache.kylin.common.exception.QueryErrorCode.TOO_MANY_ASYNC_QUERY; import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_OUT_OF_DATA_RANGE; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR; @@ -40,6 +41,7 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.AsyncQuerySQLRequest; import org.apache.kylin.rest.response.AsyncQueryResponse; @@ -639,6 +641,8 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { public void testInqueryStatusSuccess() throws Exception { Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString()); Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(), Mockito.anyString()); + AsyncQueryUtil.createSuccessFlagWithContent(PROJECT, "123", + new AsyncQueryUtil.SuccessFileContent(ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode().getCode())); mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/status", "123") .contentType(MediaType.APPLICATION_JSON) @@ -647,6 +651,9 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { .andExpect(MockMvcResultMatchers.status().isOk()); Mockito.verify(nAsyncQueryController).inqueryStatus(Mockito.any(), Mockito.anyString(), Mockito.any()); + AsyncQueryUtil.SuccessFileContent successFileContent = AsyncQueryUtil.getSuccessFileContent(PROJECT, "123"); + Assert.assertNotNull(successFileContent); + Assert.assertEquals(ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode().getCode(), successFileContent.getCode()); } @Test diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java index 08b83cf108..38a6f6bed6 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java @@ -32,6 +32,7 @@ import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2; import org.apache.kylin.rest.response.SQLResponse; @@ -190,6 +191,7 @@ public class NAsyncQueryControllerV2Test extends NLocalFileMetadataTestCase { public void testInqueryStatusSuccess() throws Exception { Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(), Mockito.anyString()); Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(), Mockito.anyString()); + AsyncQueryUtil.createSuccessFlag(PROJECT, "123"); mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/status", "123") .contentType(MediaType.APPLICATION_JSON) diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java index d1fcfe16df..71b8333113 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java @@ -94,10 +94,12 @@ public class SparderPlanExec implements QueryPlanExec { if (!(dataContext instanceof SimpleDataContext) || !((SimpleDataContext) dataContext).isContentQuery() || KapConfig.wrap(((SimpleDataContext) dataContext).getKylinConfig()).runConstantQueryLocally()) { for (OlapContext context : contexts) { - if (context.getOlapSchema() != null && context.getStorageContext().isEmptyLayout() - && !context.isHasAgg()) { - QueryContext.fillEmptyResultSetMetrics(); - return new ExecuteResult(Lists.newArrayList(), 0); + if (context.getOlapSchema() != null && context.getStorageContext().isEmptyLayout()) { + QueryContext.current().setOutOfSegmentRange(true); + if (!QueryContext.current().getQueryTagInfo().isAsyncQuery() && !context.isHasAgg()) { + QueryContext.fillEmptyResultSetMetrics(); + return new ExecuteResult(Lists.newArrayList(), 0); + } } } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index ccdce04faf..354478abde 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -26,6 +26,7 @@ import java.{lang, util} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path +import org.apache.kylin.common.exception.code.ErrorCodeServer import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException} import org.apache.kylin.common.util.{HadoopUtil, RandomUtil} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} @@ -343,7 +344,10 @@ object ResultPlan extends LogEx { normalizeSchema(df).write.option("timestampFormat", dateTimeFormat).option("encoding", encode) .option("charset", "utf-8").mode(SaveMode.Append).parquet(path) } - AsyncQueryUtil.createSuccessFlag(QueryContext.current().getProject, QueryContext.current().getQueryId) + val successFileContent = if (QueryContext.current().isOutOfSegmentRange) { + new AsyncQueryUtil.SuccessFileContent(ErrorCodeServer.ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode.getCode) + } else null + AsyncQueryUtil.createSuccessFlagWithContent(QueryContext.current().getProject, QueryContext.current().getQueryId, successFileContent) if (kapConfig.isQuerySparkJobTraceEnabled) { jobTrace.jobFinished() }