This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e6b5b7d350 [SPARK-52475][TESTS] Remove deprecated table.schema 
method from tests
59e6b5b7d350 is described below

commit 59e6b5b7d350a1603502bc92e3c117311ab2cbb6
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Jun 16 07:36:13 2025 -0700

    [SPARK-52475][TESTS] Remove deprecated table.schema method from tests
    
    ### What changes were proposed in this pull request?
    Fix many compile warnings like
    ```
    [warn] 
/Users/szehon.ho/repos/apache-spark/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala:282:18:
 method schema in trait Table is deprecated (since 3.4.0)
    [warn] Applicable -Wconf / nowarn filters for this warning: msg=<part of 
the message>, cat=deprecation, 
site=org.apache.spark.sql.connector.catalog.CatalogSuite, 
origin=org.apache.spark.sql.connector.catalog.Table.schema, version=3.4.0
    ```
    
    ### Why are the changes needed?
    Reduce compiler warnings
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Test only
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51173 from szehon-ho/fix_warnings_other_tests.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/connector/catalog/CatalogSuite.scala |  42 +-
 .../catalog/InMemoryAtomicPartitionTable.scala     |  11 +-
 .../connector/catalog/InMemoryPartitionTable.scala |  11 +-
 .../SupportsAtomicPartitionManagementSuite.scala   |  12 +-
 .../catalog/SupportsPartitionManagementSuite.scala |  14 +-
 .../connect/planner/SparkConnectProtoSuite.scala   |   6 +-
 .../apache/spark/sql/CharVarcharTestSuite.scala    |   4 +-
 .../spark/sql/collation/CollationSuite.scala       |  25 +-
 .../spark/sql/connector/AlterTableTests.scala      | 463 +++++++++++----------
 .../connector/SupportsCatalogOptionsSuite.scala    |  13 +-
 .../spark/sql/connector/V1ReadFallbackSuite.scala  |   2 +-
 .../datasources/v2/V2SessionCatalogSuite.scala     |  25 +-
 .../apache/spark/sql/internal/CatalogSuite.scala   |   6 +-
 .../streaming/test/DataStreamTableAPISuite.scala   |   2 +-
 14 files changed, 342 insertions(+), 294 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index 178c43258816..47bf1c5bdc17 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -279,7 +279,7 @@ class CatalogSuite extends SparkFunSuite {
     val loaded = catalog.loadTable(testIdent)
 
     assert(table.name == loaded.name)
-    assert(table.schema == loaded.schema)
+    assert(table.columns == loaded.columns)
     assert(table.properties == loaded.properties)
   }
 
@@ -307,7 +307,7 @@ class CatalogSuite extends SparkFunSuite {
     val loaded = catalog.loadTable(testIdent)
 
     assert(table.name == loaded.name)
-    assert(table.schema == loaded.schema)
+    assert(table.columns == loaded.columns)
     assert(table.properties == loaded.properties)
   }
 
@@ -544,8 +544,8 @@ class CatalogSuite extends SparkFunSuite {
 
     val updated = catalog.alterTable(testIdent, 
TableChange.updateColumnType(Array("id"), LongType))
 
-    val expectedSchema = new StructType().add("id", LongType).add("data", 
StringType)
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(Column.create("id", LongType), 
Column.create("data", StringType))
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: update column nullability") {
@@ -566,8 +566,9 @@ class CatalogSuite extends SparkFunSuite {
     val updated = catalog.alterTable(testIdent,
       TableChange.updateColumnNullability(Array("id"), true))
 
-    val expectedSchema = new StructType().add("id", IntegerType).add("data", 
StringType)
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(
+      Column.create("id", IntegerType, true), Column.create("data", 
StringType))
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: update missing column fails") {
@@ -606,10 +607,11 @@ class CatalogSuite extends SparkFunSuite {
     val updated = catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "comment text"))
 
-    val expectedSchema = new StructType()
-        .add("id", IntegerType, nullable = true, "comment text")
-        .add("data", StringType)
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(
+      Column.create("id", IntegerType, true, "comment text", null),
+      Column.create("data", StringType)
+    )
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: replace comment") {
@@ -626,14 +628,14 @@ class CatalogSuite extends SparkFunSuite {
 
     catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), 
"comment text"))
 
-    val expectedSchema = new StructType()
-        .add("id", IntegerType, nullable = true, "replacement comment")
-        .add("data", StringType)
-
+    val expectedColumns = Array(
+      Column.create("id", IntegerType, true, "replacement comment", null),
+      Column.create("data", StringType)
+    )
     val updated = catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "replacement comment"))
 
