This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 41d541a6cf599c8dac68f9f265fcbd327aa44362
Author: Jiawei Li <1019037...@qq.com>
AuthorDate: Tue Nov 7 18:26:31 2023 +0800

    KYLIN-5879 Update spark version to 3.3.0-kylin-4.6.18.0
    
    Use dataset iterator api to reduce driver memory use.
    Add interceptor to clean query result block.
    Set `kylin.query.use-iterable-collect` to false by default.
---
 build/release/download-spark.sh                    |  1 -
 pom.xml                                            |  2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../apache/kylin/newten/SlowQueryDetectorTest.java |  2 +
 .../apache/kylin/rest/service/QueryService.java    |  2 +
 .../kylin/rest/QueryBlockCleanInterceptor.java     | 47 ++++++++++++++++++++++
 .../kylin/rest/config/KylinWebMvcConfig.java       | 40 ++++++++++++++++++
 .../kylin/rest/QueryBlockCleanInterceptorTest.java | 38 +++++++++++++++++
 .../kylin/query/pushdown/SparkSqlClient.scala      |  9 ++++-
 .../kylin/query/runtime/plan/ResultPlan.scala      | 10 +++--
 .../scala/org/apache/spark/sql/SparderEnv.scala    |  6 ++-
 .../kylin/query/runtime/plan/TestResultPlan.java   | 43 ++++++++++++++++++++
 12 files changed, 197 insertions(+), 7 deletions(-)

diff --git a/build/release/download-spark.sh b/build/release/download-spark.sh
index 6d4a21bad0..b552e5d14c 100755
--- a/build/release/download-spark.sh
+++ b/build/release/download-spark.sh
@@ -25,7 +25,6 @@ source build/release/functions.sh
 rm -rf build/spark
 
 spark_version_pom=`mvn -f pom.xml help:evaluate -Dexpression=spark.version | 
grep -E '^[0-9]+\.[0-9]+\.[0-9]+' `
-spark_version_pom=3.2.0-kylin-4.6.9.0
 spark_pkg_name=spark-newten-"`echo ${spark_version_pom}| sed "s/-kylin//g"`"
 spark_pkg_file_name="${spark_pkg_name}.tgz"
 
diff --git a/pom.xml b/pom.xml
index a0cd96eef0..b3476cff46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,7 +128,7 @@
         <kafka.version>2.8.2</kafka.version>
 
         <!-- Spark versions -->
-        <spark.version>3.3.0-kylin-4.6.17.0</spark.version>
+        <spark.version>3.3.0-kylin-4.6.18.0</spark.version>
         <delta.version>2.1.0</delta.version>
 
         <roaring.version>0.9.2-kylin-r4</roaring.version>
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1ba4118e77..dc3186020e 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -4373,4 +4373,8 @@ public abstract class KylinConfigBase implements 
Serializable {
     public boolean isPrintQueryPlanEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.query.print-logical-plan", FALSE));
     }
+
+    public boolean isQueryUseIterableCollectApi() {
+        return 
Boolean.parseBoolean(getOptional("kylin.query.use-iterable-collect", FALSE));
+    }
 }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
index e9e9f1ed01..954100e9d4 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
@@ -228,6 +228,7 @@ public class SlowQueryDetectorTest extends 
NLocalWithSparkSessionTest {
     public void testSparderTimeoutCancelJob() throws Exception {
         val df = SparderEnv.getSparkSession().emptyDataFrame();
         val mockDf = Mockito.spy(df);
+        Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new 
Returns(null))).when(mockDf).collectToIterator();
         Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new 
Returns(null))).when(mockDf).toIterator();
         slowQueryDetector.queryStart("");
         try {
@@ -255,6 +256,7 @@ public class SlowQueryDetectorTest extends 
NLocalWithSparkSessionTest {
     public void testPushdownTimeoutCancelJob() {
         val df = SparderEnv.getSparkSession().emptyDataFrame();
         val mockDf = Mockito.spy(df);
+        Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new 
Returns(null))).when(mockDf).collectToIterator();
         Mockito.doAnswer(new AnswersWithDelay(TIMEOUT_MS * 3, new 
Returns(null))).when(mockDf).toIterator();
         slowQueryDetector.queryStart("");
         try {
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4ec19ef910..55d65f1bce 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -563,7 +563,9 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             return response;
         } finally {
             QueryLimiter.release();
+            String queryExecutionId = QueryContext.current().getExecutionID();
             QueryContext.current().close();
+            QueryContext.current().setExecutionID(queryExecutionId);
         }
     }
 
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/QueryBlockCleanInterceptor.java
 
