This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5a4932b3675d [SPARK-52683][SQL] Support ExternalCatalog alterTableSchema 5a4932b3675d is described below commit 5a4932b3675d0eb310539ef4727a8eea98300d69 Author: Szehon Ho <szehon.apa...@gmail.com> AuthorDate: Mon Jul 7 10:38:27 2025 +0800 [SPARK-52683][SQL] Support ExternalCatalog alterTableSchema ### What changes were proposed in this pull request? Add a new ExternalCatalog and SessionCatalog API alterTableSchema that will supersede alterTableDataSchema. ### Why are the changes needed? Because ExternalCatalog::alterTableDataSchema takes dataSchema only (without partition columns), we lost the context of the partition column. This will make it impossible for us to support column order where partition column are not at the end. See https://github.com/apache/spark/pull/51342 for context More generally, this is a better intuitive API than alterTableDataSchema, because the caller no longer needs to strip out partition columns. Also, it is not immediately intuitive that data schema means without partition columns. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test, move test for alterTableDataSchema to the new API ### Was this patch authored or co-authored using generative AI tooling? No Closes #51373 from szehon-ho/alter_table_schema. Authored-by: Szehon Ho <szehon.apa...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/catalog/ExternalCatalog.scala | 12 ++++++ .../catalog/ExternalCatalogWithListener.scala | 6 +++ .../sql/catalyst/catalog/InMemoryCatalog.scala | 15 ++++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 20 ++++++++++ .../apache/spark/sql/catalyst/catalog/events.scala | 1 + .../catalog/ExternalCatalogEventSuite.scala | 6 +-- .../catalyst/catalog/ExternalCatalogSuite.scala | 37 +++++++++++++++++-- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 18 ++++++++- .../spark/sql/execution/command/tables.scala | 5 ++- .../datasources/v2/V2SessionCatalog.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 43 ++++++++++++++++++++++ .../spark/sql/hive/HiveExternalCatalogSuite.scala | 4 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +- .../HiveIncompatibleColTypeChangeSuite.scala | 2 +- 14 files changed, 159 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 979613ae1126..d1f37020f211 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -120,9 +120,21 @@ trait ExternalCatalog { * @param db Database that table to alter schema for exists in * @param table Name of table to alter schema for * @param newDataSchema Updated data schema to be used for the table. + * @deprecated since 4.1.0 use `alterTableSchema` instead. */ def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit + /** + * Alter the schema of a table identified by the provided database and table name. + * + * All partition columns must be preserved. + * + * @param db Database that table to alter schema for exists in + * @param table Name of table to alter schema for + * @param newSchema Updated data schema to be used for the table. + */ + def alterTableSchema(db: String, table: String, newSchema: StructType): Unit + /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala index c2613ff74da4..33f088079caa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala @@ -125,6 +125,12 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog) postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA)) } + override def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = { + postToAll(AlterTablePreEvent(db, table, AlterTableKind.SCHEMA)) + delegate.alterTableSchema(db, table, newSchema) + postToAll(AlterTableEvent(db, table, AlterTableKind.SCHEMA)) + } + override def alterTableStats( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 284ca63d820f..5d0184579faa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -331,6 +331,21 @@ class InMemoryCatalog( catalog(db).tables(table).table = origTable.copy(schema = newSchema) } + override def alterTableSchema( + db: String, + table: String, + newSchema: StructType): Unit = synchronized { + requireTableExists(db, table) + val origTable = catalog(db).tables(table).table + + val partCols = origTable.partitionColumnNames + assert(newSchema.map(_.name).takeRight(partCols.length) == partCols, + s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " + + s"the new schema ${newSchema.catalogString} for now.") + + catalog(db).tables(table).table = origTable.copy(schema = newSchema) + } + override def alterTableStats( db: String, table: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 531b453213ed..c0d04d9d06fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -484,6 +484,7 @@ class SessionCatalog( * * @param identifier TableIdentifier * @param newDataSchema Updated data schema to be used for the table + * @deprecated since 4.1.0 use `alterTableSchema` instead. */ def alterTableDataSchema( identifier: TableIdentifier, @@ -507,6 +508,25 @@ class SessionCatalog( externalCatalog.alterTableDataSchema(db, table, newDataSchema) } + /** + * Alter the schema of a table identified by the provided table identifier. All partition columns + * must be preserved. + * + * @param identifier TableIdentifier + * @param newSchema Updated schema to be used for the table + */ + def alterTableSchema( + identifier: TableIdentifier, + newSchema: StructType): Unit = { + val qualifiedIdent = qualifyIdentifier(identifier) + val db = qualifiedIdent.database.get + val table = qualifiedIdent.table + requireDbExists(db) + requireTableExists(qualifiedIdent) + + externalCatalog.alterTableSchema(db, table, newSchema) + } + private def columnNameResolved( resolver: Resolver, schema: StructType, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala index e7d41644392d..974c225afbae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala @@ -126,6 +126,7 @@ case class RenameTableEvent( object AlterTableKind extends Enumeration { val TABLE = "table" val DATASCHEMA = "dataSchema" + val SCHEMA = "schema" val STATS = "stats" } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala index 366188c3327b..15858bf2cc69 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala @@ -128,9 +128,9 @@ class ExternalCatalogEventSuite extends SparkFunSuite { // ALTER schema val newSchema = new StructType().add("id", "long", nullable = false) - catalog.alterTableDataSchema("db5", "tbl1", newSchema) - checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: - AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil) + catalog.alterTableSchema("db5", "tbl1", newSchema) + checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: + AlterTableEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: Nil) // ALTER stats catalog.alterTableStats("db5", "tbl1", None) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index f4b0c232c25f..5bb81873449c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -245,12 +245,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite { test("alter table schema") { val catalog = newBasicCatalog() - val newDataSchema = StructType(Seq( + val newSchema = StructType(Seq( StructField("col1", IntegerType), - StructField("new_field_2", StringType))) - catalog.alterTableDataSchema("db2", "tbl1", newDataSchema) + StructField("new_field_2", StringType), + StructField("a", IntegerType), + StructField("b", StringType))) + catalog.alterTableSchema("db2", "tbl1", newSchema) val newTbl1 = catalog.getTable("db2", "tbl1") - assert(newTbl1.dataSchema == newDataSchema) + assert(newTbl1.dataSchema == StructType(newSchema.take(2))) + assert(newTbl1.schema == newSchema) } test("alter table stats") { @@ -983,6 +986,32 @@ abstract class ExternalCatalogSuite extends SparkFunSuite { "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) assert(fs.exists(partPath)) } + + test("SPARK-52683: support alterTableSchema partitioned columns") { + val catalog = newBasicCatalog() + + val schema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType) + .add("c", StringType) + val table = CatalogTable( + identifier = TableIdentifier("t", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = schema, + partitionColumnNames = Seq("c"), + provider = Some("hive")) + catalog.createTable(table, ignoreIfExists = false) + + val newSchema = new StructType() + .add("b", LongType) + .add("a", IntegerType) + .add("c", StringType) + catalog.alterTableSchema("db1", "t", newSchema) + + assert(catalog.getTable("db1", "t").schema == newSchema) + assert(catalog.getTable("db1", "t").partitionColumnNames == Seq("c")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e532ebecc442..f319a97d1133 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -564,7 +564,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { } } - test("alter table add columns") { + test("alter data schema add columns") { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") @@ -580,6 +580,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { } } + test("alter schema add columns") { + withBasicCatalog { sessionCatalog => + sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) + val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") + val newSchema = StructType(oldTab.dataSchema.fields ++ + Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema) + + sessionCatalog.alterTableSchema( + TableIdentifier("t1", Some("default")), + newSchema) + + val newTab = sessionCatalog.externalCatalog.getTable("default", "t1") + assert(newTab.schema == newSchema) + } + } + test("alter table drop columns") { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 092e6669338e..d5dd934af2be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -247,8 +247,9 @@ case class AlterTableAddColumnsCommand( } DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults)) - val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema) - catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults)) + val existingDataSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema) + catalog.alterTableSchema(table, + StructType(existingDataSchema ++ colsWithProcessedDefaults ++ catalogTable.partitionSchema)) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 1088e3f7a720..891728e6f7e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -309,7 +309,7 @@ class V2SessionCatalog(catalog: SessionCatalog) collation = collation, storage = storage)) } if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) { - catalog.alterTableDataSchema(ident.asTableIdentifier, schema) + catalog.alterTableSchema(ident.asTableIdentifier, schema) } } catch { case _: NoSuchTableException => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5c7a60151c49..5d4cdd7c78e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -735,6 +735,49 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + /** + * Alter the schema of a table identified by the provided database and table name. + */ + override def alterTableSchema( + db: String, + table: String, + newSchema: StructType): Unit = withClient { + requireTableExists(db, table) + val oldTable = getTable(db, table) + val schemaProps = { + tableMetaToTableProps(oldTable, StructType(newSchema)).toMap + } + + val partCols = oldTable.partitionColumnNames + assert(newSchema.map(_.name).takeRight(partCols.length) == partCols, + s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " + + s"the new schema ${newSchema.catalogString} for now.") + + val newDataSchema = StructType(newSchema.filter( + f => !oldTable.partitionColumnNames.contains(f.name))) + val hiveSchema = removeCollation(newDataSchema) + + if (isDatasourceTable(oldTable)) { + // For data source tables, first try to write it with the schema set; if that does not work, + // try again with updated properties and the partition schema. This is a simplified version of + // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive + // (for example, the schema does not match the data source schema, or does not match the + // storage descriptor). + try { + client.alterTableDataSchema(db, table, hiveSchema, schemaProps) + } catch { + case NonFatal(e) => + val warningMessage = log"Could not alter schema of table " + + log"${MDC(TABLE_NAME, oldTable.identifier.quotedString)} in a Hive compatible way. " + + log"Updating Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps) + } + } else { + client.alterTableDataSchema(db, table, hiveSchema, schemaProps) + } + } + private def removeCollation(schema: StructType): StructType = { // Since collated strings do not exist in Hive as a type we need to replace them with // the the regular string type. However, as we save the original schema in the table diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 655491d24535..db522b72e4cc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -103,7 +103,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { """.stripMargin) val newSchema = new StructType().add("a", "string").add("b", "string").add("c", "string") - catalog.alterTableDataSchema("db1", "t", newSchema) + catalog.alterTableSchema("db1", "t", newSchema) assert(catalog.getTable("db1", "t").schema == newSchema) val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED db1.t") @@ -234,7 +234,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { val newSchema = StructType(Seq( StructField("col1", StringType("UTF8_LCASE")) )) - catalog.alterTableDataSchema("db1", tableName, newSchema) + catalog.alterTableSchema("db1", tableName, newSchema) val alteredRawTable = externalCatalog.getRawTable("db1", tableName) assert(DataTypeUtils.sameType(alteredRawTable.schema, noCollationsSchema)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 13e8d3721d81..e43d1cdb19d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -3418,7 +3418,7 @@ class HiveDDLSuite assert(loaded.properties().get("foo") == "bar") verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) - verify(spyCatalog, times(0)).alterTableDataSchema( + verify(spyCatalog, times(0)).alterTableSchema( any[String], any[String], any[StructType]) v2SessionCatalog.alterTable(identifier, @@ -3428,7 +3428,7 @@ class HiveDDLSuite assert(loaded2.columns.head.comment() == "comment2") verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) - verify(spyCatalog, times(1)).alterTableDataSchema( + verify(spyCatalog, times(1)).alterTableSchema( any[String], any[String], any[StructType]) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala index a362db1ba4ef..bfbbc3bd4ff9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveIncompatibleColTypeChangeSuite.scala @@ -99,7 +99,7 @@ class HiveIncompatibleColTypeChangeSuite extends SparkFunSuite with TestHiveSing spark.sql(createTableStmt) val oldTable = catalog.getTable("default", tableName) catalog.createTable(oldTable, true) - catalog.alterTableDataSchema("default", tableName, updatedSchema) + catalog.alterTableSchema("default", tableName, updatedSchema) val updatedTable = catalog.getTable("default", tableName) assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org