-    assert(updated.schema == expectedSchema)
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: add comment to missing column fails") {
@@ -671,9 +673,9 @@ class CatalogSuite extends SparkFunSuite {
 
     val updated = catalog.alterTable(testIdent, 
TableChange.renameColumn(Array("id"), "some_id"))
 
-    val expectedSchema = new StructType().add("some_id", 
IntegerType).add("data", StringType)
-
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(
+      Column.create("some_id", IntegerType), Column.create("data", StringType))
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: rename nested column") {
@@ -785,8 +787,8 @@ class CatalogSuite extends SparkFunSuite {
     val updated = catalog.alterTable(testIdent,
       TableChange.deleteColumn(Array("id"), false))
 
-    val expectedSchema = new StructType().add("data", StringType)
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(Column.create("data", StringType))
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: delete nested column") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
index 8c9039c723eb..a4a70fff5e9f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala
@@ -30,12 +30,19 @@ import org.apache.spark.util.ArrayImplicits._
  */
 class InMemoryAtomicPartitionTable (
     name: String,
-    schema: StructType,
+    columns: Array[Column],
     partitioning: Array[Transform],
     properties: util.Map[String, String])
-  extends InMemoryPartitionTable(name, schema, partitioning, properties)
+  extends InMemoryPartitionTable(name, columns, partitioning, properties)
   with SupportsAtomicPartitionManagement {
 
+  def this(
+      name: String,
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]) =
+    this(name, CatalogV2Util.structTypeToV2Columns(schema), partitioning, 
properties)
+
   override def createPartition(
       ident: InternalRow,
       properties: util.Map[String, String]): Unit = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
index ad55af81a364..aaea6a2b11cd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala
@@ -33,12 +33,19 @@ import org.apache.spark.sql.types.StructType
  */
 class InMemoryPartitionTable(
     name: String,
-    schema: StructType,
+    columns: Array[Column],
     partitioning: Array[Transform],
     properties: util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) with 
SupportsPartitionManagement {
+  extends InMemoryTable(name, columns, partitioning, properties) with 
SupportsPartitionManagement {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
+  def this(
+    name: String,
+    schema: StructType,
+    partitioning: Array[Transform],
+    properties: util.Map[String, String]
+  ) = this(name, CatalogV2Util.structTypeToV2Columns(schema), partitioning, 
properties)
+
   protected val memoryTablePartitions: util.Map[InternalRow, util.Map[String, 
String]] =
     new ConcurrentHashMap[InternalRow, util.Map[String, String]]()
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
index a9d8a69128ae..ce6a72c41491 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
@@ -53,7 +53,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
   test("createPartitions") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryAtomicPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
@@ -72,7 +72,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
   test("createPartitions failed if partition already exists") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryAtomicPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("4")
@@ -94,7 +94,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
   test("dropPartitions") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryAtomicPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
@@ -112,7 +112,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
   test("purgePartitions") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryAtomicPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
     partTable.createPartitions(
       partIdents,
@@ -129,7 +129,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
   test("dropPartitions failed if partition not exists") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryAtomicPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("4")
@@ -147,7 +147,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
   test("truncatePartitions") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryAtomicPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     partTable.createPartitions(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
index 8581d4dec1fb..5a62fe282c93 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
@@ -55,7 +55,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
   test("createPartition") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("3")
@@ -70,7 +70,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
   test("dropPartition") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("3")
@@ -88,7 +88,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
   test("purgePartition") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     checkError(
       exception = intercept[SparkUnsupportedOperationException] {
         partTable.purgePartition(InternalRow.apply("3"))
@@ -101,7 +101,7 @@ class SupportsPartitionManagementSuite extends 
SparkFunSuite {
   test("replacePartitionMetadata") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("3")
@@ -123,7 +123,7 @@ class SupportsPartitionManagementSuite extends 
SparkFunSuite {
   test("loadPartitionMetadata") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("3")
@@ -140,7 +140,7 @@ class SupportsPartitionManagementSuite extends 
SparkFunSuite {
   test("listPartitionIdentifiers") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("3")
@@ -248,7 +248,7 @@ class SupportsPartitionManagementSuite extends 
SparkFunSuite {
   test("truncatePartition") {
     val table = catalog.loadTable(ident)
     val partTable = new InMemoryPartitionTable(
-      table.name(), table.schema(), table.partitioning(), table.properties())
+      table.name(), table.columns(), table.partitioning(), table.properties())
     assert(!hasPartitions(partTable))
 
     val partIdent = InternalRow.apply("3")
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 5c43715d2dd1..7b734f93e595 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.connect.dsl.MockRemoteSession
 import org.apache.spark.sql.connect.dsl.commands._
 import org.apache.spark.sql.connect.dsl.expressions._
 import org.apache.spark.sql.connect.dsl.plans._
-import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, Identifier, 
InMemoryTableCatalog, TableCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.functions._
@@ -822,7 +822,9 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
         .asTableCatalog
         .loadTable(Identifier.of(Array(), "table_name"))
       assert(table.name === "testcat.table_name")
-      assert(table.schema === new StructType().add("id", LongType).add("data", 
StringType))
+      assert(
+        table.columns sameElements
+          Array(ColumnV2.create("id", LongType), ColumnV2.create("data", 
StringType)))
       assert(table.partitioning.isEmpty)
       assert(table.properties === (Map("provider" -> "foo") ++ 
defaultOwnership).asJava)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index eceadc338bfc..6139d0e98767 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Project}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.SchemaRequiredDataSource
-import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
InMemoryPartitionTableCatalog}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -923,7 +923,7 @@ class BasicCharVarcharTestSuite extends QueryTest with 
SharedSparkSession {
       def checkSchema(df: DataFrame): Unit = {
         val schemas = df.queryExecution.analyzed.collect {
           case l: LogicalRelation => l.relation.schema
-          case d: DataSourceV2Relation => d.table.schema()
+          case d: DataSourceV2Relation => 
CatalogV2Util.v2ColumnsToStructType(d.table.columns())
         }
         assert(schemas.length == 1)
         assert(schemas.head.map(_.dataType) == Seq(StringType))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
index c278233b557b..4044e5674191 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.ExtendedAnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.CollationFactory
 import org.apache.spark.sql.connector.{DatasourceV2SQLBase, 
FakeV2ProviderWithCustomSchema}
-import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
InMemoryTable}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
@@ -610,16 +610,16 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
       sql(s"ALTER TABLE $tableName ALTER COLUMN c3.value TYPE STRING COLLATE 
UTF8_BINARY")
       sql(s"ALTER TABLE $tableName ALTER COLUMN c4.t TYPE STRING COLLATE 
UNICODE")
       val testCatalog = catalog("testcat").asTableCatalog
-      val tableSchema = testCatalog.loadTable(Identifier.of(Array(), 
"alter_column_tbl")).schema()
-      val c1Metadata = tableSchema.find(_.name == "c1").get.metadata
-      assert(c1Metadata === createMetadata("c1"))
-      val c2Metadata = tableSchema.find(_.name == "c2").get.metadata
-      assert(c2Metadata === createMetadata("c2"))
-      val c3Metadata = tableSchema.find(_.name == "c3").get.metadata
-      assert(c3Metadata === createMetadata("c3"))
-      val c4Metadata = tableSchema.find(_.name == "c4").get.metadata
-      assert(c4Metadata === createMetadata("c4"))
-      val c4tMetadata = tableSchema.find(_.name == "c4").get.dataType
+      val columns = testCatalog.loadTable(Identifier.of(Array(), 
"alter_column_tbl")).columns()
+      val c1Metadata = columns.find(_.name() == "c1").get.metadataInJSON()
+      assert(c1Metadata === createMetadata("c1").json)
+      val c2Metadata = columns.find(_.name() == "c2").get.metadataInJSON()
+      assert(c2Metadata === createMetadata("c2").json)
+      val c3Metadata = columns.find(_.name() == "c3").get.metadataInJSON()
+      assert(c3Metadata === createMetadata("c3").json)
+      val c4Metadata = columns.find(_.name() == "c4").get.metadataInJSON()
+      assert(c4Metadata === createMetadata("c4").json)
+      val c4tMetadata = columns.find(_.name() == "c4").get.dataType()
         .asInstanceOf[StructType].find(_.name == "t").get.metadata
       assert(c4tMetadata === createMetadata("c4t"))
     }
@@ -864,7 +864,8 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
       assert(table.columns().head.dataType() == StringType(collationId))
 
       val rdd = 
spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
-      checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
+      checkAnswer(spark.internalCreateDataFrame(rdd,
+        CatalogV2Util.v2ColumnsToStructType(table.columns)), Seq.empty)
 
       sql(s"INSERT INTO $tableName VALUES ('a'), ('A')")
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 53528bf2eae2..9b01cd9f75bd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.connector.catalog.{Column, Table}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
Table}
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
+import org.apache.spark.sql.connector.expressions.LiteralValue
 import org.apache.spark.sql.errors.QueryErrorsBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -86,7 +87,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType().add("id", IntegerType))
+      assert(table.columns sameElements Array(Column.create("id", 
IntegerType)))
     }
   }
 
@@ -99,7 +100,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType().add("id", 
IntegerType).add("data", StringType))
+      assert(table.columns sameElements
+        Array(Column.create("id", IntegerType),
+          Column.create("data", StringType)))
     }
   }
 
@@ -112,9 +115,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === StructType(Seq(
-        StructField("id", IntegerType),
-        StructField("data", StringType, nullable = false))))
+      assert(table.columns sameElements
+        Array(Column.create("id", IntegerType),
+          Column.create("data", StringType, false)))
     }
   }
 
@@ -127,9 +130,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === StructType(Seq(
-        StructField("id", IntegerType),
-        StructField("data", StringType).withComment("doc"))))
+      assert(table.columns sameElements
+        Array(Column.create("id", IntegerType),
+          Column.create("data", StringType, true, "doc", null)))
     }
   }
 
