This is an automated email from the ASF dual-hosted git repository.
caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new a03fbf8e7e KYLIN-6044 Removed ProjectAggregateMergeRule to avoid
aggregate index mismatches
a03fbf8e7e is described below
commit a03fbf8e7eb1b438b921aec9eb92aed8b8ac4075
Author: Guoliang Sun <[email protected]>
AuthorDate: Tue Aug 20 17:08:09 2024 +0800
KYLIN-6044 Removed ProjectAggregateMergeRule to avoid aggregate index
mismatches
---
pom.xml | 4 +--
.../apache/kylin/query/engine/SqlToRelNodeTest.xml | 12 +++----
.../query/sql_agg_not_pushdown/query01.sql | 39 ++++++++++++++++++++++
.../apache/kylin/query/engine/PlannerFactory.java | 2 ++
.../kylin/query/runtime/plan/ResultPlan.scala | 4 +--
.../gluten/KapExpressionTransformer.scala | 5 ++-
src/spark-project/spark-common/pom.xml | 33 +-----------------
.../ConvertKylinFileSourceToGlutenRule.scala | 16 ++++-----
.../apache/spark/sql/common/GlutenTestUtil.scala | 12 +++----
9 files changed, 68 insertions(+), 59 deletions(-)
diff --git a/pom.xml b/pom.xml
index ab3f4a710d..ca26aee4ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,12 +128,12 @@
<kafka.version>2.8.2</kafka.version>
<!-- Spark versions -->
- <spark.version>3.3.0-kylin-4.6.26.0</spark.version>
+ <spark.version>3.3.0-kylin-5.2.2-SNAPSHOT</spark.version>
<delta.version>2.3.0</delta.version>
<delta-standalone.version>0.6.0</delta-standalone.version>
<iceberg.version>1.4.1</iceberg.version>
- <gluten.version>1.3.0-kylin-240829-SNAPSHOT</gluten.version>
+ <gluten.version>1.3.0-kylin-250110-SNAPSHOT</gluten.version>
<gluten.deps.scope>compile</gluten.deps.scope>
<substrait.version>0.5.0</substrait.version>
<celeborn.version>0.3.0-incubating</celeborn.version>
diff --git
a/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
b/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
index c60960910e..c9bd19a7da 100644
---
a/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
+++
b/src/kylin-it/src/test/resources/org/apache/kylin/query/engine/SqlToRelNodeTest.xml
@@ -31,8 +31,8 @@ OlapToEnumerableConverter
OlapJoinRel(condition=[=($1, $4)], joinType=[inner], ctx=[])
OlapAggregateRel(group-set=[[0, 1, 2, 3]], groups=[null],
ctx=[])
OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], TRANS_ID=[$3],
SLR_SEGMENT_CD=[$4], ctx=[])
- OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]],
groups=[null], ctx=[])
- OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1],
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], ctx=[])
+ OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]],
groups=[null], agg#0=[SUM($5)], ctx=[])
+ OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1],
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], SUM_PRICE=[$6], ctx=[])
OlapLimitRel(ctx=[], fetch=[10])
OlapSortRel(sort0=[$1], sort1=[$2],
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], ctx=[])
OlapAggregateRel(group-set=[[0, 1, 2, 3, 4, 5]],
groups=[null], SUM_PRICE=[SUM($6)], ctx=[])
@@ -71,8 +71,8 @@ OlapToEnumerableConverter
OlapJoinRel(condition=[=($1, $4)], joinType=[inner], ctx=[])
OlapAggregateRel(group-set=[[0, 1, 2, 3]], groups=[null],
ctx=[])
OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], TRANS_ID=[$3],
SLR_SEGMENT_CD=[$4], ctx=[])
- OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]],
groups=[null], ctx=[])
- OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1],
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], ctx=[])
+ OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]],
groups=[null], agg#0=[SUM($5)], ctx=[])
+ OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1],
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], SUM_PRICE=[$6], ctx=[])
OlapLimitRel(ctx=[], fetch=[10])
OlapSortRel(sort0=[$1], sort1=[$2],
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], ctx=[])
OlapAggregateRel(group-set=[[0, 1, 2, 3, 4, 5]],
groups=[null], SUM_PRICE=[SUM($6)], ctx=[])
@@ -111,8 +111,8 @@ OlapToEnumerableConverter
OlapJoinRel(condition=[=($1, $4)], joinType=[inner], ctx=[])
OlapAggregateRel(group-set=[[0, 1, 2, 3]], groups=[null], ctx=[])
OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1], TRANS_ID=[$3],
SLR_SEGMENT_CD=[$4], ctx=[])
- OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], groups=[null],
ctx=[])
- OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1],
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], ctx=[])
+ OlapAggregateRel(group-set=[[0, 1, 2, 3, 4]], groups=[null],
agg#0=[SUM($5)], ctx=[])
+ OlapProjectRel(WEEK_BEG_DT=[$0], CAL_DT=[$1],
ITEM_COUNT=[$2], TRANS_ID=[$4], SLR_SEGMENT_CD=[$5], SUM_PRICE=[$6], ctx=[])
OlapLimitRel(ctx=[], fetch=[10])
OlapSortRel(sort0=[$1], sort1=[$2],
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], ctx=[])
OlapAggregateRel(group-set=[[0, 1, 2, 3, 4, 5]],
groups=[null], SUM_PRICE=[SUM($6)], ctx=[])
diff --git
a/src/kylin-it/src/test/resources/query/sql_agg_not_pushdown/query01.sql
b/src/kylin-it/src/test/resources/query/sql_agg_not_pushdown/query01.sql
new file mode 100644
index 0000000000..c045ba39a8
--- /dev/null
+++ b/src/kylin-it/src/test/resources/query/sql_agg_not_pushdown/query01.sql
@@ -0,0 +1,39 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+ count(1)
+FROM
+ (SELECT
+ 1
+ FROM
+ (SELECT
+ sum(SELLER_ID) as acct_bal_CURR,
+ 1 AS DT
+ FROM
+ TEST_KYLIN_FACT) a
+ left JOIN
+ (
+ SELECT
+ sum(ITEM_COUNT) acct_id_CURR,
+ 1 AS DT
+ FROM
+ TEST_KYLIN_FACT
+ ) b
+ ON a.DT = b.DT
+ )
\ No newline at end of file
diff --git
a/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
b/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
index 9b6c755660..bca8d5e1e1 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/PlannerFactory.java
@@ -244,6 +244,8 @@ public class PlannerFactory {
// see KAP#16036
planner.removeRule(CoreRules.UNION_MERGE);
planner.removeRule(CoreRules.PROJECT_REMOVE);
+ // This rule pruning of Aggregate operators may cause aggregate index
matching failure.See AL-9852
+ planner.removeRule(CoreRules.PROJECT_AGGREGATE_MERGE);
// skip corr expansion during model suggestion
if (!KylinConfig.getInstanceFromEnv().getSkipCorrReduceRule()) {
diff --git
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 6c1f5fabe2..1d1bcfb0cf 100644
---
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -25,7 +25,7 @@ import java.{lang, util}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
import org.apache.commons.io.IOUtils
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.exception.code.ErrorCodeServer
import org.apache.kylin.common.exception.{BigQueryException,
NewQueryRefuseException}
@@ -274,7 +274,7 @@ object ResultPlan extends LogEx {
def getResult(df: DataFrame, rowType: RelDataType): ExecuteResult =
withScope(df) {
if (!ContextUtil.getNativeRealizations.isEmpty &&
!KylinConfig.getInstanceFromEnv.queryIndexUseGluten()) {
-
df.sparkSession.sparkContext.setLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+
df.sparkSession.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
}
val queryTagInfo = QueryContext.current().getQueryTagInfo
if (queryTagInfo.isAsyncQuery) {
diff --git
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
index 5bb40713c2..6deaf1ff53 100644
---
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
+++
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/gluten/KapExpressionTransformer.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression._
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.KapFunctions.TRUNCATE
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.types._
@@ -58,7 +57,7 @@ case class CustomerExpressionTransformer() extends
ExpressionExtensionTrait {
Sig[KapAddMonths]("kap_add_months"),
Sig[KapSubtractMonths]("kap_months_between"),
Sig[YMDintBetween]("kap_ymd_int_between"),
- Sig[TRUNCATE]("truncate"),
+ Sig[Truncate]("truncate"),
Sig[KylinSplitPart]("kylin_split_part"),
Sig[KylinInstr]("kylin_instr"),
Sig[PreciseCardinality]("ke_bitmap_cardinality"),
@@ -141,7 +140,7 @@ case class CustomerExpressionTransformer() extends
ExpressionExtensionTrait {
ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.left,
attributeSeq),
ExpressionConverter.replaceWithExpressionTransformer(kapYmdIntBetween.right,
attributeSeq)),
kapYmdIntBetween)
- case truncate: TRUNCATE =>
+ case truncate: Truncate =>
GenericExpressionTransformer(
"truncate",
Seq(
diff --git a/src/spark-project/spark-common/pom.xml
b/src/spark-project/spark-common/pom.xml
index 6dc98894c8..95bea79f34 100644
--- a/src/spark-project/spark-common/pom.xml
+++ b/src/spark-project/spark-common/pom.xml
@@ -135,33 +135,8 @@
<dependency>
<groupId>org.apache.gluten</groupId>
- <artifactId>gluten-core</artifactId>
- <version>${gluten.version}</version>
- <scope>${gluten.deps.scope}</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.gluten</groupId>
- <artifactId>spark-sql-columnar-shims-spark32</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.gluten</groupId>
- <artifactId>spark-sql-columnar-shims-spark33</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.gluten</groupId>
- <artifactId>spark-sql-columnar-shims-spark34</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.gluten</groupId>
- <artifactId>spark-sql-columnar-shims-spark35</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.gluten</groupId>
- <artifactId>gluten-core</artifactId>
+ <artifactId>gluten-substrait</artifactId>
<version>${gluten.version}</version>
- <type>test-jar</type>
<scope>${gluten.deps.scope}</scope>
<exclusions>
<exclusion>
@@ -218,12 +193,6 @@
<artifactId>backends-clickhouse</artifactId>
<version>${gluten.version}</version>
<scope>${gluten.deps.scope}</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.gluten</groupId>
- <artifactId>gluten-core</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
diff --git
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
index 94565faa8d..e1b59b8831 100644
---
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
+++
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/gluten/ConvertKylinFileSourceToGlutenRule.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.execution.gluten
-import org.apache.gluten.execution.FileSourceScanExecTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenPlan,
ValidatablePlan}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.utils.PushDownUtil
@@ -28,12 +27,13 @@ import
org.apache.spark.sql.execution.{KylinFileSourceScanExec, LayoutFileSource
class ConvertKylinFileSourceToGlutenRule(val session: SparkSession) extends
Rule[SparkPlan] {
private def tryReturnGlutenPlan(glutenPlan: GlutenPlan, originPlan:
SparkPlan): SparkPlan = {
- if (glutenPlan.doValidate().ok()) {
- logDebug(s"Columnar Processing for ${originPlan.getClass} is currently
supported.")
- glutenPlan
- } else {
- logDebug(s"Columnar Processing for ${originPlan.getClass} is currently
unsupported.")
- originPlan
+ glutenPlan match {
+ case plan: ValidatablePlan if plan.doValidate().ok() =>
+ logDebug(s"Columnar Processing for ${originPlan.getClass} is currently
supported.")
+ glutenPlan
+ case _ =>
+ logDebug(s"Columnar Processing for ${originPlan.getClass} is currently
unsupported.")
+ originPlan
}
}
diff --git
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
index 6774c60f51..7d57fe0ccf 100644
---
a/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
+++
b/src/spark-project/spark-common/src/test/java/org/apache/spark/sql/common/GlutenTestUtil.scala
@@ -17,11 +17,11 @@
*/
package org.apache.spark.sql.common
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.gluten.test.FallbackUtil
import org.apache.gluten.test.FallbackUtil.collectWithSubqueries
-import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -53,13 +53,13 @@ object GlutenTestUtil {
def glutenEnabled(spark: SparkSession): Boolean = {
spark.conf.get("spark.plugins", "").contains("GlutenPlugin") &&
spark.conf.get(
- GlutenConfig.GLUTEN_ENABLE_KEY,
- GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString).toBoolean &&
glutenEnabledForCurrentThread(spark)
+ GlutenConfig.GLUTEN_ENABLED_KEY,
+ GlutenConfig.GLUTEN_ENABLED_BY_DEFAULT.toString).toBoolean &&
glutenEnabledForCurrentThread(spark)
}
private def glutenEnabledForCurrentThread(spark: SparkSession): Boolean = {
val enabled =
-
spark.sparkContext.getLocalProperty(QueryPlanSelector.GLUTEN_ENABLE_FOR_THREAD_KEY)
+
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
if (enabled != null) {
enabled.toBoolean
} else {