Repository: spark Updated Branches: refs/heads/branch-2.0 619a11426 -> 8b6ec9b91
Revert "[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter" This reverts commit 55a83724632aa54e49aedbab8ddd21d010eca26d. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b6ec9b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b6ec9b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b6ec9b9 Branch: refs/heads/branch-2.0 Commit: 8b6ec9b91e8f81729923ae0bcd0a9157f8894e4e Parents: 619a114 Author: Wenchen Fan <[email protected]> Authored: Sun Jun 12 16:53:39 2016 -0700 Committer: Wenchen Fan <[email protected]> Committed: Sun Jun 12 16:53:39 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 +++-------------- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +++++++++++- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 3 files changed, 16 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b6ec9b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 07c8bf0..4cbedbd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,17 +452,6 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => - // A partitioned relation's schema can be different from the input logicalPlan, since - // partition columns are all moved after data columns. We Project to adjust the ordering. - val input = if (parts.nonEmpty) { - val (inputPartCols, inputDataCols) = child.output.partition { attr => - parts.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, child) - } else { - child - } - val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -478,8 +467,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Partition columns are already correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table), child = input) + // Assume partition columns are correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table)) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -497,7 +486,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => - i.copy(table = EliminateSubqueryAliases(table), child = input) + i.copy(table = EliminateSubqueryAliases(table)) } case u: UnresolvedRelation => val table = u.tableIdentifier http://git-wip-us.apache.org/repos/asf/spark/blob/8b6ec9b9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8c05a7f..afae078 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -506,11 +506,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) val overwrite = mode == SaveMode.Overwrite + // A partitioned relation's schema can be different from the input logicalPlan, since + // partition columns are all moved after data columns. We Project to adjust the ordering. + // TODO: this belongs to the analyzer. + val input = normalizedParCols.map { parCols => + val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => + parCols.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, df.logicalPlan) + }.getOrElse(df.logicalPlan) + df.sparkSession.sessionState.executePlan( InsertIntoTable( UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), - df.logicalPlan, + input, overwrite, ifNotExists = false)).toRdd } http://git-wip-us.apache.org/repos/asf/spark/blob/8b6ec9b9/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 0a2bab4..e0f6ccf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1042,7 +1042,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(2, "Duplicated project detected\n" + analyzedPlan) { + assertResult(1, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size @@ -1061,7 +1061,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .queryExecution.analyzed } - assertResult(2, "Duplicated project detected\n" + analyzedPlan) { + assertResult(1, "Duplicated project detected\n" + analyzedPlan) { analyzedPlan.collect { case _: Project => () }.size --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
