This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 7c5cb0eb9948 [SPARK-52812][SQL] Make Spark Connect Catalog.createTable 
eager
7c5cb0eb9948 is described below

commit 7c5cb0eb9948849b16ce1cda6d88fba8f392bbfa
Author: rishav23 <[email protected]>
AuthorDate: Tue May 26 10:53:34 2026 -0400

    [SPARK-52812][SQL] Make Spark Connect Catalog.createTable eager
    
    ### What changes were proposed in this pull request?
    This PR makes Spark Connect Catalog.createTable eager. Previously, 
createTable() only constructed a lazy DataFrame, requiring users to explicitly 
trigger an action such as .collect() for the table creation to actually 
execute. This change eagerly executes the command internally while preserving 
the existing return type. A regression test has also been added to verify that 
tables are created immediately without requiring an explicit action.
    
    ### Why are the changes needed?
    Catalog.createTable() is a side-effecting operation and should execute 
eagerly to match expected Catalog API semantics.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Previously spark.catalog.createTable(....) did not immediately create 
the table in Spark Connect unless an action was triggered. Now the table is 
created eagerly.
    
    ### How was this patch tested?
    
    - Added a regression test in CatalogSuite
    - Ran build/sbt compile
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #56064 from rishav23/fix-spark-52812-createtable-eager-v2.
    
    Authored-by: rishav23 <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
    (cherry picked from commit 6dbe197334dccf0e339fa35532be187dee91a2b7)
    Signed-off-by: Herman van Hövell <[email protected]>
---
 .../apache/spark/sql/connect/CatalogSuite.scala    | 26 ++++++++++++++++------
 .../org/apache/spark/sql/connect/Catalog.scala     | 11 +++------
 2 files changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
index 61c4502b256d..e8ccc9f083c6 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala
@@ -99,12 +99,12 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
         import session.implicits._
         val df1 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").toDF("name")
         df1.write.parquet(table1Dir.getPath)
-        spark.catalog.createTable(parquetTableName, 
table1Dir.getPath).collect()
+        spark.catalog.createTable(parquetTableName, table1Dir.getPath)
         withTable(orcTableName, jsonTableName) {
           withTempPath { table2Dir =>
             val df2 = Seq("Bob", "Alice", "Nico", "Bob", 
"Alice").zipWithIndex.toDF("name", "id")
             df2.write.orc(table2Dir.getPath)
-            spark.catalog.createTable(orcTableName, table2Dir.getPath, 
"orc").collect()
+            spark.catalog.createTable(orcTableName, table2Dir.getPath, "orc")
             val orcTable = spark.catalog.getTable(orcTableName)
             assert(!orcTable.isTemporary)
             assert(orcTable.name == orcTableName)
@@ -117,7 +117,6 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
           val schema = new StructType().add("id", LongType).add("a", 
DoubleType)
           spark.catalog
             .createTable(jsonTableName, "json", schema, Map.empty[String, 
String])
-            .collect()
           val jsonTable = spark.catalog.getTable("default", jsonTableName)
           assert(!jsonTable.isTemporary)
           assert(jsonTable.name == jsonTableName)
@@ -151,6 +150,19 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
     assert(spark.catalog.listTables().collect().isEmpty)
   }
 
+  test("createTable should be eager") {
+    val tableName = "eager_table"
+    withTable(tableName) {
+      withTempPath { dir =>
+        val session = spark
+        import session.implicits._
+        Seq((1, "a")).toDF("id", "value").write.parquet(dir.getPath)
+        spark.catalog.createTable(tableName, dir.getPath)
+        assert(spark.catalog.tableExists(tableName))
+      }
+    }
+  }
+
   test("Cache Table APIs") {
     val parquetTableName = "parquet_table"
     withTable(parquetTableName) {
@@ -159,7 +171,7 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
         import session.implicits._
         val df1 = Seq("Bob", "Alice", "Nico", "Bob", "Alice").toDF("name")
         df1.write.parquet(table1Dir.getPath)
-        spark.catalog.createTable(parquetTableName, 
table1Dir.getPath).collect()
+        spark.catalog.createTable(parquetTableName, table1Dir.getPath)
 
         // Test cache and uncacheTable
         spark.catalog.cacheTable(parquetTableName)
@@ -375,7 +387,7 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
         val session = spark
         import session.implicits._
         Seq(1).toDF("id").write.parquet(dir.getPath)
-        spark.catalog.createTable(tbl, dir.getPath).collect()
+        spark.catalog.createTable(tbl, dir.getPath)
         assert(spark.catalog.tableExists(tbl))
         spark.catalog.dropTable(tbl)
         assert(!spark.catalog.tableExists(tbl))
@@ -445,7 +457,7 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
         val session = spark
         import session.implicits._
         Seq(1).toDF("id").write.parquet(dir.getPath)
-        spark.catalog.createTable(t, dir.getPath).collect()
+        spark.catalog.createTable(t, dir.getPath)
         val ddl = spark.catalog.getCreateTableString(t)
         assert(ddl.nonEmpty && 
ddl.toLowerCase(java.util.Locale.ROOT).contains("create"))
       }
@@ -470,7 +482,7 @@ class CatalogSuite extends ConnectFunSuite with 
RemoteSparkSession with SQLHelpe
         val session = spark
         import session.implicits._
         Seq(1).toDF("id").write.parquet(dir.getPath)
-        spark.catalog.createTable(t, dir.getPath).collect()
+        spark.catalog.createTable(t, dir.getPath)
         spark.catalog.analyzeTable(t, noScan = true)
       }
     }
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala
index ea4bc3e7ad60..2324ca05d7b7 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/Catalog.scala
@@ -392,13 +392,7 @@ class Catalog(sparkSession: SparkSession) extends 
catalog.Catalog {
    * @since 3.5.0
    */
   override def createTable(tableName: String, path: String): DataFrame = {
-    sparkSession.newDataFrame { builder =>
-      builder.getCatalogBuilder.getCreateTableBuilder
-        .setTableName(tableName)
-        .setSchema(DataTypeProtoConverter.toConnectProtoType(new StructType))
-        .setDescription("")
-        .putOptions("path", path)
-    }
+    createTable(tableName, path, "parquet")
   }
 
   /**
@@ -484,7 +478,7 @@ class Catalog(sparkSession: SparkSession) extends 
catalog.Catalog {
       schema: StructType,
       description: String,
       options: Map[String, String]): DataFrame = {
-    sparkSession.newDataFrame { builder =>
+    sparkSession.execute { builder =>
       val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder
         .setTableName(tableName)
         .setSource(source)
@@ -494,6 +488,7 @@ class Catalog(sparkSession: SparkSession) extends 
catalog.Catalog {
         createTableBuilder.putOptions(k, v)
       }
     }
+    sparkSession.table(tableName)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to