Repository: spark
Updated Branches:
refs/heads/master 3d010c837 -> ce3b98bae
[SPARK-16034][SQL] Checks the partition columns when calling
dataFrame.write.mode("append").saveAsTable
## What changes were proposed in this pull request?
`DataFrameWriter` can be used to append data to existing data source tables. It
becomes tricky when partition columns used in
`DataFrameWriter.partitionBy(columns)` don't match the actual partition columns
of the underlying table. This pull request enforces the check so that the
partition columns of these two always match.
## How was this patch tested?
Unit test.
Author: Sean Zhong <[email protected]>
Closes #13749 from clockfly/SPARK-16034.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce3b98ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce3b98ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce3b98ba
Branch: refs/heads/master
Commit: ce3b98bae28af72299722f56e4e4ef831f471ec0
Parents: 3d010c8
Author: Sean Zhong <[email protected]>
Authored: Sat Jun 18 10:41:33 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Sat Jun 18 10:41:33 2016 -0700
----------------------------------------------------------------------
.../command/createDataSourceTables.scala | 9 ++++-
.../sql/execution/datasources/DataSource.scala | 39 ++++++++++----------
.../spark/sql/execution/command/DDLSuite.scala | 24 ++++++++++++
3 files changed, 50 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ce3b98ba/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 4918780..c38eca5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand(
bucketSpec = bucketSpec,
options = optionsWithPath)
- val result = dataSource.write(mode, df)
-
+ val result = try {
+ dataSource.write(mode, df)
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to table ${tableIdent.identifier} in $mode
mode", ex)
+ throw ex
+ }
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the
table (instead of
// the schema of df). It is important since the nullability may be
changed by the relation
http://git-wip-us.apache.org/repos/asf/spark/blob/ce3b98ba/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 7f3683f..f274fc7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -435,26 +435,25 @@ case class DataSource(
// If we are appending to a table that already exists, make sure the
partitioning matches
// up. If we fail to load the table for whatever reason, ignore the
check.
if (mode == SaveMode.Append) {
- val existingPartitionColumnSet = try {
- Some(
- resolveRelation()
- .asInstanceOf[HadoopFsRelation]
- .location
- .partitionSpec()
- .partitionColumns
- .fieldNames
- .toSet)
- } catch {
- case e: Exception =>
- None
- }
-
- existingPartitionColumnSet.foreach { ex =>
- if (ex.map(_.toLowerCase) !=
partitionColumns.map(_.toLowerCase()).toSet) {
- throw new AnalysisException(
- s"Requested partitioning does not equal existing partitioning:
" +
- s"$ex != ${partitionColumns.toSet}.")
- }
+ val existingColumns = Try {
+ resolveRelation()
+ .asInstanceOf[HadoopFsRelation]
+ .location
+ .partitionSpec()
+ .partitionColumns
+ .fieldNames
+ .toSeq
+ }.getOrElse(Seq.empty[String])
+ val sameColumns =
+ existingColumns.map(_.toLowerCase) ==
partitionColumns.map(_.toLowerCase)
+ if (existingColumns.size > 0 && !sameColumns) {
+ throw new AnalysisException(
+ s"""Requested partitioning does not match existing partitioning.
+ |Existing partitioning columns:
+ | ${existingColumns.mkString(", ")}
+ |Requested partitioning columns:
+ | ${partitionColumns.mkString(", ")}
+ |""".stripMargin)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ce3b98ba/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 7eb2fff..8827649 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext
with BeforeAndAfterEach {
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
}
+ test("SPARK-16034 Partition columns should match when appending to existing
data source tables") {
+ import testImplicits._
+ val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+ withTable("partitionedTable") {
+ df.write.mode("overwrite").partitionBy("a",
"b").saveAsTable("partitionedTable")
+ // Misses some partition columns
+ intercept[AnalysisException] {
+
df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable")
+ }
+ // Wrong order
+ intercept[AnalysisException] {
+ df.write.mode("append").partitionBy("b",
"a").saveAsTable("partitionedTable")
+ }
+ // Partition columns not specified
+ intercept[AnalysisException] {
+ df.write.mode("append").saveAsTable("partitionedTable")
+ }
+ assert(sql("select * from partitionedTable").collect().size == 1)
+ // Inserts new data successfully when partition columns are correctly
specified in
+ // partitionBy(...).
+ df.write.mode("append").partitionBy("a",
"b").saveAsTable("partitionedTable")
+ assert(sql("select * from partitionedTable").collect().size == 2)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]