This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e28e9532978 [SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and 
"spark.sql.files.ignoreInvalidPartitionPaths" configs to allow ignoring invalid 
partition paths
5e28e9532978 is described below

commit 5e28e9532978fb80527605ea5cb31fa9125d7949
Author: Ivan Sadikov <[email protected]>
AuthorDate: Wed Jun 19 09:46:38 2024 +0800

    [SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and 
"spark.sql.files.ignoreInvalidPartitionPaths" configs to allow ignoring invalid 
partition paths
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new data source config `ignoreInvalidPartitionPaths` and SQL 
session configuration flag `spark.sql.files.ignoreInvalidPartitionPaths` to 
control the behaviour of skipping invalid partition paths (base paths).
    
    When the config is enabled, it allows skipping invalid paths such as:
    ```
    table/
      invalid/...
      part=1/...
      part=2/...
      part=3/...
    ```
    In this case, `table/invalid` path will be ignored.
    
    Data source option takes precedence over the SQL config so with the code:
    ```scala
    spark.conf.set("spark.sql.files.ignoreInvalidPartitionPaths", "false")
    
    spark.read.format("parquet").option("ignoreInvalidPartitionPaths", 
"true").load(...)
    ```
    
    the query would ignore invalid partitions, i.e. the flag will be enabled.
    
    The config is disabled by default.
    
    ### Why are the changes needed?
    
    Allows ignoring invalid partition paths that cannot be parsed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The added configs are disabled by default to have the exact same 
behaviour as before.
    
    ### How was this patch tested?
    
    I added a unit test for this.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47006 from sadikovi/SPARK-48649.
    
    Authored-by: Ivan Sadikov <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 ++++
 .../execution/datasources/FileIndexOptions.scala   |  1 +
 .../datasources/PartitioningAwareFileIndex.scala   | 10 +++-
 .../execution/datasources/PartitioningUtils.scala  | 10 ++--
 .../sql/execution/datasources/FileIndexSuite.scala | 65 +++++++++++++++++++++-
 .../parquet/ParquetPartitionDiscoverySuite.scala   | 23 +++++---
 6 files changed, 104 insertions(+), 15 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 25a2441e05fe..fd804bc0e986 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1977,6 +1977,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val IGNORE_INVALID_PARTITION_PATHS = 
buildConf("spark.sql.files.ignoreInvalidPartitionPaths")
+    .doc("Whether to ignore invalid partition paths that do not match 
<column>=<value>. When " +
+      "the option is enabled, table with two partition directories 
'table/invalid' and " +
+      "'table/col=1' will only load the latter directory and ignore the 
invalid partition")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
   val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
     .doc("Maximum number of records to write out to a single file. " +
       "If this value is zero or negative, there is no limit.")
@@ -5275,6 +5283,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
 
+  def ignoreInvalidPartitionPaths: Boolean = 
getConf(IGNORE_INVALID_PARTITION_PATHS)
+
   def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
index 1c352e3748f2..5a300dae4daa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 
 object FileIndexOptions extends DataSourceOptions {
   val IGNORE_MISSING_FILES = newOption(FileSourceOptions.IGNORE_MISSING_FILES)
+  val IGNORE_INVALID_PARTITION_PATHS = newOption("ignoreInvalidPartitionPaths")
   val TIME_ZONE = newOption(DateTimeUtils.TIMEZONE_OPTION)
   val RECURSIVE_FILE_LOOKUP = newOption("recursiveFileLookup")
   val BASE_PATH_PARAM = newOption("basePath")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index cc9f0d23bcb6..07be3f89872c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -70,6 +70,13 @@ abstract class PartitioningAwareFileIndex(
     caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP, 
"false").toBoolean
   }
 
+  protected lazy val ignoreInvalidPartitionPaths: Boolean = {
+    caseInsensitiveMap
+      .get(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS)
+      .map(_.toBoolean)
+      .getOrElse(sparkSession.sessionState.conf.ignoreInvalidPartitionPaths)
+  }
+
   override def listFiles(
       partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
     def isNonEmptyFile(f: FileStatus): Boolean = {
@@ -162,7 +169,8 @@ abstract class PartitioningAwareFileIndex(
         userSpecifiedSchema = userSpecifiedSchema,
         caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis,
         validatePartitionColumns = 
sparkSession.sessionState.conf.validatePartitionColumns,
-        timeZoneId = timeZoneId)
+        timeZoneId = timeZoneId,
+        ignoreInvalidPartitionPaths = ignoreInvalidPartitionPaths)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 56cba0e0561d..3b2d601b81fb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -106,9 +106,10 @@ object PartitioningUtils extends SQLConfHelper {
       userSpecifiedSchema: Option[StructType],
       caseSensitive: Boolean,
       validatePartitionColumns: Boolean,
-      timeZoneId: String): PartitionSpec = {
+      timeZoneId: String,
+      ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
     parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, 
caseSensitive,
-      validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId))
+      validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId), 
ignoreInvalidPartitionPaths)
   }
 
   private[datasources] def parsePartitions(
@@ -118,7 +119,8 @@ object PartitioningUtils extends SQLConfHelper {
       userSpecifiedSchema: Option[StructType],
       caseSensitive: Boolean,
       validatePartitionColumns: Boolean,
-      zoneId: ZoneId): PartitionSpec = {
+      zoneId: ZoneId,
+      ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
     val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) {
       val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> 
f.dataType).toMap
       if (!caseSensitive) {
@@ -171,7 +173,7 @@ object PartitioningUtils extends SQLConfHelper {
       // TODO: Selective case sensitivity.
       val discoveredBasePaths = 
optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
       assert(
-        discoveredBasePaths.distinct.size == 1,
+        ignoreInvalidPartitionPaths || discoveredBasePaths.distinct.size == 1,
         "Conflicting directory structures detected. Suspicious paths:\b" +
           discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
           "If provided paths are partition directories, please set " +
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 110c330f1695..6399eb6da049 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -31,7 +31,7 @@ import org.mockito.Mockito.{mock, when}
 
 import org.apache.spark.{SparkException, SparkRuntimeException}
 import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -547,6 +547,66 @@ class FileIndexSuite extends SharedSparkSession {
     assert(fileIndex.leafFileStatuses.toSeq == statuses)
   }
 
+  test("SPARK-48649: Ignore invalid partitions") {
+    // Table:
+    // id   part_col
+    //  1          1
+    //  2          2
+    val df = spark.range(1, 3, 1, 2).toDF("id")
+      .withColumn("part_col", col("id"))
+
+    withTempPath { directoryPath =>
+      df.write
+        .mode("overwrite")
+        .format("parquet")
+        .partitionBy("part_col")
+        .save(directoryPath.getCanonicalPath)
+
+      // Rename one of the folders.
+      new File(directoryPath, "part_col=1").renameTo(new File(directoryPath, 
"undefined"))
+
+      // By default, we expect the invalid path assertion to trigger.
+      val ex = intercept[AssertionError] {
+        spark.read
+          .format("parquet")
+          .load(directoryPath.getCanonicalPath)
+          .collect()
+      }
+      assert(ex.getMessage.contains("Conflicting directory structures 
detected"))
+
+      // With the config enabled, we should only read the valid partition.
+      withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
+        assert(
+          spark.read
+            .format("parquet")
+            .load(directoryPath.getCanonicalPath)
+            .collect() === Seq(Row(2, 2)))
+      }
+
+      // Data source option override takes precedence.
+      withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
+        val ex = intercept[AssertionError] {
+          spark.read
+            .format("parquet")
+            .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "false")
+            .load(directoryPath.getCanonicalPath)
+            .collect()
+        }
+        assert(ex.getMessage.contains("Conflicting directory structures 
detected"))
+      }
+
+      // Data source option override takes precedence.
+      withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "false") {
+        assert(
+          spark.read
+            .format("parquet")
+            .option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "true")
+            .load(directoryPath.getCanonicalPath)
+            .collect() === Seq(Row(2, 2)))
+      }
+    }
+  }
+
   test("expire FileStatusCache if TTL is configured") {
     val previousValue = 
SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
     try {
@@ -585,9 +645,10 @@ class FileIndexSuite extends SharedSparkSession {
   }
 
   test("SPARK-40667: validate FileIndex Options") {
-    assert(FileIndexOptions.getAllOptions.size == 7)
+    assert(FileIndexOptions.getAllOptions.size == 8)
     // Please add validation on any new FileIndex options here
     assert(FileIndexOptions.isValidOption("ignoreMissingFiles"))
+    assert(FileIndexOptions.isValidOption("ignoreInvalidPartitionPaths"))
     assert(FileIndexOptions.isValidOption("timeZone"))
     assert(FileIndexOptions.isValidOption("recursiveFileLookup"))
     assert(FileIndexOptions.isValidOption("basePath"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 183c4f71df6c..a6ad147c865d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -112,7 +112,8 @@ abstract class ParquetPartitionDiscoverySuite
       "hdfs://host:9000/path/a=10.5/b=hello")
 
     var exception = intercept[AssertionError] {
-      parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, 
true, true, timeZoneId)
+      parsePartitions(
+        paths.map(new Path(_)), true, Set.empty[Path], None, true, true, 
timeZoneId, false)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
 
@@ -129,7 +130,8 @@ abstract class ParquetPartitionDiscoverySuite
       None,
       true,
       true,
-      timeZoneId)
+      timeZoneId,
+      false)
 
     // Valid
     paths = Seq(
@@ -145,7 +147,8 @@ abstract class ParquetPartitionDiscoverySuite
       None,
       true,
       true,
-      timeZoneId)
+      timeZoneId,
+      false)
 
     // Valid
     paths = Seq(
@@ -161,7 +164,8 @@ abstract class ParquetPartitionDiscoverySuite
       None,
       true,
       true,
-      timeZoneId)
+      timeZoneId,
+      false)
 
     // Invalid
     paths = Seq(
@@ -177,7 +181,8 @@ abstract class ParquetPartitionDiscoverySuite
         None,
         true,
         true,
-        timeZoneId)
+        timeZoneId,
+        false)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
 
@@ -200,7 +205,8 @@ abstract class ParquetPartitionDiscoverySuite
         None,
         true,
         true,
-        timeZoneId)
+        timeZoneId,
+        false)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
   }
@@ -296,7 +302,8 @@ abstract class ParquetPartitionDiscoverySuite
           None,
           true,
           true,
-          timeZoneId)
+          timeZoneId,
+          false)
       assert(actualSpec.partitionColumns === spec.partitionColumns)
       assert(actualSpec.partitions.length === spec.partitions.length)
       actualSpec.partitions.zip(spec.partitions).foreach { case (actual, 
expected) =>
@@ -427,7 +434,7 @@ abstract class ParquetPartitionDiscoverySuite
     def check(paths: Seq[String], spec: PartitionSpec): Unit = {
       val actualSpec =
         parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None,
-          true, true, timeZoneId)
+          true, true, timeZoneId, false)
       assert(actualSpec === spec)
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to