This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1e08d51b758c5f8975a743c14fecc42c39931ba8 Author: Dorris Zhang <ruixuan.zh...@kyligence.io> AuthorDate: Mon Nov 28 13:23:01 2022 +0800 KYLIN-5430 skip shard pruning for in expr --- .../main/java/org/apache/kylin/common/KylinConfigBase.java | 4 ++++ .../apache/spark/sql/execution/datasource/FilePruner.scala | 14 +++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 53ca8c6e00..4087654ce8 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3741,4 +3741,8 @@ public abstract class KylinConfigBase implements Serializable { public int getProjectMergeRuleBloatThreshold() { return Integer.parseInt(getOptional("kylin.query.project-merge-bloat-threshold", "0")); } + + public boolean skipShardPruningForInExpr() { + return Boolean.parseBoolean(getOptional("kylin.query.skip-shard-pruning-for-in", FALSE)); + } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 8e8c046410..b658a0bd7e 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -455,8 +455,14 @@ class FilePruner(val session: SparkSession, def getShardSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { val matchedShards = new BitSet(numShards) - iter.map(v => getShardNumber(attr, v)) - .foreach(shardNum => matchedShards.set(shardNum)) + val prj = options.getOrElse("project", sys.error("project option is required")) + val skipShardPruning = NProjectManager.getProjectConfig(prj).skipShardPruningForInExpr && iter.size > 256 + if (skipShardPruning) { + matchedShards.setUntil(matchedShards.capacity) + } else { + iter.map(v => getShardNumber(attr, v)) + .foreach(shardNum => matchedShards.set(shardNum)) + } matchedShards } @@ -738,7 +744,9 @@ case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], di * blocks are always non-empty. */ - def escapeQuote(colName: String): String = {s"${colName.replace("`", "")}"} + def escapeQuote(colName: String): String = { + s"${colName.replace("`", "")}" + } def foldFilter(filter: Filter): Filter = { filter match {