This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 0c3ad9f42a KYLIN-5271 Query memory leaks 0c3ad9f42a is described below commit 0c3ad9f42a81a15bf115a8d2cadfe58deba116a4 Author: zhaoliu4 <zhaol...@iflytek.com> AuthorDate: Fri Sep 23 00:05:53 2022 +0800 KYLIN-5271 Query memory leaks --- .../kylin/query/pushdown/SparkSubmitter.java | 9 +++++--- .../apache/kylin/query/runtime/SparkEngine.java | 26 ++++++++++++++-------- .../kylin/query/runtime/plans/ResultPlan.scala | 1 - .../org/apache/spark/sql/SparderContext.scala | 1 - .../apache/spark/sql/SparderContextFacade.scala | 1 + 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java index 29259aa4bd..0379a1727c 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java +++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java @@ -30,9 +30,12 @@ public class SparkSubmitter { public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class); public static PushdownResponse submitPushDownTask(String sql) { - Pair<List<List<String>>, List<StructField>> pair = - SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql); - SparderContext.closeThreadSparkSession(); + Pair<List<List<String>>, List<StructField>> pair = null; + try { + pair = SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql); + } finally { + SparderContext.closeThreadSparkSession(); + } return new PushdownResponse(pair.getSecond(), pair.getFirst()); } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index 3a504ae073..05d2888e94 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -29,6 +29,7 @@ import org.apache.kylin.query.runtime.plans.ResultPlan; import org.apache.kylin.query.runtime.plans.ResultType; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparderContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,21 +39,28 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode relNode, RelDataType resultType) { - Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - if (System.getProperty("calcite.debug") != null) { - log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + try { + Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); + if (System.getProperty("calcite.debug") != null) { + log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + } + return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get(); + } finally { + SparderContext.closeThreadSparkSession(); } - return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get(); - } @Override public Enumerable<Object[]> compute(DataContext dataContext, RelNode relNode, RelDataType resultType) { - Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - if (System.getProperty("calcite.debug") != null) { - log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + try { + Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); + if (System.getProperty("calcite.debug") != null) { + log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + } + return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get(); + } finally { + SparderContext.closeThreadSparkSession(); } - return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get(); } private Dataset<Row> toSparkPlan(DataContext dataContext, RelNode relNode) { diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index 7c95636e31..3f98f47f39 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -190,7 +190,6 @@ object ResultPlan extends Logging { } } SparderContext.cleanQueryInfo() - SparderContext.closeThreadSparkSession() result } } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index e6b73d66cc..6be695112d 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -61,7 +61,6 @@ object SparderContext extends Logging { } def getSparkSession: SparkSession = { - logInfo(s"Current thread ${Thread.currentThread().getId} create a SparkSession.") SparderContextFacade.current().getFirst } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala index 386e9fcfc0..ede8607607 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala @@ -32,6 +32,7 @@ object SparderContextFacade extends Logging { def current(): Pair[SparkSession, UdfManager] = { if (CURRENT_SPARKSESSION.get() == null) { val spark = SparderContext.getOriginalSparkSession.cloneSession() + logInfo(s"Current thread ${Thread.currentThread().getId} create a SparkSession.") CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark, UdfManager.createWithoutBuildInFunc(spark))) }