This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fbdbd016cd71 [SPARK-54872][SQL] Unify column default value handling 
between v1 and v2 commands
fbdbd016cd71 is described below

commit fbdbd016cd718b7475fcb40c2c7af110fb2dfa51
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jan 7 00:37:09 2026 +0800

    [SPARK-54872][SQL] Unify column default value handling between v1 and v2 
commands
    
    ### What changes were proposed in this pull request?
    
    This PR refactors the column default value resolution code for `CRETE 
TABLE`, `ADD COLUMN`, and `ALTER COLUMN` commands. Now the resolution code is 
mostly unified between v1 and v2 commands:
    - Analyzer will resolve the default value expression.
    - For v2 commands, we validate default value in `CheckAnalysis`, and 
optimizer naturally constant fold the default value expression, which will be 
used as the EXIST DEFAULT.
    - For v1 commands, we convert v2 commands to v1 commands in 
`ResolveSessionCatalog`. V1 commands use `StructType` so default value 
expression is invisible to the optimizer.  we have to validate and constant 
fold the default value expression before we convert to v1 commands.
    
    ### Why are the changes needed?
    
    Code cleanup. Now `CRETE TABLE`, `ADD COLUMN`, and `ALTER COLUMN` commands 
no longer need a fake analyzer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    cursor 2.2.44
    
    Closes #53631 from cloud-fan/default.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/types/Metadata.scala      |   1 +
 .../catalyst/plans/logical/ColumnDefinition.scala  |  37 +++++--
 .../sql/catalyst/plans/logical/statements.scala    |   7 ++
 .../plans/logical/v2AlterTableCommands.scala       |   8 +-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |  80 +++------------
 .../sql/connector/catalog/CatalogV2Util.scala      |   2 +-
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  10 --
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 110 ++++++++++++++++-----
 .../apache/spark/sql/execution/command/ddl.scala   |  22 +++--
 .../spark/sql/execution/command/tables.scala       |  31 +-----
 .../execution/datasources/DataSourceStrategy.scala |  10 +-
 .../spark/sql/connector/AlterTableTests.scala      |   6 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala |   2 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |   9 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala |  20 ++--
 15 files changed, 187 insertions(+), 168 deletions(-)

diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 003fa7646df2..e2d4f6ea2322 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -339,6 +339,7 @@ class MetadataBuilder {
 
   private def put(key: String, value: Any): this.type = {
     map.put(key, value)
+    runtimeMap.remove(key)
     this
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
index d0ac51ad116d..1e03b4e12ace 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{AnalysisAwareExpression, 
Expression, Literal, UnaryExpression, Unevaluable}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import 
org.apache.spark.sql.catalyst.trees.TreePattern.{ANALYSIS_AWARE_EXPRESSION, 
TreePattern}
-import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn, 
V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, GeneratedColumn, 
IdentityColumn, V2ExpressionBuilder}
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY,
 EXISTS_DEFAULT_COLUMN_METADATA_KEY}
 import org.apache.spark.sql.connector.catalog.{Column => V2Column, 
ColumnDefaultValue, DefaultValue, IdentityColumnSpec}
@@ -33,8 +33,12 @@ import org.apache.spark.sql.internal.connector.ColumnImpl
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructField}
 
 /**
- * Column definition for tables. This is an expression so that analyzer can 
resolve the default
- * value expression in DDL commands automatically.
+ * User-specified column definition for CREATE/REPLACE TABLE commands. This is 
an expression so that
+ * analyzer can resolve the default value expression automatically.
+ *
+ * For CREATE/REPLACE TABLE commands, columns are created from scratch, so we 
store the
+ * user-specified default value as both the current default and exists 
default, in methods
+ * `toV1Column` and `toV2Column`.
  */
 case class ColumnDefinition(
     name: String,
@@ -74,9 +78,8 @@ case class ColumnDefinition(
       metadataBuilder.putString("comment", c)
     }
     defaultValue.foreach { default =>
-      // For v1 CREATE TABLE command, we will resolve and execute the default 
value expression later
-      // in the rule `DataSourceAnalysis`. We just need to put the default 
value SQL string here.
-      metadataBuilder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
default.originalSQL)
+      metadataBuilder.putExpression(
+        CURRENT_DEFAULT_COLUMN_METADATA_KEY, default.originalSQL, 
Some(default.child))
       val existsSQL = default.child match {
         case l: Literal => l.sql
         case _ => default.originalSQL
@@ -99,10 +102,29 @@ case class ColumnDefinition(
         spec.isAllowExplicitInsert)
     }
   }
