This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 24e400045959 [SPARK-51496][SQL][FOLLOW-UP] Preserve the case of DataSourceV2Relation option keys 24e400045959 is described below commit 24e4000459599e4f9e7e33242e7a4f742396f872 Author: Evan Wu <evan12...@gmail.com> AuthorDate: Mon Apr 7 21:53:41 2025 +0800 [SPARK-51496][SQL][FOLLOW-UP] Preserve the case of DataSourceV2Relation option keys ### What changes were proposed in this pull request? Preserve the case of the `DataSourceV2Relation` option keys when merging options. ### Why are the changes needed? To be consistent with the command options for `mergeOptions`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50487 from drexler-sky/followup. Authored-by: Evan Wu <evan12...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/datasources/v2/V2Writes.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala index 6f30ade1c7d3..d8e3a7eaf5ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala @@ -63,7 +63,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper { }.toArray val table = r.table - val writeOptions = mergeOptions(options, r.options.asScala.toMap) + val writeOptions = mergeOptions(options, r.options.asCaseSensitiveMap.asScala.toMap) val writeBuilder = newWriteBuilder(table, writeOptions, query.schema) val write = writeBuilder match { case builder: SupportsTruncate if isTruncate(predicates) => @@ -79,7 +79,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper { case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None) => val table = r.table - val writeOptions = mergeOptions(options, r.options.asScala.toMap) + val writeOptions = mergeOptions(options, r.options.asCaseSensitiveMap.asScala.toMap) val writeBuilder = newWriteBuilder(table, writeOptions, query.schema) val write = writeBuilder match { case builder: SupportsDynamicOverwrite => @@ -93,7 +93,8 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper { case WriteToMicroBatchDataSource( relationOpt, table, query, queryId, options, outputMode, Some(batchId)) => val writeOptions = mergeOptions( - options, relationOpt.map(r => r.options.asScala.toMap).getOrElse(Map.empty)) + options, + relationOpt.map(r => r.options.asCaseSensitiveMap.asScala.toMap).getOrElse(Map.empty)) val writeBuilder = newWriteBuilder(table, writeOptions, query.schema, queryId = queryId) val write = buildWriteForMicroBatch(table, writeBuilder, outputMode) val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming) @@ -105,14 +106,14 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper { case rd @ ReplaceData(r: DataSourceV2Relation, _, query, _, projections, _, None) => val rowSchema = projections.rowProjection.schema val metadataSchema = projections.metadataProjection.map(_.schema) - val writeOptions = mergeOptions(Map.empty, r.options.asScala.toMap) + val writeOptions = mergeOptions(Map.empty, r.options.asCaseSensitiveMap.asScala.toMap) val writeBuilder = newWriteBuilder(r.table, writeOptions, rowSchema, metadataSchema) val write = writeBuilder.build() val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog) rd.copy(write = Some(write), query = newQuery) case wd @ WriteDelta(r: DataSourceV2Relation, _, query, _, projections, None) => - val writeOptions = mergeOptions(Map.empty, r.options.asScala.toMap) + val writeOptions = mergeOptions(Map.empty, r.options.asCaseSensitiveMap.asScala.toMap) val deltaWriteBuilder = newDeltaWriteBuilder(r.table, writeOptions, projections) val deltaWrite = deltaWriteBuilder.build() val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite, query, r.funCatalog) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org