This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d4eea4c32d25 [SPARK-51723][SQL] Add DSv2 APIs for create/get table 
constraints
d4eea4c32d25 is described below

commit d4eea4c32d25da7ee29c2f99c4e1bbf96ed1555c
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Thu Apr 10 15:16:04 2025 -0700

    [SPARK-51723][SQL] Add DSv2 APIs for create/get table constraints
    
    ### What changes were proposed in this pull request?
    
    This PR adds the following DSv2 APIs as per SPIP 
[doc](https://docs.google.com/document/d/1EHjB4W1LjiXxsK_G7067j9pPX0y15LUF1Z5DlUPoPIo/):
    * Constraints info from a DSV2 table
    * Create table with constraints
    
    ### Why are the changes needed?
    
    For constraints support in Spark
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50519 from gengliangwang/dsv2Changes.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/sql/connector/catalog/Table.java  |  6 +++
 .../connector/catalog/TableCatalogCapability.java  | 15 +++++++
 .../spark/sql/connector/catalog/TableInfo.java     | 15 ++++++-
 .../catalyst/util/ResolveTableConstraints.scala    | 35 +++++++++++++++
 .../spark/sql/connector/catalog/CatalogSuite.scala | 33 +++++++++++++-
 .../catalog/InMemoryPartitionTableCatalog.scala    |  4 ++
 .../InMemoryRowLevelOperationTableCatalog.scala    |  4 ++
 .../sql/connector/catalog/InMemoryTable.scala      |  2 +
 .../connector/catalog/InMemoryTableCatalog.scala   | 24 ++++++++---
 .../catalog/InMemoryTableWithV2FilterCatalog.scala |  4 ++
 .../datasources/v2/DataSourceV2Strategy.scala      | 22 ++++++----
 .../spark/sql/RuntimeNullChecksV2Writes.scala      | 50 +++++++++++-----------
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 17 +++-----
 .../spark/sql/connector/LocalScanSuite.scala       |  9 +---
 .../spark/sql/connector/V1ReadFallbackSuite.scala  |  6 ++-
 .../WriteDistributionAndOrderingSuite.scala        |  4 +-
 .../streaming/test/DataStreamTableAPISuite.scala   |  6 ++-
 17 files changed, 191 insertions(+), 65 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
index d5eb03dcf94d..166554b0b4ca 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connector.catalog;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.catalog.constraints.Constraint;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
@@ -83,4 +84,9 @@ public interface Table {
    * Returns the set of capabilities for this table.
    */
   Set<TableCapability> capabilities();
+
+  /**
+   * Returns the constraints for this table.
+   */
+  default Constraint[] constraints() { return new Constraint[0]; }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
index dceac1b484cf..a60c827d5ace 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -61,6 +61,21 @@ public enum TableCatalogCapability {
    */
   SUPPORT_COLUMN_DEFAULT_VALUE,
 
+  /**
+   * Signals that the TableCatalog supports defining table constraints in
+   * CREATE/REPLACE/ALTER TABLE.
+   * <p>
+   * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with 
table constraints
+   * defined in the table schema will throw an exception during analysis.
+   * <p>
+   * Table constraints include CHECK, PRIMARY KEY, UNIQUE and FOREIGN KEY 
constraints.
+   * <p>
+   * Table constraints are included in the table schema for APIs like
+   * {@link TableCatalog#createTable}.
+   * See {@link Table#constraints()}.
+   */
+  SUPPORT_TABLE_CONSTRAINT,
+
   /**
    * Signals that the TableCatalog supports defining identity columns upon 
table creation in SQL.
    * <p>
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
index b931f8231f6e..c1f75c075374 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java
@@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.constraints.Constraint;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
@@ -29,6 +30,7 @@ public class TableInfo {
   private final Column[] columns;
   private final Map<String, String> properties;
   private final Transform[] partitions;
+  private final Constraint[] constraints;
 
   /**
    * Constructor for TableInfo used by the builder.
@@ -38,6 +40,7 @@ public class TableInfo {
     this.columns = builder.columns;
     this.properties = Collections.unmodifiableMap(builder.properties);
     this.partitions = builder.partitions;
+    this.constraints = builder.constraints;
   }
 
   public Column[] columns() {
@@ -56,10 +59,13 @@ public class TableInfo {
     return partitions;
   }
 
+  public Constraint[] constraints() { return constraints; }
+
   public static class Builder {
     private Column[] columns;
-    private Map<String, String> properties;
-    private Transform[] partitions;
+    private Map<String, String> properties = Maps.newHashMap();
+    private Transform[] partitions = new Transform[0];
+    private Constraint[] constraints = new Constraint[0];
 
     public Builder withColumns(Column[] columns) {
       this.columns = columns;
@@ -77,6 +83,11 @@ public class TableInfo {
       return this;
     }
 
+    public Builder withConstraints(Constraint[] constraints) {
+      this.constraints = constraints;
+      return this;
+    }
+
     public TableInfo build() {
       checkNotNull(columns, "columns should not be null");
       return new TableInfo(this);
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala
new file mode 100644
index 000000000000..9f186f9903f6
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, 
TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.constraints.Constraint
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+object ResolveTableConstraints {
+  // Fails if the given catalog does not support table constraint.
+  def validateCatalogForTableConstraint(
+      constraints: Seq[Constraint],
+      catalog: TableCatalog,
+      ident: Identifier): Unit = {
+    if (constraints.nonEmpty &&
+      
!catalog.capabilities().contains(TableCatalogCapability.SUPPORT_TABLE_CONSTRAINT))
 {
+      throw QueryCompilationErrors.unsupportedTableOperationError(
+        catalog, ident, "table constraint")
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index e7e829ab32d6..f09a8aa14d30 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.connector.catalog.constraints.Constraint
 import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
ScalarFunction, UnboundFunction}
-import org.apache.spark.sql.connector.expressions.{Expressions, 
LogicalExpressions, Transform}
+import org.apache.spark.sql.connector.expressions.{Expressions, 
FieldReference, LogicalExpressions, Transform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -167,6 +168,36 @@ class CatalogSuite extends SparkFunSuite {
     assert(catalog.tableExists(testIdent))
   }
 
+  test("createTable: with constraints") {
+    val catalog = newCatalog()
+
+    val columns = Array(
+      Column.create("id", IntegerType, false),
+      Column.create("data", StringType))
+    val constraints: Array[Constraint] = Array(
+      Constraint.primaryKey("pk", Array(FieldReference.column("id"))).build(),
+      Constraint.check("chk").predicateSql("id > 0").build(),
+      Constraint.unique("uk", Array(FieldReference.column("data"))).build(),
+      Constraint.foreignKey("fk", Array(FieldReference.column("data")), 
testIdentNew,
+        Array(FieldReference.column("id"))).build()
+    )
+    val tableInfo = new TableInfo.Builder()
+      .withColumns(columns)
+      .withPartitions(emptyTrans)
+      .withProperties(emptyProps)
+      .withConstraints(constraints)
+      .build()
+    val table = catalog.createTable(testIdent, tableInfo)
+
+    val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+    assert(parsed == Seq("test", "`", ".", "test_table"))
+    assert(table.columns === columns)
+    assert(table.constraints === constraints)
+    assert(table.properties.asScala == Map())
+
+    assert(catalog.tableExists(testIdent))
+  }
+
   test("createTable: table already exists") {
     val catalog = newCatalog()
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala
index 3b8020003aa4..17f908370f76 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala
@@ -43,4 +43,8 @@ class InMemoryPartitionTableCatalog extends 
InMemoryTableCatalog {
     namespaces.putIfAbsent(ident.namespace.toList, Map())
     table
   }
+
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    createTable(ident, tableInfo.columns(), tableInfo.partitions(), 
tableInfo.properties)
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
index f5a69cd96d96..fcf5070bd51d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala
@@ -44,6 +44,10 @@ class InMemoryRowLevelOperationTableCatalog extends 
InMemoryTableCatalog {
     table
   }
 
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    createTable(ident, tableInfo.columns(), tableInfo.partitions(), 
tableInfo.properties)
+  }
+
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
     val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
     val properties = CatalogV2Util.applyPropertiesChanges(table.properties, 
changes)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index f8eb32f6e924..50e2449623e5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connector.catalog
 
 import java.util
 
+import org.apache.spark.sql.connector.catalog.constraints.Constraint
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
SupportsOverwrite, WriteBuilder, WriterCommitMessage}
@@ -35,6 +36,7 @@ class InMemoryTable(
     schema: StructType,
     override val partitioning: Array[Transform],
     override val properties: util.Map[String, String],
+    override val constraints: Array[Constraint] = Array.empty,
     distribution: Distribution = Distributions.unspecified(),
     ordering: Array[SortOrder] = Array.empty,
     numPartitions: Option[Int] = None,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 1b223eae9839..20c7dc0a7a34 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.constraints.Constraint
 import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, 
ProcedureParameter, UnboundProcedure}
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
@@ -90,14 +91,20 @@ class BasicInMemoryTableCatalog extends TableCatalog {
   }
 
   override def createTable(
-      ident: Identifier,
-      columns: Array[Column],
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): Table = {
+    ident: Identifier,
+    columns: Array[Column],
+    partitions: Array[Transform],
+    properties: util.Map[String, String]): Table = {
     createTable(ident, columns, partitions, properties, 
Distributions.unspecified(),
       Array.empty, None, None)
   }
 
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    createTable(ident, tableInfo.columns(), tableInfo.partitions(), 
tableInfo.properties(),
+      Distributions.unspecified(), Array.empty, None, None, 
tableInfo.constraints())
+  }
+
+  // scalastyle:off argcount
   def createTable(
       ident: Identifier,
       columns: Array[Column],
@@ -107,8 +114,10 @@ class BasicInMemoryTableCatalog extends TableCatalog {
       ordering: Array[SortOrder],
       requiredNumPartitions: Option[Int],
       advisoryPartitionSize: Option[Long],
+      constraints: Array[Constraint] = Array.empty,
       distributionStrictlyRequired: Boolean = true,
       numRowsPerSplit: Int = Int.MaxValue): Table = {
+    // scalastyle:on argcount
     val schema = CatalogV2Util.v2ColumnsToStructType(columns)
     if (tables.containsKey(ident)) {
       throw new TableAlreadyExistsException(ident.asMultipartIdentifier)
@@ -117,9 +126,9 @@ class BasicInMemoryTableCatalog extends TableCatalog {
     InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
 
     val tableName = s"$name.${ident.quoted}"
-    val table = new InMemoryTable(tableName, schema, partitions, properties, 
distribution,
-      ordering, requiredNumPartitions, advisoryPartitionSize, 
distributionStrictlyRequired,
-      numRowsPerSplit)
+    val table = new InMemoryTable(tableName, schema, partitions, properties, 
constraints,
+      distribution, ordering, requiredNumPartitions, advisoryPartitionSize,
+      distributionStrictlyRequired, numRowsPerSplit)
     tables.put(ident, table)
     namespaces.putIfAbsent(ident.namespace.toList, Map())
     table
@@ -178,6 +187,7 @@ class InMemoryTableCatalog extends 
BasicInMemoryTableCatalog with SupportsNamesp
   override def capabilities: java.util.Set[TableCatalogCapability] = {
     Set(
       TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE,
+      TableCatalogCapability.SUPPORT_TABLE_CONSTRAINT,
       TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
       TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS
     ).asJava
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala
index 7ec1cab304ad..861badd39079 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala
@@ -43,4 +43,8 @@ class InMemoryTableWithV2FilterCatalog extends 
InMemoryTableCatalog {
     namespaces.putIfAbsent(ident.namespace.toList, Map())
     table
   }
+
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    createTable(ident, tableInfo.columns(), tableInfo.partitions(), 
tableInfo.properties)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 232cb0935a26..511e66a272ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, 
Attribute, DynamicPruning
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, 
V2ExpressionBuilder}
 import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, 
SupportsPartitionManagement, SupportsWrite, Table, TableCapability, 
TableCatalog, TruncatableTable}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
@@ -185,11 +185,14 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case c @ CreateTable(ResolvedIdentifier(catalog, ident), columns, 
partitioning,
         tableSpec: TableSpec, ifNotExists) =>
-      ResolveDefaultColumns.validateCatalogForDefaultValue(columns, 
catalog.asTableCatalog, ident)
+      val tableCatalog = catalog.asTableCatalog
+      ResolveDefaultColumns.validateCatalogForDefaultValue(columns, 
tableCatalog, ident)
+      ResolveTableConstraints.validateCatalogForTableConstraint(
+        tableSpec.constraints, tableCatalog, ident)
       val statementType = "CREATE TABLE"
       GeneratedColumn.validateGeneratedColumns(
-        c.tableSchema, catalog.asTableCatalog, ident, statementType)
-      IdentityColumn.validateIdentityColumn(c.tableSchema, 
catalog.asTableCatalog, ident)
+        c.tableSchema, tableCatalog, ident, statementType)
+      IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)
 
       CreateTableExec(
         catalog.asTableCatalog,
@@ -215,11 +218,14 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case c @ ReplaceTable(
         ResolvedIdentifier(catalog, ident), columns, parts, tableSpec: 
TableSpec, orCreate) =>
-      ResolveDefaultColumns.validateCatalogForDefaultValue(columns, 
catalog.asTableCatalog, ident)
+      val tableCatalog = catalog.asTableCatalog
+      ResolveDefaultColumns.validateCatalogForDefaultValue(columns, 
tableCatalog, ident)
+      ResolveTableConstraints.validateCatalogForTableConstraint(
+        tableSpec.constraints, tableCatalog, ident)
       val statementType = "REPLACE TABLE"
       GeneratedColumn.validateGeneratedColumns(
-        c.tableSchema, catalog.asTableCatalog, ident, statementType)
-      IdentityColumn.validateIdentityColumn(c.tableSchema, 
catalog.asTableCatalog, ident)
+        c.tableSchema, tableCatalog, ident, statementType)
+      IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)
 
       val v2Columns = columns.map(_.toV2Column(statementType)).toArray
       catalog match {
@@ -227,7 +233,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
           AtomicReplaceTableExec(staging, ident, v2Columns, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, 
invalidateCache) :: Nil
         case _ =>
-          ReplaceTableExec(catalog.asTableCatalog, ident, v2Columns, parts,
+          ReplaceTableExec(tableCatalog, ident, v2Columns, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, 
invalidateCache) :: Nil
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala
index b48ff7121c76..ee4dfa929cf8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala
@@ -17,11 +17,8 @@
 
 package org.apache.spark.sql
 
-import java.util.Collections
-
 import org.apache.spark.{SparkConf, SparkRuntimeException}
-import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, Identifier, 
InMemoryTableCatalog}
-import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, Identifier, 
InMemoryTableCatalog, TableInfo}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StructType}
@@ -208,13 +205,15 @@ class RuntimeNullChecksV2Writes extends QueryTest with 
SQLTestUtils with SharedS
   private def checkNullableArrayWithNotNullElement(byName: Boolean): Unit = {
     withTable("t") {
       val structType = new StructType().add("x", "int").add("y", "int")
+      val tableInfo = new TableInfo.Builder()
+        .withColumns(Array(
+          ColumnV2.create("i", IntegerType),
+          ColumnV2.create("arr", ArrayType(structType, containsNull = false))))
+        .build()
+
       catalog.createTable(
         ident = Identifier.of(Array(), "t"),
-        columns = Array(
-          ColumnV2.create("i", IntegerType),
-          ColumnV2.create("arr", ArrayType(structType, containsNull = false))),
-        partitions = Array.empty[Transform],
-        properties = Collections.emptyMap[String, String])
+        tableInfo = tableInfo)
 
       if (byName) {
         val inputDF = sql("SELECT 1 AS i, null AS arr")
@@ -255,13 +254,14 @@ class RuntimeNullChecksV2Writes extends QueryTest with 
SQLTestUtils with SharedS
   private def checkNotNullFieldsInsideNullableArray(byName: Boolean): Unit = {
     withTable("t") {
       val structType = new StructType().add("x", "int", nullable = 
false).add("y", "int")
+      val tableInfo = new TableInfo.Builder()
+        .withColumns(Array(
+          ColumnV2.create("i", IntegerType),
+          ColumnV2.create("arr", ArrayType(structType, containsNull = true))))
+        .build()
       catalog.createTable(
         ident = Identifier.of(Array(), "t"),
-        columns = Array(
-          ColumnV2.create("i", IntegerType),
-          ColumnV2.create("arr", ArrayType(structType, containsNull = true))),
-        partitions = Array.empty[Transform],
-        properties = Collections.emptyMap[String, String])
+        tableInfo = tableInfo)
 
       if (byName) {
         val inputDF = sql(
@@ -309,13 +309,14 @@ class RuntimeNullChecksV2Writes extends QueryTest with 
SQLTestUtils with SharedS
 
   private def checkNullableMapWithNonNullValues(byName: Boolean): Unit = {
     withTable("t") {
+      val tableInfo = new TableInfo.Builder()
+        .withColumns(Array(
+          ColumnV2.create("i", IntegerType),
+          ColumnV2.create("m", MapType(IntegerType, IntegerType, 
valueContainsNull = false))))
+        .build()
       catalog.createTable(
         ident = Identifier.of(Array(), "t"),
-        columns = Array(
-          ColumnV2.create("i", IntegerType),
-          ColumnV2.create("m", MapType(IntegerType, IntegerType, 
valueContainsNull = false))),
-        partitions = Array.empty[Transform],
-        properties = Collections.emptyMap[String, String])
+        tableInfo = tableInfo)
 
       if (byName) {
         val inputDF = sql("SELECT 1 AS i, null AS m")
@@ -348,13 +349,14 @@ class RuntimeNullChecksV2Writes extends QueryTest with 
SQLTestUtils with SharedS
   private def checkNotNullFieldsInsideNullableMap(byName: Boolean): Unit = {
     withTable("t") {
       val structType = new StructType().add("x", "int", nullable = 
false).add("y", "int")
+      val tableInfo = new TableInfo.Builder()
+        .withColumns(Array(
+          ColumnV2.create("i", IntegerType),
+          ColumnV2.create("m", MapType(structType, structType, 
valueContainsNull = true))))
+        .build()
       catalog.createTable(
         ident = Identifier.of(Array(), "t"),
-        columns = Array(
-          ColumnV2.create("i", IntegerType),
-          ColumnV2.create("m", MapType(structType, structType, 
valueContainsNull = true))),
-        partitions = Array.empty[Transform],
-        properties = Collections.emptyMap[String, String])
+        tableInfo = tableInfo)
 
       if (byName) {
         val inputDF = sql("SELECT 1 AS i, map(named_struct('x', 1, 'y', 1), 
null) AS m")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index b2b907c5cee2..747ecd7d9833 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3805,12 +3805,8 @@ class SimpleDelegatingCatalog extends 
DelegatingCatalogExtension {
 
 
 class V2CatalogSupportBuiltinDataSource extends InMemoryCatalog {
-  override def createTable(
-      ident: Identifier,
-      columns: Array[ColumnV2],
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): Table = {
-    super.createTable(ident, columns, partitions, properties)
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    super.createTable(ident, tableInfo)
     null
   }
 
@@ -3836,12 +3832,9 @@ class V2CatalogSupportBuiltinDataSource extends 
InMemoryCatalog {
 }
 
 class ReadOnlyCatalog extends InMemoryCatalog {
-  override def createTable(
-      ident: Identifier,
-      columns: Array[ColumnV2],
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): Table = {
-    super.createTable(ident, columns, partitions, properties)
+
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    super.createTable(ident, tableInfo)
     null
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala
index fc808c835bb9..dc72fd855368 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, 
Column, Identifier, SupportsRead, Table, TableCapability}
-import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, 
Identifier, SupportsRead, Table, TableCapability, TableInfo}
 import org.apache.spark.sql.connector.read.{LocalScan, Scan, ScanBuilder}
 import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.internal.SQLConf
@@ -55,11 +54,7 @@ class LocalScanSuite extends QueryTest with 
SharedSparkSession {
 }
 
 class TestLocalScanCatalog extends BasicInMemoryTableCatalog {
-  override def createTable(
-      ident: Identifier,
-      columns: Array[Column],
-      partitions: Array[Transform],
-      properties: java.util.Map[String, String]): Table = {
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
     val table = new TestLocalScanTable(ident.toString)
     tables.put(ident, table)
     table
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
index 50272fac4a4a..bfde1984f1fb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
 import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, 
SQLContext}
-import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, 
CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, 
CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability, 
TableInfo}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns, V1Scan}
 import org.apache.spark.sql.execution.RowDataSourceScanExec
@@ -113,6 +113,10 @@ class V1ReadFallbackCatalog extends 
BasicInMemoryTableCatalog {
     tables.put(ident, table)
     table
   }
+
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    createTable(ident, tableInfo.columns(), tableInfo.partitions(), 
tableInfo.properties)
+  }
 }
 
 object V1ReadFallbackCatalog {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index 842ae63122cd..0b313eb64f26 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -1219,7 +1219,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     // scalastyle:on argcount
 
     catalog.createTable(ident, columns, Array.empty, emptyProps, 
tableDistribution,
-      tableOrdering, tableNumPartitions, tablePartitionSize,
+      tableOrdering, tableNumPartitions, tablePartitionSize, Array.empty,
       distributionStrictlyRequired)
 
     val df = if (!dataSkewed) {
@@ -1322,7 +1322,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       expectAnalysisException: Boolean = false): Unit = {
 
     catalog.createTable(ident, columns, Array.empty, emptyProps, 
tableDistribution,
-      tableOrdering, tableNumPartitions, tablePartitionSize)
+      tableOrdering, tableNumPartitions, tablePartitionSize, Array.empty)
 
     withTempDir { checkpointDir =>
       val inputData = MemoryStream[(Long, String, Date)]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 86c4e49f6f66..aff3d045b52e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.connector.{FakeV2Provider, 
FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog}
-import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, 
Table, TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, 
Table, TableCapability, TableInfo, V2TableWithV1Fallback}
 import org.apache.spark.sql.connector.expressions.{ClusterByTransform, 
FieldReference, Transform}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
MemoryStreamScanBuilder, StreamingQueryWrapper}
@@ -708,4 +708,8 @@ class InMemoryStreamTableCatalog extends 
InMemoryTableCatalog {
     namespaces.putIfAbsent(ident.namespace.toList, Map())
     table
   }
+
+  override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
+    createTable(ident, tableInfo.columns(), tableInfo.partitions(), 
tableInfo.properties)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to