+
+  /**
+   * Returns true if the default value's type has been coerced to match this 
column's dataType.
+   * After type coercion, the default value expression's dataType should match 
the column's
+   * dataType (with CHAR/VARCHAR replaced by STRING).
+   */
+  def isDefaultValueTypeCoerced: Boolean = defaultValue.forall { d =>
+    ColumnDefinition.isDefaultValueTypeMatched(d.child.dataType, dataType)
+  }
 }
 
 object ColumnDefinition {
 
+  /**
+   * Returns true if the default value's type matches the target column type.
+   * CHAR/VARCHAR types are replaced with STRING before comparison since type 
coercion
+   * converts them to STRING.
+   */
+  def isDefaultValueTypeMatched(defaultValueType: DataType, targetType: 
DataType): Boolean = {
+    val expectedType = 
CharVarcharUtils.replaceCharVarcharWithString(targetType)
+    defaultValueType == expectedType
+  }
+
   def fromV1Column(col: StructField, parser: ParserInterface): 
ColumnDefinition = {
     val metadataBuilder = new MetadataBuilder().withMetadata(col.metadata)
     metadataBuilder.remove("comment")
@@ -116,6 +138,9 @@ object ColumnDefinition {
     val hasDefaultValue = col.getCurrentDefaultValue().isDefined &&
       col.getExistenceDefaultValue().isDefined
     val defaultValue = if (hasDefaultValue) {
+      // `ColumnDefinition` is for CREATE/REPLACE TABLE commands, and it only 
needs one
+      // default value. Here we assume user wants the current default of the 
v1 column to be
+      // the default value of this column definition.
       val defaultValueSQL = col.getCurrentDefaultValue().get
       Some(DefaultValueExpression(parser.parseExpression(defaultValueSQL), 
defaultValueSQL))
     } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 0afc2227cb67..a9e0650010d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -141,6 +141,13 @@ case class QualifiedColType(
   def getV2Default(statement: String): ColumnDefaultValue =
     default.map(_.toV2(statement, colName)).orNull
 
+  /**
+   * Returns true if the default value's type has been coerced to match this 
column's dataType.
+   */
+  def isDefaultValueTypeCoerced: Boolean = default.forall { d =>
+    ColumnDefinition.isDefaultValueTypeMatched(d.child.dataType, dataType)
+  }
+
   override def children: Seq[Expression] = default.toSeq
 
   override protected def withNewChildrenInternal(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 843ce22061d8..68b3573ce5ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -248,7 +248,13 @@ case class AlterColumnSpec(
     copy(column = newColumn, newPosition = newPos, newDefaultExpression = 
newDefault)
   }
 
-
+  /**
+   * Returns true if the default value's type has been coerced to match the 
column's dataType.
+   * When newDataType is None, we skip this check as the type will be resolved 
later.
+   */
+  def isDefaultValueTypeCoerced: Boolean = newDefaultExpression.forall { d =>
+    newDataType.forall(dt => 
ColumnDefinition.isDefaultValueTypeMatched(d.child.dataType, dt))
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index f741690e908b..9c077630f33d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
 
 /**
@@ -54,52 +53,6 @@ object ResolveDefaultColumns extends QueryErrorsBase
   // CURRENT_DEFAULT_COLUMN_METADATA.
   val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
 
-  /**
-   * Finds "current default" expressions in CREATE/REPLACE TABLE columns and 
constant-folds them.
-   *
-   * The results are stored in the "exists default" metadata of the same 
columns. For example, in
-   * the event of this statement:
-   *
-   * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
-   *
-   * This method constant-folds the "current default" value, stored in the 
CURRENT_DEFAULT metadata
-   * of the "b" column, to "10", storing the result in the "exists default" 
value within the
-   * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current 
default" metadata of this
-   * "b" column retains its original value of "5 + 5".
-   *
-   * The reason for constant-folding the EXISTS_DEFAULT is to make the 
end-user visible behavior the
-   * same, after executing an ALTER TABLE ADD COLUMNS command with DEFAULT 
value, as if the system
-   * had performed an exhaustive backfill of the provided value to all 
previously existing rows in
-   * the table instead. We choose to avoid doing such a backfill because it 
would be a
-   * time-consuming and costly operation. Instead, we elect to store the 
EXISTS_DEFAULT in the
-   * column metadata for future reference when querying data out of the data 
source. In turn, each
-   * data source then takes responsibility to provide the constant-folded 
value in the
-   * EXISTS_DEFAULT metadata for such columns where the value is not present 
in storage.
-   *
-   * @param tableSchema   represents the names and types of the columns of the 
statement to process.
-   * @param statementType name of the statement being processed, such as 
INSERT; useful for errors.
-   * @return a copy of `tableSchema` with field metadata updated with the 
constant-folded values.
-   */
-  def constantFoldCurrentDefaultsToExistDefaults(
-      tableSchema: StructType,
-      statementType: String): StructType = {
-    if (SQLConf.get.enableDefaultColumns) {
-      val newFields: Seq[StructField] = tableSchema.fields.map { field =>
-        if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
-          val analyzed: Expression = analyze(field, statementType)
-          val newMetadata: Metadata = new 
MetadataBuilder().withMetadata(field.metadata)
-            .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
analyzed.sql).build()
-          field.copy(metadata = newMetadata)
-        } else {
-          field
-        }
-      }.toImmutableArraySeq
-      StructType(newFields)
-    } else {
-      tableSchema
-    }
-  }
-
   // Fails if the given catalog does not support column default value.
   def validateCatalogForDefaultValue(
       columns: Seq[ColumnDefinition],
@@ -607,25 +560,8 @@ object ResolveDefaultColumns extends QueryErrorsBase
     if (default.containsPattern(PLAN_EXPRESSION)) {
       throw 
QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
         statement, colName, default.originalSQL)
-    } else if (default.resolved) {
-      targetTypeOption match {
-        case Some(targetType) =>
-          CharVarcharUtils.replaceCharVarcharWithString(targetType)
-          if (!Cast.canUpCast(default.child.dataType, targetType) &&
-            defaultValueFromWiderTypeLiteral(default.child, targetType, 
colName).isEmpty) {
-            throw QueryCompilationErrors.defaultValuesDataTypeError(
-              statement, colName, default.originalSQL, targetType, 
default.child.dataType)
-          }
-        case _ => ()
-      }
-    } else {
-      throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
-        statement, colName, default.originalSQL, null)
     }
 
-    // Our analysis check passes here. We do not further inspect whether the
-    // expression is `foldable` here, as the plan is not optimized yet.
-
     if (default.references.nonEmpty || 
default.exists(_.isInstanceOf[VariableReference])) {
       // Ideally we should let the rest of `CheckAnalysis` report errors about 
why the default
       // expression is unresolved. But we should report a better error here if 
the default
@@ -635,6 +571,22 @@ object ResolveDefaultColumns extends QueryErrorsBase
         statement, colName, default.originalSQL)
     }
 
+    if (default.resolved) {
+      targetTypeOption.foreach { targetType =>
+        // Type coercion should have already run before this check, so types 
should match.
+        if 
(!ColumnDefinition.isDefaultValueTypeMatched(default.child.dataType, 
targetType)) {
+          throw QueryCompilationErrors.defaultValuesDataTypeError(
+            statement, colName, default.originalSQL, targetType, 
default.child.dataType)
+        }
+      }
+    } else {
+      throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
+        statement, colName, default.originalSQL, null)
+    }
+
+    // Our analysis check passes here. We do not further inspect whether the
+    // expression is `foldable` here, as the plan is not optimized yet.
+
     if (!default.deterministic) {
       throw QueryCompilationErrors.defaultValueNonDeterministicError(
         statement, colName, default.originalSQL)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 2d4ef5cd9e07..e0b523d18afc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -365,7 +365,7 @@ private[sql] object CatalogV2Util {
     }
     validateTableProviderForDefaultValue(
       newSchema, tableProvider, statementType, addNewColumnToExistingTable)
-    constantFoldCurrentDefaultsToExistDefaults(newSchema, statementType)
+    newSchema
   }
 
   private def replace(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 7a0a37f38099..2841e5adb2ad 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -197,16 +197,6 @@ abstract class SessionCatalogSuite extends AnalysisTest 
with Eventually {
           "expectedType" -> "\"BOOLEAN\"",
           "defaultValue" -> "41 + 1",
           "actualType" -> "\"INT\""))
-
-      // Make sure that constant-folding default values does not take place 
when the feature is
-      // disabled.
-      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
-        val result: StructType = 
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          db1tbl3.schema, "CREATE TABLE")
-        val columnEWithFeatureDisabled: StructField = findField("e", result)
-        // No constant-folding has taken place to the EXISTS_DEFAULT metadata.
-        assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
-      }
     }
     withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> 