@@ -152,15 +155,17 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       sql(s"CREATE TABLE $t (point struct<x: int>) USING $v2Format")
 
       sql(s"ALTER TABLE $t ADD COLUMN a string FIRST")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", StringType)
-        .add("point", new StructType().add("x", IntegerType)))
+      assert(getTableMetadata(t).columns sameElements
+        Array(
+          Column.create("a", StringType),
+          Column.create("point", new StructType().add("x", IntegerType))))
 
       sql(s"ALTER TABLE $t ADD COLUMN b string AFTER point")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", StringType)
-        .add("point", new StructType().add("x", IntegerType))
-        .add("b", StringType))
+      assert(getTableMetadata(t).columns sameElements
+        Array(
+          Column.create("a", StringType),
+          Column.create("point", new StructType().add("x", IntegerType)),
+          Column.create("b", StringType)))
 
       val e1 = intercept[AnalysisException](
         sql(s"ALTER TABLE $t ADD COLUMN c string AFTER non_exist"))
@@ -171,21 +176,22 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       )
 
       sql(s"ALTER TABLE $t ADD COLUMN point.y int FIRST")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", StringType)
-        .add("point", new StructType()
-          .add("y", IntegerType)
-          .add("x", IntegerType))
-        .add("b", StringType))
+      assert(getTableMetadata(t).columns sameElements
+        Array(Column.create("a", StringType),
+          Column.create("point", new StructType()
+            .add("y", IntegerType)
+            .add("x", IntegerType)),
+          Column.create("b", StringType)))
 
       sql(s"ALTER TABLE $t ADD COLUMN point.z int AFTER x")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", StringType)
