This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9e9f13582c1c1110e1dbeca385eebbdc62e66b45 Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com> AuthorDate: Mon Nov 6 17:46:16 2023 +0800 KYLIN-5868 Add ifBigQuery API --- .../apache/kylin/rest/interceptor/KEFilter.java | 3 +- .../org/apache/kylin/rest/request/SQLRequest.java | 2 + .../java/org/apache/kylin/common/KapConfig.java | 4 ++ .../java/org/apache/kylin/common/QueryContext.java | 9 ++- .../kylin/common/exception/BigQueryException.java | 35 ++++++++++ .../kylin/common/exception/QueryErrorCode.java | 1 + .../kylin/rest/controller/NQueryController.java | 16 +++++ .../rest/controller/NQueryControllerTest.java | 16 +++++ .../kylin/rest/response/BigQueryResponse.java | 51 ++++++++++++++ .../apache/kylin/rest/response/SQLResponse.java | 1 + .../apache/kylin/rest/service/QueryService.java | 48 +++++++++++--- .../kylin/rest/service/QueryServiceTest.java | 77 ++++++++++++++++++++++ .../org/apache/kylin/rest/QueryNodeFilter.java | 2 + .../kylin/query/runtime/plan/ResultPlan.scala | 31 ++++++--- .../kylin/query/runtime/plan/TestResultPlan.java | 30 +++++++++ 15 files changed, 308 insertions(+), 18 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java b/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java index 716f730c35..f260fd5eb7 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/interceptor/KEFilter.java @@ -58,7 +58,8 @@ public class KEFilter extends OncePerRequestFilter { ErrorSuggestion.setMsg(lang); if (("/kylin/api/query".equals(request.getRequestURI()) - || "/kylin/api/async_query".equals(request.getRequestURI()))) { + || "/kylin/api/async_query".equals(request.getRequestURI())) + || "/kylin/api/query/if_big_query".equals(request.getRequestURI())) { QueryContext.reset(); // reset it anyway QueryContext.current(); // init query context to set the timer QueryContext.currentTrace().startSpan(QueryTrace.HTTP_RECEPTION); diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java index 25e5ce2b16..e4fae4b462 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/request/SQLRequest.java @@ -72,6 +72,8 @@ public class SQLRequest implements Serializable, ProjectInsensitiveRequest, Vali @JsonProperty("include_header") private boolean includeHeader; + private boolean ifBigQuery = false; + private Map<String, String> backdoorToggles; @Size(max = 256) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java index 3da6ca9e9e..17282484f8 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java @@ -716,6 +716,10 @@ public class KapConfig { return Long.parseLong(config.getOptional("kylin.query.big-query-second", "10")); } + public boolean isBigQueryLimitEnable() { + return Boolean.parseBoolean(config.getOptional("kylin.query.big-query-limit-enabled", FALSE)); + } + public long getBigQueryThresholdUpdateIntervalSecond() { return Long.parseLong(config.getOptional("kylin.query.big-query-threshold-update-interval-second", "10800")); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index b30cce0797..c0dbfcbf33 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -152,7 +152,6 @@ public class QueryContext implements Closeable { @Setter private boolean enhancedAggPushDown; - /** * For debug purpose, will show RelNode * when dryRun is enabled @@ -169,6 +168,14 @@ public class QueryContext implements Closeable { @Getter private boolean dryRun = false; + @Getter + @Setter + private boolean ifBigQuery = false; + + @Getter + @Setter + private boolean isBigQuery = false; + private QueryContext() { // use QueryContext.current() instead queryId = RandomUtil.randomUUIDStr(); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/BigQueryException.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/BigQueryException.java new file mode 100644 index 0000000000..e889b33d07 --- /dev/null +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/BigQueryException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.exception; + +import static org.apache.kylin.common.exception.QueryErrorCode.BIG_QUERY_DECIDE; + +import org.apache.commons.lang3.exception.ExceptionUtils; + +public class BigQueryException extends KylinException { + + public BigQueryException(String message) { + super(BIG_QUERY_DECIDE, message); + } + + public static boolean causedByRefuse(Throwable e) { + return e instanceof BigQueryException || ExceptionUtils.getRootCause(e) instanceof BigQueryException; + } + +} diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java index a89ee7945d..9868b4e6b7 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java @@ -61,6 +61,7 @@ public enum QueryErrorCode implements ErrorCodeSupplier { // 20080XXX query limit REFUSE_NEW_QUERY("KE-020080001"), + BIG_QUERY_DECIDE("KE-020008002"), // 20090XXX forcedToTieredStorage route query FORCED_TO_TIEREDSTORAGE_AND_FORCE_TO_INDEX("KE-020090001"), diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java index 589dbd187a..43de658651 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java @@ -83,6 +83,7 @@ import org.apache.kylin.rest.request.SQLFormatRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.request.SaveSqlRequest; import org.apache.kylin.rest.request.SyncFileSegmentsRequest; +import org.apache.kylin.rest.response.BigQueryResponse; import org.apache.kylin.rest.response.DataResult; import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.rest.response.QueryDetectResponse; @@ -727,4 +728,19 @@ public class NQueryController extends NBasicController { throw new KylinException(INVALID_TABLE_REFRESH_PARAMETER, message.getTableRefreshParamInvalid(), false); } } + + @ApiOperation(value = "ifBigQuery", tags = { + "QE" }, notes = "Update Param: query_id, accept_partial, backdoor_toggles, cache_key") + @PostMapping(value = "/if_big_query") + @ResponseBody + public EnvelopeResponse<BigQueryResponse> ifBigQuery(@Valid @RequestBody PrepareSqlRequest sqlRequest, + @RequestHeader(value = "User-Agent") String userAgent) { + sqlRequest.setIfBigQuery(true); + checkForcedToParams(sqlRequest); + checkProjectName(sqlRequest.getProject()); + sqlRequest.setUserAgent(userAgent != null ? userAgent : ""); + QueryContext.current().record("end_http_proc"); + BigQueryResponse bigQueryResponse = queryService.ifBigQuery(sqlRequest); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, bigQueryResponse, ""); + } } diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java index ff1f5b98f3..9bf8814fe1 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java @@ -677,4 +677,20 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase { Assert.assertEquals(modelId, realsGot.get(0).getModelId()); } } + + @Test + public void testIfBigQuery() throws Exception { + final PrepareSqlRequest sql = new PrepareSqlRequest(); + sql.setSql("SELECT * FROM empty_table"); + sql.setProject(PROJECT); + sql.setForcedToTieredStorage(1); + sql.setForcedToIndex(true); + sql.setForcedToPushDown(false); + mockMvc.perform(MockMvcRequestBuilders.post("/api/query/if_big_query").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(sql)).header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36") + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + + Mockito.verify(nQueryController).ifBigQuery(Mockito.any(), Mockito.anyString()); + } } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/response/BigQueryResponse.java b/src/query-service/src/main/java/org/apache/kylin/rest/response/BigQueryResponse.java new file mode 100644 index 0000000000..a45144edea --- /dev/null +++ b/src/query-service/src/main/java/org/apache/kylin/rest/response/BigQueryResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.response; + +import java.io.Serializable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class BigQueryResponse implements Serializable { + + protected static final long serialVersionUID = 1L; + + private static final Logger logger = LoggerFactory.getLogger(BigQueryResponse.class); + + @JsonProperty("if_big_query") + private String ifBigQuery; + + @JsonProperty("scan_rows") + private long scanRows; + + protected boolean isException = false; + + // if isException, the detailed exception message + protected String exceptionMessage; + + private boolean isCache = false; +} diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 9acc674a5f..6d67706667 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -152,6 +152,7 @@ public class SQLResponse implements Serializable { @JsonProperty("executed_plan") private String executedPlan; + private boolean isBigQuery = false; public SQLResponse() { this(new LinkedList<>(), new LinkedList<>(), 0, false, null); } diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index df66aca6ef..bfec300c71 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -64,6 +64,7 @@ import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryTrace; import org.apache.kylin.common.constant.LogConstant; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.common.exception.BigQueryException; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.exception.NewQueryRefuseException; @@ -151,6 +152,7 @@ import org.apache.kylin.rest.model.Query; import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.QueryDetectRequest; import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.BigQueryResponse; import org.apache.kylin.rest.response.QueryDetectResponse; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.response.SQLResponseTrace; @@ -509,6 +511,23 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup return log; } + public BigQueryResponse ifBigQuery(SQLRequest sqlRequest) { + SQLResponse sqlResponse = queryWithCache(sqlRequest); + BigQueryResponse bigQueryResponse = new BigQueryResponse(); + String isBigQuery = sqlResponse.isBigQuery() ? "bigQuery" : "nonBigQuery"; + if (sqlRequest.isForcedToPushDown() || sqlResponse.isException() || sqlResponse.isQueryPushDown()) { + isBigQuery = "others"; + } + if (sqlResponse.isException()) { + bigQueryResponse.setException(true); + bigQueryResponse.setExceptionMessage(sqlResponse.getExceptionMessage()); + } + bigQueryResponse.setCache(sqlResponse.isStorageCacheUsed()); + bigQueryResponse.setIfBigQuery(isBigQuery); + bigQueryResponse.setScanRows(sqlResponse.getTotalScanRows()); + return bigQueryResponse; + } + public SQLResponse queryWithCache(SQLRequest sqlRequest) { aclEvaluate.checkProjectQueryPermission(sqlRequest.getProject()); checkIfExecuteUserValid(sqlRequest); @@ -516,6 +535,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup queryContext.setProject(sqlRequest.getProject()); queryContext.setLimit(sqlRequest.getLimit()); queryContext.setOffset(sqlRequest.getOffset()); + queryContext.setIfBigQuery(sqlRequest.isIfBigQuery()); if (StringUtils.isNotEmpty(sqlRequest.getQueryId())) { // validate queryId with UUID.fromString queryContext.setQueryId(UUID.fromString(sqlRequest.getQueryId()).toString()); @@ -664,6 +684,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup sqlResponse = queryAndUpdateCache(sqlRequest, kylinConfig); } } + if (sqlRequest.isIfBigQuery()) { + return sqlResponse; + } QueryUtils.updateQueryContextSQLMetrics(rawSql.getStatementString()); QueryContext.currentTrace().amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, System.currentTimeMillis()); @@ -844,7 +867,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup } else { throw new KylinException(PERMISSION_DENIED, MsgPicker.getMsg().getnotSupportedSql()); } - if (checkCondition(queryCacheEnabled, "query cache is disabled")) { + if (checkCondition(queryCacheEnabled, "query cache is disabled") && !sqlRequest.isIfBigQuery()) { // set duration for caching condition checking sqlResponse.setDuration(QueryContext.currentMetrics().duration()); queryCacheManager.cacheSuccessQuery(sqlRequest, sqlResponse); @@ -864,13 +887,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup queryContext.getMetrics().setQueryMsg(errMsg); queryContext.getQueryTagInfo().setPushdown(false); - if (e.getCause() != null && NewQueryRefuseException.causedByRefuse(e)) { - queryContext.getQueryTagInfo().setRefused(true); - } - - if (e.getCause() != null && KylinTimeoutException.causedByTimeout(e)) { - queryContext.getQueryTagInfo().setTimeout(true); - } + applyExceptionResponse(sqlResponse, e, queryContext); if (UserStopQueryException.causedByUserStop(e)) { sqlResponse.setStopByUser(true); @@ -890,6 +907,21 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup return sqlResponse; } + private static void applyExceptionResponse(SQLResponse sqlResponse, Throwable e, QueryContext queryContext) { + if (e.getCause() != null) { + if (NewQueryRefuseException.causedByRefuse(e)) { + queryContext.getQueryTagInfo().setRefused(true); + } else if (BigQueryException.causedByRefuse(e)) { + sqlResponse.setException(false); + sqlResponse.setExceptionMessage(""); + sqlResponse.setScanRows(queryContext.getMetrics().getScanRows()); + sqlResponse.setBigQuery(queryContext.isBigQuery()); + } else if (KylinTimeoutException.causedByTimeout(e)) { + queryContext.getQueryTagInfo().setTimeout(true); + } + } + } + @VisibleForTesting public void putIntoExceptionCache(SQLRequest sqlRequest, SQLResponse sqlResponse, Throwable e) { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index c7744100e5..fe74d64e22 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -65,6 +65,7 @@ import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryTrace; +import org.apache.kylin.common.exception.BigQueryException; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.exception.KylinTimeoutException; @@ -137,6 +138,7 @@ import org.apache.kylin.rest.model.Query; import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.QueryDetectRequest; import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.BigQueryResponse; import org.apache.kylin.rest.response.QueryDetectResponse; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.security.AclEntityFactory; @@ -3028,4 +3030,79 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { Assert.assertTrue(queryContext.getQueryTagInfo().isAsyncQuery()); } } + + @Test + public void testIfBigQuery() throws Exception { + final String sql = "select count(1) from KYLIN_SALES"; + final String project = "default"; + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql(sql); + sqlRequest.setProject(project); + sqlRequest.setForcedToIndex(true); + sqlRequest.setIfBigQuery(true); + Mockito.doThrow(new SQLException(new BigQueryException("is nonBigQuery"))).when(queryService) + .query(Mockito.any(SQLRequest.class)); + final BigQueryResponse response = queryService.ifBigQuery(sqlRequest); + Assert.assertFalse(response.isException()); + } + + @Test + public void testPushDownIfBigQuery() { + final String sql = "select count(1) from KYLIN_SALES"; + final String project = "default"; + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql(sql); + sqlRequest.setProject(project); + sqlRequest.setForcedToIndex(true); + sqlRequest.setIfBigQuery(true); + sqlRequest.setForcedToPushDown(true); + final BigQueryResponse response = queryService.ifBigQuery(sqlRequest); + Assert.assertTrue(response.isException()); + } + + @Test + public void testIfBigQueryException() { + final String sql = "select aa from KYLIN_SALES"; + final String project = "default"; + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql(sql); + sqlRequest.setProject(project); + sqlRequest.setForcedToIndex(true); + sqlRequest.setIfBigQuery(true); + Mockito.doThrow(new RuntimeException(new KylinTimeoutException(""))).when(queryService) + .queryAndUpdateCache(Mockito.any(SQLRequest.class), Mockito.any(KylinConfig.class)); + BigQueryResponse bigQueryResponse = queryService.ifBigQuery(sqlRequest); + Assert.assertTrue(bigQueryResponse.isException()); + } + + @Test + public void testQueryDetectWhenIndexPlanIsNullIfBigQuery() { + QueryDetectRequest queryDetectRequest = new QueryDetectRequest("sql", "default", 0, 500); + // build sqlRequest + SQLRequest sqlRequest = new SQLRequest(); + sqlRequest.setSql(queryDetectRequest.getSql()); + String project = queryDetectRequest.getProject(); + sqlRequest.setLimit(queryDetectRequest.getLimit()); + sqlRequest.setOffset(queryDetectRequest.getOffset()); + sqlRequest.setProject(project); + + // build sqlResponse + SQLResponse sqlResponse = new SQLResponse(); + sqlResponse.setException(false); + sqlResponse.setQueryId("queryId"); + sqlResponse.setQueryPushDown(true); + List<NativeQueryRealization> nativeRealizations = Lists.newArrayList(); + sqlResponse.setNativeRealizations(nativeRealizations); + + // mock NIndexPlanManager and IndexPlan + PowerMockito.mockStatic(NIndexPlanManager.class); + NIndexPlanManager nIndexPlanManager = Mockito.mock(NIndexPlanManager.class); + + // mock method return value + Mockito.doReturn(sqlResponse).when(queryService).queryWithCache(sqlRequest); + + BigQueryResponse bigQueryResponse = queryService.ifBigQuery(sqlRequest); + + Assert.assertEquals("others", bigQueryResponse.getIfBigQuery()); + } } diff --git a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java index 1f9ab4db76..4aecad0164 100644 --- a/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java +++ b/src/server/src/main/java/org/apache/kylin/rest/QueryNodeFilter.java @@ -86,10 +86,12 @@ public class QueryNodeFilter extends BaseFilter { // jdbc, odbc, query, maintain notRoutePostApiSet.add("/kylin/api/query"); notRoutePostApiSet.add("/kylin/api/async_query"); + notRoutePostApiSet.add("/kylin/api/query/if_big_query"); notRoutePostApiSet.add("/kylin/api/query/prestate"); notRoutePostApiSet.add("/kylin/api/user/authentication"); notRoutePostApiSet.add("/kylin/api/system/maintenance_mode"); notRouteDeleteApiSet.add("/kylin/api/query"); + notRouteDeleteApiSet.add("/kylin/api/query/if_big_query"); notRoutePostApiSet.add("/kylin/api/kg/health/instance_info"); notRoutePostApiSet.add("/kylin/api/kg/health/instance_service/query_up_grade"); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 940b884282..ccdce04faf 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -26,7 +26,7 @@ import java.{lang, util} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path -import org.apache.kylin.common.exception.NewQueryRefuseException +import org.apache.kylin.common.exception.{BigQueryException, NewQueryRefuseException} import org.apache.kylin.common.util.{HadoopUtil, RandomUtil} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} import org.apache.kylin.engine.spark.utils.LogEx @@ -110,13 +110,9 @@ object ResultPlan extends LogEx { // judge whether to refuse the new big query logDebug(s"Total source scan rows: $sumOfSourceScanRows") - if (QueryShareStateManager.isShareStateSwitchEnabled - && sumOfSourceScanRows >= bigQueryThreshold - && SparkQueryJobManager.isNewBigQueryRefuse) { - QueryContext.current().getQueryTagInfo.setRefused(true) - throw new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is " + sumOfSourceScanRows - + ", refuse query threshold is " + bigQueryThreshold + ". Current step: Collecting dataset for sparder. ") - } + val sourceScanRows = Array(new lang.Long(sumOfSourceScanRows)).toList.asJava + val ifBigQuery: Boolean = QueryContext.current().isIfBigQuery + ifRefuseQuery(sumOfSourceScanRows, bigQueryThreshold, sourceScanRows, ifBigQuery) QueryContext.current.record("executed_plan") QueryContext.currentTrace().endLastSpan() @@ -166,6 +162,25 @@ object ResultPlan extends LogEx { } } + private def ifRefuseQuery(sumOfSourceScanRows: Long, bigQueryThreshold: Long, sourceScanRows: util.List[lang.Long], ifBigQuery: Boolean): Unit = { + if (QueryShareStateManager.isShareStateSwitchEnabled + && sumOfSourceScanRows >= bigQueryThreshold + && (SparkQueryJobManager.isNewBigQueryRefuse || KapConfig.getInstanceFromEnv.isBigQueryLimitEnable)) { + if (ifBigQuery) { + QueryContext.current().setBigQuery(true) + QueryContext.current().getMetrics.setScanRows(sourceScanRows) + throw new BigQueryException("This query is bigquery.") + } else { + QueryContext.current().getQueryTagInfo.setRefused(true) + throw new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is " + sumOfSourceScanRows + + ", refuse query threshold is " + bigQueryThreshold + ". Current step: Collecting dataset for sparder. ") + } + } else if (ifBigQuery) { + QueryContext.current().getMetrics.setScanRows(sourceScanRows) + QueryContext.current().setBigQuery(false) + throw new BigQueryException("This query is non bigquery.") + } + } def readResultRow(resultRows: util.Iterator[Row], resultTypes: mutable.Buffer[RelDataTypeField]): lang.Iterable[util.List[String]] = { () => diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java index 7fee8e8d81..0781c6415a 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.BigQueryException; import org.apache.kylin.common.exception.NewQueryRefuseException; import org.apache.kylin.common.state.StateSwitchConstant; import org.apache.kylin.common.util.AddressUtil; @@ -249,4 +250,33 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { ResultPlan.getResult(ss.sql(sql), resultType); } + @Test + public void testBigQueryException() { + QueryShareStateManager.getInstance().setState(Collections.singletonList(AddressUtil.concatInstanceName()), + StateSwitchConstant.QUERY_LIMIT_STATE, "true"); + QueryContext queryContext = QueryContext.current(); + queryContext.setIfBigQuery(true); + queryContext.getMetrics() + .addAccumSourceScanRows(KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold() + 1); + String sql = "select * from TEST_KYLIN_FACT"; + try { + ResultPlan.getResult(ss.sql(sql), null); + } catch (Exception e) { + Assert.assertTrue(e instanceof BigQueryException); + } + } + + @Test + public void testNoBigQueryException() { + QueryContext queryContext = QueryContext.current(); + queryContext.setIfBigQuery(true); + queryContext.getMetrics() + .addAccumSourceScanRows(KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold() + 1); + String sql = "select * from TEST_KYLIN_FACT"; + try { + ResultPlan.getResult(ss.sql(sql), null); + } catch (Exception e) { + Assert.assertTrue(e instanceof BigQueryException); + } + } }