This is an automated email from the ASF dual-hosted git repository.

caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new a03fbf8e7e KYLIN-6044 Removed ProjectAggregateMergeRule to avoid 
aggregate index mismatches
a03fbf8e7e is described below

commit a03fbf8e7eb1b438b921aec9eb92aed8b8ac4075
Author: Guoliang Sun <[email protected]>
AuthorDate: Tue Aug 20 17:08:09 2024 +0800

    KYLIN-6044 Removed ProjectAggregateMergeRule to avoid aggregate index 
mismatches
---
 pom.xml                                            |  4 +--
 .../apache/kylin/query/engine/SqlToRelNodeTest.xml | 12 +++----
 .../query/sql_agg_not_pushdown/query01.sql         | 39 ++++++++++++++++++++++
 .../apache/kylin/query/engine/PlannerFactory.java  |  2 ++
 .../kylin/query/runtime/plan/ResultPlan.scala      |  4 +--
 .../gluten/KapExpressionTransformer.scala          |  5 ++-
 src/spark-project/spark-common/pom.xml             | 33 +-----------------
 .../ConvertKylinFileSourceToGlutenRule.scala       | 16 ++++-----
 .../apache/spark/sql/common/GlutenTestUtil.scala   | 12 +++----
 9 files changed, 68 insertions(+), 59 deletions(-)

diff --git a/pom.xml b/pom.xml
index ab3f4a710d..ca26aee4ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,12 +128,12 @@
         <kafka.version>2.8.2</kafka.version>
 
         <!-- Spark versions -->
-        <spark.version>3.3.0-kylin-4.6.26.0</spark.version>
+        <spark.version>3.3.0-kylin-5.2.2-SNAPSHOT</spark.version>
         <delta.version>2.3.0</delta.version>
         <delta-standalone.version>0.6.0</delta-standalone.version>
         <iceberg.version>1.4.1</iceberg.version>
 
-        <gluten.version>1.3.0-kylin-240829-SNAPSHOT</gluten.version>
+        <gluten.version>1.3.0-kylin-250110-SNAPSHOT</gluten.version>
         <gluten.deps.scope>compile</gluten.deps.scope>
         <substrait.version>0.5.0</substrait.version>
         <celeborn.version>0.3.0-incubating</celeborn.version>
diff --git 
a/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
 
b/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
index c60960910e..c9bd19a7da 100644
--- 
a/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
+++ 
b/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
@@ -31,8 +31,8 @@ OlapToEnumerableConverter
               OlapJoinRel(condition=[=($1, $4)], joinType=[inner], ctx=[])
                 OlapAggregateRel(group-set=[[0, 1, 2, 3]], groups=[null], 
ctx=[])
                   OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], TRANS_ID=[$3], 
SLR_SEGMENT_CD=[$4], ctx=[])
-                    OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], 
groups=[null], ctx=[])
-                      OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], 
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], ctx=[])
+                    OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], 
groups=[null], agg#0=[SUM($5)], ctx=[])
+                      OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], 
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], SUM_PRICE=[$6], ctx=[])
                         OlapLimitRel(ctx=[], fetch=[10])
                           OlapSortRel(sort0=[$1], sort1=[$2], 
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], ctx=[])
                             OlapAggregateRel(group-set=[[0, 1, 2, 3, 4, 5]], 
groups=[null], SUM_PRICE=[SUM($6)], ctx=[])
@@ -71,8 +71,8 @@ OlapToEnumerableConverter
               OlapJoinRel(condition=[=($1, $4)], joinType=[inner], ctx=[])
                 OlapAggregateRel(group-set=[[0, 1, 2, 3]], groups=[null], 
ctx=[])
                   OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], TRANS_ID=[$3], 
SLR_SEGMENT_CD=[$4], ctx=[])
-                    OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], 
groups=[null], ctx=[])
-                      OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], 
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], ctx=[])
+                    OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], 
groups=[null], agg#0=[SUM($5)], ctx=[])
+                      OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], 
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], SUM_PRICE=[$6], ctx=[])
                         OlapLimitRel(ctx=[], fetch=[10])
                           OlapSortRel(sort0=[$1], sort1=[$2], 
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], ctx=[])
                             OlapAggregateRel(group-set=[[0, 1, 2, 3, 4, 5]], 
groups=[null], SUM_PRICE=[SUM($6)], ctx=[])
@@ -111,8 +111,8 @@ OlapToEnumerableConverter
             OlapJoinRel(condition=[=($1, $4)], joinType=[inner], ctx=[])
               OlapAggregateRel(group-set=[[0, 1, 2, 3]], groups=[null], ctx=[])
                 OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], TRANS_ID=[$3], 
