This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 22355bab824c [SPARK-53054][CONNECT][3.5] Fix the 
connect.DataFrameReader default format behavior
22355bab824c is described below

commit 22355bab824c49286acfeb5bf3db16c032cb44a2
Author: Robert Dillitz <r.dill...@gmail.com>
AuthorDate: Fri Aug 1 23:30:42 2025 +0800

    [SPARK-53054][CONNECT][3.5] Fix the connect.DataFrameReader default format 
behavior
    
    ### What changes were proposed in this pull request?
    See title.
    
    ### Why are the changes needed?
    Scala Spark Connect does not adhere to the 
[documented](https://spark.apache.org/docs/3.5.6/sql-data-sources-load-save-functions.html)
 behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    As documented in [Generic Load/Save Functions - Spark 3.5.6 
Documentation](https://spark.apache.org/docs/3.5.6/sql-data-sources-load-save-functions.html),
 and similar to Spark Classic and the Python Spark Connect, Scala Spark 
Connect's `DataFrameReader` will now also default to the format set via the 
`spark.sql.sources.default` SQL configuration.
    
    **Before**: `spark.read.load("..."`) throws
    
    ```
    java.lang.IllegalArgumentException: The source format must be specified.
    ```
    **Now**: `spark.read.load("...")` uses the format specified via 
`spark.sql.sources.default`
    
    ### How was this patch tested?
    Test case added to ClientE2ETestSuite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51759 from dillitz/fix-default-format-3.5.
    
    Lead-authored-by: Robert Dillitz <r.dill...@gmail.com>
    Co-authored-by: Robert Dillitz <robert.dill...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 15 ++++-----------
 .../scala/org/apache/spark/sql/ClientE2ETestSuite.scala   | 12 ++++++++++++
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 10d2af094a08..f138ca93760c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -45,7 +45,7 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
    * @since 3.4.0
    */
   def format(source: String): DataFrameReader = {
-    this.source = source
+    this.source = Some(source)
     this
   }
 
@@ -179,8 +179,7 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
   def load(paths: String*): DataFrame = {
     sparkSession.newDataFrame { builder =>
       val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder
-      assertSourceFormatSpecified()
-      dataSourceBuilder.setFormat(source)
+      source.foreach(dataSourceBuilder.setFormat)
       userSpecifiedSchema.foreach(schema => 
dataSourceBuilder.setSchema(schema.toDDL))
       extraOptions.foreach { case (k, v) =>
         dataSourceBuilder.putOptions(k, v)
@@ -285,7 +284,7 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     sparkSession.newDataFrame { builder =>
       val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder
       format("jdbc")
-      dataSourceBuilder.setFormat(source)
+      source.foreach(dataSourceBuilder.setFormat)
       predicates.foreach(predicate => 
dataSourceBuilder.addPredicates(predicate))
       this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
       val params = extraOptions ++ connectionProperties.asScala
@@ -539,12 +538,6 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     text(paths: _*).select("value").as(StringEncoder)
   }
 
-  private def assertSourceFormatSpecified(): Unit = {
-    if (source == null) {
-      throw new IllegalArgumentException("The source format must be 
specified.")
-    }
-  }
-
   private def parse(ds: Dataset[String], format: ParseFormat): DataFrame = {
     sparkSession.newDataFrame { builder =>
       val parseBuilder = builder.getParseBuilder
@@ -571,7 +564,7 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
   // Builder pattern config options
   
///////////////////////////////////////////////////////////////////////////////////////
 
-  private var source: String = _
+  private var source: Option[String] = None
 
   private var userSpecifiedSchema: Option[StructType] = None
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index feefd19000d1..d53a472723b7 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -1325,6 +1325,18 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
       .dropDuplicatesWithinWatermark("newcol")
     testAndVerify(result2)
   }
+
+  test("SPARK-53054: DataFrameReader defaults to spark.sql.sources.default") {
+    withTempPath { file =>
+      val path = file.getAbsoluteFile.toURI.toString
+      spark.range(100).write.parquet(file.toPath.toAbsolutePath.toString)
+
+      spark.conf.set("spark.sql.sources.default", "parquet")
+
+      val df = spark.read.load(path)
+      assert(df.count == 100)
+    }
+  }
 }
 
 private[sql] case class ClassData(a: String, b: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to