Repository: spark Updated Branches: refs/heads/master 9eb095243 -> 8b5b2e272
[SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePartitions rule. ## What changes were proposed in this pull request? After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed. ## How was this patch tested? add unit test. Author: lianhuiwang <[email protected]> Closes #18205 from lianhuiwang/SPARK-20986. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b5b2e27 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b5b2e27 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b5b2e27 Branch: refs/heads/master Commit: 8b5b2e272f48f7ddf8aeece0205cb4a5853c364e Parents: 9eb0952 Author: lianhuiwang <[email protected]> Authored: Wed Jun 14 09:57:56 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Wed Jun 14 09:57:56 2017 +0800 ---------------------------------------------------------------------- .../datasources/PruneFileSourcePartitions.scala | 8 +++++-- .../PruneFileSourcePartitionsSuite.scala | 25 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b5b2e27/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 905b868..f5df184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -59,8 +60,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) - + // Change table stats based on the sizeInBytes of pruned files + val withStats = logicalRelation.catalogTable.map(_.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) + val prunedLogicalRelation = logicalRelation.copy( + relation = prunedFsRelation, catalogTable = withStats) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) val filter = Filter(filterExpression, prunedLogicalRelation) http://git-wip-us.apache.org/repos/asf/spark/blob/8b5b2e27/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index f818e29..d91f25a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -66,4 +67,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te } } } + + test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { + withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") + sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") + val tableStats = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")).stats + assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost") + + val df = sql("SELECT * FROM tbl WHERE p = 1") + val sizes1 = df.queryExecution.analyzed.collect { + case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") + assert(sizes1(0) == tableStats.get.sizeInBytes) + + val relations = df.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => relation + } + assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}") + val size2 = relations(0).computeStats(conf).sizeInBytes + assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes) + assert(size2 < tableStats.get.sizeInBytes) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