"csv,hive,json,orc,parquet") {
       test
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7caadb0631f5..eff95bf4f523 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -22,7 +22,8 @@ import org.apache.spark.internal.LogKeys.CONFIG
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, HiveTableRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, 
Attribute}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, 
CharVarcharUtils, ResolveDefaultColumns => DefaultCols}
@@ -35,7 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1,
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.connector.V1Function
-import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, 
StructType}
+import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, 
StructField, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.SparkStringUtils
 
@@ -46,13 +47,14 @@ import org.apache.spark.util.SparkStringUtils
  * again, which may lead to inconsistent behavior if the current database is 
changed in the middle.
  */
 class ResolveSessionCatalog(val catalogManager: CatalogManager)
-  extends Rule[LogicalPlan] with LookupCatalog {
+  extends Rule[LogicalPlan] with LookupCatalog with AliasHelper {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
   import org.apache.spark.sql.connector.catalog.CatalogV2Util._
   import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
-    case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
+    case a @ AddColumns(ResolvedV1TableIdentifier(ident), cols)
+        if a.resolved && cols.forall(_.isDefaultValueTypeCoerced) =>
       cols.foreach { c =>
         if (c.name.length > 1) {
           throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -62,13 +64,25 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
           throw 
QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError()
         }
       }
-      AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
+      // For V1 ALTER TABLE ADD COLUMNS command, the default value expression 
is hidden in the
+      // StructField metadata, and won't be constant folded by the optimizer. 
Here we
+      // manually constant fold it, as exist default needs to be a constant.
+      val newCols = cols.map { col =>
+        val builder = new MetadataBuilder
+        col.comment.foreach(builder.putString("comment", _))
+        col.default.foreach { defaultValue =>
+          addDefaultValueMetadata(builder, defaultValue, col.colName, 
col.dataType,
+            "ALTER TABLE ADD COLUMNS")
+        }
+        StructField(col.name.head, col.dataType, nullable = true, 
builder.build())
+      }
+      AlterTableAddColumnsCommand(ident, newCols)
 
     case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) =>
       throw QueryCompilationErrors.unsupportedTableOperationError(ident, 
"REPLACE COLUMNS")
 
-    case AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _), specs)
-        if supportsV1Command(catalog) =>
+    case a @ AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _), 
specs)
+        if supportsV1Command(catalog) && a.resolved && 
specs.forall(_.isDefaultValueTypeCoerced) =>
       if (specs.size > 1) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
           catalog, ident, "ALTER COLUMN in bulk")
