This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 22355bab824c [SPARK-53054][CONNECT][3.5] Fix the connect.DataFrameReader default format behavior 22355bab824c is described below commit 22355bab824c49286acfeb5bf3db16c032cb44a2 Author: Robert Dillitz <r.dill...@gmail.com> AuthorDate: Fri Aug 1 23:30:42 2025 +0800 [SPARK-53054][CONNECT][3.5] Fix the connect.DataFrameReader default format behavior ### What changes were proposed in this pull request? See title. ### Why are the changes needed? Scala Spark Connect does not adhere to the [documented](https://spark.apache.org/docs/3.5.6/sql-data-sources-load-save-functions.html) behavior. ### Does this PR introduce _any_ user-facing change? As documented in [Generic Load/Save Functions - Spark 3.5.6 Documentation](https://spark.apache.org/docs/3.5.6/sql-data-sources-load-save-functions.html), and similar to Spark Classic and the Python Spark Connect, Scala Spark Connect's `DataFrameReader` will now also default to the format set via the `spark.sql.sources.default` SQL configuration. **Before**: `spark.read.load("..."`) throws ``` java.lang.IllegalArgumentException: The source format must be specified. ``` **Now**: `spark.read.load("...")` uses the format specified via `spark.sql.sources.default` ### How was this patch tested? Test case added to ClientE2ETestSuite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51759 from dillitz/fix-default-format-3.5. Lead-authored-by: Robert Dillitz <r.dill...@gmail.com> Co-authored-by: Robert Dillitz <robert.dill...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 15 ++++----------- .../scala/org/apache/spark/sql/ClientE2ETestSuite.scala | 12 ++++++++++++ 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 10d2af094a08..f138ca93760c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -45,7 +45,7 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging * @since 3.4.0 */ def format(source: String): DataFrameReader = { - this.source = source + this.source = Some(source) this } @@ -179,8 +179,7 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging def load(paths: String*): DataFrame = { sparkSession.newDataFrame { builder => val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder - assertSourceFormatSpecified() - dataSourceBuilder.setFormat(source) + source.foreach(dataSourceBuilder.setFormat) userSpecifiedSchema.foreach(schema => dataSourceBuilder.setSchema(schema.toDDL)) extraOptions.foreach { case (k, v) => dataSourceBuilder.putOptions(k, v) @@ -285,7 +284,7 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging sparkSession.newDataFrame { builder => val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder format("jdbc") - dataSourceBuilder.setFormat(source) + source.foreach(dataSourceBuilder.setFormat) predicates.foreach(predicate => dataSourceBuilder.addPredicates(predicate)) this.extraOptions ++= Seq("url" -> url, "dbtable" -> table) val params = extraOptions ++ connectionProperties.asScala @@ -539,12 +538,6 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging text(paths: _*).select("value").as(StringEncoder) } - private def assertSourceFormatSpecified(): Unit = { - if (source == null) { - throw new IllegalArgumentException("The source format must be specified.") - } - } - private def parse(ds: Dataset[String], format: ParseFormat): DataFrame = { sparkSession.newDataFrame { builder => val parseBuilder = builder.getParseBuilder @@ -571,7 +564,7 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// - private var source: String = _ + private var source: Option[String] = None private var userSpecifiedSchema: Option[StructType] = None diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index feefd19000d1..d53a472723b7 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -1325,6 +1325,18 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM .dropDuplicatesWithinWatermark("newcol") testAndVerify(result2) } + + test("SPARK-53054: DataFrameReader defaults to spark.sql.sources.default") { + withTempPath { file => + val path = file.getAbsoluteFile.toURI.toString + spark.range(100).write.parquet(file.toPath.toAbsolutePath.toString) + + spark.conf.set("spark.sql.sources.default", "parquet") + + val df = spark.read.load(path) + assert(df.count == 100) + } + } } private[sql] case class ClassData(a: String, b: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org