This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d4eea4c32d25 [SPARK-51723][SQL] Add DSv2 APIs for create/get table constraints d4eea4c32d25 is described below commit d4eea4c32d25da7ee29c2f99c4e1bbf96ed1555c Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Thu Apr 10 15:16:04 2025 -0700 [SPARK-51723][SQL] Add DSv2 APIs for create/get table constraints ### What changes were proposed in this pull request? This PR adds the following DSv2 APIs as per SPIP [doc](https://docs.google.com/document/d/1EHjB4W1LjiXxsK_G7067j9pPX0y15LUF1Z5DlUPoPIo/): * Constraints info from a DSV2 table * Create table with constraints ### Why are the changes needed? For constraints support in Spark ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #50519 from gengliangwang/dsv2Changes. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/sql/connector/catalog/Table.java | 6 +++ .../connector/catalog/TableCatalogCapability.java | 15 +++++++ .../spark/sql/connector/catalog/TableInfo.java | 15 ++++++- .../catalyst/util/ResolveTableConstraints.scala | 35 +++++++++++++++ .../spark/sql/connector/catalog/CatalogSuite.scala | 33 +++++++++++++- .../catalog/InMemoryPartitionTableCatalog.scala | 4 ++ .../InMemoryRowLevelOperationTableCatalog.scala | 4 ++ .../sql/connector/catalog/InMemoryTable.scala | 2 + .../connector/catalog/InMemoryTableCatalog.scala | 24 ++++++++--- .../catalog/InMemoryTableWithV2FilterCatalog.scala | 4 ++ .../datasources/v2/DataSourceV2Strategy.scala | 22 ++++++---- .../spark/sql/RuntimeNullChecksV2Writes.scala | 50 +++++++++++----------- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 17 +++----- .../spark/sql/connector/LocalScanSuite.scala | 9 +--- .../spark/sql/connector/V1ReadFallbackSuite.scala | 6 ++- .../WriteDistributionAndOrderingSuite.scala | 4 +- .../streaming/test/DataStreamTableAPISuite.scala | 6 ++- 17 files changed, 191 insertions(+), 65 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java index d5eb03dcf94d..166554b0b4ca 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.catalog.constraints.Constraint; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; @@ -83,4 +84,9 @@ public interface Table { * Returns the set of capabilities for this table. */ Set<TableCapability> capabilities(); + + /** + * Returns the constraints for this table. + */ + default Constraint[] constraints() { return new Constraint[0]; } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java index dceac1b484cf..a60c827d5ace 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -61,6 +61,21 @@ public enum TableCatalogCapability { */ SUPPORT_COLUMN_DEFAULT_VALUE, + /** + * Signals that the TableCatalog supports defining table constraints in + * CREATE/REPLACE/ALTER TABLE. + * <p> + * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with table constraints + * defined in the table schema will throw an exception during analysis. + * <p> + * Table constraints include CHECK, PRIMARY KEY, UNIQUE and FOREIGN KEY constraints. + * <p> + * Table constraints are included in the table schema for APIs like + * {@link TableCatalog#createTable}. + * See {@link Table#constraints()}. + */ + SUPPORT_TABLE_CONSTRAINT, + /** * Signals that the TableCatalog supports defining identity columns upon table creation in SQL. * <p> diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java index b931f8231f6e..c1f75c075374 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableInfo.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Maps; +import org.apache.spark.sql.connector.catalog.constraints.Constraint; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; @@ -29,6 +30,7 @@ public class TableInfo { private final Column[] columns; private final Map<String, String> properties; private final Transform[] partitions; + private final Constraint[] constraints; /** * Constructor for TableInfo used by the builder. @@ -38,6 +40,7 @@ public class TableInfo { this.columns = builder.columns; this.properties = Collections.unmodifiableMap(builder.properties); this.partitions = builder.partitions; + this.constraints = builder.constraints; } public Column[] columns() { @@ -56,10 +59,13 @@ public class TableInfo { return partitions; } + public Constraint[] constraints() { return constraints; } + public static class Builder { private Column[] columns; - private Map<String, String> properties; - private Transform[] partitions; + private Map<String, String> properties = Maps.newHashMap(); + private Transform[] partitions = new Transform[0]; + private Constraint[] constraints = new Constraint[0]; public Builder withColumns(Column[] columns) { this.columns = columns; @@ -77,6 +83,11 @@ public class TableInfo { return this; } + public Builder withConstraints(Constraint[] constraints) { + this.constraints = constraints; + return this; + } + public TableInfo build() { checkNotNull(columns, "columns should not be null"); return new TableInfo(this); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala new file mode 100644 index 000000000000..9f186f9903f6 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableCatalogCapability} +import org.apache.spark.sql.connector.catalog.constraints.Constraint +import org.apache.spark.sql.errors.QueryCompilationErrors + +object ResolveTableConstraints { + // Fails if the given catalog does not support table constraint. + def validateCatalogForTableConstraint( + constraints: Seq[Constraint], + catalog: TableCatalog, + ident: Identifier): Unit = { + if (constraints.nonEmpty && + !catalog.capabilities().contains(TableCatalogCapability.SUPPORT_TABLE_CONSTRAINT)) { + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, "table constraint") + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index e7e829ab32d6..f09a8aa14d30 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} -import org.apache.spark.sql.connector.expressions.{Expressions, LogicalExpressions, Transform} +import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, LogicalExpressions, Transform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -167,6 +168,36 @@ class CatalogSuite extends SparkFunSuite { assert(catalog.tableExists(testIdent)) } + test("createTable: with constraints") { + val catalog = newCatalog() + + val columns = Array( + Column.create("id", IntegerType, false), + Column.create("data", StringType)) + val constraints: Array[Constraint] = Array( + Constraint.primaryKey("pk", Array(FieldReference.column("id"))).build(), + Constraint.check("chk").predicateSql("id > 0").build(), + Constraint.unique("uk", Array(FieldReference.column("data"))).build(), + Constraint.foreignKey("fk", Array(FieldReference.column("data")), testIdentNew, + Array(FieldReference.column("id"))).build() + ) + val tableInfo = new TableInfo.Builder() + .withColumns(columns) + .withPartitions(emptyTrans) + .withProperties(emptyProps) + .withConstraints(constraints) + .build() + val table = catalog.createTable(testIdent, tableInfo) + + val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) + assert(parsed == Seq("test", "`", ".", "test_table")) + assert(table.columns === columns) + assert(table.constraints === constraints) + assert(table.properties.asScala == Map()) + + assert(catalog.tableExists(testIdent)) + } + test("createTable: table already exists") { val catalog = newCatalog() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala index 3b8020003aa4..17f908370f76 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala @@ -43,4 +43,8 @@ class InMemoryPartitionTableCatalog extends InMemoryTableCatalog { namespaces.putIfAbsent(ident.namespace.toList, Map()) table } + + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala index f5a69cd96d96..fcf5070bd51d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala @@ -44,6 +44,10 @@ class InMemoryRowLevelOperationTableCatalog extends InMemoryTableCatalog { table } + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties) + } + override def alterTable(ident: Identifier, changes: TableChange*): Table = { val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable] val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index f8eb32f6e924..50e2449623e5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector.catalog import java.util +import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsOverwrite, WriteBuilder, WriterCommitMessage} @@ -35,6 +36,7 @@ class InMemoryTable( schema: StructType, override val partitioning: Array[Transform], override val properties: util.Map[String, String], + override val constraints: Array[Constraint] = Array.empty, distribution: Distribution = Distributions.unspecified(), ordering: Array[SortOrder] = Array.empty, numPartitions: Option[Int] = None, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 1b223eae9839..20c7dc0a7a34 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} @@ -90,14 +91,20 @@ class BasicInMemoryTableCatalog extends TableCatalog { } override def createTable( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { createTable(ident, columns, partitions, properties, Distributions.unspecified(), Array.empty, None, None) } + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(), + Distributions.unspecified(), Array.empty, None, None, tableInfo.constraints()) + } + + // scalastyle:off argcount def createTable( ident: Identifier, columns: Array[Column], @@ -107,8 +114,10 @@ class BasicInMemoryTableCatalog extends TableCatalog { ordering: Array[SortOrder], requiredNumPartitions: Option[Int], advisoryPartitionSize: Option[Long], + constraints: Array[Constraint] = Array.empty, distributionStrictlyRequired: Boolean = true, numRowsPerSplit: Int = Int.MaxValue): Table = { + // scalastyle:on argcount val schema = CatalogV2Util.v2ColumnsToStructType(columns) if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident.asMultipartIdentifier) @@ -117,9 +126,9 @@ class BasicInMemoryTableCatalog extends TableCatalog { InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) val tableName = s"$name.${ident.quoted}" - val table = new InMemoryTable(tableName, schema, partitions, properties, distribution, - ordering, requiredNumPartitions, advisoryPartitionSize, distributionStrictlyRequired, - numRowsPerSplit) + val table = new InMemoryTable(tableName, schema, partitions, properties, constraints, + distribution, ordering, requiredNumPartitions, advisoryPartitionSize, + distributionStrictlyRequired, numRowsPerSplit) tables.put(ident, table) namespaces.putIfAbsent(ident.namespace.toList, Map()) table @@ -178,6 +187,7 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp override def capabilities: java.util.Set[TableCatalogCapability] = { Set( TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE, + TableCatalogCapability.SUPPORT_TABLE_CONSTRAINT, TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS, TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS ).asJava diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala index 7ec1cab304ad..861badd39079 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala @@ -43,4 +43,8 @@ class InMemoryTableWithV2FilterCatalog extends InMemoryTableCatalog { namespaces.putIfAbsent(ident.namespace.toList, Map()) table } + + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 232cb0935a26..511e66a272ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder} import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} import org.apache.spark.sql.connector.catalog.index.SupportsIndex @@ -185,11 +185,14 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case c @ CreateTable(ResolvedIdentifier(catalog, ident), columns, partitioning, tableSpec: TableSpec, ifNotExists) => - ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog.asTableCatalog, ident) + val tableCatalog = catalog.asTableCatalog + ResolveDefaultColumns.validateCatalogForDefaultValue(columns, tableCatalog, ident) + ResolveTableConstraints.validateCatalogForTableConstraint( + tableSpec.constraints, tableCatalog, ident) val statementType = "CREATE TABLE" GeneratedColumn.validateGeneratedColumns( - c.tableSchema, catalog.asTableCatalog, ident, statementType) - IdentityColumn.validateIdentityColumn(c.tableSchema, catalog.asTableCatalog, ident) + c.tableSchema, tableCatalog, ident, statementType) + IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident) CreateTableExec( catalog.asTableCatalog, @@ -215,11 +218,14 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case c @ ReplaceTable( ResolvedIdentifier(catalog, ident), columns, parts, tableSpec: TableSpec, orCreate) => - ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog.asTableCatalog, ident) + val tableCatalog = catalog.asTableCatalog + ResolveDefaultColumns.validateCatalogForDefaultValue(columns, tableCatalog, ident) + ResolveTableConstraints.validateCatalogForTableConstraint( + tableSpec.constraints, tableCatalog, ident) val statementType = "REPLACE TABLE" GeneratedColumn.validateGeneratedColumns( - c.tableSchema, catalog.asTableCatalog, ident, statementType) - IdentityColumn.validateIdentityColumn(c.tableSchema, catalog.asTableCatalog, ident) + c.tableSchema, tableCatalog, ident, statementType) + IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident) val v2Columns = columns.map(_.toV2Column(statementType)).toArray catalog match { @@ -227,7 +233,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AtomicReplaceTableExec(staging, ident, v2Columns, parts, qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil case _ => - ReplaceTableExec(catalog.asTableCatalog, ident, v2Columns, parts, + ReplaceTableExec(tableCatalog, ident, v2Columns, parts, qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala index b48ff7121c76..ee4dfa929cf8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeNullChecksV2Writes.scala @@ -17,11 +17,8 @@ package org.apache.spark.sql -import java.util.Collections - import org.apache.spark.{SparkConf, SparkRuntimeException} -import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, Identifier, InMemoryTableCatalog} -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, Identifier, InMemoryTableCatalog, TableInfo} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StructType} @@ -208,13 +205,15 @@ class RuntimeNullChecksV2Writes extends QueryTest with SQLTestUtils with SharedS private def checkNullableArrayWithNotNullElement(byName: Boolean): Unit = { withTable("t") { val structType = new StructType().add("x", "int").add("y", "int") + val tableInfo = new TableInfo.Builder() + .withColumns(Array( + ColumnV2.create("i", IntegerType), + ColumnV2.create("arr", ArrayType(structType, containsNull = false)))) + .build() + catalog.createTable( ident = Identifier.of(Array(), "t"), - columns = Array( - ColumnV2.create("i", IntegerType), - ColumnV2.create("arr", ArrayType(structType, containsNull = false))), - partitions = Array.empty[Transform], - properties = Collections.emptyMap[String, String]) + tableInfo = tableInfo) if (byName) { val inputDF = sql("SELECT 1 AS i, null AS arr") @@ -255,13 +254,14 @@ class RuntimeNullChecksV2Writes extends QueryTest with SQLTestUtils with SharedS private def checkNotNullFieldsInsideNullableArray(byName: Boolean): Unit = { withTable("t") { val structType = new StructType().add("x", "int", nullable = false).add("y", "int") + val tableInfo = new TableInfo.Builder() + .withColumns(Array( + ColumnV2.create("i", IntegerType), + ColumnV2.create("arr", ArrayType(structType, containsNull = true)))) + .build() catalog.createTable( ident = Identifier.of(Array(), "t"), - columns = Array( - ColumnV2.create("i", IntegerType), - ColumnV2.create("arr", ArrayType(structType, containsNull = true))), - partitions = Array.empty[Transform], - properties = Collections.emptyMap[String, String]) + tableInfo = tableInfo) if (byName) { val inputDF = sql( @@ -309,13 +309,14 @@ class RuntimeNullChecksV2Writes extends QueryTest with SQLTestUtils with SharedS private def checkNullableMapWithNonNullValues(byName: Boolean): Unit = { withTable("t") { + val tableInfo = new TableInfo.Builder() + .withColumns(Array( + ColumnV2.create("i", IntegerType), + ColumnV2.create("m", MapType(IntegerType, IntegerType, valueContainsNull = false)))) + .build() catalog.createTable( ident = Identifier.of(Array(), "t"), - columns = Array( - ColumnV2.create("i", IntegerType), - ColumnV2.create("m", MapType(IntegerType, IntegerType, valueContainsNull = false))), - partitions = Array.empty[Transform], - properties = Collections.emptyMap[String, String]) + tableInfo = tableInfo) if (byName) { val inputDF = sql("SELECT 1 AS i, null AS m") @@ -348,13 +349,14 @@ class RuntimeNullChecksV2Writes extends QueryTest with SQLTestUtils with SharedS private def checkNotNullFieldsInsideNullableMap(byName: Boolean): Unit = { withTable("t") { val structType = new StructType().add("x", "int", nullable = false).add("y", "int") + val tableInfo = new TableInfo.Builder() + .withColumns(Array( + ColumnV2.create("i", IntegerType), + ColumnV2.create("m", MapType(structType, structType, valueContainsNull = true)))) + .build() catalog.createTable( ident = Identifier.of(Array(), "t"), - columns = Array( - ColumnV2.create("i", IntegerType), - ColumnV2.create("m", MapType(structType, structType, valueContainsNull = true))), - partitions = Array.empty[Transform], - properties = Collections.emptyMap[String, String]) + tableInfo = tableInfo) if (byName) { val inputDF = sql("SELECT 1 AS i, map(named_struct('x', 1, 'y', 1), null) AS m") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index b2b907c5cee2..747ecd7d9833 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -3805,12 +3805,8 @@ class SimpleDelegatingCatalog extends DelegatingCatalogExtension { class V2CatalogSupportBuiltinDataSource extends InMemoryCatalog { - override def createTable( - ident: Identifier, - columns: Array[ColumnV2], - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - super.createTable(ident, columns, partitions, properties) + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + super.createTable(ident, tableInfo) null } @@ -3836,12 +3832,9 @@ class V2CatalogSupportBuiltinDataSource extends InMemoryCatalog { } class ReadOnlyCatalog extends InMemoryCatalog { - override def createTable( - ident: Identifier, - columns: Array[ColumnV2], - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - super.createTable(ident, columns, partitions, properties) + + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + super.createTable(ident, tableInfo) null } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala index fc808c835bb9..dc72fd855368 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Column, Identifier, SupportsRead, Table, TableCapability} -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Identifier, SupportsRead, Table, TableCapability, TableInfo} import org.apache.spark.sql.connector.read.{LocalScan, Scan, ScanBuilder} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.internal.SQLConf @@ -55,11 +54,7 @@ class LocalScanSuite extends QueryTest with SharedSparkSession { } class TestLocalScanCatalog extends BasicInMemoryTableCatalog { - override def createTable( - ident: Identifier, - columns: Array[Column], - partitions: Array[Transform], - properties: java.util.Map[String, String]): Table = { + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { val table = new TestLocalScanTable(ident.toString) tables.put(ident, table) table diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala index 50272fac4a4a..bfde1984f1fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, SQLContext} -import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability, TableInfo} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns, V1Scan} import org.apache.spark.sql.execution.RowDataSourceScanExec @@ -113,6 +113,10 @@ class V1ReadFallbackCatalog extends BasicInMemoryTableCatalog { tables.put(ident, table) table } + + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties) + } } object V1ReadFallbackCatalog { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 842ae63122cd..0b313eb64f26 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -1219,7 +1219,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase // scalastyle:on argcount catalog.createTable(ident, columns, Array.empty, emptyProps, tableDistribution, - tableOrdering, tableNumPartitions, tablePartitionSize, + tableOrdering, tableNumPartitions, tablePartitionSize, Array.empty, distributionStrictlyRequired) val df = if (!dataSkewed) { @@ -1322,7 +1322,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase expectAnalysisException: Boolean = false): Unit = { catalog.createTable(ident, columns, Array.empty, emptyProps, tableDistribution, - tableOrdering, tableNumPartitions, tablePartitionSize) + tableOrdering, tableNumPartitions, tablePartitionSize, Array.empty) withTempDir { checkpointDir => val inputData = MemoryStream[(Long, String, Date)] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 86c4e49f6f66..aff3d045b52e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog} -import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, TableInfo, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference, Transform} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder, StreamingQueryWrapper} @@ -708,4 +708,8 @@ class InMemoryStreamTableCatalog extends InMemoryTableCatalog { namespaces.putIfAbsent(ident.namespace.toList, Map()) table } + + override def createTable(ident: Identifier, tableInfo: TableInfo): Table = { + createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org