This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9e9f13582c1c1110e1dbeca385eebbdc62e66b45
Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com>
AuthorDate: Mon Nov 6 17:46:16 2023 +0800

    KYLIN-5868 Add ifBigQuery API
---
 .../apache/kylin/rest/interceptor/KEFilter.java    |  3 +-
 .../org/apache/kylin/rest/request/SQLRequest.java  |  2 +
 .../java/org/apache/kylin/common/KapConfig.java    |  4 ++
 .../java/org/apache/kylin/common/QueryContext.java |  9 ++-
 .../kylin/common/exception/BigQueryException.java  | 35 ++++++++++
 .../kylin/common/exception/QueryErrorCode.java     |  1 +
 .../kylin/rest/controller/NQueryController.java    | 16 +++++
 .../rest/controller/NQueryControllerTest.java      | 16 +++++
 .../kylin/rest/response/BigQueryResponse.java      | 51 ++++++++++++++
 .../apache/kylin/rest/response/SQLResponse.java    |  1 +
 .../apache/kylin/rest/service/QueryService.java    | 48 +++++++++++---
 .../kylin/rest/service/QueryServiceTest.java       | 77 ++++++++++++++++++++++
 .../org/apache/kylin/rest/QueryNodeFilter.java     |  2 +
 .../kylin/query/runtime/plan/ResultPlan.scala      | 31 ++++++---
 .../kylin/query/runtime/plan/TestResultPlan.java   | 30 +++++++++
 15 files changed, 308 insertions(+), 18 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java
