This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 59e6b5b7d350 [SPARK-52475][TESTS] Remove deprecated table.schema
method from tests
59e6b5b7d350 is described below
commit 59e6b5b7d350a1603502bc92e3c117311ab2cbb6
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Jun 16 07:36:13 2025 -0700
[SPARK-52475][TESTS] Remove deprecated table.schema method from tests
### What changes were proposed in this pull request?
Fix many compile warnings like
```
[warn]
/Users/szehon.ho/repos/apache-spark/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala:282:18:
method schema in trait Table is deprecated (since 3.4.0)
[warn] Applicable -Wconf / nowarn filters for this warning: msg=<part of
the message>, cat=deprecation,
site=org.apache.spark.sql.connector.catalog.CatalogSuite,
origin=org.apache.spark.sql.connector.catalog.Table.schema, version=3.4.0
```
### Why are the changes needed?
Reduce compiler warnings
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test only
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51173 from szehon-ho/fix_warnings_other_tests.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/connector/catalog/CatalogSuite.scala | 42 +-
.../catalog/InMemoryAtomicPartitionTable.scala | 11 +-
.../connector/catalog/InMemoryPartitionTable.scala | 11 +-
.../SupportsAtomicPartitionManagementSuite.scala | 12 +-
.../catalog/SupportsPartitionManagementSuite.scala | 14 +-
.../connect/planner/SparkConnectProtoSuite.scala | 6 +-
.../apache/spark/sql/CharVarcharTestSuite.scala | 4 +-
.../spark/sql/collation/CollationSuite.scala | 25 +-
.../spark/sql/connector/AlterTableTests.scala | 463 +++++++++++----------
.../connector/SupportsCatalogOptionsSuite.scala | 13 +-
.../spark/sql/connector/V1ReadFallbackSuite.scala | 2 +-
.../datasources/v2/V2SessionCatalogSuite.scala | 25 +-
.../apache/spark/sql/internal/CatalogSuite.scala | 6 +-
.../streaming/test/DataStreamTableAPISuite.scala | 2 +-
14 files changed, 342 insertions(+), 294 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index 178c43258816..47bf1c5bdc17 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -279,7 +279,7 @@ class CatalogSuite extends SparkFunSuite {
val loaded = catalog.loadTable(testIdent)
assert(table.name == loaded.name)
- assert(table.schema == loaded.schema)
+ assert(table.columns == loaded.columns)
assert(table.properties == loaded.properties)
}
@@ -307,7 +307,7 @@ class CatalogSuite extends SparkFunSuite {
val loaded = catalog.loadTable(testIdent)
assert(table.name == loaded.name)
- assert(table.schema == loaded.schema)
+ assert(table.columns == loaded.columns)
assert(table.properties == loaded.properties)
}
@@ -544,8 +544,8 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.updateColumnType(Array("id"), LongType))
- val expectedSchema = new StructType().add("id", LongType).add("data",
StringType)
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(Column.create("id", LongType),
Column.create("data", StringType))
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: update column nullability") {
@@ -566,8 +566,9 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.updateColumnNullability(Array("id"), true))
- val expectedSchema = new StructType().add("id", IntegerType).add("data",
StringType)
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(
+ Column.create("id", IntegerType, true), Column.create("data",
StringType))
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: update missing column fails") {
@@ -606,10 +607,11 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "comment text"))
- val expectedSchema = new StructType()
- .add("id", IntegerType, nullable = true, "comment text")
- .add("data", StringType)
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(
+ Column.create("id", IntegerType, true, "comment text", null),
+ Column.create("data", StringType)
+ )
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: replace comment") {
@@ -626,14 +628,14 @@ class CatalogSuite extends SparkFunSuite {
catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"),
"comment text"))
- val expectedSchema = new StructType()
- .add("id", IntegerType, nullable = true, "replacement comment")
- .add("data", StringType)
-
+ val expectedColumns = Array(
+ Column.create("id", IntegerType, true, "replacement comment", null),
+ Column.create("data", StringType)
+ )
val updated = catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "replacement comment"))
- assert(updated.schema == expectedSchema)
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: add comment to missing column fails") {
@@ -671,9 +673,9 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.renameColumn(Array("id"), "some_id"))
- val expectedSchema = new StructType().add("some_id",
IntegerType).add("data", StringType)
-
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(
+ Column.create("some_id", IntegerType), Column.create("data", StringType))
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: rename nested column") {
@@ -785,8 +787,8 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.deleteColumn(Array("id"), false))
- val expectedSchema = new StructType().add("data", StringType)
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(Column.create("data", StringType))
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: delete nested column") {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
index 8c9039c723eb..a4a70fff5e9f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
@@ -30,12 +30,19 @@ import org.apache.spark.util.ArrayImplicits._
*/
class InMemoryAtomicPartitionTable (
name: String,
- schema: StructType,
+ columns: Array[Column],
partitioning: Array[Transform],
properties: util.Map[String, String])
- extends InMemoryPartitionTable(name, schema, partitioning, properties)
+ extends InMemoryPartitionTable(name, columns, partitioning, properties)
with SupportsAtomicPartitionManagement {
+ def this(
+ name: String,
+ schema: StructType,
+ partitioning: Array[Transform],
+ properties: util.Map[String, String]) =
+ this(name, CatalogV2Util.structTypeToV2Columns(schema), partitioning,
properties)
+
override def createPartition(
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
index ad55af81a364..aaea6a2b11cd 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
@@ -33,12 +33,19 @@ import org.apache.spark.sql.types.StructType
*/
class InMemoryPartitionTable(
name: String,
- schema: StructType,
+ columns: Array[Column],
partitioning: Array[Transform],
properties: util.Map[String, String])
- extends InMemoryTable(name, schema, partitioning, properties) with
SupportsPartitionManagement {
+ extends InMemoryTable(name, columns, partitioning, properties) with
SupportsPartitionManagement {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ def this(
+ name: String,
+ schema: StructType,
+ partitioning: Array[Transform],
+ properties: util.Map[String, String]
+ ) = this(name, CatalogV2Util.structTypeToV2Columns(schema), partitioning,
properties)
+
protected val memoryTablePartitions: util.Map[InternalRow, util.Map[String,
String]] =
new ConcurrentHashMap[InternalRow, util.Map[String, String]]()
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
index a9d8a69128ae..ce6a72c41491 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
@@ -53,7 +53,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
test("createPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
@@ -72,7 +72,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
test("createPartitions failed if partition already exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("4")
@@ -94,7 +94,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
test("dropPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
@@ -112,7 +112,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
test("purgePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
@@ -129,7 +129,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
test("dropPartitions failed if partition not exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("4")
@@ -147,7 +147,7 @@ class SupportsAtomicPartitionManagementSuite extends
SparkFunSuite {
test("truncatePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
partTable.createPartitions(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
index 8581d4dec1fb..5a62fe282c93 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
@@ -55,7 +55,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("createPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
@@ -70,7 +70,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("dropPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
@@ -88,7 +88,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("purgePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
checkError(
exception = intercept[SparkUnsupportedOperationException] {
partTable.purgePartition(InternalRow.apply("3"))
@@ -101,7 +101,7 @@ class SupportsPartitionManagementSuite extends
SparkFunSuite {
test("replacePartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
@@ -123,7 +123,7 @@ class SupportsPartitionManagementSuite extends
SparkFunSuite {
test("loadPartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
@@ -140,7 +140,7 @@ class SupportsPartitionManagementSuite extends
SparkFunSuite {
test("listPartitionIdentifiers") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
@@ -248,7 +248,7 @@ class SupportsPartitionManagementSuite extends
SparkFunSuite {
test("truncatePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
- table.name(), table.schema(), table.partitioning(), table.properties())
+ table.name(), table.columns(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 5c43715d2dd1..7b734f93e595 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.connect.dsl.MockRemoteSession
import org.apache.spark.sql.connect.dsl.commands._
import org.apache.spark.sql.connect.dsl.expressions._
import org.apache.spark.sql.connect.dsl.plans._
-import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, Identifier,
InMemoryTableCatalog, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.functions._
@@ -822,7 +822,9 @@ class SparkConnectProtoSuite extends PlanTest with
SparkConnectPlanTest {
.asTableCatalog
.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name === "testcat.table_name")
- assert(table.schema === new StructType().add("id", LongType).add("data",
StringType))
+ assert(
+ table.columns sameElements
+ Array(ColumnV2.create("id", LongType), ColumnV2.create("data",
StringType)))
assert(table.partitioning.isEmpty)
assert(table.properties === (Map("provider" -> "foo") ++
defaultOwnership).asJava)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index eceadc338bfc..6139d0e98767 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.SchemaRequiredDataSource
-import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
InMemoryPartitionTableCatalog}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
@@ -923,7 +923,7 @@ class BasicCharVarcharTestSuite extends QueryTest with
SharedSparkSession {
def checkSchema(df: DataFrame): Unit = {
val schemas = df.queryExecution.analyzed.collect {
case l: LogicalRelation => l.relation.schema
- case d: DataSourceV2Relation => d.table.schema()
+ case d: DataSourceV2Relation =>
CatalogV2Util.v2ColumnsToStructType(d.table.columns())
}
assert(schemas.length == 1)
assert(schemas.head.map(_.dataType) == Seq(StringType))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
index c278233b557b..4044e5674191 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.connector.{DatasourceV2SQLBase,
FakeV2ProviderWithCustomSchema}
-import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
InMemoryTable}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
@@ -610,16 +610,16 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
sql(s"ALTER TABLE $tableName ALTER COLUMN c3.value TYPE STRING COLLATE
UTF8_BINARY")
sql(s"ALTER TABLE $tableName ALTER COLUMN c4.t TYPE STRING COLLATE
UNICODE")
val testCatalog = catalog("testcat").asTableCatalog
- val tableSchema = testCatalog.loadTable(Identifier.of(Array(),
"alter_column_tbl")).schema()
- val c1Metadata = tableSchema.find(_.name == "c1").get.metadata
- assert(c1Metadata === createMetadata("c1"))
- val c2Metadata = tableSchema.find(_.name == "c2").get.metadata
- assert(c2Metadata === createMetadata("c2"))
- val c3Metadata = tableSchema.find(_.name == "c3").get.metadata
- assert(c3Metadata === createMetadata("c3"))
- val c4Metadata = tableSchema.find(_.name == "c4").get.metadata
- assert(c4Metadata === createMetadata("c4"))
- val c4tMetadata = tableSchema.find(_.name == "c4").get.dataType
+ val columns = testCatalog.loadTable(Identifier.of(Array(),
"alter_column_tbl")).columns()
+ val c1Metadata = columns.find(_.name() == "c1").get.metadataInJSON()
+ assert(c1Metadata === createMetadata("c1").json)
+ val c2Metadata = columns.find(_.name() == "c2").get.metadataInJSON()
+ assert(c2Metadata === createMetadata("c2").json)
+ val c3Metadata = columns.find(_.name() == "c3").get.metadataInJSON()
+ assert(c3Metadata === createMetadata("c3").json)
+ val c4Metadata = columns.find(_.name() == "c4").get.metadataInJSON()
+ assert(c4Metadata === createMetadata("c4").json)
+ val c4tMetadata = columns.find(_.name() == "c4").get.dataType()
.asInstanceOf[StructType].find(_.name == "t").get.metadata
assert(c4tMetadata === createMetadata("c4t"))
}
@@ -864,7 +864,8 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
assert(table.columns().head.dataType() == StringType(collationId))
val rdd =
spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
- checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
+ checkAnswer(spark.internalCreateDataFrame(rdd,
+ CatalogV2Util.v2ColumnsToStructType(table.columns)), Seq.empty)
sql(s"INSERT INTO $tableName VALUES ('a'), ('A')")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 53528bf2eae2..9b01cd9f75bd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.connector.catalog.{Column, Table}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
Table}
import
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
+import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -86,7 +87,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType().add("id", IntegerType))
+ assert(table.columns sameElements Array(Column.create("id",
IntegerType)))
}
}
@@ -99,7 +100,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType().add("id",
IntegerType).add("data", StringType))
+ assert(table.columns sameElements
+ Array(Column.create("id", IntegerType),
+ Column.create("data", StringType)))
}
}
@@ -112,9 +115,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === StructType(Seq(
- StructField("id", IntegerType),
- StructField("data", StringType, nullable = false))))
+ assert(table.columns sameElements
+ Array(Column.create("id", IntegerType),
+ Column.create("data", StringType, false)))
}
}
@@ -127,9 +130,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === StructType(Seq(
- StructField("id", IntegerType),
- StructField("data", StringType).withComment("doc"))))
+ assert(table.columns sameElements
+ Array(Column.create("id", IntegerType),
+ Column.create("data", StringType, true, "doc", null)))
}
}
@@ -152,15 +155,17 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
sql(s"CREATE TABLE $t (point struct<x: int>) USING $v2Format")
sql(s"ALTER TABLE $t ADD COLUMN a string FIRST")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", StringType)
- .add("point", new StructType().add("x", IntegerType)))
+ assert(getTableMetadata(t).columns sameElements
+ Array(
+ Column.create("a", StringType),
+ Column.create("point", new StructType().add("x", IntegerType))))
sql(s"ALTER TABLE $t ADD COLUMN b string AFTER point")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", StringType)
- .add("point", new StructType().add("x", IntegerType))
- .add("b", StringType))
+ assert(getTableMetadata(t).columns sameElements
+ Array(
+ Column.create("a", StringType),
+ Column.create("point", new StructType().add("x", IntegerType)),
+ Column.create("b", StringType)))
val e1 = intercept[AnalysisException](
sql(s"ALTER TABLE $t ADD COLUMN c string AFTER non_exist"))
@@ -171,21 +176,22 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
)
sql(s"ALTER TABLE $t ADD COLUMN point.y int FIRST")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", StringType)
- .add("point", new StructType()
- .add("y", IntegerType)
- .add("x", IntegerType))
- .add("b", StringType))
+ assert(getTableMetadata(t).columns sameElements
+ Array(Column.create("a", StringType),
+ Column.create("point", new StructType()
+ .add("y", IntegerType)
+ .add("x", IntegerType)),
+ Column.create("b", StringType)))
sql(s"ALTER TABLE $t ADD COLUMN point.z int AFTER x")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", StringType)
- .add("point", new StructType()
- .add("y", IntegerType)
- .add("x", IntegerType)
- .add("z", IntegerType))
- .add("b", StringType))
+ assert(getTableMetadata(t).columns sameElements
+ Array(
+ Column.create("a", StringType),
+ Column.create("point", new StructType()
+ .add("y", IntegerType)
+ .add("x", IntegerType)
+ .add("z", IntegerType)),
+ Column.create("b", StringType)))
val e2 = intercept[AnalysisException](
sql(s"ALTER TABLE $t ADD COLUMN point.x2 int AFTER non_exist"))
@@ -203,28 +209,30 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
sql(s"CREATE TABLE $t (a string, b int, point struct<x: double, y:
double>) USING $v2Format")
sql(s"ALTER TABLE $t ADD COLUMNS (x int AFTER a, y int AFTER x, z int
AFTER y)")
- assert(getTableMetadata(t).schema === new StructType()
- .add("a", StringType)
- .add("x", IntegerType)
- .add("y", IntegerType)
- .add("z", IntegerType)
- .add("b", IntegerType)
- .add("point", new StructType()
- .add("x", DoubleType)
- .add("y", DoubleType)))
+ assert(getTableMetadata(t).columns sameElements
+ Array(
+ Column.create("a", StringType),
+ Column.create("x", IntegerType),
+ Column.create("y", IntegerType),
+ Column.create("z", IntegerType),
+ Column.create("b", IntegerType),
+ Column.create("point", new StructType()
+ .add("x", DoubleType)
+ .add("y", DoubleType))))
sql(s"ALTER TABLE $t ADD COLUMNS (point.z double AFTER x, point.zz
double AFTER z)")
- assert(getTableMetadata(t).schema === new StructType()
- .add("a", StringType)
- .add("x", IntegerType)
- .add("y", IntegerType)
- .add("z", IntegerType)
- .add("b", IntegerType)
- .add("point", new StructType()
- .add("x", DoubleType)
- .add("z", DoubleType)
- .add("zz", DoubleType)
- .add("y", DoubleType)))
+ assert(getTableMetadata(t).columns sameElements
+ Array(
+ Column.create("a", StringType),
+ Column.create("x", IntegerType),
+ Column.create("y", IntegerType),
+ Column.create("z", IntegerType),
+ Column.create("b", IntegerType),
+ Column.create("point", new StructType()
+ .add("x", DoubleType)
+ .add("z", DoubleType)
+ .add("zz", DoubleType)
+ .add("y", DoubleType))))
// The new column being referenced should come before being referenced.
val e = intercept[AnalysisException](
@@ -246,10 +254,10 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === StructType(Seq(
- StructField("id", IntegerType),
- StructField("data", StringType).withComment("doc"),
- StructField("ts", TimestampType))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("data", StringType, true, "doc", null),
+ Column.create("ts", TimestampType)))
}
}
@@ -262,12 +270,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType),
- StructField("z", DoubleType)))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", new StructType()
+ .add("x", DoubleType)
+ .add("y", DoubleType)
+ .add("z", DoubleType))))
}
}
@@ -281,12 +289,14 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType),
- StructField("z", DoubleType))), LongType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ MapType(StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType),
+ StructField("z", DoubleType))),
+ LongType))))
}
}
@@ -300,12 +310,13 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StringType, StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType),
- StructField("z", DoubleType))))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ MapType(StringType, StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType),
+ StructField("z", DoubleType)))))))
}
}
@@ -318,12 +329,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(StructType(Seq(
StructField("x", DoubleType),
StructField("y", DoubleType),
- StructField("z", DoubleType))))))
+ StructField("z", DoubleType)))))))
}
}
@@ -337,28 +348,25 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("a", StringType)
- .add(StructField("b", IntegerType)
- .withCurrentDefaultValue("2 + 3")
- .withExistenceDefaultValue("5")))
+ assert(table.columns sameElements Array(
+ Column.create("a", StringType),
+ Column.create("b", IntegerType, true, null,
+ new ColumnDefaultValue("2 + 3", LiteralValue(5, IntegerType)),
null)))
sql(s"alter table $t alter column b set default 2 + 3")
assert(
- getTableMetadata(t).schema === new StructType()
- .add("a", StringType)
- .add(StructField("b", IntegerType)
- .withCurrentDefaultValue("2 + 3")
- .withExistenceDefaultValue("5")))
+ getTableMetadata(t).columns sameElements Array(
+ Column.create("a", StringType),
+ Column.create("b", IntegerType, true, null,
+ new ColumnDefaultValue("2 + 3", LiteralValue(5, IntegerType)),
null)))
sql(s"alter table $t alter column b drop default")
assert(
- getTableMetadata(t).schema === new StructType()
- .add("a", StringType)
- .add(StructField("b", IntegerType)
- .withExistenceDefaultValue("5")))
+ getTableMetadata(t).columns sameElements Array(
+ Column.create("a", StringType),
+ Column.create("b", IntegerType, true, null,
"""{"EXISTS_DEFAULT":"5"}""")))
}
}
}
@@ -401,11 +409,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(StructType(Seq(
StructField("x", DoubleType),
StructField("y", DoubleType))))))
+ )
}
}
@@ -418,12 +427,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(StructType(Seq(
StructField("x", DoubleType),
StructField("y", DoubleType),
- StructField("z", DoubleType).withComment("doc"))))))
+ StructField("z", DoubleType).withComment("doc")))))))
}
}
@@ -521,7 +530,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType().add("id", LongType))
+ assert(table.columns sameElements Array(Column.create("id", LongType)))
}
}
@@ -531,7 +540,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
sql(s"CREATE TABLE $t (id int) USING $v2Format")
(DataTypeTestUtils.dayTimeIntervalTypes ++
DataTypeTestUtils.yearMonthIntervalTypes)
.foreach {
- case d: DataType => d.typeName
+ d: DataType => d.typeName
val sqlText = s"ALTER TABLE $t ALTER COLUMN id TYPE ${d.typeName}"
checkError(
@@ -559,12 +568,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType().add("id", LongType, nullable =
false))
+ assert(table.columns sameElements Array(Column.create("id", LongType,
false)))
sql(s"ALTER TABLE $t ALTER COLUMN id DROP NOT NULL")
val table2 = getTableMetadata(t)
assert(table2.name === t)
- assert(table2.schema === new StructType().add("id", LongType))
+ assert(table2.columns sameElements Array(Column.create("id", LongType,
true)))
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ALTER COLUMN id SET NOT NULL")
@@ -581,11 +590,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType)))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", new StructType()
+ .add("x", DoubleType)
+ .add("y", DoubleType))))
}
}
@@ -613,11 +622,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType)))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", new StructType()
+ .add("x", DoubleType)
+ .add("y", DoubleType))))
}
}
@@ -644,9 +653,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(IntegerType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(IntegerType))))
}
}
@@ -659,9 +668,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(LongType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(LongType))))
}
}
@@ -688,9 +697,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("m", MapType(StringType, IntegerType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("m", MapType(StringType, IntegerType))))
}
}
@@ -703,9 +712,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("m", MapType(StringType, LongType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("m", MapType(StringType, LongType))))
}
}
@@ -719,11 +728,13 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType))), LongType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ MapType(StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType))),
+ LongType))))
}
}
@@ -737,11 +748,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StringType, StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType))))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ MapType(StringType, StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType)))))))
}
}
@@ -754,11 +766,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType))))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ ArrayType(StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType)))))))
}
}
@@ -834,7 +847,8 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === StructType(Seq(StructField("id",
IntegerType).withComment("doc"))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType, true, "doc", null)))
}
}
@@ -844,22 +858,22 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
sql(s"CREATE TABLE $t (a int, b int, point struct<x: int, y: int, z:
int>) USING $v2Format")
sql(s"ALTER TABLE $t ALTER COLUMN b FIRST")
- assert(getTableMetadata(t).schema == new StructType()
- .add("b", IntegerType)
- .add("a", IntegerType)
- .add("point", new StructType()
+ assert(getTableMetadata(t).columns sameElements Array(
+ Column.create("b", IntegerType),
+ Column.create("a", IntegerType),
+ Column.create("point", new StructType()
.add("x", IntegerType)
.add("y", IntegerType)
- .add("z", IntegerType)))
+ .add("z", IntegerType))))
sql(s"ALTER TABLE $t ALTER COLUMN b AFTER point")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", IntegerType)
- .add("point", new StructType()
+ assert(getTableMetadata(t).columns sameElements Array(
+ Column.create("a", IntegerType),
+ Column.create("point", new StructType()
.add("x", IntegerType)
.add("y", IntegerType)
- .add("z", IntegerType))
- .add("b", IntegerType))
+ .add("z", IntegerType)),
+ Column.create("b", IntegerType)))
val sqlText1 = s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"
checkError(
@@ -874,22 +888,23 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
context = ExpectedContext(fragment = sqlText1, start = 0, stop =
sqlText1.length - 1))
sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", IntegerType)
- .add("point", new StructType()
+ assert(getTableMetadata(t).columns sameElements Array(
+ Column.create("a", IntegerType),
+ Column.create("point", new StructType()
.add("y", IntegerType)
.add("x", IntegerType)
- .add("z", IntegerType))
- .add("b", IntegerType))
+ .add("z", IntegerType)),
+ Column.create("b", IntegerType)
+ ))
sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER z")
- assert(getTableMetadata(t).schema == new StructType()
- .add("a", IntegerType)
- .add("point", new StructType()
+ assert(getTableMetadata(t).columns sameElements Array(
+ Column.create("a", IntegerType),
+ Column.create("point", new StructType()
.add("x", IntegerType)
.add("z", IntegerType)
- .add("y", IntegerType))
- .add("b", IntegerType))
+ .add("y", IntegerType)),
+ Column.create("b", IntegerType)))
val sqlText2 = s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"
checkError(
@@ -918,11 +933,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType).withComment("doc")))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", new StructType()
+ .add("x", DoubleType)
+ .add("y", DoubleType, nullable = true, "doc"))))
}
}
@@ -936,11 +951,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType).withComment("doc"))), LongType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ MapType(StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType).withComment("doc"))), LongType))))
}
}
@@ -954,11 +970,15 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StringType, StructType(Seq(
- StructField("x", DoubleType),
- StructField("y", DoubleType).withComment("doc"))))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points",
+ MapType(StringType, StructType(Seq(
+ StructField("x", DoubleType),
+ StructField("y", DoubleType, nullable = true,
+ new MetadataBuilder()
+ .putString("comment", "doc")
+ .build())))))))
}
}
@@ -971,11 +991,14 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(StructType(Seq(
StructField("x", DoubleType),
- StructField("y", DoubleType).withComment("doc"))))))
+ StructField("y", DoubleType, nullable = true,
+ new MetadataBuilder()
+ .putString("comment", "doc")
+ .build())))))))
}
}
@@ -1037,7 +1060,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
assert(table.name === t)
assert(
- table.columns() === Array(
+ table.columns sameElements Array(
Column.create("data", StringType),
Column.create("id", LongType),
Column.create("ts", TimestampType, true),
@@ -1104,7 +1127,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType().add("user_id", IntegerType))
+ assert(table.columns sameElements Array(Column.create("user_id",
IntegerType)))
}
}
@@ -1117,11 +1140,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", StructType(Seq(
StructField("x", DoubleType),
- StructField("t", DoubleType)))))
+ StructField("t", DoubleType))))))
}
}
@@ -1135,11 +1158,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", MapType(StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", MapType(StructType(Seq(
StructField("x", DoubleType),
- StructField("t", DoubleType))), LongType)))
+ StructField("t", DoubleType))), LongType))))
}
}
@@ -1153,11 +1176,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StringType, StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", MapType(StringType, StructType(Seq(
StructField("x", DoubleType),
- StructField("t", DoubleType))))))
+ StructField("t", DoubleType)))))))
}
}
@@ -1170,11 +1193,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(StructType(Seq(
StructField("x", DoubleType),
- StructField("t", DoubleType))))))
+ StructField("t", DoubleType)))))))
}
}
@@ -1278,7 +1301,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType().add("id", IntegerType))
+ assert(table.columns sameElements Array(Column.create("id",
IntegerType)))
}
}
@@ -1292,11 +1315,11 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", StructType(Seq(
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", StructType(Seq(
StructField("x", DoubleType),
- StructField("y", DoubleType)))))
+ StructField("y", DoubleType))))))
}
}
@@ -1310,10 +1333,10 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("point", MapType(StructType(Seq(
- StructField("x", DoubleType))), LongType)))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("point", MapType(StructType(Seq(
+ StructField("x", DoubleType))), LongType))))
}
}
@@ -1327,10 +1350,10 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", MapType(StringType, StructType(Seq(
- StructField("x", DoubleType))))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", MapType(StringType, StructType(Seq(
+ StructField("x", DoubleType)))))))
}
}
@@ -1343,10 +1366,10 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === new StructType()
- .add("id", IntegerType)
- .add("points", ArrayType(StructType(Seq(
- StructField("x", DoubleType))))))
+ assert(table.columns sameElements Array(
+ Column.create("id", IntegerType),
+ Column.create("points", ArrayType(StructType(Seq(
+ StructField("x", DoubleType)))))))
}
}
@@ -1370,7 +1393,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
// with if exists it should pass
sql(s"ALTER TABLE $t DROP COLUMN IF EXISTS data")
val table = getTableMetadata(t)
- assert(table.schema == new StructType().add("id", IntegerType))
+ assert(table.columns sameElements Array(Column.create("id",
IntegerType)))
}
}
@@ -1394,7 +1417,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
// with if exists it should pass
sql(s"ALTER TABLE $t DROP COLUMN IF EXISTS point.x")
val table = getTableMetadata(t)
- assert(table.schema == new StructType().add("id", IntegerType))
+ assert(table.columns sameElements Array(Column.create("id",
IntegerType)))
}
}
@@ -1408,8 +1431,10 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
sql(s"ALTER TABLE $t DROP COLUMNS IF EXISTS " +
s"names, name, points.element.z, id, points.element.x")
val table = getTableMetadata(t)
- assert(table.schema == new StructType()
- .add("points", ArrayType(StructType(Seq(StructField("y",
DoubleType))))))
+ assert(table.columns sameElements Array(
+ Column.create("points", ArrayType(
+ StructType(Seq(
+ StructField("y", DoubleType)))))))
}
}
@@ -1456,9 +1481,9 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
val table = getTableMetadata(t)
assert(table.name === t)
- assert(table.schema === StructType(Seq(
- StructField("col2", StringType),
- StructField("col3", IntegerType).withComment("c3"))))
+ assert(table.columns sameElements Array(
+ Column.create("col2", StringType),
+ Column.create("col3", IntegerType, true, "c3", null)))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index b952270fc786..5e17c1a64f44 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -30,14 +30,14 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.expressions.{FieldReference,
IdentityTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{LongType, StructType}
+import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.util.{CaseInsensitiveStringMap,
QueryExecutionListener}
import org.apache.spark.unsafe.types.UTF8String
@@ -100,7 +100,8 @@ class SupportsCatalogOptionsSuite extends QueryTest with
SharedSparkSession with
}
assert(table.partitioning().map(_.references().head.fieldNames().head) ===
partitionBy,
"Partitioning was incorrect")
- assert(table.schema() === df.schema.asNullable, "Schema did not match")
+ assert(table.columns() ===
CatalogV2Util.structTypeToV2Columns(df.schema.asNullable),
+ "Column did not match")
checkAnswer(load("t1", withCatalogOption), df.toDF())
}
@@ -147,7 +148,8 @@ class SupportsCatalogOptionsSuite extends QueryTest with
SharedSparkSession with
val table =
catalog(SESSION_CATALOG_NAME).loadTable(Identifier.of(Array("default"), "t1"))
assert(table.partitioning().isEmpty, "Partitioning should be empty")
- assert(table.schema() === new StructType().add("id", LongType), "Schema
did not match")
+ assert(table.columns() sameElements
+ Array(Column.create("id", LongType)), "Schema did not match")
assert(load("t1", None).count() === 0)
}
@@ -159,7 +161,8 @@ class SupportsCatalogOptionsSuite extends QueryTest with
SharedSparkSession with
val table = catalog(catalogName).loadTable("t1")
assert(table.partitioning().isEmpty, "Partitioning should be empty")
- assert(table.schema() === new StructType().add("id", LongType), "Schema
did not match")
+ assert(table.columns() sameElements
+ Array(Column.create("id", LongType)), "Schema did not match")
assert(load("t1", Some(catalogName)).count() === 0)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
index bfde1984f1fb..747d16434534 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
@@ -144,7 +144,7 @@ class TableWithV1ReadFallback(override val name: String)
extends Table with Supp
private class V1ReadFallbackScanBuilder extends ScanBuilder
with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
- private var requiredSchema: StructType = schema()
+ private var requiredSchema: StructType =
CatalogV2Util.v2ColumnsToStructType(columns())
override def pruneColumns(requiredSchema: StructType): Unit = {
this.requiredSchema = requiredSchema
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 63e54812922a..c7e8dfeb5914 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -288,7 +288,7 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
val loaded = catalog.loadTable(testIdent)
assert(table.name == loaded.name)
- assert(table.schema == loaded.schema)
+ assert(table.columns sameElements loaded.columns())
assert(table.properties == loaded.properties)
}
@@ -490,8 +490,10 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"),
LongType))
val updated = catalog.loadTable(testIdent)
- val expectedSchema = new StructType().add("id", LongType).add("data",
StringType)
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(
+ Column.create("id", LongType),
+ Column.create("data", StringType))
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: update column nullability") {
@@ -509,7 +511,6 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
TableChange.updateColumnNullability(Array("id"), true))
val updated = catalog.loadTable(testIdent)
- val expectedSchema = new StructType().add("id", IntegerType).add("data",
StringType)
val expectedColumns: Array[Column] = Array(
Column.create("id", IntegerType),
Column.create("data", StringType)
@@ -546,10 +547,10 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
TableChange.updateColumnComment(Array("id"), "comment text"))
val updated = catalog.loadTable(testIdent)
- val expectedSchema = new StructType()
- .add("id", IntegerType, nullable = true, "comment text")
- .add("data", StringType)
- assert(updated.schema == expectedSchema)
+ val expectedColumns = Array(
+ Column.create("id", IntegerType, true, "comment text", null),
+ Column.create("data", StringType))
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: replace comment") {
@@ -562,15 +563,15 @@ class V2SessionCatalogTableSuite extends
V2SessionCatalogBaseSuite {
catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"),
"comment text"))
- val expectedSchema = new StructType()
- .add("id", IntegerType, nullable = true, "replacement comment")
- .add("data", StringType)
+ val expectedColumns = Array(
+ Column.create("id", IntegerType, true, "replacement comment", null),
+ Column.create("data", StringType))
catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "replacement comment"))
val updated = catalog.loadTable(testIdent)
- assert(updated.schema == expectedSchema)
+ assert(updated.columns sameElements expectedColumns)
}
test("alterTable: add comment to missing column fails") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 1fe7530044e5..4b1c892277b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -32,7 +32,7 @@ import
org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.classic.Catalog
import org.apache.spark.sql.connector.{FakeV2Provider,
InMemoryTableSessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier,
InMemoryCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.test.SharedSparkSession
@@ -811,7 +811,7 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
val testCatalog =
spark.sessionState.catalogManager.catalog(catalogName).asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
- assert(table.schema().equals(tableSchema))
+ assert(table.columns sameElements
CatalogV2Util.structTypeToV2Columns(tableSchema))
assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
assert(table.properties().get("comment").equals(description))
}
@@ -831,7 +831,7 @@ class CatalogSuite extends SharedSparkSession with
AnalysisTest with BeforeAndAf
val testCatalog =
spark.sessionState.catalogManager.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(dbName),
tableName))
- assert(table.schema().equals(tableSchema))
+ assert(table.columns sameElements
CatalogV2Util.structTypeToV2Columns(tableSchema))
assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index aff3d045b52e..1c1575b06e5c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -681,7 +681,7 @@ class NonStreamV2Table(override val name: String)
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
owner = null,
- schema = schema(),
+ schema = StructType(Nil),
provider = Some("parquet"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]