SLR_SEGMENT_CD=[$4], ctx=[])
-                  OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], groups=[null], 
ctx=[])
-                    OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], 
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], ctx=[])
+                  OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], groups=[null], 
agg#0=[SUM($5)], ctx=[])
+                    OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], 
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], SUM_PRICE=[$6], ctx=[])
                       OlapLimitRel(ctx=[], fetch=[10])
                         OlapSortRel(sort0=[$1], sort1=[$2], 
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], ctx=[])
                           OlapAggregateRel(group-set=[[0, 1, 2, 3, 4, 5]], 
groups=[null], SUM_PRICE=[SUM($6)], ctx=[])
diff --git 
a/src/kylin-it/src/test/resources/query/sql_agg_not_pushdown/query01.sql 
b/src/kylin-it/src/test/resources/query/sql_agg_not_pushdown/query01.sql
new file mode 100644
index 0000000000..c045ba39a8
--- /dev/null
+++ b/src/kylin-it/src/test/resources/query/sql_agg_not_pushdown/query01.sql
@@ -0,0 +1,39 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+    count(1)
+FROM
+    (SELECT
+         1
+     FROM
+         (SELECT
+              sum(SELLER_ID) as acct_bal_CURR,
+              1 AS DT
+          FROM
+              TEST_KYLIN_FACT) a
+             left JOIN
+         (
+             SELECT
+                 sum(ITEM_COUNT) acct_id_CURR,
+                 1 AS DT
+             FROM
+                 TEST_KYLIN_FACT
+         ) b
+         ON a.DT = b.DT
+    )
\ No newline at end of file
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java 
b/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
index 9b6c755660..bca8d5e1e1 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
@@ -244,6 +244,8 @@ public class PlannerFactory {
         // see KAP#16036
         planner.removeRule(CoreRules.UNION_MERGE);
         planner.removeRule(CoreRules.PROJECT_REMOVE);
+        // This rule pruning of Aggregate operators may cause aggregate index 
matching failure.See AL-9852
+        planner.removeRule(CoreRules.PROJECT_AGGREGATE_MERGE);
 
         // skip corr expansion during model suggestion
         if (!KylinConfig.getInstanceFromEnv().getSkipCorrReduceRule()) {
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 6c1f5fabe2..1d1bcfb0cf 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -25,7 +25,7 @@ import java.{lang, util}
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.exception.code.ErrorCodeServer
 import org.apache.kylin.common.exception.{BigQueryException, 
NewQueryRefuseException}
@@ -274,7 +274,7 @@ object ResultPlan extends LogEx {
 
   def getResult(df: DataFrame, rowType: RelDataType): ExecuteResult = 
withScope(df) {
     if (!ContextUtil.getNativeRealizations.isEmpty && 
!KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) {
-      
df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      
df.sparkSession.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
     }
     val queryTagInfo = QueryContext.current().getQueryTagInfo
     if (queryTagInfo.isAsyncQuery) {
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
index 5bb40713c2..6deaf1ff53 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.expression._
 import org.apache.gluten.extension.ExpressionExtensionTrait
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.KapFunctions.TRUNCATE
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.types._
@@ -58,7 +57,7 @@ case class CustomerExpressionTransformer() extends 
ExpressionExtensionTrait {
     Sig[KapAddMonths]("kap_add_months"),
     Sig[KapSubtractMonths]("kap_months_between"),
     Sig[YMDintBetween]("kap_ymd_int_between"),
-    Sig[TRUNCATE]("truncate"),
+    Sig[Truncate]("truncate"),
     Sig[KylinSplitPart]("kylin_split_part"),
     Sig[KylinInstr]("kylin_instr"),
     Sig[PreciseCardinality]("ke_bitmap_cardinality"),
@@ -141,7 +140,7 @@ case class CustomerExpressionTransformer() extends 
ExpressionExtensionTrait {
           
ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.left, 
attributeSeq),
           
ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.right, 
attributeSeq)),
         kapYmdIntBetween)
-    case truncate: TRUNCATE =>
+    case truncate: Truncate =>
       GenericExpressionTransformer(
         "truncate",
         Seq(
diff --git a/src/spark-project/spark-common/pom.xml 
b/src/spark-project/spark-common/pom.xml
index 6dc98894c8..95bea79f34 100644
--- a/src/spark-project/spark-common/pom.xml
+++ b/src/spark-project/spark-common/pom.xml
@@ -135,33 +135,8 @@
 
         <dependency>
             <groupId>org.apache.gluten</groupId>
-            <artifactId>gluten-core</artifactId>
-            <version>${gluten.version}</version>
-            <scope>${gluten.deps.scope}</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.gluten</groupId>
-                    <artifactId>spark-sql-columnar-shims-spark32</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.gluten</groupId>
-                    <artifactId>spark-sql-columnar-shims-spark33</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.gluten</groupId>
-                    <artifactId>spark-sql-columnar-shims-spark34</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.gluten</groupId>
-                    <artifactId>spark-sql-columnar-shims-spark35</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.gluten</groupId>
-            <artifactId>gluten-core</artifactId>
+            <artifactId>gluten-substrait</artifactId>
             <version>${gluten.version}</version>
-            <type>test-jar</type>
             <scope>${gluten.deps.scope}</scope>
             <exclusions>
                 <exclusion>
@@ -218,12 +193,6 @@
             <artifactId>backends-clickhouse</artifactId>
             <version>${gluten.version}</version>
             <scope>${gluten.deps.scope}</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.gluten</groupId>
-                    <artifactId>gluten-core</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.gluten</groupId>
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
index 94565faa8d..e1b59b8831 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
@@ -18,8 +18,7 @@
 
 package org.apache.spark.sql.execution.gluten
 
-import org.apache.gluten.execution.FileSourceScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenPlan, 
ValidatablePlan}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.utils.PushDownUtil
@@ -28,12 +27,13 @@ import 
org.apache.spark.sql.execution.{KylinFileSourceScanExec, LayoutFileSource
 class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends 
Rule[SparkPlan] {
 
   private def tryReturnGlutenPlan(glutenPlan: GlutenPlan, originPlan: 
SparkPlan): SparkPlan = {
-    if (glutenPlan.doValidate().ok()) {
-      logDebug(s"Columnar Processing for ${originPlan.getClass} is currently 
supported.")
-      glutenPlan
-    } else {
-      logDebug(s"Columnar Processing for ${originPlan.getClass} is currently 
unsupported.")
-      originPlan
+    glutenPlan match {
+      case plan: ValidatablePlan if plan.doValidate().ok() =>
+        logDebug(s"Columnar Processing for ${originPlan.getClass} is currently 
supported.")
+        glutenPlan
+      case _ =>
+        logDebug(s"Columnar Processing for ${originPlan.getClass} is currently 
unsupported.")
+        originPlan
     }
   }
 
diff --git 
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
 
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
index 6774c60f51..7d57fe0ccf 100644
--- 
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
+++ 
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
@@ -17,11 +17,11 @@
  */
 package org.apache.spark.sql.common
 
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.gluten.test.FallbackUtil
 import org.apache.gluten.test.FallbackUtil.collectWithSubqueries
-import org.apache.gluten.utils.QueryPlanSelector
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -53,13 +53,13 @@ object GlutenTestUtil {
   def glutenEnabled(spark: SparkSession): Boolean = {
     spark.conf.get("spark.plugins", "").contains("GlutenPlugin") &&
       spark.conf.get(
-        GlutenConfig.GLUTEN_ENABLE_KEY,
-        GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString).toBoolean && 
glutenEnabledForCurrentThread(spark)
+        GlutenConfig.GLUTEN_ENABLED_KEY,
+        GlutenConfig.GLUTEN_ENABLED_BY_DEFAULT.toString).toBoolean && 
glutenEnabledForCurrentThread(spark)
   }
 
   private def glutenEnabledForCurrentThread(spark: SparkSession): Boolean = {
     val enabled =
-      
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+      
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
     if (enabled != null) {
       enabled.toBoolean
     } else {

Reply via email to