This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6758164ff [SPARK-38651][SQL] Add 
`spark.sql.legacy.allowEmptySchemaWrite`
5d6758164ff is described below

commit 5d6758164ff341b7c9055000f89fa91c68622af0
Author: Thejdeep Gudivada <[email protected]>
AuthorDate: Sat Jan 14 01:58:25 2023 -0800

    [SPARK-38651][SQL] Add `spark.sql.legacy.allowEmptySchemaWrite`
    
    ### What changes were proposed in this pull request?
     Add SQL configuration `spark.sql.legacy.allowEmptySchemaWrite` to allow 
support for writing out empty schemas to certain file based datasources that 
support it.
    
     ### Why are the changes needed?
     Without this change, there is backward in-compatibility introduced while 
applications are migrated past Spark 2.3 since Spark 2.4 introduced a breaking 
change that would disallow empty schemas. Since some file formats like ORC 
support empty schemas, we should honor this by not validating for empty schemas.
    
     ### Does this PR introduce _any_ user-facing change?
     Yes
    
     ### How was this patch tested?
     Added a unit test to test this behavior
    
    Closes #35969 from thejdeep/SPARK-38651.
    
    Lead-authored-by: Thejdeep Gudivada <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala  |  8 ++++++++
 .../apache/spark/sql/execution/datasources/DataSource.scala | 12 +++++++-----
 .../spark/sql/execution/datasources/v2/FileWrite.scala      |  7 ++++---
 .../org/apache/spark/sql/FileBasedDataSourceSuite.scala     | 13 +++++++++++++
 4 files changed, 32 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3abd2578d4a..f1c633962a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2912,6 +2912,14 @@ object SQLConf {
     .stringConf
     .createWithDefault("avro,csv,json,kafka,orc,parquet,text")
 
+  val ALLOW_EMPTY_SCHEMAS_FOR_WRITES = 
buildConf("spark.sql.legacy.allowEmptySchemaWrite")
+    .internal()
+    .doc("When this option is set to true, validation of empty or empty nested 
schemas that " +
+      "occurs when writing into a FileFormat based data source does not 
happen.")
+    .version("3.4.0")
+    .booleanConf
+    .createWithDefault(false)
+
   val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
     .doc("A comma-separated list of fully qualified data source register class 
names for which" +
       " StreamWriteSupport is disabled. Writes to these sources will fall back 
to the V1 Sinks.")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9bb5191dc01..ad26ee21c2c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -525,7 +525,7 @@ case class DataSource(
         SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, 
mode)
       case format: FileFormat =>
         disallowWritingIntervals(data.schema.map(_.dataType), 
forbidAnsiIntervals = false)
-        DataSource.validateSchema(data.schema)
+        DataSource.validateSchema(data.schema, sparkSession.sessionState.conf)
         planForWritingFileFormat(format, mode, data)
       case _ => throw new IllegalStateException(
         s"${providingClass.getCanonicalName} does not allow create table as 
select.")
@@ -791,11 +791,13 @@ object DataSource extends Logging {
   }
 
   /**
-   * Called before writing into a FileFormat based data source to make sure the
-   * supplied schema is not empty.
+   * Called before writing into a FileFormat based data source to validate 
whether
+   * the supplied schema is not empty.
    * @param schema
+   * @param conf
    */
-  def validateSchema(schema: StructType): Unit = {
+  def validateSchema(schema: StructType, conf: SQLConf): Unit = {
+    val shouldAllowEmptySchema = 
conf.getConf(SQLConf.ALLOW_EMPTY_SCHEMAS_FOR_WRITES)
     def hasEmptySchema(schema: StructType): Boolean = {
       schema.size == 0 || schema.exists {
         case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
@@ -804,7 +806,7 @@ object DataSource extends Logging {
     }
 
 
-    if (hasEmptySchema(schema)) {
+    if (!shouldAllowEmptySchema && hasEmptySchema(schema)) {
       throw 
QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError()
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
index e65ff13ba22..b54f05bec12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
@@ -52,7 +52,7 @@ trait FileWrite extends Write {
 
   override def toBatch: BatchWrite = {
     val sparkSession = SparkSession.active
-    validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
+    validateInputs(sparkSession.sessionState.conf)
     val path = new Path(paths.head)
     val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
     // Hadoop Configurations are case sensitive.
@@ -80,7 +80,8 @@ trait FileWrite extends Write {
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory
 
-  private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
+  private def validateInputs(sqlConf: SQLConf): Unit = {
+    val caseSensitiveAnalysis = sqlConf.caseSensitiveAnalysis
     assert(schema != null, "Missing input data schema")
     assert(queryId != null, "Missing query ID")
 
@@ -90,7 +91,7 @@ trait FileWrite extends Write {
     }
     val pathName = paths.head
     SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name), 
caseSensitiveAnalysis)
-    DataSource.validateSchema(schema)
+    DataSource.validateSchema(schema, sqlConf)
 
     // TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert.
     schema.foreach { field =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index a30be5e4cbe..3b81d215c7f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -154,6 +154,19 @@ class FileBasedDataSourceSuite extends QueryTest
     }
   }
 
+  val emptySchemaSupportedDataSources = Seq("orc", "csv", "json")
+  emptySchemaSupportedDataSources.foreach { format =>
+    val emptySchemaValidationConf = SQLConf.ALLOW_EMPTY_SCHEMAS_FOR_WRITES.key
+    test("SPARK-38651 allow writing empty schema files " +
+      s"using $format when ${emptySchemaValidationConf} is enabled") {
+      withSQLConf(emptySchemaValidationConf -> "true") {
+        withTempPath { outputPath =>
+          spark.emptyDataFrame.write.format(format).save(outputPath.toString)
+        }
+      }
+    }
+  }
+
   allFileBasedDataSources.foreach { format =>
     test(s"SPARK-22146 read files containing special characters using 
$format") {
       withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to