This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 97e0a88b1d04 [SPARK-49359][SQL] Allow StagedTableCatalog 
implementations to fall back to non-atomic write
97e0a88b1d04 is described below

commit 97e0a88b1d04a8e33206e5cb8384878ca4cdfc5a
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Aug 23 10:49:39 2024 +0900

    [SPARK-49359][SQL] Allow StagedTableCatalog implementations to fall back to 
non-atomic write
    
    ### What changes were proposed in this pull request?
    
    This PR allows `StagedTableCatalog#create/replaceTable` to return null and 
Spark will fall back to normal non-atomic write.
    
    ### Why are the changes needed?
    
    Extending an interface is static but sometimes the implementations need 
more dynamicity. For example, a catalog may only support atomic CTAS for 
certain table formats, and we shouldn't force them to implement atomic writes 
for all other formats.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #47848 from cloud-fan/stage.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/connector/catalog/StagingTableCatalog.java |  9 ++-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  7 ++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 72 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
index a8e1757a492d..50643b9cbe32 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
@@ -80,7 +80,8 @@ public interface StagingTableCatalog extends TableCatalog {
    * @param columns the column of the new table
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
-   * @return metadata for the new table
+   * @return metadata for the new table. This can be null if the catalog does 
not support atomic
+   *         creation for this table. Spark will call {@link 
#loadTable(Identifier)} later.
    * @throws TableAlreadyExistsException If a table or view already exists for 
the identifier
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
@@ -128,7 +129,8 @@ public interface StagingTableCatalog extends TableCatalog {
    * @param columns the columns of the new table
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
-   * @return metadata for the new table
+   * @return metadata for the new table. This can be null if the catalog does 
not support atomic
+   *         creation for this table. Spark will call {@link 
#loadTable(Identifier)} later.
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
    * @throws NoSuchTableException If the table does not exist
@@ -176,7 +178,8 @@ public interface StagingTableCatalog extends TableCatalog {
    * @param columns the columns of the new table
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
-   * @return metadata for the new table
+   * @return metadata for the new table. This can be null if the catalog does 
not support atomic
+   *         creation for this table. Spark will call {@link 
#loadTable(Identifier)} later.
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 89372017257d..5885ec0afadc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -117,9 +117,10 @@ case class AtomicCreateTableAsSelectExec(
       }
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
-    val stagedTable = catalog.stageCreate(
+    val stagedTable = Option(catalog.stageCreate(
       ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
       partitioning.toArray, properties.asJava)
+    ).getOrElse(catalog.loadTable(ident, 
Set(TableWritePrivilege.INSERT).asJava))
     writeToTable(catalog, stagedTable, writeOptions, ident, query)
   }
 }
@@ -216,7 +217,9 @@ case class AtomicReplaceTableAsSelectExec(
     } else {
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
-    writeToTable(catalog, staged, writeOptions, ident, query)
+    val table = Option(staged).getOrElse(
+      catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
+    writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index a63eaddc2206..de398c49a0eb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3770,6 +3770,19 @@ class DataSourceV2SQLSuiteV1Filter
     checkWriteOperations("read_only_cat")
   }
 
+  test("StagingTableCatalog without atomic support") {
+    withSQLConf("spark.sql.catalog.fakeStagedCat" -> 
classOf[FakeStagedTableCatalog].getName) {
+      withTable("fakeStagedCat.t") {
+        sql("CREATE TABLE fakeStagedCat.t AS SELECT 1 col")
+        checkAnswer(spark.table("fakeStagedCat.t"), Row(1))
+        sql("REPLACE TABLE fakeStagedCat.t AS SELECT 2 col")
+        checkAnswer(spark.table("fakeStagedCat.t"), Row(2))
+        sql("CREATE OR REPLACE TABLE fakeStagedCat.t AS SELECT 1 c1, 2 c2")
+        checkAnswer(spark.table("fakeStagedCat.t"), Row(1, 2))
+      }
+    }
+  }
+
   private def testNotSupportedV2Command(
       sqlCommand: String,
       sqlParams: String,
@@ -3855,3 +3868,62 @@ class ReadOnlyCatalog extends InMemoryCatalog {
       writePrivileges.asScala.toSeq.map(_.toString).sorted.mkString(","))
   }
 }
+
+class FakeStagedTableCatalog extends InMemoryCatalog with StagingTableCatalog {
+  override def stageCreate(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    throw new RuntimeException("shouldn't be called")
+  }
+
+  override def stageCreate(
+      ident: Identifier,
+      columns: Array[ColumnV2],
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    super.createTable(ident, columns, partitions, properties)
+    null
+  }
+
+  override def stageReplace(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    throw new RuntimeException("shouldn't be called")
+  }
+
+  override def stageReplace(
+      ident: Identifier,
+      columns: Array[ColumnV2],
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    super.dropTable(ident)
+    super.createTable(ident, columns, partitions, properties)
+    null
+  }
+
+  override def stageCreateOrReplace(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    throw new RuntimeException("shouldn't be called")
+  }
+
+  override def stageCreateOrReplace(
+      ident: Identifier,
+      columns: Array[ColumnV2],
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): StagedTable = {
+    try {
+      super.dropTable(ident)
+    } catch {
+      case _: Throwable =>
+    }
+    super.createTable(ident, columns, partitions, properties)
+    null
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to