-        .add("point", new StructType()
-          .add("y", IntegerType)
-          .add("x", IntegerType)
-          .add("z", IntegerType))
-        .add("b", StringType))
+      assert(getTableMetadata(t).columns sameElements
+        Array(
+          Column.create("a", StringType),
+          Column.create("point", new StructType()
+            .add("y", IntegerType)
+            .add("x", IntegerType)
+            .add("z", IntegerType)),
+          Column.create("b", StringType)))
 
       val e2 = intercept[AnalysisException](
         sql(s"ALTER TABLE $t ADD COLUMN point.x2 int AFTER non_exist"))
@@ -203,28 +209,30 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       sql(s"CREATE TABLE $t (a string, b int, point struct<x: double, y: 
double>) USING $v2Format")
       sql(s"ALTER TABLE $t ADD COLUMNS (x int AFTER a, y int AFTER x, z int 
AFTER y)")
 
-      assert(getTableMetadata(t).schema === new StructType()
-        .add("a", StringType)
-        .add("x", IntegerType)
-        .add("y", IntegerType)
-        .add("z", IntegerType)
-        .add("b", IntegerType)
-        .add("point", new StructType()
-          .add("x", DoubleType)
-          .add("y", DoubleType)))
+      assert(getTableMetadata(t).columns sameElements
+        Array(
+          Column.create("a", StringType),
+          Column.create("x", IntegerType),
+          Column.create("y", IntegerType),
+          Column.create("z", IntegerType),
+          Column.create("b", IntegerType),
+          Column.create("point", new StructType()
+            .add("x", DoubleType)
+            .add("y", DoubleType))))
 
       sql(s"ALTER TABLE $t ADD COLUMNS (point.z double AFTER x, point.zz 
double AFTER z)")
-      assert(getTableMetadata(t).schema === new StructType()
-        .add("a", StringType)
-        .add("x", IntegerType)
-        .add("y", IntegerType)
-        .add("z", IntegerType)
-        .add("b", IntegerType)
-        .add("point", new StructType()
-          .add("x", DoubleType)
-          .add("z", DoubleType)
-          .add("zz", DoubleType)
-          .add("y", DoubleType)))
+      assert(getTableMetadata(t).columns sameElements
+        Array(
+          Column.create("a", StringType),
+          Column.create("x", IntegerType),
+          Column.create("y", IntegerType),
+          Column.create("z", IntegerType),
+          Column.create("b", IntegerType),
+          Column.create("point", new StructType()
+            .add("x", DoubleType)
+            .add("z", DoubleType)
+            .add("zz", DoubleType)
+            .add("y", DoubleType))))
 
       // The new column being referenced should come before being referenced.
       val e = intercept[AnalysisException](
@@ -246,10 +254,10 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === StructType(Seq(
-        StructField("id", IntegerType),
-        StructField("data", StringType).withComment("doc"),
-        StructField("ts", TimestampType))))
+      assert(table.columns sameElements Array(
+          Column.create("id", IntegerType),
+          Column.create("data", StringType, true, "doc", null),
+          Column.create("ts", TimestampType)))
     }
   }
 