index 716f730c35..f260fd5eb7 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java
@@ -58,7 +58,8 @@ public class KEFilter extends OncePerRequestFilter {
         ErrorSuggestion.setMsg(lang);
 
         if (("/kylin/api/query".equals(request.getRequestURI())
-                || "/kylin/api/async_query".equals(request.getRequestURI()))) {
+                || "/kylin/api/async_query".equals(request.getRequestURI()))
+                || 
"/kylin/api/query/if_big_query".equals(request.getRequestURI())) {
             QueryContext.reset(); // reset it anyway
             QueryContext.current(); // init query context to set the timer
             QueryContext.currentTrace().startSpan(QueryTrace.HTTP_RECEPTION);
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
index 25e5ce2b16..e4fae4b462 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java
@@ -72,6 +72,8 @@ public class SQLRequest implements Serializable, 
ProjectInsensitiveRequest, Vali
     @JsonProperty("include_header")
     private boolean includeHeader;
 
+    private boolean ifBigQuery = false;
+
     private Map<String, String> backdoorToggles;
 
     @Size(max = 256)
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
index 3da6ca9e9e..17282484f8 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
@@ -716,6 +716,10 @@ public class KapConfig {
         return 
Long.parseLong(config.getOptional("kylin.query.big-query-second", "10"));
     }
 
+    public boolean isBigQueryLimitEnable() {
+        return 
Boolean.parseBoolean(config.getOptional("kylin.query.big-query-limit-enabled", 
FALSE));
+    }
+
     public long getBigQueryThresholdUpdateIntervalSecond() {
         return 
Long.parseLong(config.getOptional("kylin.query.big-query-threshold-update-interval-second",
 "10800"));
     }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index b30cce0797..c0dbfcbf33 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -152,7 +152,6 @@ public class QueryContext implements Closeable {
     @Setter
     private boolean enhancedAggPushDown;
 
-
     /**
      * For debug purpose, will show RelNode
      * when dryRun is enabled
@@ -169,6 +168,14 @@ public class QueryContext implements Closeable {
     @Getter
     private boolean dryRun = false;
 
+    @Getter
+    @Setter
+    private boolean ifBigQuery = false;
+
+    @Getter
+    @Setter
+    private boolean isBigQuery = false;
+
     private QueryContext() {
         // use QueryContext.current() instead
         queryId = RandomUtil.randomUUIDStr();
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/BigQueryException.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/BigQueryException.java
new file mode 100644
index 0000000000..e889b33d07
--- /dev/null
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/BigQueryException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.exception;
+
+import static 
org.apache.kylin.common.exception.QueryErrorCode.BIG_QUERY_DECIDE;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+public class BigQueryException extends KylinException {
+
+    public BigQueryException(String message) {
+        super(BIG_QUERY_DECIDE, message);
+    }
+
+    public static boolean causedByRefuse(Throwable e) {
+        return e instanceof BigQueryException || 
ExceptionUtils.getRootCause(e) instanceof BigQueryException;
+    }
+
+}
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
index a89ee7945d..9868b4e6b7 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
@@ -61,6 +61,7 @@ public enum QueryErrorCode implements ErrorCodeSupplier {
 
     // 20080XXX query limit
     REFUSE_NEW_QUERY("KE-020080001"),
+    BIG_QUERY_DECIDE("KE-020008002"),
 
     // 20090XXX forcedToTieredStorage route query
     FORCED_TO_TIEREDSTORAGE_AND_FORCE_TO_INDEX("KE-020090001"),
diff --git 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
index 589dbd187a..43de658651 100644
--- 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
+++ 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
@@ -83,6 +83,7 @@ import org.apache.kylin.rest.request.SQLFormatRequest;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.request.SaveSqlRequest;
 import org.apache.kylin.rest.request.SyncFileSegmentsRequest;
+import org.apache.kylin.rest.response.BigQueryResponse;
 import org.apache.kylin.rest.response.DataResult;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.QueryDetectResponse;
@@ -727,4 +728,19 @@ public class NQueryController extends NBasicController {
             throw new KylinException(INVALID_TABLE_REFRESH_PARAMETER, 
message.getTableRefreshParamInvalid(), false);
         }
     }
+
+    @ApiOperation(value = "ifBigQuery", tags = {
+            "QE" }, notes = "Update Param: query_id, accept_partial, 
backdoor_toggles, cache_key")
+    @PostMapping(value = "/if_big_query")
+    @ResponseBody
+    public EnvelopeResponse<BigQueryResponse> ifBigQuery(@Valid @RequestBody 
PrepareSqlRequest sqlRequest,
+            @RequestHeader(value = "User-Agent") String userAgent) {
+        sqlRequest.setIfBigQuery(true);
+        checkForcedToParams(sqlRequest);
+        checkProjectName(sqlRequest.getProject());
+        sqlRequest.setUserAgent(userAgent != null ? userAgent : "");
+        QueryContext.current().record("end_http_proc");
+        BigQueryResponse bigQueryResponse = 
queryService.ifBigQuery(sqlRequest);
+        return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, 
bigQueryResponse, "");
+    }
 }
diff --git 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
index ff1f5b98f3..9bf8814fe1 100644
--- 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
+++ 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
@@ -677,4 +677,20 @@ public class NQueryControllerTest extends 
NLocalFileMetadataTestCase {
             Assert.assertEquals(modelId, realsGot.get(0).getModelId());
         }
     }
+
+    @Test
+    public void testIfBigQuery() throws Exception {
+        final PrepareSqlRequest sql = new PrepareSqlRequest();
+        sql.setSql("SELECT * FROM empty_table");
+        sql.setProject(PROJECT);
+        sql.setForcedToTieredStorage(1);
+        sql.setForcedToIndex(true);
+        sql.setForcedToPushDown(false);
+        
mockMvc.perform(MockMvcRequestBuilders.post("/api/query/if_big_query").contentType(MediaType.APPLICATION_JSON)
+                        
.content(JsonUtil.writeValueAsString(sql)).header("User-Agent", 
"Chrome/89.0.4389.82 Safari/537.36")
+                        
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk());
+
+        Mockito.verify(nQueryController).ifBigQuery(Mockito.any(), 
Mockito.anyString());
+    }
 }
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/response/BigQueryResponse.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/response/BigQueryResponse.java
new file mode 100644
index 0000000000..a45144edea
--- /dev/null
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/response/BigQueryResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.response;
+
+import java.io.Serializable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class BigQueryResponse implements Serializable {
+
+    protected static final long serialVersionUID = 1L;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BigQueryResponse.class);
+
+    @JsonProperty("if_big_query")
+    private String ifBigQuery;
+
+    @JsonProperty("scan_rows")
+    private long scanRows;
+
+    protected boolean isException = false;
+
+    // if isException, the detailed exception message
+    protected String exceptionMessage;
+
+    private boolean isCache = false;
+}
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 9acc674a5f..6d67706667 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -152,6 +152,7 @@ public class SQLResponse implements Serializable {
     @JsonProperty("executed_plan")
     private String executedPlan;
 
+    private boolean isBigQuery = false;
     public SQLResponse() {
         this(new LinkedList<>(), new LinkedList<>(), 0, false, null);
     }
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index df66aca6ef..bfec300c71 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -64,6 +64,7 @@ import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryTrace;
 import org.apache.kylin.common.constant.LogConstant;
 import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.exception.BigQueryException;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.exception.NewQueryRefuseException;
@@ -151,6 +152,7 @@ import org.apache.kylin.rest.model.Query;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
 import org.apache.kylin.rest.request.QueryDetectRequest;
 import org.apache.kylin.rest.request.SQLRequest;
+import org.apache.kylin.rest.response.BigQueryResponse;
 import org.apache.kylin.rest.response.QueryDetectResponse;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.response.SQLResponseTrace;
@@ -509,6 +511,23 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         return log;
     }
 
+    public BigQueryResponse ifBigQuery(SQLRequest sqlRequest) {
+        SQLResponse sqlResponse = queryWithCache(sqlRequest);
+        BigQueryResponse bigQueryResponse = new BigQueryResponse();
+        String isBigQuery = sqlResponse.isBigQuery() ? "bigQuery" : 
"nonBigQuery";
+        if (sqlRequest.isForcedToPushDown() || sqlResponse.isException() || 
sqlResponse.isQueryPushDown()) {
+            isBigQuery = "others";
+        }
+        if (sqlResponse.isException()) {
+            bigQueryResponse.setException(true);
+            
bigQueryResponse.setExceptionMessage(sqlResponse.getExceptionMessage());
+        }
+        bigQueryResponse.setCache(sqlResponse.isStorageCacheUsed());
+        bigQueryResponse.setIfBigQuery(isBigQuery);
+        bigQueryResponse.setScanRows(sqlResponse.getTotalScanRows());
+        return bigQueryResponse;
+    }
+
     public SQLResponse queryWithCache(SQLRequest sqlRequest) {
         aclEvaluate.checkProjectQueryPermission(sqlRequest.getProject());
         checkIfExecuteUserValid(sqlRequest);
@@ -516,6 +535,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         queryContext.setProject(sqlRequest.getProject());
         queryContext.setLimit(sqlRequest.getLimit());
         queryContext.setOffset(sqlRequest.getOffset());
+        queryContext.setIfBigQuery(sqlRequest.isIfBigQuery());
         if (StringUtils.isNotEmpty(sqlRequest.getQueryId())) {
             // validate queryId with UUID.fromString
             
queryContext.setQueryId(UUID.fromString(sqlRequest.getQueryId()).toString());
@@ -664,6 +684,9 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
                     sqlResponse = queryAndUpdateCache(sqlRequest, kylinConfig);
                 }
             }
+            if (sqlRequest.isIfBigQuery()) {
+                return sqlResponse;
+            }
 
             
QueryUtils.updateQueryContextSQLMetrics(rawSql.getStatementString());
             
QueryContext.currentTrace().amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, 
System.currentTimeMillis());
@@ -844,7 +867,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             } else {
                 throw new KylinException(PERMISSION_DENIED, 
MsgPicker.getMsg().getnotSupportedSql());
             }
-            if (checkCondition(queryCacheEnabled, "query cache is disabled")) {
+            if (checkCondition(queryCacheEnabled, "query cache is disabled") 
&& !sqlRequest.isIfBigQuery()) {
                 // set duration for caching condition checking
                 
sqlResponse.setDuration(QueryContext.currentMetrics().duration());
                 queryCacheManager.cacheSuccessQuery(sqlRequest, sqlResponse);
@@ -864,13 +887,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             queryContext.getMetrics().setQueryMsg(errMsg);
             queryContext.getQueryTagInfo().setPushdown(false);
 
-            if (e.getCause() != null && 
NewQueryRefuseException.causedByRefuse(e)) {
-                queryContext.getQueryTagInfo().setRefused(true);
-            }
-
-            if (e.getCause() != null && 
KylinTimeoutException.causedByTimeout(e)) {
-                queryContext.getQueryTagInfo().setTimeout(true);
-            }
+            applyExceptionResponse(sqlResponse, e, queryContext);
 
             if (UserStopQueryException.causedByUserStop(e)) {
                 sqlResponse.setStopByUser(true);
@@ -890,6 +907,21 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         return sqlResponse;
     }
 
+    private static void applyExceptionResponse(SQLResponse sqlResponse, 
Throwable e, QueryContext queryContext) {
+        if (e.getCause() != null) {
+            if (NewQueryRefuseException.causedByRefuse(e)) {
+                queryContext.getQueryTagInfo().setRefused(true);
+            } else if (BigQueryException.causedByRefuse(e)) {
+                sqlResponse.setException(false);
+                sqlResponse.setExceptionMessage("");
+                
sqlResponse.setScanRows(queryContext.getMetrics().getScanRows());
+                sqlResponse.setBigQuery(queryContext.isBigQuery());
+            } else if (KylinTimeoutException.causedByTimeout(e)) {
+                queryContext.getQueryTagInfo().setTimeout(true);
+            }
+        }
+    }
+
     @VisibleForTesting
     public void putIntoExceptionCache(SQLRequest sqlRequest, SQLResponse 
sqlResponse, Throwable e) {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index c7744100e5..fe74d64e22 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -65,6 +65,7 @@ import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryTrace;
+import org.apache.kylin.common.exception.BigQueryException;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.exception.KylinTimeoutException;
@@ -137,6 +138,7 @@ import org.apache.kylin.rest.model.Query;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
 import org.apache.kylin.rest.request.QueryDetectRequest;
 import org.apache.kylin.rest.request.SQLRequest;
+import org.apache.kylin.rest.response.BigQueryResponse;
 import org.apache.kylin.rest.response.QueryDetectResponse;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.security.AclEntityFactory;
@@ -3028,4 +3030,79 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
             Assert.assertTrue(queryContext.getQueryTagInfo().isAsyncQuery());
         }
     }
+
+    @Test
+    public void testIfBigQuery() throws Exception {
+        final String sql = "select count(1) from KYLIN_SALES";
+        final String project = "default";
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql(sql);
+        sqlRequest.setProject(project);
+        sqlRequest.setForcedToIndex(true);
+        sqlRequest.setIfBigQuery(true);
+        Mockito.doThrow(new SQLException(new BigQueryException("is 
nonBigQuery"))).when(queryService)
+                .query(Mockito.any(SQLRequest.class));
+        final BigQueryResponse response = queryService.ifBigQuery(sqlRequest);
+        Assert.assertFalse(response.isException());
+    }
+
+    @Test
+    public void testPushDownIfBigQuery() {
+        final String sql = "select count(1) from KYLIN_SALES";
+        final String project = "default";
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql(sql);
+        sqlRequest.setProject(project);
+        sqlRequest.setForcedToIndex(true);
+        sqlRequest.setIfBigQuery(true);
+        sqlRequest.setForcedToPushDown(true);
+        final BigQueryResponse response = queryService.ifBigQuery(sqlRequest);
+        Assert.assertTrue(response.isException());
+    }
+
+    @Test
+    public void testIfBigQueryException() {
+        final String sql = "select aa from KYLIN_SALES";
+        final String project = "default";
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql(sql);
+        sqlRequest.setProject(project);
+        sqlRequest.setForcedToIndex(true);
+        sqlRequest.setIfBigQuery(true);
+        Mockito.doThrow(new RuntimeException(new 
KylinTimeoutException(""))).when(queryService)
+                .queryAndUpdateCache(Mockito.any(SQLRequest.class), 
Mockito.any(KylinConfig.class));
+        BigQueryResponse bigQueryResponse = 
queryService.ifBigQuery(sqlRequest);
+        Assert.assertTrue(bigQueryResponse.isException());
+    }
+
+    @Test
+    public void testQueryDetectWhenIndexPlanIsNullIfBigQuery() {
+        QueryDetectRequest queryDetectRequest = new QueryDetectRequest("sql", 
"default", 0, 500);
+        // build sqlRequest
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql(queryDetectRequest.getSql());
+        String project = queryDetectRequest.getProject();
+        sqlRequest.setLimit(queryDetectRequest.getLimit());
+        sqlRequest.setOffset(queryDetectRequest.getOffset());
+        sqlRequest.setProject(project);
+
+        // build sqlResponse
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setException(false);
+        sqlResponse.setQueryId("queryId");
+        sqlResponse.setQueryPushDown(true);
+        List<NativeQueryRealization> nativeRealizations = Lists.newArrayList();
+        sqlResponse.setNativeRealizations(nativeRealizations);
+
+        // mock NIndexPlanManager and IndexPlan
+        PowerMockito.mockStatic(NIndexPlanManager.class);
+        NIndexPlanManager nIndexPlanManager = 
Mockito.mock(NIndexPlanManager.class);
+
+        // mock method return value
+        
Mockito.doReturn(sqlResponse).when(queryService).queryWithCache(sqlRequest);
+
+        BigQueryResponse bigQueryResponse = 
queryService.ifBigQuery(sqlRequest);
+
+        Assert.assertEquals("others", bigQueryResponse.getIfBigQuery());
+    }
 }
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java 
b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java
index 1f9ab4db76..4aecad0164 100644
--- a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java
+++ b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java
@@ -86,10 +86,12 @@ public class QueryNodeFilter extends BaseFilter {
         // jdbc, odbc, query, maintain
         notRoutePostApiSet.add("/kylin/api/query");
         notRoutePostApiSet.add("/kylin/api/async_query");
+        notRoutePostApiSet.add("/kylin/api/query/if_big_query");
         notRoutePostApiSet.add("/kylin/api/query/prestate");
         notRoutePostApiSet.add("/kylin/api/user/authentication");
         notRoutePostApiSet.add("/kylin/api/system/maintenance_mode");
         notRouteDeleteApiSet.add("/kylin/api/query");
+        notRouteDeleteApiSet.add("/kylin/api/query/if_big_query");
 
         notRoutePostApiSet.add("/kylin/api/kg/health/instance_info");
         
notRoutePostApiSet.add("/kylin/api/kg/health/instance_service/query_up_grade");
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 940b884282..ccdce04faf 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -26,7 +26,7 @@ import java.{lang, util}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
-import org.apache.kylin.common.exception.NewQueryRefuseException
+import org.apache.kylin.common.exception.{BigQueryException, 
NewQueryRefuseException}
 import org.apache.kylin.common.util.{HadoopUtil, RandomUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
 import org.apache.kylin.engine.spark.utils.LogEx
@@ -110,13 +110,9 @@ object ResultPlan extends LogEx {
 
       // judge whether to refuse the new big query
       logDebug(s"Total source scan rows: $sumOfSourceScanRows")
-      if (QueryShareStateManager.isShareStateSwitchEnabled
-        && sumOfSourceScanRows >= bigQueryThreshold
-        && SparkQueryJobManager.isNewBigQueryRefuse) {
-        QueryContext.current().getQueryTagInfo.setRefused(true)
-        throw new NewQueryRefuseException("Refuse new big query, sum of 
source_scan_rows is " + sumOfSourceScanRows
-          + ", refuse query threshold is " + bigQueryThreshold + ". Current 
step: Collecting dataset for sparder. ")
-      }
+      val sourceScanRows = Array(new 
lang.Long(sumOfSourceScanRows)).toList.asJava
+      val ifBigQuery: Boolean = QueryContext.current().isIfBigQuery
+      ifRefuseQuery(sumOfSourceScanRows, bigQueryThreshold, sourceScanRows, 
ifBigQuery)
 
       QueryContext.current.record("executed_plan")
       QueryContext.currentTrace().endLastSpan()
@@ -166,6 +162,25 @@ object ResultPlan extends LogEx {
     }
   }
 
+  private def ifRefuseQuery(sumOfSourceScanRows: Long, bigQueryThreshold: 
Long, sourceScanRows: util.List[lang.Long], ifBigQuery: Boolean): Unit = {
+    if (QueryShareStateManager.isShareStateSwitchEnabled
+      && sumOfSourceScanRows >= bigQueryThreshold
+      && (SparkQueryJobManager.isNewBigQueryRefuse || 
KapConfig.getInstanceFromEnv.isBigQueryLimitEnable)) {
+      if (ifBigQuery) {
+        QueryContext.current().setBigQuery(true)
+        QueryContext.current().getMetrics.setScanRows(sourceScanRows)
+        throw new BigQueryException("This query is bigquery.")
+      } else {
+        QueryContext.current().getQueryTagInfo.setRefused(true)
+        throw new NewQueryRefuseException("Refuse new big query, sum of 
source_scan_rows is " + sumOfSourceScanRows
+          + ", refuse query threshold is " + bigQueryThreshold + ". Current 
step: Collecting dataset for sparder. ")
+      }
+    } else if (ifBigQuery) {
+      QueryContext.current().getMetrics.setScanRows(sourceScanRows)
+      QueryContext.current().setBigQuery(false)
+      throw new BigQueryException("This query is non bigquery.")
+    }
+  }
 
   def readResultRow(resultRows: util.Iterator[Row], resultTypes: 
mutable.Buffer[RelDataTypeField]): lang.Iterable[util.List[String]] = {
     () =>
diff --git 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
index 7fee8e8d81..0781c6415a 100644
--- 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
+++ 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.BigQueryException;
 import org.apache.kylin.common.exception.NewQueryRefuseException;
 import org.apache.kylin.common.state.StateSwitchConstant;
 import org.apache.kylin.common.util.AddressUtil;
@@ -249,4 +250,33 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
         ResultPlan.getResult(ss.sql(sql), resultType);
     }
 
+    @Test
+    public void testBigQueryException() {
+        
QueryShareStateManager.getInstance().setState(Collections.singletonList(AddressUtil.concatInstanceName()),
+                StateSwitchConstant.QUERY_LIMIT_STATE, "true");
+        QueryContext queryContext = QueryContext.current();
+        queryContext.setIfBigQuery(true);
+        queryContext.getMetrics()
+                
.addAccumSourceScanRows(KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold()
 + 1);
+        String sql = "select * from TEST_KYLIN_FACT";
+        try {
+            ResultPlan.getResult(ss.sql(sql), null);
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof BigQueryException);
+        }
+    }
+
+    @Test
+    public void testNoBigQueryException() {
+        QueryContext queryContext = QueryContext.current();
+        queryContext.setIfBigQuery(true);
+        queryContext.getMetrics()
+                
.addAccumSourceScanRows(KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold()
 + 1);
+        String sql = "select * from TEST_KYLIN_FACT";
+        try {
+            ResultPlan.getResult(ss.sql(sql), null);
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof BigQueryException);
+        }
+    }
 }

Reply via email to