This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5d6758164ff [SPARK-38651][SQL] Add
`spark.sql.legacy.allowEmptySchemaWrite`
5d6758164ff is described below
commit 5d6758164ff341b7c9055000f89fa91c68622af0
Author: Thejdeep Gudivada <[email protected]>
AuthorDate: Sat Jan 14 01:58:25 2023 -0800
[SPARK-38651][SQL] Add `spark.sql.legacy.allowEmptySchemaWrite`
### What changes were proposed in this pull request?
Add SQL configuration `spark.sql.legacy.allowEmptySchemaWrite` to allow
support for writing out empty schemas to certain file based datasources that
support it.
### Why are the changes needed?
Without this change, there is backward in-compatibility introduced while
applications are migrated past Spark 2.3 since Spark 2.4 introduced a breaking
change that would disallow empty schemas. Since some file formats like ORC
support empty schemas, we should honor this by not validating for empty schemas.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added a unit test to test this behavior
Closes #35969 from thejdeep/SPARK-38651.
Lead-authored-by: Thejdeep Gudivada <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++
.../apache/spark/sql/execution/datasources/DataSource.scala | 12 +++++++-----
.../spark/sql/execution/datasources/v2/FileWrite.scala | 7 ++++---
.../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 13 +++++++++++++
4 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3abd2578d4a..f1c633962a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2912,6 +2912,14 @@ object SQLConf {
.stringConf
.createWithDefault("avro,csv,json,kafka,orc,parquet,text")
+ val ALLOW_EMPTY_SCHEMAS_FOR_WRITES =
buildConf("spark.sql.legacy.allowEmptySchemaWrite")
+ .internal()
+ .doc("When this option is set to true, validation of empty or empty nested
schemas that " +
+ "occurs when writing into a FileFormat based data source does not
happen.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(false)
+
val DISABLED_V2_STREAMING_WRITERS =
buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class
names for which" +
" StreamWriteSupport is disabled. Writes to these sources will fall back
to the V1 Sinks.")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9bb5191dc01..ad26ee21c2c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -525,7 +525,7 @@ case class DataSource(
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions,
mode)
case format: FileFormat =>
disallowWritingIntervals(data.schema.map(_.dataType),
forbidAnsiIntervals = false)
- DataSource.validateSchema(data.schema)
+ DataSource.validateSchema(data.schema, sparkSession.sessionState.conf)
planForWritingFileFormat(format, mode, data)
case _ => throw new IllegalStateException(
s"${providingClass.getCanonicalName} does not allow create table as
select.")
@@ -791,11 +791,13 @@ object DataSource extends Logging {
}
/**
- * Called before writing into a FileFormat based data source to make sure the
- * supplied schema is not empty.
+ * Called before writing into a FileFormat based data source to validate
whether
+ * the supplied schema is not empty.
* @param schema
+ * @param conf
*/
- def validateSchema(schema: StructType): Unit = {
+ def validateSchema(schema: StructType, conf: SQLConf): Unit = {
+ val shouldAllowEmptySchema =
conf.getConf(SQLConf.ALLOW_EMPTY_SCHEMAS_FOR_WRITES)
def hasEmptySchema(schema: StructType): Boolean = {
schema.size == 0 || schema.exists {
case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
@@ -804,7 +806,7 @@ object DataSource extends Logging {
}
- if (hasEmptySchema(schema)) {
+ if (!shouldAllowEmptySchema && hasEmptySchema(schema)) {
throw
QueryCompilationErrors.writeEmptySchemasUnsupportedByDataSourceError()
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
index e65ff13ba22..b54f05bec12 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala
@@ -52,7 +52,7 @@ trait FileWrite extends Write {
override def toBatch: BatchWrite = {
val sparkSession = SparkSession.active
- validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ validateInputs(sparkSession.sessionState.conf)
val path = new Path(paths.head)
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
@@ -80,7 +80,8 @@ trait FileWrite extends Write {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
- private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
+ private def validateInputs(sqlConf: SQLConf): Unit = {
+ val caseSensitiveAnalysis = sqlConf.caseSensitiveAnalysis
assert(schema != null, "Missing input data schema")
assert(queryId != null, "Missing query ID")
@@ -90,7 +91,7 @@ trait FileWrite extends Write {
}
val pathName = paths.head
SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name),
caseSensitiveAnalysis)
- DataSource.validateSchema(schema)
+ DataSource.validateSchema(schema, sqlConf)
// TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert.
schema.foreach { field =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index a30be5e4cbe..3b81d215c7f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -154,6 +154,19 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
+ val emptySchemaSupportedDataSources = Seq("orc", "csv", "json")
+ emptySchemaSupportedDataSources.foreach { format =>
+ val emptySchemaValidationConf = SQLConf.ALLOW_EMPTY_SCHEMAS_FOR_WRITES.key
+ test("SPARK-38651 allow writing empty schema files " +
+ s"using $format when ${emptySchemaValidationConf} is enabled") {
+ withSQLConf(emptySchemaValidationConf -> "true") {
+ withTempPath { outputPath =>
+ spark.emptyDataFrame.write.format(format).save(outputPath.toString)
+ }
+ }
+ }
+ }
+
allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using
$format") {
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]