This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b404e02 [SPARK-27476][SQL] Refactoring SchemaPruning rule to remove duplicate code b404e02 is described below commit b404e02574084c5ab550ce8716d4177464e7ce8c Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Tue Apr 16 14:50:37 2019 -0700 [SPARK-27476][SQL] Refactoring SchemaPruning rule to remove duplicate code ## What changes were proposed in this pull request? In SchemaPruning rule, there is duplicate code for data source v1 and v2. Their logic is the same and we can refactor the rule to remove duplicate code. ## How was this patch tested? Existing tests. Closes #24383 from viirya/SPARK-27476. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../sql/execution/datasources/SchemaPruning.scala | 100 ++++++++++----------- 2 files changed, 47 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f33cc86..3f59fa1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1551,7 +1551,7 @@ object SQLConf { .internal() .doc("Prune nested fields from a logical relation's output which are unnecessary in " + "satisfying a query. This optimization allows columnar file format readers to avoid " + - "reading unnecessary nested column data. Currently Parquet and ORC v1 are the " + + "reading unnecessary nested column data. Currently Parquet and ORC are the " + "data sources that implement this optimization.") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala index 15fdf65..463ee9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala @@ -50,73 +50,65 @@ object SchemaPruning extends Rule[LogicalPlan] { case op @ PhysicalOperation(projects, filters, l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _)) if canPruneRelation(hadoopFsRelation) => - val (normalizedProjects, normalizedFilters) = - normalizeAttributeRefNames(l.output, projects, filters) - val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) - - // If requestedRootFields includes a nested field, continue. Otherwise, - // return op - if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) { - val dataSchema = hadoopFsRelation.dataSchema - val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields) - - // If the data schema is different from the pruned data schema, continue. Otherwise, - // return op. We effect this comparison by counting the number of "leaf" fields in - // each schemata, assuming the fields in prunedDataSchema are a subset of the fields - // in dataSchema. - if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + + prunePhysicalColumns(l.output, projects, filters, hadoopFsRelation.dataSchema, + prunedDataSchema => { val prunedHadoopRelation = hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) - - val prunedRelation = buildPrunedRelation(l, prunedHadoopRelation) - val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) - - buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation, - projectionOverSchema) - } else { - op - } - } else { - op - } + buildPrunedRelation(l, prunedHadoopRelation) + }).getOrElse(op) case op @ PhysicalOperation(projects, filters, d @ DataSourceV2Relation(table: FileTable, output, _)) if canPruneTable(table) => - val (normalizedProjects, normalizedFilters) = - normalizeAttributeRefNames(output, projects, filters) - val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) - - // If requestedRootFields includes a nested field, continue. Otherwise, - // return op - if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) { - val dataSchema = table.dataSchema - val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields) - - // If the data schema is different from the pruned data schema, continue. Otherwise, - // return op. We effect this comparison by counting the number of "leaf" fields in - // each schemata, assuming the fields in prunedDataSchema are a subset of the fields - // in dataSchema. - if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + + prunePhysicalColumns(output, projects, filters, table.dataSchema, + prunedDataSchema => { val prunedFileTable = table match { case o: OrcTable => o.copy(userSpecifiedSchema = Some(prunedDataSchema)) case _ => val message = s"${table.formatName} data source doesn't support schema pruning." throw new AnalysisException(message) } + buildPrunedRelationV2(d, prunedFileTable) + }).getOrElse(op) + } - - val prunedRelationV2 = buildPrunedRelationV2(d, prunedFileTable) - val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) - - buildNewProjection(normalizedProjects, normalizedFilters, prunedRelationV2, - projectionOverSchema) - } else { - op - } - } else { - op - } + /** + * This method returns optional logical plan. `None` is returned if no nested field is required or + * all nested fields are required. + */ + private def prunePhysicalColumns( + output: Seq[AttributeReference], + projects: Seq[NamedExpression], + filters: Seq[Expression], + dataSchema: StructType, + leafNodeBuilder: StructType => LeafNode): Option[LogicalPlan] = { + val (normalizedProjects, normalizedFilters) = + normalizeAttributeRefNames(output, projects, filters) + val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) + + // If requestedRootFields includes a nested field, continue. Otherwise, + // return op + if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt }) { + val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return op. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in prunedDataSchema are a subset of the fields + // in dataSchema. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val prunedRelation = leafNodeBuilder(prunedDataSchema) + val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + + Some(buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation, + projectionOverSchema)) + } else { + None + } + } else { + None } + } /** * Checks to see if the given relation can be pruned. Currently we support Parquet and ORC v1. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org