@@ -88,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       }
       val builder = new MetadataBuilder
       // Add comment to metadata
-      s.newComment.map(c => builder.putString("comment", c))
+      s.newComment.foreach(c => builder.putString("comment", c))
       val colName = s.column.name.head
       val dataType = s.newDataType.getOrElse {
         table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
@@ -102,9 +116,16 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
               toSQLId(s.column.name), table.schema.fieldNames)
           }
       }
-      // Add the current default column value string (if any) to the column 
metadata.
-      s.newDefaultExpression.map { c => 
builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY,
-        c.originalSQL) }
+      // For V1 ALTER TABLE ALTER COLUMN command, we only set the 
CURRENT_DEFAULT metadata.
+      // The EXISTS_DEFAULT should be preserved from the original column, as 
it represents
+      // the default value that was in effect when the column was added (used 
for backfilling).
+      s.newDefaultExpression.foreach { defaultValue =>
+        // Validate the default value expression
+        constantFoldDefaultValue(defaultValue, colName, dataType, "ALTER TABLE 
ALTER COLUMN")
+        builder.putExpression(
+          DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
defaultValue.originalSQL,
+          Some(defaultValue.child))
+      }
       if (s.dropDefault) {
         // for legacy reasons, "" means clearing default value
         builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "")
@@ -201,7 +222,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the 
catalog is resolved to the
     // session catalog and the table provider is not v2.
     case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: 
TableSpec, _)
-        if c.resolved =>
+        if c.resolved && c.columns.forall(_.isDefaultValueTypeCoerced) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider, tableSpec.options, c.tableSpec.location, 
c.tableSpec.serde,
         ctas = false)
