This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 64c7941daa20e140d58fae1d442599fb7b14c011
Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com>
AuthorDate: Fri Nov 10 11:35:26 2023 +0800

    KYLIN-5870 Fix async query status for out of segment range situation
---
 .../java/org/apache/kylin/common/QueryContext.java |  4 ++
 .../common/exception/code/ErrorCodeServer.java     | 15 ++++++++
 .../resources/kylin_error_msg_conf_cn.properties   |  1 +
 .../resources/kylin_error_msg_conf_en.properties   |  2 +-
 .../main/resources/kylin_errorcode_conf.properties |  1 +
 .../kylin/common/exception/code/ErrorCodeTest.java | 16 ++++++++
 .../apache/kylin/query/util/AsyncQueryUtil.java    | 44 ++++++++++++++++++++++
 .../rest/controller/NAsyncQueryController.java     |  7 +++-
 .../rest/controller/NAsyncQueryControllerTest.java |  7 ++++
 .../controller/NAsyncQueryControllerV2Test.java    |  2 +
 .../kylin/query/engine/exec/SparderPlanExec.java   | 10 +++--
 .../kylin/query/runtime/plan/ResultPlan.scala      |  6 ++-
 12 files changed, 107 insertions(+), 8 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index c0dbfcbf33..2bf371fc0e 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -176,6 +176,10 @@ public class QueryContext implements Closeable {
     @Setter
     private boolean isBigQuery = false;
 
+    @Getter
+    @Setter
+    private boolean outOfSegmentRange = false;
+
     private QueryContext() {
         // use QueryContext.current() instead
         queryId = RandomUtil.randomUUIDStr();
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
index 16f290a6f1..4dcac02280 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.kylin.common.exception.code;
 
+import org.apache.commons.lang3.StringUtils;
+
 public enum ErrorCodeServer implements ErrorCodeProducer {
 
     // 100012XX project
@@ -165,6 +167,7 @@ public enum ErrorCodeServer implements ErrorCodeProducer {
     ASYNC_QUERY_PROJECT_NAME_EMPTY("KE-010031302"),
     ASYNC_QUERY_TIME_FORMAT_ERROR("KE-010031303"),
     ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY("KE-010031304"),
+    ASYNC_QUERY_OUT_OF_DATA_RANGE("KE-010031305"),
 
     // 400272XX resource group
     RESOURCE_GROUP_DISABLE_FAILED("KE-040027201"),
@@ -202,6 +205,18 @@ public enum ErrorCodeServer implements ErrorCodeProducer {
     CUSTOM_PARSER_ALREADY_EXISTS_PARSER("KE-010042215"),
     CUSTOM_PARSER_ALREADY_EXISTS_JAR("KE-010042216");
 
+    public static ErrorCodeServer of(String keCode) {
+        if (StringUtils.isBlank(keCode)) {
+            return null;
+        }
+        for (ErrorCodeServer value : values()) {
+            if (value.getErrorCode().getCode().equals(keCode)) {
+                return value;
+            }
+        }
+        return null;
+    }
+
     private final ErrorCode errorCode;
     private final ErrorMsg errorMsg;
     private final ErrorSuggestion errorSuggestion;
diff --git 
a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties 
b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index ed5a7c517f..c2e4ae4f1c 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -191,6 +191,7 @@ KE-010031301=该项目下无法找到该 Query ID 对应的异步查询。请检
 KE-010031302=项目名称不能为空。请检查后重试。
 KE-010031303=无效的时间格式。请按 “yyyy-MM-dd HH:mm:ss” 格式填写。
 KE-010031304=在当前版本中,"include header"参数被移至提交异步查询的API,因此您在下载结果中的"include 
header"参数将不起作用。请参考产品手册以了解更多细节。
+KE-010031305=因查询范围超出模型服务的数据范围,查询结果为空,请构建相应 Segment 或子分区数据。
 
 # System
 ## 400052XX password
diff --git 
a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties 
b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index 635092107a..a9e6235a2a 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -189,7 +189,7 @@ KE-010031301=Can’t find the query by this query ID in this 
project. Please che
 KE-010031302=The project name can’t be empty. Please check and try again.
 KE-010031303=The time format is invalid. Please enter the date in the format 
“yyyy-MM-dd HH:mm:ss”.
 KE-010031304=Notice:Now we move the "include_header" parameter to  Submit 
Async Query API, so the parameter here doesn't work.Please read user manual for 
details.
-
+KE-010031305=Because the query is out of the data range of served models, the 
query result is empty. Please build segments accordingly.
 
 ### batch 3
 # System
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties 
b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
index 68e5388772..cb664d837d 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
@@ -126,6 +126,7 @@ KE-010031301
 KE-010031302
 KE-010031303
 KE-010031304
+KE-010031305
 
 ## 100102XX computed column
 KE-010010201
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java
index 476f1d4b13..adf9e3c7ee 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/exception/code/ErrorCodeTest.java
@@ -37,4 +37,20 @@ public class ErrorCodeTest {
         Assert.assertEquals("KE-060100201: An Exception occurred outside Kylin 
5.0.",
                 nonKeException.getCodeMsg());
     }
+
+    @Test
+    public void testConstructFromString() {
+        ErrorCodeServer code = null;
+        // blank
+        code = ErrorCodeServer.of(null);
+        Assert.assertNull(code);
+
+        // KE-010001201
+        code = ErrorCodeServer.of("KE-010001201");
+        Assert.assertEquals(ErrorCodeServer.PROJECT_NOT_EXIST, code);
+
+        // No such code
+        code = ErrorCodeServer.of("No-Such-Code");
+        Assert.assertNull(code);
+    }
 }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
index fba00e5708..f8ebc8a0ed 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/AsyncQueryUtil.java
@@ -27,19 +27,34 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 public class AsyncQueryUtil {
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class SuccessFileContent {
+        private String code;
+    }
+
     public static final String ASYNC_QUERY_JOB_ID_PRE = "ASYNC-QUERY-";
     private static final Logger logger = 
LoggerFactory.getLogger(AsyncQueryUtil.class);
 
@@ -123,6 +138,35 @@ public class AsyncQueryUtil {
         }
     }
 
+    public static void createSuccessFlagWithContent(String project, String 
queryId, SuccessFileContent content)
+            throws IOException {
+        FileSystem fileSystem = getFileSystem();
+        Path asyncQueryResultDir = getAsyncQueryResultDir(project, queryId);
+        try (FSDataOutputStream os = fileSystem.create(new 
Path(asyncQueryResultDir, getSuccessFlagFileName()))) {
+            if (content != null) {
+                os.writeUTF(JsonUtil.writeValueAsString(content));
+            }
+            os.hflush();
+        }
+    }
+
+    public static SuccessFileContent getSuccessFileContent(String project, 
String queryId) throws IOException {
+        FileSystem fileSystem = getFileSystem();
+        Path asyncQueryResultDir = getAsyncQueryResultDir(project, queryId);
+        Path successFilePath = new Path(asyncQueryResultDir, 
getSuccessFlagFileName());
+        FileStatus successFileStatus = 
fileSystem.getFileStatus(successFilePath);
+        if (successFileStatus.getLen() == 0) {
+            return null;
+        }
+        try (FSDataInputStream in = fileSystem.open(successFilePath)) {
+            String content = in.readUTF();
+            if (StringUtils.isNotBlank(content)) {
+                return JsonUtil.readValue(content, SuccessFileContent.class);
+            }
+        }
+        return null;
+    }
+
     public static Path getAsyncQueryResultDir(String project, String queryId) {
         return new 
Path(KapConfig.getInstanceFromEnv().getAsyncResultBaseDir(project), queryId);
     }
diff --git 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index b7b0da24e2..620069975b 100644
--- 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -44,6 +44,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
+import org.apache.kylin.common.exception.code.ErrorCodeServer;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
 import 
org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNotifier;
@@ -312,8 +313,10 @@ public class NAsyncQueryController extends 
NBasicController {
         AsyncQueryResponse asyncQueryResponse;
         switch (queryStatus) {
         case SUCCESS:
-            asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.SUCCESSFUL,
-                    "await fetching results");
+            AsyncQueryUtil.SuccessFileContent content = 
AsyncQueryUtil.getSuccessFileContent(project, queryId);
+            ErrorCodeServer code = content == null ? null : 
ErrorCodeServer.of(content.getCode());
+            String info = code == null ? "await fetching results" : 
code.getErrorMsg().getLocalizedString();
+            asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.SUCCESSFUL, info);
             break;
         case RUNNING:
             asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.RUNNING, "still running");
diff --git 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
index e26e2c3c2a..bfad277739 100644
--- 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
+++ 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.controller;
 import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
 import static 
org.apache.kylin.common.exception.QueryErrorCode.TOO_MANY_ASYNC_QUERY;
 import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_OUT_OF_DATA_RANGE;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR;
@@ -40,6 +41,7 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
 import org.apache.kylin.rest.response.AsyncQueryResponse;
@@ -639,6 +641,8 @@ public class NAsyncQueryControllerTest extends 
NLocalFileMetadataTestCase {
     public void testInqueryStatusSuccess() throws Exception {
         
Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(),
 Mockito.anyString());
         
Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(),
 Mockito.anyString());
+        AsyncQueryUtil.createSuccessFlagWithContent(PROJECT, "123",
+                new 
AsyncQueryUtil.SuccessFileContent(ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode().getCode()));
 
         
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/status",
 "123")
                 .contentType(MediaType.APPLICATION_JSON)
@@ -647,6 +651,9 @@ public class NAsyncQueryControllerTest extends 
NLocalFileMetadataTestCase {
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
         Mockito.verify(nAsyncQueryController).inqueryStatus(Mockito.any(), 
Mockito.anyString(), Mockito.any());
+        AsyncQueryUtil.SuccessFileContent successFileContent = 
AsyncQueryUtil.getSuccessFileContent(PROJECT, "123");
+        Assert.assertNotNull(successFileContent);
+        
Assert.assertEquals(ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode().getCode(), 
successFileContent.getCode());
     }
 
     @Test
diff --git 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
index 08b83cf108..38a6f6bed6 100644
--- 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
+++ 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerV2Test.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.AsyncQuerySQLRequestV2;
 import org.apache.kylin.rest.response.SQLResponse;
@@ -190,6 +191,7 @@ public class NAsyncQueryControllerV2Test extends 
NLocalFileMetadataTestCase {
     public void testInqueryStatusSuccess() throws Exception {
         
Mockito.doReturn(true).when(asyncQueryService).hasPermission(Mockito.anyString(),
 Mockito.anyString());
         
Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(),
 Mockito.anyString());
+        AsyncQueryUtil.createSuccessFlag(PROJECT, "123");
 
         
mockMvc.perform(MockMvcRequestBuilders.get("/api/async_query/{query_id}/status",
 "123")
                 .contentType(MediaType.APPLICATION_JSON)
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java
 
b/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java
index d1fcfe16df..71b8333113 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/engine/exec/SparderPlanExec.java
@@ -94,10 +94,12 @@ public class SparderPlanExec implements QueryPlanExec {
         if (!(dataContext instanceof SimpleDataContext) || 
!((SimpleDataContext) dataContext).isContentQuery()
                 || KapConfig.wrap(((SimpleDataContext) 
dataContext).getKylinConfig()).runConstantQueryLocally()) {
             for (OlapContext context : contexts) {
-                if (context.getOlapSchema() != null && 
context.getStorageContext().isEmptyLayout()
-                        && !context.isHasAgg()) {
-                    QueryContext.fillEmptyResultSetMetrics();
-                    return new ExecuteResult(Lists.newArrayList(), 0);
+                if (context.getOlapSchema() != null && 
context.getStorageContext().isEmptyLayout()) {
+                    QueryContext.current().setOutOfSegmentRange(true);
+                    if 
(!QueryContext.current().getQueryTagInfo().isAsyncQuery() && 
!context.isHasAgg()) {
+                        QueryContext.fillEmptyResultSetMetrics();
+                        return new ExecuteResult(Lists.newArrayList(), 0);
+                    }
                 }
             }
         }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index ccdce04faf..354478abde 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -26,6 +26,7 @@ import java.{lang, util}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
+import org.apache.kylin.common.exception.code.ErrorCodeServer
 import org.apache.kylin.common.exception.{BigQueryException, 
NewQueryRefuseException}
 import org.apache.kylin.common.util.{HadoopUtil, RandomUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
@@ -343,7 +344,10 @@ object ResultPlan extends LogEx {
         normalizeSchema(df).write.option("timestampFormat", 
dateTimeFormat).option("encoding", encode)
           .option("charset", "utf-8").mode(SaveMode.Append).parquet(path)
     }
-    AsyncQueryUtil.createSuccessFlag(QueryContext.current().getProject, 
QueryContext.current().getQueryId)
+    val successFileContent = if (QueryContext.current().isOutOfSegmentRange) {
+      new 
AsyncQueryUtil.SuccessFileContent(ErrorCodeServer.ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode.getCode)
+    } else null
+    
AsyncQueryUtil.createSuccessFlagWithContent(QueryContext.current().getProject, 
QueryContext.current().getQueryId, successFileContent)
     if (kapConfig.isQuerySparkJobTraceEnabled) {
       jobTrace.jobFinished()
     }

Reply via email to