This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e400045959 [SPARK-51496][SQL][FOLLOW-UP] Preserve the case of 
DataSourceV2Relation option keys
24e400045959 is described below

commit 24e4000459599e4f9e7e33242e7a4f742396f872
Author: Evan Wu <evan12...@gmail.com>
AuthorDate: Mon Apr 7 21:53:41 2025 +0800

    [SPARK-51496][SQL][FOLLOW-UP] Preserve the case of DataSourceV2Relation 
option keys
    
    ### What changes were proposed in this pull request?
    Preserve the case of the `DataSourceV2Relation` option keys when merging 
options.
    
    ### Why are the changes needed?
    To be consistent with the command options for `mergeOptions`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50487 from drexler-sky/followup.
    
    Authored-by: Evan Wu <evan12...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/datasources/v2/V2Writes.scala  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
index 6f30ade1c7d3..d8e3a7eaf5ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
@@ -63,7 +63,7 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
       }.toArray
 
       val table = r.table
-      val writeOptions = mergeOptions(options, r.options.asScala.toMap)
+      val writeOptions = mergeOptions(options, 
r.options.asCaseSensitiveMap.asScala.toMap)
       val writeBuilder = newWriteBuilder(table, writeOptions, query.schema)
       val write = writeBuilder match {
         case builder: SupportsTruncate if isTruncate(predicates) =>
@@ -79,7 +79,7 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
 
     case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, 
options, _, None) =>
       val table = r.table
-      val writeOptions = mergeOptions(options, r.options.asScala.toMap)
+      val writeOptions = mergeOptions(options, 
r.options.asCaseSensitiveMap.asScala.toMap)
       val writeBuilder = newWriteBuilder(table, writeOptions, query.schema)
       val write = writeBuilder match {
         case builder: SupportsDynamicOverwrite =>
@@ -93,7 +93,8 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
     case WriteToMicroBatchDataSource(
         relationOpt, table, query, queryId, options, outputMode, 
Some(batchId)) =>
       val writeOptions = mergeOptions(
-        options, relationOpt.map(r => 
r.options.asScala.toMap).getOrElse(Map.empty))
+        options,
+        relationOpt.map(r => 
r.options.asCaseSensitiveMap.asScala.toMap).getOrElse(Map.empty))
       val writeBuilder = newWriteBuilder(table, writeOptions, query.schema, 
queryId = queryId)
       val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
       val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
@@ -105,14 +106,14 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
     case rd @ ReplaceData(r: DataSourceV2Relation, _, query, _, projections, 
_, None) =>
       val rowSchema = projections.rowProjection.schema
       val metadataSchema = projections.metadataProjection.map(_.schema)
-      val writeOptions = mergeOptions(Map.empty, r.options.asScala.toMap)
+      val writeOptions = mergeOptions(Map.empty, 
r.options.asCaseSensitiveMap.asScala.toMap)
       val writeBuilder = newWriteBuilder(r.table, writeOptions, rowSchema, 
metadataSchema)
       val write = writeBuilder.build()
       val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, 
r.funCatalog)
       rd.copy(write = Some(write), query = newQuery)
 
     case wd @ WriteDelta(r: DataSourceV2Relation, _, query, _, projections, 
None) =>
-      val writeOptions = mergeOptions(Map.empty, r.options.asScala.toMap)
+      val writeOptions = mergeOptions(Map.empty, 
r.options.asCaseSensitiveMap.asScala.toMap)
       val deltaWriteBuilder = newDeltaWriteBuilder(r.table, writeOptions, 
projections)
       val deltaWrite = deltaWriteBuilder.build()
       val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite, 
query, r.funCatalog)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to