@@ -210,7 +231,21 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
           throw QueryCompilationErrors.unsupportedTableOperationError(
             ident, "CONSTRAINT")
         }
-        constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, 
c.partitioning,
+        // For V1 CREATE TABLE command, the default value expression is hidden 
in the
+        // StructField metadata, and won't be constant folded by the 
optimizer. Here we
+        // manually constant fold it, as exist default needs to be a constant.
+        val fields = c.columns.map { col =>
+          col.defaultValue.map { defaultValue =>
+            val existsDefaultSQL = constantFoldDefaultValue(
+              defaultValue, col.name, col.dataType, "CREATE TABLE")
+            // toV1Column already sets CURRENT_DEFAULT, so just update 
EXISTS_DEFAULT
+            val field = col.toV1Column
+            val newMetadata = new 
MetadataBuilder().withMetadata(field.metadata)
+              .putString(DefaultCols.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
existsDefaultSQL).build()
+            field.copy(metadata = newMetadata)
+          }.getOrElse(col.toV1Column)
+        }
+        constructV1TableCmd(None, c.tableSpec, ident, StructType(fields), 
c.partitioning,
           c.ignoreIfExists, storageFormat, provider)
       } else {
         c
@@ -725,20 +760,49 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     }
   }
 
-  private def convertToStructField(col: QualifiedColType): StructField = {
-    val builder = new MetadataBuilder
-    col.comment.foreach(builder.putString("comment", _))
-    col.default.map {
-      value: DefaultValueExpression => builder.putString(
-        DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY, value.originalSQL)
-    }
-    StructField(col.name.head, col.dataType, nullable = true, builder.build())
-  }
-
   private def isV2Provider(provider: String): Boolean = {
     DataSourceV2Utils.getTableProvider(provider, conf).isDefined
   }
 