b/src/server/src/main/java/org/apache/kylin/rest/QueryBlockCleanInterceptor.java
new file mode 100644
index 0000000000..3175d137ff
--- /dev/null
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/QueryBlockCleanInterceptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.guava30.shaded.common.base.Strings;
+import org.apache.spark.sql.SparderEnv;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.HandlerInterceptor;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Component
+@Order(-200)
+public class QueryBlockCleanInterceptor implements HandlerInterceptor {
+    @Override
+    public void afterCompletion(HttpServletRequest request, 
HttpServletResponse response, Object handler, Exception ex)
+            throws Exception {
+        String queryExecutionID = QueryContext.current().getExecutionID();
+        if (!Strings.isNullOrEmpty(queryExecutionID)) {
+            SparderEnv.deleteQueryTaskResultBlock(queryExecutionID);
+        }
+        QueryContext.current().close();
+
+    }
+}
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/config/KylinWebMvcConfig.java 
b/src/server/src/main/java/org/apache/kylin/rest/config/KylinWebMvcConfig.java
new file mode 100644
index 0000000000..f9b243780d
--- /dev/null
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/config/KylinWebMvcConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.config;
+
+
+import org.apache.kylin.rest.QueryBlockCleanInterceptor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@Configuration
+public class KylinWebMvcConfig implements WebMvcConfigurer {
+
+    @Bean
+    public QueryBlockCleanInterceptor getQueryBlockCleanInterceptor() {
+        return new QueryBlockCleanInterceptor();
+    }
+
+    @Override
+    public void addInterceptors(InterceptorRegistry registry) {
+        
registry.addInterceptor(getQueryBlockCleanInterceptor()).addPathPatterns("/api/query");
+    }
+
+}
diff --git 
a/src/server/src/test/java/org/apache/kylin/rest/QueryBlockCleanInterceptorTest.java
 
b/src/server/src/test/java/org/apache/kylin/rest/QueryBlockCleanInterceptorTest.java
new file mode 100644
index 0000000000..939d96fdcf
--- /dev/null
+++ 
b/src/server/src/test/java/org/apache/kylin/rest/QueryBlockCleanInterceptorTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest;
+
+import org.apache.kylin.common.QueryContext;
+import org.apache.spark.SparkEnv;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class QueryBlockCleanInterceptorTest {
+    @Test
+    public void QueryBlockCleanInterceptorTest() throws Exception {
+        QueryBlockCleanInterceptor interceptor = new 
QueryBlockCleanInterceptor();
+        QueryContext.current().setExecutionID("1");
+        SparkEnv mockEnv = Mockito.mock(SparkEnv.class);
+        SparkEnv.set(mockEnv);
+        interceptor.afterCompletion(null, null, null, null);
+        Mockito.verify(mockEnv, 
Mockito.times(1)).deleteAllBlockForQueryResult("1");
+        interceptor.afterCompletion(null, null, null, null);
+        Mockito.verify(mockEnv, 
Mockito.times(1)).deleteAllBlockForQueryResult("1");
+
+    }
+}
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index f9f4acc818..367784d3dd 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -186,7 +186,14 @@ object SparkSqlClient {
       val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace()
         , QueryContext.current().getQueryId, 
SparderEnv.getSparkSession.sparkContext)
 
-      val results = df.toIterator()
+      
NProjectManager.getProjectConfig(QueryContext.current().getProject).isQueryUseIterableCollectApi
+
+      val results = if 
(NProjectManager.getProjectConfig(QueryContext.current().getProject)
+        .isQueryUseIterableCollectApi ) {
+        df.collectToIterator()
+      } else {
+        df.toIterator()
+      }
       val resultRows = results._1
       val resultSize = results._2
       if (config.isQuerySparkJobTraceEnabled) jobTrace.jobFinished()
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 354478abde..5c80d379d3 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -22,7 +22,6 @@ import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicLong
 import java.{lang, util}
-
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
@@ -49,8 +48,8 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, 
SparderEnv}
 import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions.`iterator asScala`
 import scala.collection.mutable
-
 import io.kyligence.kap.secondstorage.SecondStorageUtil
+import org.apache.kylin.metadata.project.NProjectManager
 
 // scalastyle:off
 object ResultType extends Enumeration {
@@ -118,7 +117,12 @@ object ResultPlan extends LogEx {
       QueryContext.current.record("executed_plan")
       QueryContext.currentTrace().endLastSpan()
       val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), 
QueryContext.current().getQueryId, sparkContext)
-      val results = df.toIterator()
+      val results = if 
(NProjectManager.getProjectConfig(QueryContext.current().getProject)
+        .isQueryUseIterableCollectApi) {
+        df.collectToIterator()
+      } else {
+        df.toIterator()
+      }
       val resultRows = results._1
       val resultSize = results._2
       if (kapConfig.isQuerySparkJobTraceEnabled) jobTrace.jobFinished()
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 85dcc3b583..9c8c56ff3f 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin
 import org.apache.spark.sql.hive.HiveStorageRule
 import org.apache.spark.sql.udf.UdfManager
 import org.apache.spark.util.{ThreadUtils, Utils}
