This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fd3069ab113c [SPARK-49163][SQL] Attempt to create table based on
broken parquet partition data results should return user-facing error
fd3069ab113c is described below
commit fd3069ab113c12a6dfa338abfd8c99acd707dfbd
Author: Nikola Mandic <[email protected]>
AuthorDate: Fri Aug 9 21:50:48 2024 +0800
[SPARK-49163][SQL] Attempt to create table based on broken parquet
partition data results should return user-facing error
### What changes were proposed in this pull request?
Create an example parquet table with partitions and insert data in Spark:
```
create table t(col1 string, col2 string, col3 string) using parquet
location 'some/path/parquet-test' partitioned by (col1, col2);
insert into t (col1, col2, col3) values ('a', 'b', 'c');
```
Go into the `parquet-test` path in the filesystem and try to copy parquet
data file from path `col1=a/col2=b` directory into `col1=a`. After that, try to
create new table based on parquet data in Spark:
```
create table broken_table using parquet location 'some/path/parquet-test';
```
This query errors with internal error. Stack trace excerpts:
```
org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command
failed. You hit a bug in Spark or the Spark plugins you use. Please, report
this bug to the corresponding communities or vendors, and provide the full
stack trace. SQLSTATE: XX000
...
Caused by: java.lang.AssertionError: assertion failed: Conflicting
partition column names detected: Partition column name list #0: col1
Partition column name list #1: col1, col2For partitioned table
directories, data files should only live in leaf directories.
And directories at the same level should have the same partition column
name.
Please check the following directories for unexpected files or inconsistent
partition column names: file:some/path/parquet-test/col1=a
file:some/path/parquet-test/col1=a/col2=b
at scala.Predef$.assert(Predef.scala:279)
at
org.apache.spark.sql.execution.datasources.PartitioningUtils$.resolvePartitions(PartitioningUtils.scala:391)
...
```
Fix this by changing internal error to user-facing error.
### Why are the changes needed?
Replace internal error with user-facing one for valid sequence of Spark SQL
operations.
### Does this PR introduce _any_ user-facing change?
Yes, it presents the user with regular error instead of internal error.
### How was this patch tested?
Added checks to `ParquetPartitionDiscoverySuite` which simulate the
described scenario by manually breaking parquet table in the filesystem.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47668 from nikolamand-db/SPARK-49163.
Authored-by: Nikola Mandic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 11 +++
.../spark/sql/errors/QueryExecutionErrors.scala | 12 +++
.../execution/datasources/PartitioningUtils.scala | 20 ++--
.../sql/execution/datasources/FileIndexSuite.scala | 2 +-
.../parquet/ParquetPartitionDiscoverySuite.scala | 110 ++++++++++++++-------
5 files changed, 108 insertions(+), 47 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 4766c7790915..3512fe34e92a 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -625,6 +625,17 @@
],
"sqlState" : "40000"
},
+ "CONFLICTING_PARTITION_COLUMN_NAMES" : {
+ "message" : [
+ "Conflicting partition column names detected:",
+ "<distinctPartColLists>",
+ "For partitioned table directories, data files should only live in leaf
directories.",
+ "And directories at the same level should have the same partition column
name.",
+ "Please check the following directories for unexpected files or
inconsistent partition column names:",
+ "<suspiciousPaths>"
+ ],
+ "sqlState" : "KD009"
+ },
"CONNECT" : {
"message" : [
"Generic Spark Connect error."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 9bfb81ad821b..eb25387af5a7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2837,4 +2837,16 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
"parameter" -> toSQLId("unit"),
"invalidValue" -> s"'$invalidValue'"))
}
+
+ def conflictingPartitionColumnNamesError(
+ distinctPartColLists: Seq[String],
+ suspiciousPaths: Seq[Path]): SparkRuntimeException = {
+ new SparkRuntimeException(
+ errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+ messageParameters = Map(
+ "distinctPartColLists" -> distinctPartColLists.mkString("\n\t",
"\n\t", "\n"),
+ "suspiciousPaths" -> suspiciousPaths.map("\t" + _).mkString("\n",
"\n", "")
+ )
+ )
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 3b2d601b81fb..676a2ab64d0a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -386,9 +387,9 @@ object PartitioningUtils extends SQLConfHelper {
} else {
pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
}
- assert(
- partColNames.distinct.size == 1,
- listConflictingPartitionColumns(pathsWithPartitionValues))
+ if (partColNames.distinct.size != 1) {
+ throw conflictingPartitionColumnsError(pathsWithPartitionValues)
+ }
// Resolves possible type conflicts for each column
val values = pathsWithPartitionValues.map(_._2)
@@ -404,8 +405,8 @@ object PartitioningUtils extends SQLConfHelper {
}
}
- private[datasources] def listConflictingPartitionColumns(
- pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+ private[datasources] def conflictingPartitionColumnsError(
+ pathWithPartitionValues: Seq[(Path, PartitionValues)]):
SparkRuntimeException = {
val distinctPartColNames =
pathWithPartitionValues.map(_._2.columnNames).distinct
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
@@ -423,13 +424,8 @@ object PartitioningUtils extends SQLConfHelper {
// Lists out those non-leaf partition directories that also contain files
val suspiciousPaths =
distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
- s"Conflicting partition column names detected:\n" +
- distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
- "For partitioned table directories, data files should only live in leaf
directories.\n" +
- "And directories at the same level should have the same partition column
name.\n" +
- "Please check the following directories for unexpected files or " +
- "inconsistent partition column names:\n" +
- suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+ QueryExecutionErrors.conflictingPartitionColumnNamesError(
+ distinctPartColLists, suspiciousPaths)
}
// scalastyle:off line.size.limit
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 6399eb6da049..21623f94c8ba 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -112,7 +112,7 @@ class FileIndexSuite extends SharedSparkSession {
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val msg = intercept[AssertionError] {
+ val msg = intercept[SparkRuntimeException] {
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty,
None)
fileIndex.partitionSpec()
}.getMessage
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index a6ad147c865d..1484511a98b6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -27,7 +27,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkRuntimeException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
@@ -958,54 +958,58 @@ abstract class ParquetPartitionDiscoverySuite
}
}
- test("listConflictingPartitionColumns") {
- def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]):
String = {
- val conflictingColNameLists = colNameLists.zipWithIndex.map { case
(list, index) =>
- s"\tPartition column name list #$index: $list"
- }.mkString("\n", "\n", "\n")
-
- // scalastyle:off
- s"""Conflicting partition column names detected:
- |$conflictingColNameLists
- |For partitioned table directories, data files should only live in
leaf directories.
- |And directories at the same level should have the same partition
column name.
- |Please check the following directories for unexpected files or
inconsistent partition column names:
- |${paths.map("\t" + _).mkString("\n", "\n", "")}
- """.stripMargin.trim
- // scalastyle:on
- }
-
- assert(
- listConflictingPartitionColumns(
+ test("conflictingPartitionColumnsError") {
+ checkError(
+ exception = conflictingPartitionColumnsError(
Seq(
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
(new Path("file:/tmp/foo/b=1"),
- PartitionValues(Seq("b"), Seq(TypedPartValue("1",
IntegerType)))))).trim ===
- makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1",
"file:/tmp/foo/b=1")))
+ PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType))))
+ )
+ ),
+ errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+ parameters = Map(
+ "distinctPartColLists" ->
+ "\n\tPartition column name list #0: a\n\tPartition column name list
#1: b\n",
+ "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/b=1"
+ )
+ )
- assert(
- listConflictingPartitionColumns(
+ checkError(
+ exception = conflictingPartitionColumnsError(
Seq(
(new Path("file:/tmp/foo/a=1/_temporary"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
(new Path("file:/tmp/foo/a=1"),
- PartitionValues(Seq("a"), Seq(TypedPartValue("1",
IntegerType)))))).trim ===
- makeExpectedMessage(
- Seq("a"),
- Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+ PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType))))
+ )
+ ),
+ errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+ parameters = Map(
+ "distinctPartColLists" ->
+ "\n\tPartition column name list #0: a\n",
+ "suspiciousPaths" ->
"\n\tfile:/tmp/foo/a=1/_temporary\n\tfile:/tmp/foo/a=1"
+ )
+ )
- assert(
- listConflictingPartitionColumns(
+ checkError(
+ exception = conflictingPartitionColumnsError(
Seq(
(new Path("file:/tmp/foo/a=1"),
PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))),
(new Path("file:/tmp/foo/a=1/b=foo"),
PartitionValues(Seq("a", "b"),
- Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo",
StringType)))))).trim ===
- makeExpectedMessage(
- Seq("a", "a, b"),
- Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+ Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo",
StringType))))
+ )
+ ),
+ errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES",
+ parameters = Map(
+ "distinctPartColLists" ->
+ "\n\tPartition column name list #0: a\n\tPartition column name list
#1: a, b\n",
+ "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/a=1/b=foo"
+ )
+ )
}
test("Parallel partition discovery") {
@@ -1145,6 +1149,44 @@ abstract class ParquetPartitionDiscoverySuite
checkAnswer(res, Seq(Row(1, 2, 3, 4.0f)))
}
}
+
+ test("SPARK-49163: Attempt to create table based on broken parquet partition
data") {
+ withTempDir { dir =>
+ val data = Seq[(String, String, String)](("a", "b", "c"))
+ data
+ .toDF("col1", "col2", "col3")
+ .write
+ .mode("overwrite")
+ .partitionBy("col1", "col2")
+ .parquet(dir.getCanonicalPath)
+
+ // Structure of parquet table in filesystem:
+ // <base>
+ // +- col1=a
+ // +- col2=b
+ // |- part-00000.parquet
+
+ val partition = new File(dir, "col1=a")
+ val dummyData = new File(partition, "dummy")
+ dummyData.createNewFile()
+
+ // Structure of parquet table in filesystem is now corrupt:
+ // <base>
+ // +- col1=a
+ // |- dummy
+ // +- col2=b
+ // |- part-00000.parquet
+
+ val exception = intercept[SparkRuntimeException] {
+ spark.read.parquet(dir.toString)
+ }
+ val msg = exception.getMessage
+ assert(exception.getErrorClass === "CONFLICTING_PARTITION_COLUMN_NAMES")
+ // Partitions inside the error message can be presented in any order
+ assert("Partition column name list #[0-1]:
col1".r.findFirstIn(msg).isDefined)
+ assert("Partition column name list #[0-1]: col1,
col2".r.findFirstIn(msg).isDefined)
+ }
+ }
}
class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]