+  /**
+   * Validates and constant-folds a column default value expression for V1 
tables.
+   * Populates both CURRENT_DEFAULT and EXISTS_DEFAULT in the metadata builder.
+   */
+  private def addDefaultValueMetadata(
+      builder: MetadataBuilder,
+      defaultValue: DefaultValueExpression,
+      colName: String,
+      colType: DataType,
+      statementType: String): Unit = {
+    val existsDefaultSQL = constantFoldDefaultValue(defaultValue, colName, 
colType, statementType)
+    builder.putExpression(
+      DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
defaultValue.originalSQL,
+      Some(defaultValue.child))
+    builder.putString(DefaultCols.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
existsDefaultSQL)
+  }
+
+  /**
+   * Validates and constant-folds a column default value expression for V1 
tables.
+   * Returns the SQL string of the constant-folded expression for 
EXISTS_DEFAULT metadata.
+   */
+  private def constantFoldDefaultValue(
+      defaultValue: DefaultValueExpression,
+      colName: String,
+      colType: DataType,
+      statementType: String): String = {
+    validateDefaultValueExpr(defaultValue, statementType, colName, 
Some(colType))
+    val fakePlan = Project(Seq(Alias(defaultValue.child, colName)()), 
OneRowRelation())
+    val optimizedPlan = 
ConstantFolding(DefaultColumnOptimizer.FinishAnalysis(fakePlan))
+    val constantFolded = optimizedPlan.collectFirst {
+      case Project(Seq(expr), OneRowRelation()) => trimAliases(expr)
+    }.get
+    if (!constantFolded.foldable) {
+      throw QueryCompilationErrors.defaultValueNotConstantError(statementType, 
colName,
+        defaultValue.originalSQL)
+    }
+    constantFolded.sql
+  }
+
   private object ResolvedV1Database {
     def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
       case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) => 
None
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8a4f586edfe0..3face404378c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -35,10 +35,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
 import org.apache.spark.sql.classic.ClassicConversions.castToImpl
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
TableCatalog}
@@ -408,15 +408,19 @@ case class AlterTableChangeColumnCommand(
         val withNewTypeAndComment: StructField =
           addComment(withNewType(field, newColumn.dataType), 
newColumn.getComment())
         // Create a new column from the origin column with the new current 
default value.
+        // The default value is already validated by ResolveSessionCatalog, so 
we just need
+        // to copy the CURRENT_DEFAULT metadata. Note: we preserve the 
original EXISTS_DEFAULT
+        // (even if it's absent) from withNewTypeAndComment, as it represents 
the default value
+        // that was in effect when the column was added (used for backfilling 
old rows).
         if (newColumn.getCurrentDefaultValue().isDefined) {
           if (newColumn.getCurrentDefaultValue().get.nonEmpty) {
-            val result: StructField =
-              addCurrentDefaultValue(withNewTypeAndComment, 
newColumn.getCurrentDefaultValue())
-            // Check that the proposed default value parses and analyzes 
correctly, and that the
-            // type of the resulting expression is equivalent or coercible to 
the destination column
-            // type.
-            ResolveDefaultColumns.analyze(result, "ALTER TABLE ALTER COLUMN")
-            result
+            val (sql, expr) = newColumn.metadata.getExpression[Expression](
+              CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+            val newMetadata = new MetadataBuilder()
+              .withMetadata(withNewTypeAndComment.metadata)
+              .putExpression(CURRENT_DEFAULT_COLUMN_METADATA_KEY, sql, expr)
+              .build()
+            withNewTypeAndComment.copy(metadata = newMetadata)
           } else {
             withNewTypeAndComment.clearCurrentDefaultValue()
           }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 5c5ee1d6beaf..abb434b838a5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -234,23 +234,23 @@ case class AlterTableAddColumnsCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val catalogTable = 
verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)
-    val colsWithProcessedDefaults =
-      constantFoldCurrentDefaultsToExistDefaults(sparkSession, 
catalogTable.provider)
+    ResolveDefaultColumns.validateTableProviderForDefaultValue(
+      StructType(colsToAdd), catalogTable.provider, "ALTER TABLE ADD COLUMNS", 
true)
 
     CommandUtils.uncacheTableOrView(sparkSession, table)
     catalog.refreshTable(table)
 
     SchemaUtils.checkColumnNameDuplication(
-      (colsWithProcessedDefaults ++ catalogTable.schema).map(_.name),
+      (colsToAdd ++ catalogTable.schema).map(_.name),
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
     if (!conf.allowCollationsInMapKeys) {
       colsToAdd.foreach(col => 
SchemaUtils.checkNoCollationsInMapKeys(col.dataType))
     }
-    DDLUtils.checkTableColumns(catalogTable, 
StructType(colsWithProcessedDefaults))
+    DDLUtils.checkTableColumns(catalogTable, StructType(colsToAdd))
 
     val existingDataSchema = 
CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
     catalog.alterTableSchema(table,
-      StructType(existingDataSchema ++ colsWithProcessedDefaults ++ 
catalogTable.partitionSchema))
+      StructType(existingDataSchema ++ colsToAdd ++ 
catalogTable.partitionSchema))
     Seq.empty[Row]
   }
 