@@ -262,12 +270,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType),
-          StructField("z", DoubleType)))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", new StructType()
+          .add("x", DoubleType)
+          .add("y", DoubleType)
+          .add("z", DoubleType))))
     }
   }
 
@@ -281,12 +289,14 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType),
-          StructField("z", DoubleType))), LongType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          MapType(StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType),
+            StructField("z", DoubleType))),
+            LongType))))
     }
   }
 
@@ -300,12 +310,13 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StringType, StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType),
-          StructField("z", DoubleType))))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          MapType(StringType, StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType),
+            StructField("z", DoubleType)))))))
     }
   }
 
@@ -318,12 +329,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(StructType(Seq(
           StructField("x", DoubleType),
           StructField("y", DoubleType),
-          StructField("z", DoubleType))))))
+          StructField("z", DoubleType)))))))
     }
   }
 
@@ -337,28 +348,25 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
         val table = getTableMetadata(t)
 
         assert(table.name === t)
-        assert(table.schema === new StructType()
-          .add("a", StringType)
-          .add(StructField("b", IntegerType)
-            .withCurrentDefaultValue("2 + 3")
-            .withExistenceDefaultValue("5")))
+        assert(table.columns sameElements Array(
+          Column.create("a", StringType),
+          Column.create("b", IntegerType, true, null,
+            new ColumnDefaultValue("2 + 3", LiteralValue(5, IntegerType)), 
null)))
 
         sql(s"alter table $t alter column b set default 2 + 3")
 
         assert(
-          getTableMetadata(t).schema === new StructType()
-            .add("a", StringType)
-            .add(StructField("b", IntegerType)
-              .withCurrentDefaultValue("2 + 3")
-              .withExistenceDefaultValue("5")))
+          getTableMetadata(t).columns sameElements Array(
+            Column.create("a", StringType),
+            Column.create("b", IntegerType, true, null,
+              new ColumnDefaultValue("2 + 3", LiteralValue(5, IntegerType)), 
null)))
 
         sql(s"alter table $t alter column b drop default")
 
         assert(
-          getTableMetadata(t).schema === new StructType()
-            .add("a", StringType)
-            .add(StructField("b", IntegerType)
-              .withExistenceDefaultValue("5")))
+          getTableMetadata(t).columns sameElements Array(
+            Column.create("a", StringType),
+            Column.create("b", IntegerType, true, null, 
"""{"EXISTS_DEFAULT":"5"}""")))
       }
     }
   }
@@ -401,11 +409,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(StructType(Seq(
           StructField("x", DoubleType),
           StructField("y", DoubleType))))))
+      )
     }
   }
 
@@ -418,12 +427,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(StructType(Seq(
           StructField("x", DoubleType),
           StructField("y", DoubleType),
-          StructField("z", DoubleType).withComment("doc"))))))
+          StructField("z", DoubleType).withComment("doc")))))))
     }
   }
 
@@ -521,7 +530,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType().add("id", LongType))
+      assert(table.columns sameElements Array(Column.create("id", LongType)))
     }
   }
 
@@ -531,7 +540,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       sql(s"CREATE TABLE $t (id int) USING $v2Format")
       (DataTypeTestUtils.dayTimeIntervalTypes ++ 
DataTypeTestUtils.yearMonthIntervalTypes)
         .foreach {
-          case d: DataType => d.typeName
+          d: DataType => d.typeName
             val sqlText = s"ALTER TABLE $t ALTER COLUMN id TYPE ${d.typeName}"
 
             checkError(
@@ -559,12 +568,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
 
       val table = getTableMetadata(t)
       assert(table.name === t)
-      assert(table.schema === new StructType().add("id", LongType, nullable = 
false))
+      assert(table.columns sameElements Array(Column.create("id", LongType, 
false)))
 
       sql(s"ALTER TABLE $t ALTER COLUMN id DROP NOT NULL")
       val table2 = getTableMetadata(t)
       assert(table2.name === t)
-      assert(table2.schema === new StructType().add("id", LongType))
+      assert(table2.columns sameElements Array(Column.create("id", LongType, 
true)))
 
       val e = intercept[AnalysisException] {
         sql(s"ALTER TABLE $t ALTER COLUMN id SET NOT NULL")
@@ -581,11 +590,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
 
       val table = getTableMetadata(t)
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType)))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", new StructType()
+          .add("x", DoubleType)
+          .add("y", DoubleType))))
     }
   }
 
@@ -613,11 +622,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType)))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", new StructType()
+          .add("x", DoubleType)
+          .add("y", DoubleType))))
     }
   }
 
@@ -644,9 +653,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(IntegerType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(IntegerType))))
     }
   }
 
@@ -659,9 +668,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(LongType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(LongType))))
     }
   }
 
@@ -688,9 +697,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("m", MapType(StringType, IntegerType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("m", MapType(StringType, IntegerType))))
     }
   }
 