-import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
+import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext, 
SparkEnv}
 
 import java.lang.{Boolean => JBoolean, String => JString}
 import java.security.PrivilegedAction
@@ -415,4 +415,8 @@ object SparderEnv extends Logging {
   def getActiveExecutorIds(): Seq[String] = {
     getSparkSession.sparkContext.getExecutorIds()
   }
+
+  def deleteQueryTaskResultBlock(queryExecutionID: String): Unit = {
+    SparkEnv.get.deleteAllBlockForQueryResult(queryExecutionID)
+  }
 }
diff --git 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
index 0781c6415a..70b50a0487 100644
--- 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
+++ 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
@@ -19,10 +19,13 @@
 package org.apache.kylin.query.runtime.plan;
 
 import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
@@ -31,10 +34,13 @@ import 
org.apache.kylin.common.exception.NewQueryRefuseException;
 import org.apache.kylin.common.state.StateSwitchConstant;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.metadata.state.QueryShareStateManager;
 import org.apache.kylin.query.MockContext;
+import org.apache.kylin.query.engine.data.QueryResult;
 import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.pushdown.SparkSqlClient;
 import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.spark.SparkConf;
 import org.apache.spark.scheduler.JobFailed;
@@ -65,6 +71,7 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
         
getTestConfig().setProperty("kylin.query.share-state-switch-implement", "jdbc");
         
getTestConfig().setProperty("kylin.query.big-query-source-scan-rows-threshold", 
"100000000");
         ss = 
SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+        SparderEnv.registerListener(ss.sparkContext());
         SparderEnv.setSparkSession(ss);
         StructType schema = new StructType();
         schema = schema.add("TRANS_ID", DataTypes.LongType, false);
@@ -109,6 +116,7 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
 
     @Test
     public void testCancelQuery() throws InterruptedException {
+        overwriteSystemProp("kylin.query.use-iterable-collect", "true");
         AtomicReference<SparkListenerJobEnd> sparkJobEnd = new 
AtomicReference<>();
         CountDownLatch isJobEnd = new CountDownLatch(1);
         ss.sparkContext().addSparkListener(new SparkListener() {
@@ -279,4 +287,39 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
             Assert.assertTrue(e instanceof BigQueryException);
         }
     }
+    @Test
+    public void testSparkSqlClient() {
+
+        QueryContext queryContext = QueryContext.current();
+        
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).createProject("test",
 "ADMIN", "des",
+                new LinkedHashMap<String, String>());
+        String sql = "select * from TEST_KYLIN_FACT";
+        overwriteSystemProp("kylin.query.use-iterable-collect", "true");
+        val reuslt1 = SparkSqlClient.executeSqlToIterable(ss, sql, 
UUID.randomUUID(), "test");
+        val queryResult1 = new QueryResult(reuslt1._1(), (int) reuslt1._2(), 
reuslt1._3());
+
+        overwriteSystemProp("kylin.query.use-iterable-collect", "false");
+        val reuslt2 = SparkSqlClient.executeSqlToIterable(ss, sql, 
UUID.randomUUID(), "test");
+        val queryResult2 = new QueryResult(reuslt2._1(), (int) reuslt2._2(), 
reuslt2._3());
+        CollectionUtils.isEqualCollection(queryResult1.getRows(), 
queryResult2.getRows());
+    }
+
+    @Test
+    public void testSparkEnvDeleteBlock() {
+        QueryContext queryContext = QueryContext.current();
+        
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).createProject("test",
 "ADMIN", "des",
+                new LinkedHashMap<String, String>());
+        String sql = "select * from TEST_KYLIN_FACT t1 left join (select * 
from TEST_KYLIN_FACT limit 10) t2 on 1=1  ";
+        overwriteSystemProp("kylin.query.use-iterable-collect", "true");
+        val reuslt1 = SparkSqlClient.executeSqlToIterable(ss, sql, 
UUID.randomUUID(), "test");
+        SparderEnv.deleteQueryTaskResultBlock(queryContext.getExecutionID());
+        try {
+            while (reuslt1._1().iterator().hasNext()) {
+                reuslt1._1().iterator().next();
+            }
+        }catch (Exception e){
+            assert e.getMessage().contains("Failed to fetch block after 1 
fetch failures");
+        }
+    }
+
 }

Reply via email to