@@ -286,27 +286,6 @@ case class AlterTableAddColumnsCommand(
     }
     catalogTable
   }
-
-  /**
-   * ALTER TABLE ADD COLUMNS commands may optionally specify a DEFAULT 
expression for any column.
-   * In that case, this method evaluates its originally specified value and 
then stores the result
-   * in a separate column metadata entry, then returns the updated column 
definitions.
-   */
-  private def constantFoldCurrentDefaultsToExistDefaults(
-      sparkSession: SparkSession, tableProvider: Option[String]): 
Seq[StructField] = {
-    colsToAdd.map { col: StructField =>
-      if (col.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
-        val schema = StructType(Array(col))
-        ResolveDefaultColumns.validateTableProviderForDefaultValue(
-          schema, tableProvider, "ALTER TABLE ADD COLUMNS", true)
-        val foldedStructType = 
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          schema, "ALTER TABLE ADD COLUMNS")
-        foldedStructType.fields(0)
-      } else {
-        col
-      }
-    }
-  }
 }
 
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index b1fcf6f4b3a7..539bcb6c29e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -144,22 +144,18 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
     case CreateTable(tableDesc, mode, None) if 
DDLUtils.isDatasourceTable(tableDesc) =>
       ResolveDefaultColumns.validateTableProviderForDefaultValue(
         tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
-      val newSchema: StructType =
-        ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          tableDesc.schema, "CREATE TABLE")
 
-      if (GeneratedColumn.hasGeneratedColumns(newSchema)) {
+      if (GeneratedColumn.hasGeneratedColumns(tableDesc.schema)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
           tableDesc.identifier, "generated columns")
       }
 
-      if (IdentityColumn.hasIdentityColumns(newSchema)) {
+      if (IdentityColumn.hasIdentityColumns(tableDesc.schema)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
           tableDesc.identifier, "identity columns")
       }
 
-      val newTableDesc = tableDesc.copy(schema = newSchema)
-      CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == 
SaveMode.Ignore)
+      CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == 
SaveMode.Ignore)
 
     case CreateTable(tableDesc, mode, Some(query))
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 9c1cd1cbb105..f48258ea6b85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -394,12 +394,12 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
     withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, 