@@ -703,9 +712,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("m", MapType(StringType, LongType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("m", MapType(StringType, LongType))))
     }
   }
 
@@ -719,11 +728,13 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType))), LongType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          MapType(StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType))),
+            LongType))))
     }
   }
 
@@ -737,11 +748,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StringType, StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType))))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          MapType(StringType, StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType)))))))
     }
   }
 
@@ -754,11 +766,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType))))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          ArrayType(StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType)))))))
     }
   }
 
@@ -834,7 +847,8 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === StructType(Seq(StructField("id", 
IntegerType).withComment("doc"))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType, true, "doc", null)))
     }
   }
 
@@ -844,22 +858,22 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       sql(s"CREATE TABLE $t (a int, b int, point struct<x: int, y: int, z: 
int>) USING $v2Format")
 
       sql(s"ALTER TABLE $t ALTER COLUMN b FIRST")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("b", IntegerType)
-        .add("a", IntegerType)
-        .add("point", new StructType()
+      assert(getTableMetadata(t).columns sameElements Array(
+        Column.create("b", IntegerType),
+        Column.create("a", IntegerType),
+        Column.create("point", new StructType()
           .add("x", IntegerType)
           .add("y", IntegerType)
-          .add("z", IntegerType)))
+          .add("z", IntegerType))))
 
       sql(s"ALTER TABLE $t ALTER COLUMN b AFTER point")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", IntegerType)
-        .add("point", new StructType()
+      assert(getTableMetadata(t).columns sameElements Array(
+        Column.create("a", IntegerType),
+        Column.create("point", new StructType()
           .add("x", IntegerType)
           .add("y", IntegerType)
-          .add("z", IntegerType))
-        .add("b", IntegerType))
+          .add("z", IntegerType)),
+        Column.create("b", IntegerType)))
 
       val sqlText1 = s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"
       checkError(
@@ -874,22 +888,23 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
         context = ExpectedContext(fragment = sqlText1, start = 0, stop = 
sqlText1.length - 1))
 
       sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", IntegerType)
-        .add("point", new StructType()
+      assert(getTableMetadata(t).columns sameElements Array(
+        Column.create("a", IntegerType),
+        Column.create("point", new StructType()
           .add("y", IntegerType)
           .add("x", IntegerType)
-          .add("z", IntegerType))
-        .add("b", IntegerType))
+          .add("z", IntegerType)),
+        Column.create("b", IntegerType)
+      ))
 
       sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER z")
-      assert(getTableMetadata(t).schema == new StructType()
-        .add("a", IntegerType)
-        .add("point", new StructType()
+      assert(getTableMetadata(t).columns sameElements Array(
+        Column.create("a", IntegerType),
+        Column.create("point", new StructType()
           .add("x", IntegerType)
           .add("z", IntegerType)
-          .add("y", IntegerType))
-        .add("b", IntegerType))
+          .add("y", IntegerType)),
+        Column.create("b", IntegerType)))
 
       val sqlText2 = s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"
       checkError(
@@ -918,11 +933,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType).withComment("doc")))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", new StructType()
+          .add("x", DoubleType)
+          .add("y", DoubleType, nullable = true, "doc"))))
     }
   }
 
@@ -936,11 +951,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType).withComment("doc"))), LongType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          MapType(StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType).withComment("doc"))), LongType))))
     }
   }
 
@@ -954,11 +970,15 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StringType, StructType(Seq(
-          StructField("x", DoubleType),
-          StructField("y", DoubleType).withComment("doc"))))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points",
+          MapType(StringType, StructType(Seq(
+            StructField("x", DoubleType),
+            StructField("y", DoubleType, nullable = true,
+              new MetadataBuilder()
+                .putString("comment", "doc")
+                .build())))))))
     }
   }
 
@@ -971,11 +991,14 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(StructType(Seq(
           StructField("x", DoubleType),
-          StructField("y", DoubleType).withComment("doc"))))))
+          StructField("y", DoubleType, nullable = true,
+            new MetadataBuilder()
+              .putString("comment", "doc")
+              .build())))))))
     }
   }
 
@@ -1037,7 +1060,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
 
         assert(table.name === t)
         assert(
-          table.columns() === Array(
+          table.columns sameElements Array(
             Column.create("data", StringType),
             Column.create("id", LongType),
             Column.create("ts", TimestampType, true),
@@ -1104,7 +1127,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType().add("user_id", IntegerType))
+      assert(table.columns sameElements Array(Column.create("user_id", 
IntegerType)))
     }
   }
 
@@ -1117,11 +1140,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", StructType(Seq(
           StructField("x", DoubleType),
-          StructField("t", DoubleType)))))
+          StructField("t", DoubleType))))))
     }
   }
 
