This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 41d541a6cf599c8dac68f9f265fcbd327aa44362 Author: Jiawei Li <1019037...@qq.com> AuthorDate: Tue Nov 7 18:26:31 2023 +0800 KYLIN-5879 Update spark version to 3.3.0-kylin-4.6.18.0 Use dataset iterator api to reduce driver memory use. Add interceptor to clean query result block. Set `kylin.query.use-iterable-collect` to false by default. --- build/release/download-spark.sh | 1 - pom.xml | 2 +- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../apache/kylin/newten/SlowQueryDetectorTest.java | 2 + .../apache/kylin/rest/service/QueryService.java | 2 + .../kylin/rest/QueryBlockCleanInterceptor.java | 47 ++++++++++++++++++++++ .../kylin/rest/config/KylinWebMvcConfig.java | 40 ++++++++++++++++++ .../kylin/rest/QueryBlockCleanInterceptorTest.java | 38 +++++++++++++++++ .../kylin/query/pushdown/SparkSqlClient.scala | 9 ++++- .../kylin/query/runtime/plan/ResultPlan.scala | 10 +++-- .../scala/org/apache/spark/sql/SparderEnv.scala | 6 ++- .../kylin/query/runtime/plan/TestResultPlan.java | 43 ++++++++++++++++++++ 12 files changed, 197 insertions(+), 7 deletions(-) diff --git a/build/release/download-spark.sh b/build/release/download-spark.sh index 6d4a21bad0..b552e5d14c 100755 --- a/build/release/download-spark.sh +++ b/build/release/download-spark.sh @@ -25,7 +25,6 @@ source build/release/functions.sh rm -rf build/spark spark_version_pom=`mvn -f pom.xml help:evaluate -Dexpression=spark.version | grep -E '^[0-9]+\.[0-9]+\.[0-9]+' ` -spark_version_pom=3.2.0-kylin-4.6.9.0 spark_pkg_name=spark-newten-"`echo ${spark_version_pom}| sed "s/-kylin//g"`" spark_pkg_file_name="${spark_pkg_name}.tgz" diff --git a/pom.xml b/pom.xml index a0cd96eef0..b3476cff46 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ <kafka.version>2.8.2</kafka.version> <!-- Spark versions --> - <spark.version>3.3.0-kylin-4.6.17.0</spark.version> + <spark.version>3.3.0-kylin-4.6.18.0</spark.version> <delta.version>2.1.0</delta.version> <roaring.version>0.9.2-kylin-r4</roaring.version> diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 1ba4118e77..dc3186020e 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -4373,4 +4373,8 @@ public abstract class KylinConfigBase implements Serializable { public boolean isPrintQueryPlanEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.print-logical-plan", FALSE)); } + + public boolean isQueryUseIterableCollectApi() { + return Boolean.parseBoolean(getOptional("kylin.query.use-iterable-collect", FALSE)); + } } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java index e9e9f1ed01..954100e9d4 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java @@ -228,6 +228,7 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest { public void testSparderTimeoutCancelJob() throws Exception { val df = SparderEnv.getSparkSession().emptyDataFrame(); val mockDf = Mockito.spy(df); + Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new Returns(null))).when(mockDf).collectToIterator(); Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new Returns(null))).when(mockDf).toIterator(); slowQueryDetector.queryStart(""); try { @@ -255,6 +256,7 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest { public void testPushdownTimeoutCancelJob() { val df = SparderEnv.getSparkSession().emptyDataFrame(); val mockDf = Mockito.spy(df); + Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new Returns(null))).when(mockDf).collectToIterator(); Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new Returns(null))).when(mockDf).toIterator(); slowQueryDetector.queryStart(""); try { diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 4ec19ef910..55d65f1bce 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -563,7 +563,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup return response; } finally { QueryLimiter.release(); + String queryExecutionId = QueryContext.current().getExecutionID(); QueryContext.current().close(); + QueryContext.current().setExecutionID(queryExecutionId); } } diff --git a/src/server/src/main/java/org/apache/kylin/rest/QueryBlockCleanInterceptor.java b/src/server/src/main/java/org/apache/kylin/rest/QueryBlockCleanInterceptor.java new file mode 100644 index 0000000000..3175d137ff --- /dev/null +++ b/src/server/src/main/java/org/apache/kylin/rest/QueryBlockCleanInterceptor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.guava30.shaded.common.base.Strings; +import org.apache.spark.sql.SparderEnv; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.HandlerInterceptor; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@Order(-200) +public class QueryBlockCleanInterceptor implements HandlerInterceptor { + @Override + public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) + throws Exception { + String queryExecutionID = QueryContext.current().getExecutionID(); + if (!Strings.isNullOrEmpty(queryExecutionID)) { + SparderEnv.deleteQueryTaskResultBlock(queryExecutionID); + } + QueryContext.current().close(); + + } +} diff --git a/src/server/src/main/java/org/apache/kylin/rest/config/KylinWebMvcConfig.java b/src/server/src/main/java/org/apache/kylin/rest/config/KylinWebMvcConfig.java new file mode 100644 index 0000000000..f9b243780d --- /dev/null +++ b/src/server/src/main/java/org/apache/kylin/rest/config/KylinWebMvcConfig.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.config; + + +import org.apache.kylin.rest.QueryBlockCleanInterceptor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.InterceptorRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@Configuration +public class KylinWebMvcConfig implements WebMvcConfigurer { + + @Bean + public QueryBlockCleanInterceptor getQueryBlockCleanInterceptor() { + return new QueryBlockCleanInterceptor(); + } + + @Override + public void addInterceptors(InterceptorRegistry registry) { + registry.addInterceptor(getQueryBlockCleanInterceptor()).addPathPatterns("/api/query"); + } + +} diff --git a/src/server/src/test/java/org/apache/kylin/rest/QueryBlockCleanInterceptorTest.java b/src/server/src/test/java/org/apache/kylin/rest/QueryBlockCleanInterceptorTest.java new file mode 100644 index 0000000000..939d96fdcf --- /dev/null +++ b/src/server/src/test/java/org/apache/kylin/rest/QueryBlockCleanInterceptorTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest; + +import org.apache.kylin.common.QueryContext; +import org.apache.spark.SparkEnv; +import org.junit.Test; +import org.mockito.Mockito; + +public class QueryBlockCleanInterceptorTest { + @Test + public void QueryBlockCleanInterceptorTest() throws Exception { + QueryBlockCleanInterceptor interceptor = new QueryBlockCleanInterceptor(); + QueryContext.current().setExecutionID("1"); + SparkEnv mockEnv = Mockito.mock(SparkEnv.class); + SparkEnv.set(mockEnv); + interceptor.afterCompletion(null, null, null, null); + Mockito.verify(mockEnv, Mockito.times(1)).deleteAllBlockForQueryResult("1"); + interceptor.afterCompletion(null, null, null, null); + Mockito.verify(mockEnv, Mockito.times(1)).deleteAllBlockForQueryResult("1"); + + } +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index f9f4acc818..367784d3dd 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -186,7 +186,14 @@ object SparkSqlClient { val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace() , QueryContext.current().getQueryId, SparderEnv.getSparkSession.sparkContext) - val results = df.toIterator() + NProjectManager.getProjectConfig(QueryContext.current().getProject).isQueryUseIterableCollectApi + + val results = if (NProjectManager.getProjectConfig(QueryContext.current().getProject) + .isQueryUseIterableCollectApi ) { + df.collectToIterator() + } else { + df.toIterator() + } val resultRows = results._1 val resultSize = results._2 if (config.isQuerySparkJobTraceEnabled) jobTrace.jobFinished() diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 354478abde..5c80d379d3 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -22,7 +22,6 @@ import java.io.{File, FileOutputStream, OutputStreamWriter} import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicLong import java.{lang, util} - import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path @@ -49,8 +48,8 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv} import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`iterator asScala` import scala.collection.mutable - import io.kyligence.kap.secondstorage.SecondStorageUtil +import org.apache.kylin.metadata.project.NProjectManager // scalastyle:off object ResultType extends Enumeration { @@ -118,7 +117,12 @@ object ResultPlan extends LogEx { QueryContext.current.record("executed_plan") QueryContext.currentTrace().endLastSpan() val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), QueryContext.current().getQueryId, sparkContext) - val results = df.toIterator() + val results = if (NProjectManager.getProjectConfig(QueryContext.current().getProject) + .isQueryUseIterableCollectApi) { + df.collectToIterator() + } else { + df.toIterator() + } val resultRows = results._1 val resultSize = results._2 if (kapConfig.isQuerySparkJobTraceEnabled) jobTrace.jobFinished() diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala index 85dcc3b583..9c8c56ff3f 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin import org.apache.spark.sql.hive.HiveStorageRule import org.apache.spark.sql.udf.UdfManager import org.apache.spark.util.{ThreadUtils, Utils} -import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext, SparkEnv} import java.lang.{Boolean => JBoolean, String => JString} import java.security.PrivilegedAction @@ -415,4 +415,8 @@ object SparderEnv extends Logging { def getActiveExecutorIds(): Seq[String] = { getSparkSession.sparkContext.getExecutorIds() } + + def deleteQueryTaskResultBlock(queryExecutionID: String): Unit = { + SparkEnv.get.deleteAllBlockForQueryResult(queryExecutionID) + } } diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java index 0781c6415a..70b50a0487 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java @@ -19,10 +19,13 @@ package org.apache.kylin.query.runtime.plan; import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.collections4.CollectionUtils; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; @@ -31,10 +34,13 @@ import org.apache.kylin.common.exception.NewQueryRefuseException; import org.apache.kylin.common.state.StateSwitchConstant; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.query.BigQueryThresholdUpdater; import org.apache.kylin.metadata.state.QueryShareStateManager; import org.apache.kylin.query.MockContext; +import org.apache.kylin.query.engine.data.QueryResult; import org.apache.kylin.query.exception.UserStopQueryException; +import org.apache.kylin.query.pushdown.SparkSqlClient; import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.JobFailed; @@ -65,6 +71,7 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { getTestConfig().setProperty("kylin.query.share-state-switch-implement", "jdbc"); getTestConfig().setProperty("kylin.query.big-query-source-scan-rows-threshold", "100000000"); ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate(); + SparderEnv.registerListener(ss.sparkContext()); SparderEnv.setSparkSession(ss); StructType schema = new StructType(); schema = schema.add("TRANS_ID", DataTypes.LongType, false); @@ -109,6 +116,7 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { @Test public void testCancelQuery() throws InterruptedException { + overwriteSystemProp("kylin.query.use-iterable-collect", "true"); AtomicReference<SparkListenerJobEnd> sparkJobEnd = new AtomicReference<>(); CountDownLatch isJobEnd = new CountDownLatch(1); ss.sparkContext().addSparkListener(new SparkListener() { @@ -279,4 +287,39 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { Assert.assertTrue(e instanceof BigQueryException); } } + @Test + public void testSparkSqlClient() { + + QueryContext queryContext = QueryContext.current(); + NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).createProject("test", "ADMIN", "des", + new LinkedHashMap<String, String>()); + String sql = "select * from TEST_KYLIN_FACT"; + overwriteSystemProp("kylin.query.use-iterable-collect", "true"); + val reuslt1 = SparkSqlClient.executeSqlToIterable(ss, sql, UUID.randomUUID(), "test"); + val queryResult1 = new QueryResult(reuslt1._1(), (int) reuslt1._2(), reuslt1._3()); + + overwriteSystemProp("kylin.query.use-iterable-collect", "false"); + val reuslt2 = SparkSqlClient.executeSqlToIterable(ss, sql, UUID.randomUUID(), "test"); + val queryResult2 = new QueryResult(reuslt2._1(), (int) reuslt2._2(), reuslt2._3()); + CollectionUtils.isEqualCollection(queryResult1.getRows(), queryResult2.getRows()); + } + + @Test + public void testSparkEnvDeleteBlock() { + QueryContext queryContext = QueryContext.current(); + NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).createProject("test", "ADMIN", "des", + new LinkedHashMap<String, String>()); + String sql = "select * from TEST_KYLIN_FACT t1 left join (select * from TEST_KYLIN_FACT limit 10) t2 on 1=1 "; + overwriteSystemProp("kylin.query.use-iterable-collect", "true"); + val reuslt1 = SparkSqlClient.executeSqlToIterable(ss, sql, UUID.randomUUID(), "test"); + SparderEnv.deleteQueryTaskResultBlock(queryContext.getExecutionID()); + try { + while (reuslt1._1().iterator().hasNext()) { + reuslt1._1().iterator().next(); + } + }catch (Exception e){ + assert e.getMessage().contains("Failed to fetch block after 1 fetch failures"); + } + } + }