Repository: spark
Updated Branches:
refs/heads/master 09fcf96b8 -> cc465fd92
[SPARK-8138] [SQL] Improves error message when conflicting partition columns
are found
This PR improves the error message shown when conflicting partition column
names are detected. This can be particularly annoying and confusing when there
are a large number of partitions while a handful of them happened to contain
unexpected temporary file(s). Now all suspicious directories are listed as
below:
```
java.lang.AssertionError: assertion failed: Conflicting partition column names
detected:
Partition column name list #0: b, c, d
Partition column name list #1: b, c
Partition column name list #2: b
For partitioned table directories, data files should only live in leaf
directories. Please check the following directories for unexpected files:
file:/tmp/foo/b=0
file:/tmp/foo/b=1
file:/tmp/foo/b=1/c=1
file:/tmp/foo/b=0/c=0
```
Author: Cheng Lian <[email protected]>
Closes #6610 from liancheng/part-errmsg and squashes the following commits:
7d05f2c [Cheng Lian] Fixes Scala style issue
a149250 [Cheng Lian] Adds test case for the error message
6b74dd8 [Cheng Lian] Also lists suspicious non-leaf partition directories
a935eb8 [Cheng Lian] Improves error message when conflicting partition columns
are found
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc465fd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc465fd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc465fd9
Branch: refs/heads/master
Commit: cc465fd92482737c21971d82e30d4cf247acf932
Parents: 09fcf96
Author: Cheng Lian <[email protected]>
Authored: Wed Jun 24 02:17:12 2015 -0700
Committer: Cheng Lian <[email protected]>
Committed: Wed Jun 24 02:17:12 2015 -0700
----------------------------------------------------------------------
.../spark/sql/sources/PartitioningUtils.scala | 47 +++++++++++++++-----
.../ParquetPartitionDiscoverySuite.scala | 45 +++++++++++++++++++
2 files changed, 82 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cc465fd9/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c6f535d..8b2a45d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions
have the same
// partition columns and resolve potential type conflicts.
- val resolvedPartitionValues =
resolvePartitions(pathsWithPartitionValues.map(_._2))
+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
// Creates the StructType which represents the partition columns.
val fields = {
@@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
* StringType
* }}}
*/
- private[sql] def resolvePartitions(values: Seq[PartitionValues]):
Seq[PartitionValues] = {
- // Column names of all partitions must match
- val distinctPartitionsColNames = values.map(_.columnNames).distinct
-
- if (distinctPartitionsColNames.isEmpty) {
+ private[sql] def resolvePartitions(
+ pathsWithPartitionValues: Seq[(Path, PartitionValues)]):
Seq[PartitionValues] = {
+ if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
- assert(distinctPartitionsColNames.size == 1, {
- val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
- s"Conflicting partition column names detected:\n$list"
- })
+ val distinctPartColNames =
pathsWithPartitionValues.map(_._2.columnNames).distinct
+ assert(
+ distinctPartColNames.size == 1,
+ listConflictingPartitionColumns(pathsWithPartitionValues))
// Resolves possible type conflicts for each column
+ val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
@@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
}
}
+ private[sql] def listConflictingPartitionColumns(
+ pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+ val distinctPartColNames =
pathWithPartitionValues.map(_._2.columnNames).distinct
+
+ def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+ seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value)
=> value })
+
+ val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
+ case (path, partValues) => partValues.columnNames -> path
+ })
+
+ val distinctPartColLists = distinctPartColNames.map(_.mkString(",
")).zipWithIndex.map {
+ case (names, index) =>
+ s"Partition column name list #$index: $names"
+ }
+
+ // Lists out those non-leaf partition directories that also contain files
+ val suspiciousPaths =
distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
+
+ s"Conflicting partition column names detected:\n" +
+ distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
+ "For partitioned table directories, data files should only live in leaf
directories.\n" +
+ "And directories at the same level should have the same partition column
name.\n" +
+ "Please check the following directories for unexpected files or " +
+ "inconsistent partition column names:\n" +
+ suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+ }
+
/**
* Converts a string to a [[Literal]] with automatic type inference.
Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]],
and
http://git-wip-us.apache.org/repos/asf/spark/blob/cc465fd9/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 01df189..d0ebb11 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest
with ParquetTest {
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
}
}
+
+ test("listConflictingPartitionColumns") {
+ def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]):
String = {
+ val conflictingColNameLists = colNameLists.zipWithIndex.map { case
(list, index) =>
+ s"\tPartition column name list #$index: $list"
+ }.mkString("\n", "\n", "\n")
+
+ // scalastyle:off
+ s"""Conflicting partition column names detected:
+ |$conflictingColNameLists
+ |For partitioned table directories, data files should only live in
leaf directories.
+ |And directories at the same level should have the same partition
column name.
+ |Please check the following directories for unexpected files or
inconsistent partition column names:
+ |${paths.map("\t" + _).mkString("\n", "\n", "")}
+ """.stripMargin.trim
+ // scalastyle:on
+ }
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"),
Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"),
Seq(Literal(1)))))).trim ===
+ makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1",
"file:/tmp/foo/b=1")))
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"),
Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"),
Seq(Literal(1)))))).trim ===
+ makeExpectedMessage(
+ Seq("a"),
+ Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1"),
+ PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/a=1/b=foo"),
+ PartitionValues(Seq("a", "b"), Seq(Literal(1),
Literal("foo")))))).trim ===
+ makeExpectedMessage(
+ Seq("a", "a, b"),
+ Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]