@@ -1135,11 +1158,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", MapType(StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", MapType(StructType(Seq(
           StructField("x", DoubleType),
-          StructField("t", DoubleType))), LongType)))
+          StructField("t", DoubleType))), LongType))))
     }
   }
 
@@ -1153,11 +1176,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StringType, StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", MapType(StringType, StructType(Seq(
           StructField("x", DoubleType),
-          StructField("t", DoubleType))))))
+          StructField("t", DoubleType)))))))
     }
   }
 
@@ -1170,11 +1193,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(StructType(Seq(
           StructField("x", DoubleType),
-          StructField("t", DoubleType))))))
+          StructField("t", DoubleType)))))))
     }
   }
 
@@ -1278,7 +1301,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType().add("id", IntegerType))
+      assert(table.columns sameElements Array(Column.create("id", 
IntegerType)))
     }
   }
 
@@ -1292,11 +1315,11 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", StructType(Seq(
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", StructType(Seq(
           StructField("x", DoubleType),
-          StructField("y", DoubleType)))))
+          StructField("y", DoubleType))))))
     }
   }
 
@@ -1310,10 +1333,10 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("point", MapType(StructType(Seq(
-          StructField("x", DoubleType))), LongType)))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("point", MapType(StructType(Seq(
+          StructField("x", DoubleType))), LongType))))
     }
   }
 
@@ -1327,10 +1350,10 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", MapType(StringType, StructType(Seq(
-          StructField("x", DoubleType))))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", MapType(StringType, StructType(Seq(
+          StructField("x", DoubleType)))))))
     }
   }
 
@@ -1343,10 +1366,10 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === new StructType()
-        .add("id", IntegerType)
-        .add("points", ArrayType(StructType(Seq(
-          StructField("x", DoubleType))))))
+      assert(table.columns sameElements Array(
+        Column.create("id", IntegerType),
+        Column.create("points", ArrayType(StructType(Seq(
+          StructField("x", DoubleType)))))))
     }
   }
 
@@ -1370,7 +1393,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       // with if exists it should pass
       sql(s"ALTER TABLE $t DROP COLUMN IF EXISTS data")
       val table = getTableMetadata(t)
-      assert(table.schema == new StructType().add("id", IntegerType))
+      assert(table.columns sameElements Array(Column.create("id", 
IntegerType)))
     }
   }
 
@@ -1394,7 +1417,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       // with if exists it should pass
       sql(s"ALTER TABLE $t DROP COLUMN IF EXISTS point.x")
       val table = getTableMetadata(t)
-      assert(table.schema == new StructType().add("id", IntegerType))
+      assert(table.columns sameElements Array(Column.create("id", 
IntegerType)))
     }
   }
 
@@ -1408,8 +1431,10 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       sql(s"ALTER TABLE $t DROP COLUMNS IF EXISTS " +
         s"names, name, points.element.z, id, points.element.x")
       val table = getTableMetadata(t)
-      assert(table.schema == new StructType()
-        .add("points", ArrayType(StructType(Seq(StructField("y", 
DoubleType))))))
+      assert(table.columns sameElements Array(
+        Column.create("points", ArrayType(
+          StructType(Seq(
+            StructField("y", DoubleType)))))))
     }
   }
 
@@ -1456,9 +1481,9 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
       val table = getTableMetadata(t)
 
       assert(table.name === t)
-      assert(table.schema === StructType(Seq(
-        StructField("col2", StringType),
-        StructField("col3", IntegerType).withComment("c3"))))
+      assert(table.columns sameElements Array(
+        Column.create("col2", StringType),
+        Column.create("col3", IntegerType, true, "c3", null)))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index b952270fc786..5e17c1a64f44 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -30,14 +30,14 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{LongType, StructType}
+import org.apache.spark.sql.types.LongType
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, 
QueryExecutionListener}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -100,7 +100,8 @@ class SupportsCatalogOptionsSuite extends QueryTest with 
SharedSparkSession with
     }
     assert(table.partitioning().map(_.references().head.fieldNames().head) === 
partitionBy,
       "Partitioning was incorrect")
-    assert(table.schema() === df.schema.asNullable, "Schema did not match")
+    assert(table.columns() === 
CatalogV2Util.structTypeToV2Columns(df.schema.asNullable),
+      "Column did not match")
 
     checkAnswer(load("t1", withCatalogOption), df.toDF())
   }
@@ -147,7 +148,8 @@ class SupportsCatalogOptionsSuite extends QueryTest with 
SharedSparkSession with
 
     val table = 
catalog(SESSION_CATALOG_NAME).loadTable(Identifier.of(Array("default"), "t1"))
     assert(table.partitioning().isEmpty, "Partitioning should be empty")
-    assert(table.schema() === new StructType().add("id", LongType), "Schema 
did not match")
+    assert(table.columns() sameElements
+      Array(Column.create("id", LongType)), "Schema did not match")
     assert(load("t1", None).count() === 0)
   }
 