") {
       withTable("t") {
         sql(s"create table t(i boolean) using $v2Format")
-        // The default value fails to analyze.
+        // The default value references a non-existing column, which is not a 
constant.
         checkError(
           exception = intercept[AnalysisException] {
             sql("alter table t add column s bigint default badvalue")
           },
-          condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+          condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
           parameters = Map(
             "statement" -> "ALTER TABLE ADD COLUMNS",
             "colName" -> "`s`",
@@ -410,7 +410,7 @@ trait AlterTableTests extends SharedSparkSession with 
QueryErrorsBase {
           exception = intercept[AnalysisException] {
             sql("alter table t alter column s set default badvalue")
           },
-          condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+          condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
           parameters = Map(
             "statement" -> "ALTER TABLE ALTER COLUMN",
             "colName" -> "`s`",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 3a327fcf9863..e4f32701201a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -1071,7 +1071,7 @@ class DataSourceV2DataFrameSuite
            c1 timestamp,
            current_timestamp TIMESTAMP DEFAULT c1)""")
         },
-        condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
         parameters = Map(
           "statement" -> "CREATE TABLE",
           "colName" -> "`current_timestamp`",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index d1255da0590c..9560fa28012a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3782,12 +3782,12 @@ class DataSourceV2SQLSuiteV1Filter
   test("CREATE TABLE with invalid default value") {
     withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, 
") {
       withTable("t") {
-        // The default value fails to analyze.
+        // The default value references a non-existing column, which is not a 
constant.
         checkError(
           exception = intercept[AnalysisException] {
             sql(s"create table t(s int default badvalue) using $v2Format")
           },
-          condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+          condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
           parameters = Map(
             "statement" -> "CREATE TABLE",
             "colName" -> "`s`",
@@ -3800,13 +3800,12 @@ class DataSourceV2SQLSuiteV1Filter
     withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, 
") {
       withTable("t") {
         sql(s"create table t(i boolean) using $v2Format")
-
-        // The default value fails to analyze.
+        // The default value references a non-existing column, which is not a 
constant.
         checkError(
           exception = intercept[AnalysisException] {
             sql(s"replace table t(s int default badvalue) using $v2Format")
           },
-          condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+          condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
           parameters = Map(
             "statement" -> "REPLACE TABLE",
             "colName" -> "`s`",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 977370ddc43e..3cd289f7c6d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1202,29 +1202,27 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
   }
 
   test("SPARK-38336 INSERT INTO statements with tables with default columns: 
negative tests") {
-    // The default value references columns.
+    // The default value references a non-existing column, which is not a 
constant.
     withTable("t") {
       checkError(
         exception = intercept[AnalysisException] {
           sql("create table t(i boolean, s bigint default badvalue) using 
parquet")
         },
-        condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
         parameters = Map(
           "statement" -> "CREATE TABLE",
           "colName" -> "`s`",
           "defaultValue" -> "badvalue"))
     }
     try {
-      // The default value references session variables.
+      // The default value references a session variable, which is not a 
constant.
       sql("DECLARE test_var INT")
       withTable("t") {
         checkError(
           exception = intercept[AnalysisException] {
             sql("create table t(i boolean, s int default test_var) using 
parquet")
           },
-          // V1 command still use the fake Analyzer which can't resolve 
session variables and we
-          // can only report UNRESOLVED_EXPRESSION error.
-          condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+          condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
           parameters = Map(
             "statement" -> "CREATE TABLE",
             "colName" -> "`s`",
@@ -1235,8 +1233,6 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql(s"create table t(i int, j int default test_var) using 
$v2Source")
           },
-          // V2 command uses the actual analyzer and can resolve session 
variables. We can report
-          // a more accurate NOT_CONSTANT error.
           condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
           parameters = Map(
             "statement" -> "CREATE TABLE",
@@ -1671,14 +1667,14 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
   }
 
   test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: 
Negative tests") {
-    // The default value fails to analyze.
+    // The default value references a non-existing column, which is not a 
constant.
     withTable("t") {
       sql("create table t(i boolean) using parquet")
       checkError(
         exception = intercept[AnalysisException] {
           sql("alter table t add column s bigint default badvalue")
         },
-        condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
         parameters = Map(
           "statement" -> "ALTER TABLE ADD COLUMNS",
           "colName" -> "`s`",
@@ -1773,12 +1769,12 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     val createTable = "create table t(i boolean, s bigint) using parquet"
     withTable("t") {
       sql(createTable)
-      // The default value fails to analyze.
+      // The default value references a non-existing column, which is not a 
constant.
       checkError(
         exception = intercept[AnalysisException] {
           sql("alter table t alter column s set default badvalue")
         },
-        condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
         parameters = Map(
           "statement" -> "ALTER TABLE ALTER COLUMN",
           "colName" -> "`s`",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to