This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new d4cd28d KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner d4cd28d is described below commit d4cd28d108c8f16aa1546434a9f311002d2d8010 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Mon Oct 19 16:13:13 2020 +0800 KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduceLeft' when there are cast expressions in the filters of FilePruner When execute function 'pruneSegments' of FilePruner, if there are some cast expressions in filter, it will throw exception 'UnsupportedOperationException: empty.reduceLeft'. Solution: Convert cast expressions in filter to attribute before translating filter. --- .../resources/query/sql_castprunesegs/query01.sql | 22 +++++ .../resources/query/sql_castprunesegs/query02.sql | 22 +++++ .../resources/query/sql_castprunesegs/query03.sql | 31 +++++++ .../sql/execution/datasource/FilePruner.scala | 98 +++++++++++++++------- .../kylin/engine/spark2/NBuildAndQueryTest.java | 1 + 5 files changed, 145 insertions(+), 29 deletions(-) diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql new file mode 100644 index 0000000..c0e64e3 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +SELECT sum(price) as sum_price + FROM TEST_KYLIN_FACT + WHERE CAL_DT > cast(TIMESTAMPADD(Day, -15000, CURRENT_DATE) as DATE) +GROUP BY CAL_DT +;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql new file mode 100644 index 0000000..5ea4572 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +SELECT sum(price) as sum_price + FROM TEST_KYLIN_FACT + WHERE CAL_DT > '2013-06-01' +GROUP BY CAL_DT +;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144} \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql b/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql new file mode 100644 index 0000000..99c92b2 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql @@ -0,0 +1,31 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select test_cal_dt.cal_dt, sum(test_kylin_fact.price) as GMV + , count(*) as TRANS_CNT + from test_kylin_fact +inner JOIN edw.test_cal_dt as test_cal_dt + ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt + inner JOIN test_category_groupings + ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id + inner JOIN edw.test_sites as test_sites + ON test_kylin_fact.lstg_site_id = test_sites.site_id + where + extract(DAY from test_cal_dt.cal_dt) = 12 + group by test_cal_dt.cal_dt +;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 2ecd4fd..70b7956 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -260,47 +260,54 @@ class FilePruner( val filteredStatuses = if (filters.isEmpty) { segDirs } else { - val reducedFilter = filters.flatMap(DataSourceStrategy.translateFilter).reduceLeft(And) - segDirs.filter { - e => { - val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange - SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match { - case Trivial(true) => true - case Trivial(false) => false + val translatedFilter = filters.map(filter => convertCastFilter(filter)) + .flatMap(DataSourceStrategy.translateFilter) + if (translatedFilter.isEmpty) { + logInfo("Can not use filters to prune segments.") + segDirs + } else { + val reducedFilter = translatedFilter.reduceLeft(And) + val pruned = segDirs.filter { + e => { + val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange + SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match { + case Trivial(true) => true + case Trivial(false) => false + } } } + logInfo(s"Selected files after segments pruning:" + pruned.map(_.segmentName)) + pruned } } - logInfo(s"Selected files after segments pruning:" + filteredStatuses.map(_.segmentName)) filteredStatuses } - private def pruneShards( - filters: Seq[Expression], - segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { - val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) { - segDirs - } else { - val normalizedFiltersAndExpr = filters.reduce(expressions.And) + private def pruneShards(filters: Seq[Expression], + segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { + val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) { + segDirs + } else { + val normalizedFiltersAndExpr = filters.reduce(expressions.And) - val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) => - val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY); - val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt - require(partitionNumber > 0, "Shards num with shard by col should greater than 0.") + val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) => + val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY); + val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt + require(partitionNumber > 0, "Shards num with shard by col should greater than 0.") - val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber) + val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber) - val selected = files.filter(f => { - val partitionId = FilePruner.getPartitionId(f.getPath) - bitSet.get(partitionId) - }) - SegmentDirectory(segName, segIdentifier, selected) - } - logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";")) - pruned + val selected = files.filter(f => { + val partitionId = FilePruner.getPartitionId(f.getPath) + bitSet.get(partitionId) + }) + SegmentDirectory(segName, segIdentifier, selected) } - filteredStatuses + logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";")) + pruned } + filteredStatuses + } override lazy val inputFiles: Array[String] = Array.empty[String] @@ -358,6 +365,39 @@ class FilePruner( matchedShards } } + + // translate for filter type match + private def convertCastFilter(filter: Expression): Expression = { + filter match { + case expressions.EqualTo(expressions.Cast(a: Attribute, _, _), Literal(v, t)) => + expressions.EqualTo(a, Literal(v, t)) + case expressions.EqualTo(Literal(v, t), expressions.Cast(a: Attribute, _, _)) => + expressions.EqualTo(Literal(v, t), a) + case expressions.GreaterThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) => + expressions.GreaterThan(a, Literal(v, t)) + case expressions.GreaterThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) => + expressions.GreaterThan(Literal(v, t), a) + case expressions.LessThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) => + expressions.LessThan(a, Literal(v, t)) + case expressions.LessThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) => + expressions.LessThan(Literal(v, t), a) + case expressions.GreaterThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) => + expressions.GreaterThanOrEqual(a, Literal(v, t)) + case expressions.GreaterThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) => + expressions.GreaterThanOrEqual(Literal(v, t), a) + case expressions.LessThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) => + expressions.LessThanOrEqual(a, Literal(v, t)) + case expressions.LessThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) => + expressions.LessThanOrEqual(Literal(v, t), a) + case expressions.Or(left, right) => + expressions.Or(convertCastFilter(left), convertCastFilter(right)) + case expressions.And(left, right) => + expressions.And(convertCastFilter(left), convertCastFilter(right)) + case expressions.Not(child) => + expressions.Not(convertCastFilter(child)) + case _ => filter + } + } } object FilePruner { diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java index fd70f9b..df72e8a 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java @@ -153,6 +153,7 @@ public class NBuildAndQueryTest extends LocalWithSparkSessionTest { tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_lookup")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_casewhen")); + tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_castprunesegs")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_like")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_cache"));