This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5e28e9532978 [SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and
"spark.sql.files.ignoreInvalidPartitionPaths" configs to allow ignoring invalid
partition paths
5e28e9532978 is described below
commit 5e28e9532978fb80527605ea5cb31fa9125d7949
Author: Ivan Sadikov <[email protected]>
AuthorDate: Wed Jun 19 09:46:38 2024 +0800
[SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and
"spark.sql.files.ignoreInvalidPartitionPaths" configs to allow ignoring invalid
partition paths
### What changes were proposed in this pull request?
This PR adds a new data source config `ignoreInvalidPartitionPaths` and SQL
session configuration flag `spark.sql.files.ignoreInvalidPartitionPaths` to
control the behaviour of skipping invalid partition paths (base paths).
When the config is enabled, it allows skipping invalid paths such as:
```
table/
invalid/...
part=1/...
part=2/...
part=3/...
```
In this case, `table/invalid` path will be ignored.
Data source option takes precedence over the SQL config so with the code:
```scala
spark.conf.set("spark.sql.files.ignoreInvalidPartitionPaths", "false")
spark.read.format("parquet").option("ignoreInvalidPartitionPaths",
"true").load(...)
```
the query would ignore invalid partitions, i.e. the flag will be enabled.
The config is disabled by default.
### Why are the changes needed?
Allows ignoring invalid partition paths that cannot be parsed.
### Does this PR introduce _any_ user-facing change?
No. The added configs are disabled by default to have the exact same
behaviour as before.
### How was this patch tested?
I added a unit test for this.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47006 from sadikovi/SPARK-48649.
Authored-by: Ivan Sadikov <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++
.../execution/datasources/FileIndexOptions.scala | 1 +
.../datasources/PartitioningAwareFileIndex.scala | 10 +++-
.../execution/datasources/PartitioningUtils.scala | 10 ++--
.../sql/execution/datasources/FileIndexSuite.scala | 65 +++++++++++++++++++++-
.../parquet/ParquetPartitionDiscoverySuite.scala | 23 +++++---
6 files changed, 104 insertions(+), 15 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 25a2441e05fe..fd804bc0e986 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1977,6 +1977,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val IGNORE_INVALID_PARTITION_PATHS =
buildConf("spark.sql.files.ignoreInvalidPartitionPaths")
+ .doc("Whether to ignore invalid partition paths that do not match
<column>=<value>. When " +
+ "the option is enabled, table with two partition directories
'table/invalid' and " +
+ "'table/col=1' will only load the latter directory and ignore the
invalid partition")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
.doc("Maximum number of records to write out to a single file. " +
"If this value is zero or negative, there is no limit.")
@@ -5275,6 +5283,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
+ def ignoreInvalidPartitionPaths: Boolean =
getConf(IGNORE_INVALID_PARTITION_PATHS)
+
def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
def useCompression: Boolean = getConf(COMPRESS_CACHED)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
index 1c352e3748f2..5a300dae4daa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
object FileIndexOptions extends DataSourceOptions {
val IGNORE_MISSING_FILES = newOption(FileSourceOptions.IGNORE_MISSING_FILES)
+ val IGNORE_INVALID_PARTITION_PATHS = newOption("ignoreInvalidPartitionPaths")
val TIME_ZONE = newOption(DateTimeUtils.TIMEZONE_OPTION)
val RECURSIVE_FILE_LOOKUP = newOption("recursiveFileLookup")
val BASE_PATH_PARAM = newOption("basePath")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index cc9f0d23bcb6..07be3f89872c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -70,6 +70,13 @@ abstract class PartitioningAwareFileIndex(
caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP,
"false").toBoolean
}
+ protected lazy val ignoreInvalidPartitionPaths: Boolean = {
+ caseInsensitiveMap
+ .get(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS)
+ .map(_.toBoolean)
+ .getOrElse(sparkSession.sessionState.conf.ignoreInvalidPartitionPaths)
+ }
+
override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]):
Seq[PartitionDirectory] = {
def isNonEmptyFile(f: FileStatus): Boolean = {
@@ -162,7 +169,8 @@ abstract class PartitioningAwareFileIndex(
userSpecifiedSchema = userSpecifiedSchema,
caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis,
validatePartitionColumns =
sparkSession.sessionState.conf.validatePartitionColumns,
- timeZoneId = timeZoneId)
+ timeZoneId = timeZoneId,
+ ignoreInvalidPartitionPaths = ignoreInvalidPartitionPaths)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 56cba0e0561d..3b2d601b81fb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -106,9 +106,10 @@ object PartitioningUtils extends SQLConfHelper {
userSpecifiedSchema: Option[StructType],
caseSensitive: Boolean,
validatePartitionColumns: Boolean,
- timeZoneId: String): PartitionSpec = {
+ timeZoneId: String,
+ ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema,
caseSensitive,
- validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId))
+ validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId),
ignoreInvalidPartitionPaths)
}
private[datasources] def parsePartitions(
@@ -118,7 +119,8 @@ object PartitioningUtils extends SQLConfHelper {
userSpecifiedSchema: Option[StructType],
caseSensitive: Boolean,
validatePartitionColumns: Boolean,
- zoneId: ZoneId): PartitionSpec = {
+ zoneId: ZoneId,
+ ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) {
val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name ->
f.dataType).toMap
if (!caseSensitive) {
@@ -171,7 +173,7 @@ object PartitioningUtils extends SQLConfHelper {
// TODO: Selective case sensitivity.
val discoveredBasePaths =
optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
assert(
- discoveredBasePaths.distinct.size == 1,
+ ignoreInvalidPartitionPaths || discoveredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
"If provided paths are partition directories, please set " +
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 110c330f1695..6399eb6da049 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -31,7 +31,7 @@ import org.mockito.Mockito.{mock, when}
import org.apache.spark.{SparkException, SparkRuntimeException}
import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -547,6 +547,66 @@ class FileIndexSuite extends SharedSparkSession {
assert(fileIndex.leafFileStatuses.toSeq == statuses)
}
+ test("SPARK-48649: Ignore invalid partitions") {
+ // Table:
+ // id part_col
+ // 1 1
+ // 2 2
+ val df = spark.range(1, 3, 1, 2).toDF("id")
+ .withColumn("part_col", col("id"))
+
+ withTempPath { directoryPath =>
+ df.write
+ .mode("overwrite")
+ .format("parquet")
+ .partitionBy("part_col")
+ .save(directoryPath.getCanonicalPath)
+
+ // Rename one of the folders.
+ new File(directoryPath, "part_col=1").renameTo(new File(directoryPath,
"undefined"))
+
+ // By default, we expect the invalid path assertion to trigger.
+ val ex = intercept[AssertionError] {
+ spark.read
+ .format("parquet")
+ .load(directoryPath.getCanonicalPath)
+ .collect()
+ }
+ assert(ex.getMessage.contains("Conflicting directory structures
detected"))
+
+ // With the config enabled, we should only read the valid partition.
+ withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
+ assert(
+ spark.read
+ .format("parquet")
+ .load(directoryPath.getCanonicalPath)
+ .collect() === Seq(Row(2, 2)))
+ }
+
+ // Data source option override takes precedence.
+ withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
+ val ex = intercept[AssertionError] {
+ spark.read
+ .format("parquet")
+ .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "false")
+ .load(directoryPath.getCanonicalPath)
+ .collect()
+ }
+ assert(ex.getMessage.contains("Conflicting directory structures
detected"))
+ }
+
+ // Data source option override takes precedence.
+ withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "false") {
+ assert(
+ spark.read
+ .format("parquet")
+ .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "true")
+ .load(directoryPath.getCanonicalPath)
+ .collect() === Seq(Row(2, 2)))
+ }
+ }
+ }
+
test("expire FileStatusCache if TTL is configured") {
val previousValue =
SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
try {
@@ -585,9 +645,10 @@ class FileIndexSuite extends SharedSparkSession {
}
test("SPARK-40667: validate FileIndex Options") {
- assert(FileIndexOptions.getAllOptions.size == 7)
+ assert(FileIndexOptions.getAllOptions.size == 8)
// Please add validation on any new FileIndex options here
assert(FileIndexOptions.isValidOption("ignoreMissingFiles"))
+ assert(FileIndexOptions.isValidOption("ignoreInvalidPartitionPaths"))
assert(FileIndexOptions.isValidOption("timeZone"))
assert(FileIndexOptions.isValidOption("recursiveFileLookup"))
assert(FileIndexOptions.isValidOption("basePath"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 183c4f71df6c..a6ad147c865d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -112,7 +112,8 @@ abstract class ParquetPartitionDiscoverySuite
"hdfs://host:9000/path/a=10.5/b=hello")
var exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None,
true, true, timeZoneId)
+ parsePartitions(
+ paths.map(new Path(_)), true, Set.empty[Path], None, true, true,
timeZoneId, false)
}
assert(exception.getMessage().contains("Conflicting directory structures
detected"))
@@ -129,7 +130,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
- timeZoneId)
+ timeZoneId,
+ false)
// Valid
paths = Seq(
@@ -145,7 +147,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
- timeZoneId)
+ timeZoneId,
+ false)
// Valid
paths = Seq(
@@ -161,7 +164,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
- timeZoneId)
+ timeZoneId,
+ false)
// Invalid
paths = Seq(
@@ -177,7 +181,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
- timeZoneId)
+ timeZoneId,
+ false)
}
assert(exception.getMessage().contains("Conflicting directory structures
detected"))
@@ -200,7 +205,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
- timeZoneId)
+ timeZoneId,
+ false)
}
assert(exception.getMessage().contains("Conflicting directory structures
detected"))
}
@@ -296,7 +302,8 @@ abstract class ParquetPartitionDiscoverySuite
None,
true,
true,
- timeZoneId)
+ timeZoneId,
+ false)
assert(actualSpec.partitionColumns === spec.partitionColumns)
assert(actualSpec.partitions.length === spec.partitions.length)
actualSpec.partitions.zip(spec.partitions).foreach { case (actual,
expected) =>
@@ -427,7 +434,7 @@ abstract class ParquetPartitionDiscoverySuite
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
val actualSpec =
parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None,
- true, true, timeZoneId)
+ true, true, timeZoneId, false)
assert(actualSpec === spec)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]