This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 64e9365 KYLIN-4842 Supports grouping sets function for Kylin 4 64e9365 is described below commit 64e93654cdc4ed56bed790fe3a86e57ab78ca02c Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Tue Dec 15 23:45:13 2020 +0800 KYLIN-4842 Supports grouping sets function for Kylin 4 --- .../test/resources/query/sql_grouping/query03.sql | 26 ++++++++++++++++++++++ .../test/resources/query/sql_grouping/query04.sql | 26 ++++++++++++++++++++++ .../test/resources/query/sql_grouping/query05.sql | 26 ++++++++++++++++++++++ .../test/resources/query/sql_grouping/query06.sql | 26 ++++++++++++++++++++++ .../test/resources/query/sql_grouping/query07.sql | 24 ++++++++++++++++++++ .../test/resources/query/sql_grouping/query08.sql | 24 ++++++++++++++++++++ .../test/resources/query/sql_grouping/query09.sql | 24 ++++++++++++++++++++ .../apache/spark/sql/common/SparkQueryTest.scala | 4 ++-- .../kylin/query/runtime/plans/AggregatePlan.scala | 15 +++++++------ .../kylin/query/runtime/plans/ValuesPlan.scala | 3 +-- .../runtime => spark/sql}/SparkOperation.scala | 16 ++++++++++--- .../kylin/engine/spark2/NBuildAndQueryTest.java | 5 +---- 12 files changed, 201 insertions(+), 18 deletions(-) diff --git a/kylin-it/src/test/resources/query/sql_grouping/query03.sql b/kylin-it/src/test/resources/query/sql_grouping/query03.sql new file mode 100644 index 0000000..2adcca5 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query03.sql @@ -0,0 +1,26 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +sum(price) as GMV, +cal_dt as dt, +(case grouping(slr_segment_cd) when 1 then 'ALL' else cast(slr_segment_cd as varchar(256)) end) as cd, +(case grouping(lstg_format_name) when 1 then 'ALL' else lstg_format_name end) as name, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd), (cal_dt, slr_segment_cd), (lstg_format_name, slr_segment_cd)) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_grouping/query04.sql b/kylin-it/src/test/resources/query/sql_grouping/query04.sql new file mode 100644 index 0000000..fea6da4 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query04.sql @@ -0,0 +1,26 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +sum(price) as GMV, +cal_dt as dt, +slr_segment_cd as cd, +lstg_format_name as name, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd), (cal_dt, slr_segment_cd), (lstg_format_name, slr_segment_cd)) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_grouping/query05.sql b/kylin-it/src/test/resources/query/sql_grouping/query05.sql new file mode 100644 index 0000000..1a0eb4d --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query05.sql @@ -0,0 +1,26 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +sum(price) as GMV, +grouping(cal_dt) as dt, +grouping(slr_segment_cd) as cd, +grouping(lstg_format_name) as name, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd), (cal_dt, slr_segment_cd), (lstg_format_name, slr_segment_cd)) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_grouping/query06.sql b/kylin-it/src/test/resources/query/sql_grouping/query06.sql new file mode 100644 index 0000000..1fe8be2 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query06.sql @@ -0,0 +1,26 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +sum(price) as GMV, +cal_dt as dt, +slr_segment_cd as cd, +lstg_format_name as name, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by grouping sets((lstg_format_name, cal_dt, slr_segment_cd)) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_grouping/query07.sql b/kylin-it/src/test/resources/query/sql_grouping/query07.sql new file mode 100644 index 0000000..b0f9923 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query07.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +sum(price) as GMV, +lstg_format_name as name, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by grouping sets(lstg_format_name) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_grouping/query08.sql b/kylin-it/src/test/resources/query/sql_grouping/query08.sql new file mode 100644 index 0000000..923d99f --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query08.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +lstg_format_name as name, +sum(price) as GMV, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by cube(lstg_format_name) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_grouping/query09.sql b/kylin-it/src/test/resources/query/sql_grouping/query09.sql new file mode 100644 index 0000000..55da2bc --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_grouping/query09.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select +slr_segment_cd as cd, +sum(price) as GMV, +count(*) as TRANS_CNT from test_kylin_fact +where cal_dt between '2012-01-01' and '2012-02-01' +group by rollup(slr_segment_cd) +;{"scanRowCount":9287,"scanBytes":0,"scanFiles":1,"cuboidId":[276480]} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala index dbf9beb..76fc1fc 100644 --- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala +++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/SparkQueryTest.scala @@ -116,9 +116,9 @@ object SparkQueryTest { |== Results == |${ sideBySide( - s"== Kylin Answer - ${kylinAnswer.size} ==" +: + s"== Expected Answer - ${kylinAnswer.size} ==" +: kylinResults.map(_.toString()), - s"== Spark Answer - ${sparkAnswer.size} ==" +: + s"== Kylin Answer - ${sparkAnswer.size} ==" +: sparkResults.map(_.toString()) ).mkString("\n") } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala index d1d81b9..4907af3 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala @@ -18,11 +18,12 @@ package org.apache.kylin.query.runtime.plans import org.apache.calcite.DataContext +import org.apache.calcite.rel.core.Aggregate import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.sql.SqlKind import org.apache.kylin.metadata.model.FunctionDesc import org.apache.kylin.query.relnode.{KylinAggregateCall, OLAPAggregateRel} -import org.apache.kylin.query.runtime.{AggArgc, RuntimeHelper, SparkOperation} +import org.apache.kylin.query.runtime.RuntimeHelper import org.apache.kylin.query.SchemaProcessor import org.apache.spark.sql.KylinFunctions._ import org.apache.spark.sql._ @@ -61,7 +62,11 @@ object AggregatePlan extends LogEx { } else { dataFrame = genFiltersWhenIntersectCount(rel, dataFrame) val aggList = buildAgg(dataFrame.schema, rel) - SparkOperation.agg(AggArgc(dataFrame, groupList, aggList)) + val groupSets = rel.getGroupSets.asScala + .map(groupSet => groupSet.asScala.map(groupId => col(schemaNames.apply(groupId))).toList) + .toList + SparkOperation.agg(AggArgc(dataFrame, groupList, aggList, groupSets, + rel.getGroupType() == Aggregate.Group.SIMPLE)) } } @@ -167,11 +172,7 @@ object AggregatePlan extends LogEx { case SqlKind.SINGLE_VALUE.sql => first(argNames.head).alias(aggName) case FunctionDesc.FUNC_GROUPING => - if (rel.getGroupSet.get(call.getArgList.get(0))) { - lit(0).alias(aggName) - } else { - lit(1).alias(aggName) - } + grouping(argNames.head).alias(aggName) case _ => throw new IllegalArgumentException( s"""Unsupported function name $funcName""") diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala index 3271f39..3d8fa02 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ValuesPlan.scala @@ -18,8 +18,7 @@ package org.apache.kylin.query.runtime.plans import org.apache.kylin.query.relnode.OLAPValuesRel -import org.apache.kylin.query.runtime.SparkOperation -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Row, SparkOperation} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.utils.SparkTypeUtil diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkOperation.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparkOperation.scala similarity index 75% rename from kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkOperation.scala rename to kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparkOperation.scala index 52d407e..44791df 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkOperation.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparkOperation.scala @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kylin.query.runtime +package org.apache.spark.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -39,7 +39,16 @@ object SparkOperation { } def agg(aggArgc: AggArgc): DataFrame = { - if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty) { + if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty && + !aggArgc.isSimpleGroup && aggArgc.groupSets.nonEmpty) { + // for grouping sets, group by cube or group by rollup + val groupingSets = GroupingSets(aggArgc.groupSets.map(groupSet => groupSet.map(_.expr)), + aggArgc.group.map(_.expr), + aggArgc.dataFrame.queryExecution.logical, + aggArgc.group.map(_.named) ++ aggArgc.agg.map(_.named) + ) + Dataset.ofRows(aggArgc.dataFrame.sparkSession, groupingSets) + } else if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty) { aggArgc.dataFrame .groupBy(aggArgc.group: _*) .agg(aggArgc.agg.head, aggArgc.agg.drop(1): _*) @@ -53,4 +62,5 @@ object SparkOperation { } } -case class AggArgc(dataFrame: DataFrame, group: List[Column], agg: List[Column]) \ No newline at end of file +case class AggArgc(dataFrame: DataFrame, group: List[Column], agg: List[Column], + groupSets: List[List[Column]], isSimpleGroup: Boolean) \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java index 02c6e8a..ce27ff2 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java @@ -172,10 +172,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest { //tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_extended_column")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_function")); - - // Not support yet - //tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_grouping")); - + tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_grouping")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_h2")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_hive")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_intersect_count"));