@@ -159,7 +161,8 @@ class SupportsCatalogOptionsSuite extends QueryTest with 
SharedSparkSession with
 
     val table = catalog(catalogName).loadTable("t1")
     assert(table.partitioning().isEmpty, "Partitioning should be empty")
-    assert(table.schema() === new StructType().add("id", LongType), "Schema 
did not match")
+    assert(table.columns() sameElements
+      Array(Column.create("id", LongType)), "Schema did not match")
     assert(load("t1", Some(catalogName)).count() === 0)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
index bfde1984f1fb..747d16434534 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
@@ -144,7 +144,7 @@ class TableWithV1ReadFallback(override val name: String) 
extends Table with Supp
   private class V1ReadFallbackScanBuilder extends ScanBuilder
     with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
 
-    private var requiredSchema: StructType = schema()
+    private var requiredSchema: StructType = 
CatalogV2Util.v2ColumnsToStructType(columns())
     override def pruneColumns(requiredSchema: StructType): Unit = {
       this.requiredSchema = requiredSchema
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 63e54812922a..c7e8dfeb5914 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -288,7 +288,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val loaded = catalog.loadTable(testIdent)
 
     assert(table.name == loaded.name)
-    assert(table.schema == loaded.schema)
+    assert(table.columns sameElements loaded.columns())
     assert(table.properties == loaded.properties)
   }
 
@@ -490,8 +490,10 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), 
LongType))
     val updated = catalog.loadTable(testIdent)
 
-    val expectedSchema = new StructType().add("id", LongType).add("data", 
StringType)
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(
+      Column.create("id", LongType),
+      Column.create("data", StringType))
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: update column nullability") {
@@ -509,7 +511,6 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
       TableChange.updateColumnNullability(Array("id"), true))
     val updated = catalog.loadTable(testIdent)
 
-    val expectedSchema = new StructType().add("id", IntegerType).add("data", 
StringType)
     val expectedColumns: Array[Column] = Array(
       Column.create("id", IntegerType),
       Column.create("data", StringType)
@@ -546,10 +547,10 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
       TableChange.updateColumnComment(Array("id"), "comment text"))
     val updated = catalog.loadTable(testIdent)
 
-    val expectedSchema = new StructType()
-        .add("id", IntegerType, nullable = true, "comment text")
-        .add("data", StringType)
-    assert(updated.schema == expectedSchema)
+    val expectedColumns = Array(
+      Column.create("id", IntegerType, true, "comment text", null),
+      Column.create("data", StringType))
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: replace comment") {
@@ -562,15 +563,15 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), 
"comment text"))
 
-    val expectedSchema = new StructType()
-        .add("id", IntegerType, nullable = true, "replacement comment")
-        .add("data", StringType)
+    val expectedColumns = Array(
+      Column.create("id", IntegerType, true, "replacement comment", null),
+      Column.create("data", StringType))
 
     catalog.alterTable(testIdent,
       TableChange.updateColumnComment(Array("id"), "replacement comment"))
     val updated = catalog.loadTable(testIdent)
 
-    assert(updated.schema == expectedSchema)
+    assert(updated.columns sameElements expectedColumns)
   }
 
   test("alterTable: add comment to missing column fails") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 1fe7530044e5..4b1c892277b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -32,7 +32,7 @@ import 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.classic.Catalog
 import org.apache.spark.sql.connector.{FakeV2Provider, 
InMemoryTableSessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, 
InMemoryCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
Identifier, InMemoryCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
 import org.apache.spark.sql.connector.catalog.functions._
 import org.apache.spark.sql.test.SharedSparkSession
@@ -811,7 +811,7 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
     val testCatalog =
       spark.sessionState.catalogManager.catalog(catalogName).asTableCatalog
     val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
-    assert(table.schema().equals(tableSchema))
+    assert(table.columns sameElements 
CatalogV2Util.structTypeToV2Columns(tableSchema))
     
assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
     assert(table.properties().get("comment").equals(description))
   }
@@ -831,7 +831,7 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
       val testCatalog =
         spark.sessionState.catalogManager.catalog("testcat").asTableCatalog
       val table = testCatalog.loadTable(Identifier.of(Array(dbName), 
tableName))
-      assert(table.schema().equals(tableSchema))
+      assert(table.columns sameElements 
CatalogV2Util.structTypeToV2Columns(tableSchema))
       
assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
       assert(table.properties().get("comment").equals(description))
       assert(table.properties().get("path").equals(dir.getAbsolutePath))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index aff3d045b52e..1c1575b06e5c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -681,7 +681,7 @@ class NonStreamV2Table(override val name: String)
       tableType = CatalogTableType.MANAGED,
       storage = CatalogStorageFormat.empty,
       owner = null,
-      schema = schema(),
+      schema = StructType(Nil),
       provider = Some("parquet"))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to