This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fbdbd016cd71 [SPARK-54872][SQL] Unify column default value handling
between v1 and v2 commands
fbdbd016cd71 is described below
commit fbdbd016cd718b7475fcb40c2c7af110fb2dfa51
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jan 7 00:37:09 2026 +0800
[SPARK-54872][SQL] Unify column default value handling between v1 and v2
commands
### What changes were proposed in this pull request?
This PR refactors the column default value resolution code for `CRETE
TABLE`, `ADD COLUMN`, and `ALTER COLUMN` commands. Now the resolution code is
mostly unified between v1 and v2 commands:
- Analyzer will resolve the default value expression.
- For v2 commands, we validate default value in `CheckAnalysis`, and
optimizer naturally constant fold the default value expression, which will be
used as the EXIST DEFAULT.
- For v1 commands, we convert v2 commands to v1 commands in
`ResolveSessionCatalog`. V1 commands use `StructType` so default value
expression is invisible to the optimizer. we have to validate and constant
fold the default value expression before we convert to v1 commands.
### Why are the changes needed?
Code cleanup. Now `CRETE TABLE`, `ADD COLUMN`, and `ALTER COLUMN` commands
no longer need a fake analyzer.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing test
### Was this patch authored or co-authored using generative AI tooling?
cursor 2.2.44
Closes #53631 from cloud-fan/default.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/types/Metadata.scala | 1 +
.../catalyst/plans/logical/ColumnDefinition.scala | 37 +++++--
.../sql/catalyst/plans/logical/statements.scala | 7 ++
.../plans/logical/v2AlterTableCommands.scala | 8 +-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 80 +++------------
.../sql/connector/catalog/CatalogV2Util.scala | 2 +-
.../sql/catalyst/catalog/SessionCatalogSuite.scala | 10 --
.../catalyst/analysis/ResolveSessionCatalog.scala | 110 ++++++++++++++++-----
.../apache/spark/sql/execution/command/ddl.scala | 22 +++--
.../spark/sql/execution/command/tables.scala | 31 +-----
.../execution/datasources/DataSourceStrategy.scala | 10 +-
.../spark/sql/connector/AlterTableTests.scala | 6 +-
.../sql/connector/DataSourceV2DataFrameSuite.scala | 2 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 9 +-
.../org/apache/spark/sql/sources/InsertSuite.scala | 20 ++--
15 files changed, 187 insertions(+), 168 deletions(-)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 003fa7646df2..e2d4f6ea2322 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -339,6 +339,7 @@ class MetadataBuilder {
private def put(key: String, value: Any): this.type = {
map.put(key, value)
+ runtimeMap.remove(key)
this
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
index d0ac51ad116d..1e03b4e12ace 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{AnalysisAwareExpression,
Expression, Literal, UnaryExpression, Unevaluable}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import
org.apache.spark.sql.catalyst.trees.TreePattern.{ANALYSIS_AWARE_EXPRESSION,
TreePattern}
-import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn,
V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, GeneratedColumn,
IdentityColumn, V2ExpressionBuilder}
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY,
EXISTS_DEFAULT_COLUMN_METADATA_KEY}
import org.apache.spark.sql.connector.catalog.{Column => V2Column,
ColumnDefaultValue, DefaultValue, IdentityColumnSpec}
@@ -33,8 +33,12 @@ import org.apache.spark.sql.internal.connector.ColumnImpl
import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder,
StructField}
/**
- * Column definition for tables. This is an expression so that analyzer can
resolve the default
- * value expression in DDL commands automatically.
+ * User-specified column definition for CREATE/REPLACE TABLE commands. This is
an expression so that
+ * analyzer can resolve the default value expression automatically.
+ *
+ * For CREATE/REPLACE TABLE commands, columns are created from scratch, so we
store the
+ * user-specified default value as both the current default and exists
default, in methods
+ * `toV1Column` and `toV2Column`.
*/
case class ColumnDefinition(
name: String,
@@ -74,9 +78,8 @@ case class ColumnDefinition(
metadataBuilder.putString("comment", c)
}
defaultValue.foreach { default =>
- // For v1 CREATE TABLE command, we will resolve and execute the default
value expression later
- // in the rule `DataSourceAnalysis`. We just need to put the default
value SQL string here.
- metadataBuilder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY,
default.originalSQL)
+ metadataBuilder.putExpression(
+ CURRENT_DEFAULT_COLUMN_METADATA_KEY, default.originalSQL,
Some(default.child))
val existsSQL = default.child match {
case l: Literal => l.sql
case _ => default.originalSQL
@@ -99,10 +102,29 @@ case class ColumnDefinition(
spec.isAllowExplicitInsert)
}
}
+
+ /**
+ * Returns true if the default value's type has been coerced to match this
column's dataType.
+ * After type coercion, the default value expression's dataType should match
the column's
+ * dataType (with CHAR/VARCHAR replaced by STRING).
+ */
+ def isDefaultValueTypeCoerced: Boolean = defaultValue.forall { d =>
+ ColumnDefinition.isDefaultValueTypeMatched(d.child.dataType, dataType)
+ }
}
object ColumnDefinition {
+ /**
+ * Returns true if the default value's type matches the target column type.
+ * CHAR/VARCHAR types are replaced with STRING before comparison since type
coercion
+ * converts them to STRING.
+ */
+ def isDefaultValueTypeMatched(defaultValueType: DataType, targetType:
DataType): Boolean = {
+ val expectedType =
CharVarcharUtils.replaceCharVarcharWithString(targetType)
+ defaultValueType == expectedType
+ }
+
def fromV1Column(col: StructField, parser: ParserInterface):
ColumnDefinition = {
val metadataBuilder = new MetadataBuilder().withMetadata(col.metadata)
metadataBuilder.remove("comment")
@@ -116,6 +138,9 @@ object ColumnDefinition {
val hasDefaultValue = col.getCurrentDefaultValue().isDefined &&
col.getExistenceDefaultValue().isDefined
val defaultValue = if (hasDefaultValue) {
+ // `ColumnDefinition` is for CREATE/REPLACE TABLE commands, and it only
needs one
+ // default value. Here we assume user wants the current default of the
v1 column to be
+ // the default value of this column definition.
val defaultValueSQL = col.getCurrentDefaultValue().get
Some(DefaultValueExpression(parser.parseExpression(defaultValueSQL),
defaultValueSQL))
} else {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 0afc2227cb67..a9e0650010d4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -141,6 +141,13 @@ case class QualifiedColType(
def getV2Default(statement: String): ColumnDefaultValue =
default.map(_.toV2(statement, colName)).orNull
+ /**
+ * Returns true if the default value's type has been coerced to match this
column's dataType.
+ */
+ def isDefaultValueTypeCoerced: Boolean = default.forall { d =>
+ ColumnDefinition.isDefaultValueTypeMatched(d.child.dataType, dataType)
+ }
+
override def children: Seq[Expression] = default.toSeq
override protected def withNewChildrenInternal(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 843ce22061d8..68b3573ce5ce 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -248,7 +248,13 @@ case class AlterColumnSpec(
copy(column = newColumn, newPosition = newPos, newDefaultExpression =
newDefault)
}
-
+ /**
+ * Returns true if the default value's type has been coerced to match the
column's dataType.
+ * When newDataType is None, we skip this check as the type will be resolved
later.
+ */
+ def isDefaultValueTypeCoerced: Boolean = newDefaultExpression.forall { d =>
+ newDataType.forall(dt =>
ColumnDefinition.isDefaultValueTypeMatched(d.child.dataType, dt))
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index f741690e908b..9c077630f33d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
/**
@@ -54,52 +53,6 @@ object ResolveDefaultColumns extends QueryErrorsBase
// CURRENT_DEFAULT_COLUMN_METADATA.
val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
- /**
- * Finds "current default" expressions in CREATE/REPLACE TABLE columns and
constant-folds them.
- *
- * The results are stored in the "exists default" metadata of the same
columns. For example, in
- * the event of this statement:
- *
- * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
- *
- * This method constant-folds the "current default" value, stored in the
CURRENT_DEFAULT metadata
- * of the "b" column, to "10", storing the result in the "exists default"
value within the
- * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current
default" metadata of this
- * "b" column retains its original value of "5 + 5".
- *
- * The reason for constant-folding the EXISTS_DEFAULT is to make the
end-user visible behavior the
- * same, after executing an ALTER TABLE ADD COLUMNS command with DEFAULT
value, as if the system
- * had performed an exhaustive backfill of the provided value to all
previously existing rows in
- * the table instead. We choose to avoid doing such a backfill because it
would be a
- * time-consuming and costly operation. Instead, we elect to store the
EXISTS_DEFAULT in the
- * column metadata for future reference when querying data out of the data
source. In turn, each
- * data source then takes responsibility to provide the constant-folded
value in the
- * EXISTS_DEFAULT metadata for such columns where the value is not present
in storage.
- *
- * @param tableSchema represents the names and types of the columns of the
statement to process.
- * @param statementType name of the statement being processed, such as
INSERT; useful for errors.
- * @return a copy of `tableSchema` with field metadata updated with the
constant-folded values.
- */
- def constantFoldCurrentDefaultsToExistDefaults(
- tableSchema: StructType,
- statementType: String): StructType = {
- if (SQLConf.get.enableDefaultColumns) {
- val newFields: Seq[StructField] = tableSchema.fields.map { field =>
- if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
- val analyzed: Expression = analyze(field, statementType)
- val newMetadata: Metadata = new
MetadataBuilder().withMetadata(field.metadata)
- .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY,
analyzed.sql).build()
- field.copy(metadata = newMetadata)
- } else {
- field
- }
- }.toImmutableArraySeq
- StructType(newFields)
- } else {
- tableSchema
- }
- }
-
// Fails if the given catalog does not support column default value.
def validateCatalogForDefaultValue(
columns: Seq[ColumnDefinition],
@@ -607,25 +560,8 @@ object ResolveDefaultColumns extends QueryErrorsBase
if (default.containsPattern(PLAN_EXPRESSION)) {
throw
QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
statement, colName, default.originalSQL)
- } else if (default.resolved) {
- targetTypeOption match {
- case Some(targetType) =>
- CharVarcharUtils.replaceCharVarcharWithString(targetType)
- if (!Cast.canUpCast(default.child.dataType, targetType) &&
- defaultValueFromWiderTypeLiteral(default.child, targetType,
colName).isEmpty) {
- throw QueryCompilationErrors.defaultValuesDataTypeError(
- statement, colName, default.originalSQL, targetType,
default.child.dataType)
- }
- case _ => ()
- }
- } else {
- throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
- statement, colName, default.originalSQL, null)
}
- // Our analysis check passes here. We do not further inspect whether the
- // expression is `foldable` here, as the plan is not optimized yet.
-
if (default.references.nonEmpty ||
default.exists(_.isInstanceOf[VariableReference])) {
// Ideally we should let the rest of `CheckAnalysis` report errors about
why the default
// expression is unresolved. But we should report a better error here if
the default
@@ -635,6 +571,22 @@ object ResolveDefaultColumns extends QueryErrorsBase
statement, colName, default.originalSQL)
}
+ if (default.resolved) {
+ targetTypeOption.foreach { targetType =>
+ // Type coercion should have already run before this check, so types
should match.
+ if
(!ColumnDefinition.isDefaultValueTypeMatched(default.child.dataType,
targetType)) {
+ throw QueryCompilationErrors.defaultValuesDataTypeError(
+ statement, colName, default.originalSQL, targetType,
default.child.dataType)
+ }
+ }
+ } else {
+ throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
+ statement, colName, default.originalSQL, null)
+ }
+
+ // Our analysis check passes here. We do not further inspect whether the
+ // expression is `foldable` here, as the plan is not optimized yet.
+
if (!default.deterministic) {
throw QueryCompilationErrors.defaultValueNonDeterministicError(
statement, colName, default.originalSQL)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 2d4ef5cd9e07..e0b523d18afc 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -365,7 +365,7 @@ private[sql] object CatalogV2Util {
}
validateTableProviderForDefaultValue(
newSchema, tableProvider, statementType, addNewColumnToExistingTable)
- constantFoldCurrentDefaultsToExistDefaults(newSchema, statementType)
+ newSchema
}
private def replace(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 7a0a37f38099..2841e5adb2ad 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -197,16 +197,6 @@ abstract class SessionCatalogSuite extends AnalysisTest
with Eventually {
"expectedType" -> "\"BOOLEAN\"",
"defaultValue" -> "41 + 1",
"actualType" -> "\"INT\""))
-
- // Make sure that constant-folding default values does not take place
when the feature is
- // disabled.
- withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
- val result: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- db1tbl3.schema, "CREATE TABLE")
- val columnEWithFeatureDisabled: StructField = findField("e", result)
- // No constant-folding has taken place to the EXISTS_DEFAULT metadata.
- assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
- }
}
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key ->
"csv,hive,json,orc,parquet") {
test
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7caadb0631f5..eff95bf4f523 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -22,7 +22,8 @@ import org.apache.spark.internal.LogKeys.CONFIG
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, HiveTableRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper,
Attribute}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL,
CharVarcharUtils, ResolveDefaultColumns => DefaultCols}
@@ -35,7 +36,7 @@ import
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1,
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
-import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField,
StructType}
+import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType,
StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SparkStringUtils
@@ -46,13 +47,14 @@ import org.apache.spark.util.SparkStringUtils
* again, which may lead to inconsistent behavior if the current database is
changed in the middle.
*/
class ResolveSessionCatalog(val catalogManager: CatalogManager)
- extends Rule[LogicalPlan] with LookupCatalog {
+ extends Rule[LogicalPlan] with LookupCatalog with AliasHelper {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
- case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
+ case a @ AddColumns(ResolvedV1TableIdentifier(ident), cols)
+ if a.resolved && cols.forall(_.isDefaultValueTypeCoerced) =>
cols.foreach { c =>
if (c.name.length > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
@@ -62,13 +64,25 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
throw
QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError()
}
}
- AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
+ // For V1 ALTER TABLE ADD COLUMNS command, the default value expression
is hidden in the
+ // StructField metadata, and won't be constant folded by the optimizer.
Here we
+ // manually constant fold it, as exist default needs to be a constant.
+ val newCols = cols.map { col =>
+ val builder = new MetadataBuilder
+ col.comment.foreach(builder.putString("comment", _))
+ col.default.foreach { defaultValue =>
+ addDefaultValueMetadata(builder, defaultValue, col.colName,
col.dataType,
+ "ALTER TABLE ADD COLUMNS")
+ }
+ StructField(col.name.head, col.dataType, nullable = true,
builder.build())
+ }
+ AlterTableAddColumnsCommand(ident, newCols)
case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident,
"REPLACE COLUMNS")
- case AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _), specs)
- if supportsV1Command(catalog) =>
+ case a @ AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _),
specs)
+ if supportsV1Command(catalog) && a.resolved &&
specs.forall(_.isDefaultValueTypeCoerced) =>
if (specs.size > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN in bulk")
@@ -88,7 +102,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
}
val builder = new MetadataBuilder
// Add comment to metadata
- s.newComment.map(c => builder.putString("comment", c))
+ s.newComment.foreach(c => builder.putString("comment", c))
val colName = s.column.name.head
val dataType = s.newDataType.getOrElse {
table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
@@ -102,9 +116,16 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
toSQLId(s.column.name), table.schema.fieldNames)
}
}
- // Add the current default column value string (if any) to the column
metadata.
- s.newDefaultExpression.map { c =>
builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY,
- c.originalSQL) }
+ // For V1 ALTER TABLE ALTER COLUMN command, we only set the
CURRENT_DEFAULT metadata.
+ // The EXISTS_DEFAULT should be preserved from the original column, as
it represents
+ // the default value that was in effect when the column was added (used
for backfilling).
+ s.newDefaultExpression.foreach { defaultValue =>
+ // Validate the default value expression
+ constantFoldDefaultValue(defaultValue, colName, dataType, "ALTER TABLE
ALTER COLUMN")
+ builder.putExpression(
+ DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
defaultValue.originalSQL,
+ Some(defaultValue.child))
+ }
if (s.dropDefault) {
// for legacy reasons, "" means clearing default value
builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "")
@@ -201,7 +222,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
// For CREATE TABLE [AS SELECT], we should use the v1 command if the
catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec:
TableSpec, _)
- if c.resolved =>
+ if c.resolved && c.columns.forall(_.isDefaultValueTypeCoerced) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, tableSpec.options, c.tableSpec.location,
c.tableSpec.serde,
ctas = false)
@@ -210,7 +231,21 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
throw QueryCompilationErrors.unsupportedTableOperationError(
ident, "CONSTRAINT")
}
- constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema,
c.partitioning,
+ // For V1 CREATE TABLE command, the default value expression is hidden
in the
+ // StructField metadata, and won't be constant folded by the
optimizer. Here we
+ // manually constant fold it, as exist default needs to be a constant.
+ val fields = c.columns.map { col =>
+ col.defaultValue.map { defaultValue =>
+ val existsDefaultSQL = constantFoldDefaultValue(
+ defaultValue, col.name, col.dataType, "CREATE TABLE")
+ // toV1Column already sets CURRENT_DEFAULT, so just update
EXISTS_DEFAULT
+ val field = col.toV1Column
+ val newMetadata = new
MetadataBuilder().withMetadata(field.metadata)
+ .putString(DefaultCols.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
existsDefaultSQL).build()
+ field.copy(metadata = newMetadata)
+ }.getOrElse(col.toV1Column)
+ }
+ constructV1TableCmd(None, c.tableSpec, ident, StructType(fields),
c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
@@ -725,20 +760,49 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
}
}
- private def convertToStructField(col: QualifiedColType): StructField = {
- val builder = new MetadataBuilder
- col.comment.foreach(builder.putString("comment", _))
- col.default.map {
- value: DefaultValueExpression => builder.putString(
- DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY, value.originalSQL)
- }
- StructField(col.name.head, col.dataType, nullable = true, builder.build())
- }
-
private def isV2Provider(provider: String): Boolean = {
DataSourceV2Utils.getTableProvider(provider, conf).isDefined
}
+ /**
+ * Validates and constant-folds a column default value expression for V1
tables.
+ * Populates both CURRENT_DEFAULT and EXISTS_DEFAULT in the metadata builder.
+ */
+ private def addDefaultValueMetadata(
+ builder: MetadataBuilder,
+ defaultValue: DefaultValueExpression,
+ colName: String,
+ colType: DataType,
+ statementType: String): Unit = {
+ val existsDefaultSQL = constantFoldDefaultValue(defaultValue, colName,
colType, statementType)
+ builder.putExpression(
+ DefaultCols.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
defaultValue.originalSQL,
+ Some(defaultValue.child))
+ builder.putString(DefaultCols.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
existsDefaultSQL)
+ }
+
+ /**
+ * Validates and constant-folds a column default value expression for V1
tables.
+ * Returns the SQL string of the constant-folded expression for
EXISTS_DEFAULT metadata.
+ */
+ private def constantFoldDefaultValue(
+ defaultValue: DefaultValueExpression,
+ colName: String,
+ colType: DataType,
+ statementType: String): String = {
+ validateDefaultValueExpr(defaultValue, statementType, colName,
Some(colType))
+ val fakePlan = Project(Seq(Alias(defaultValue.child, colName)()),
OneRowRelation())
+ val optimizedPlan =
ConstantFolding(DefaultColumnOptimizer.FinishAnalysis(fakePlan))
+ val constantFolded = optimizedPlan.collectFirst {
+ case Project(Seq(expr), OneRowRelation()) => trimAliases(expr)
+ }.get
+ if (!constantFolded.foldable) {
+ throw QueryCompilationErrors.defaultValueNotConstantError(statementType,
colName,
+ defaultValue.originalSQL)
+ }
+ constantFolded.sql
+ }
+
private object ResolvedV1Database {
def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
case ResolvedNamespace(catalog, _, _) if !supportsV1Command(catalog) =>
None
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8a4f586edfe0..3face404378c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -35,10 +35,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
TableCatalog}
@@ -408,15 +408,19 @@ case class AlterTableChangeColumnCommand(
val withNewTypeAndComment: StructField =
addComment(withNewType(field, newColumn.dataType),
newColumn.getComment())
// Create a new column from the origin column with the new current
default value.
+ // The default value is already validated by ResolveSessionCatalog, so
we just need
+ // to copy the CURRENT_DEFAULT metadata. Note: we preserve the
original EXISTS_DEFAULT
+ // (even if it's absent) from withNewTypeAndComment, as it represents
the default value
+ // that was in effect when the column was added (used for backfilling
old rows).
if (newColumn.getCurrentDefaultValue().isDefined) {
if (newColumn.getCurrentDefaultValue().get.nonEmpty) {
- val result: StructField =
- addCurrentDefaultValue(withNewTypeAndComment,
newColumn.getCurrentDefaultValue())
- // Check that the proposed default value parses and analyzes
correctly, and that the
- // type of the resulting expression is equivalent or coercible to
the destination column
- // type.
- ResolveDefaultColumns.analyze(result, "ALTER TABLE ALTER COLUMN")
- result
+ val (sql, expr) = newColumn.metadata.getExpression[Expression](
+ CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+ val newMetadata = new MetadataBuilder()
+ .withMetadata(withNewTypeAndComment.metadata)
+ .putExpression(CURRENT_DEFAULT_COLUMN_METADATA_KEY, sql, expr)
+ .build()
+ withNewTypeAndComment.copy(metadata = newMetadata)
} else {
withNewTypeAndComment.clearCurrentDefaultValue()
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 5c5ee1d6beaf..abb434b838a5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -234,23 +234,23 @@ case class AlterTableAddColumnsCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable =
verifyAlterTableAddColumn(sparkSession.sessionState.conf, catalog, table)
- val colsWithProcessedDefaults =
- constantFoldCurrentDefaultsToExistDefaults(sparkSession,
catalogTable.provider)
+ ResolveDefaultColumns.validateTableProviderForDefaultValue(
+ StructType(colsToAdd), catalogTable.provider, "ALTER TABLE ADD COLUMNS",
true)
CommandUtils.uncacheTableOrView(sparkSession, table)
catalog.refreshTable(table)
SchemaUtils.checkColumnNameDuplication(
- (colsWithProcessedDefaults ++ catalogTable.schema).map(_.name),
+ (colsToAdd ++ catalogTable.schema).map(_.name),
sparkSession.sessionState.conf.caseSensitiveAnalysis)
if (!conf.allowCollationsInMapKeys) {
colsToAdd.foreach(col =>
SchemaUtils.checkNoCollationsInMapKeys(col.dataType))
}
- DDLUtils.checkTableColumns(catalogTable,
StructType(colsWithProcessedDefaults))
+ DDLUtils.checkTableColumns(catalogTable, StructType(colsToAdd))
val existingDataSchema =
CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
catalog.alterTableSchema(table,
- StructType(existingDataSchema ++ colsWithProcessedDefaults ++
catalogTable.partitionSchema))
+ StructType(existingDataSchema ++ colsToAdd ++
catalogTable.partitionSchema))
Seq.empty[Row]
}
@@ -286,27 +286,6 @@ case class AlterTableAddColumnsCommand(
}
catalogTable
}
-
- /**
- * ALTER TABLE ADD COLUMNS commands may optionally specify a DEFAULT
expression for any column.
- * In that case, this method evaluates its originally specified value and
then stores the result
- * in a separate column metadata entry, then returns the updated column
definitions.
- */
- private def constantFoldCurrentDefaultsToExistDefaults(
- sparkSession: SparkSession, tableProvider: Option[String]):
Seq[StructField] = {
- colsToAdd.map { col: StructField =>
- if (col.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
- val schema = StructType(Array(col))
- ResolveDefaultColumns.validateTableProviderForDefaultValue(
- schema, tableProvider, "ALTER TABLE ADD COLUMNS", true)
- val foldedStructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- schema, "ALTER TABLE ADD COLUMNS")
- foldedStructType.fields(0)
- } else {
- col
- }
- }
- }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index b1fcf6f4b3a7..539bcb6c29e8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -144,22 +144,18 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
case CreateTable(tableDesc, mode, None) if
DDLUtils.isDatasourceTable(tableDesc) =>
ResolveDefaultColumns.validateTableProviderForDefaultValue(
tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
- val newSchema: StructType =
- ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- tableDesc.schema, "CREATE TABLE")
- if (GeneratedColumn.hasGeneratedColumns(newSchema)) {
+ if (GeneratedColumn.hasGeneratedColumns(tableDesc.schema)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
tableDesc.identifier, "generated columns")
}
- if (IdentityColumn.hasIdentityColumns(newSchema)) {
+ if (IdentityColumn.hasIdentityColumns(tableDesc.schema)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
tableDesc.identifier, "identity columns")
}
- val newTableDesc = tableDesc.copy(schema = newSchema)
- CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode ==
SaveMode.Ignore)
+ CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode ==
SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 9c1cd1cbb105..f48258ea6b85 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -394,12 +394,12 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format,
") {
withTable("t") {
sql(s"create table t(i boolean) using $v2Format")
- // The default value fails to analyze.
+ // The default value references a non-existing column, which is not a
constant.
checkError(
exception = intercept[AnalysisException] {
sql("alter table t add column s bigint default badvalue")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "ALTER TABLE ADD COLUMNS",
"colName" -> "`s`",
@@ -410,7 +410,7 @@ trait AlterTableTests extends SharedSparkSession with
QueryErrorsBase {
exception = intercept[AnalysisException] {
sql("alter table t alter column s set default badvalue")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "ALTER TABLE ALTER COLUMN",
"colName" -> "`s`",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 3a327fcf9863..e4f32701201a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -1071,7 +1071,7 @@ class DataSourceV2DataFrameSuite
c1 timestamp,
current_timestamp TIMESTAMP DEFAULT c1)""")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "CREATE TABLE",
"colName" -> "`current_timestamp`",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index d1255da0590c..9560fa28012a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3782,12 +3782,12 @@ class DataSourceV2SQLSuiteV1Filter
test("CREATE TABLE with invalid default value") {
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format,
") {
withTable("t") {
- // The default value fails to analyze.
+ // The default value references a non-existing column, which is not a
constant.
checkError(
exception = intercept[AnalysisException] {
sql(s"create table t(s int default badvalue) using $v2Format")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "CREATE TABLE",
"colName" -> "`s`",
@@ -3800,13 +3800,12 @@ class DataSourceV2SQLSuiteV1Filter
withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format,
") {
withTable("t") {
sql(s"create table t(i boolean) using $v2Format")
-
- // The default value fails to analyze.
+ // The default value references a non-existing column, which is not a
constant.
checkError(
exception = intercept[AnalysisException] {
sql(s"replace table t(s int default badvalue) using $v2Format")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "REPLACE TABLE",
"colName" -> "`s`",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 977370ddc43e..3cd289f7c6d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1202,29 +1202,27 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
test("SPARK-38336 INSERT INTO statements with tables with default columns:
negative tests") {
- // The default value references columns.
+ // The default value references a non-existing column, which is not a
constant.
withTable("t") {
checkError(
exception = intercept[AnalysisException] {
sql("create table t(i boolean, s bigint default badvalue) using
parquet")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "CREATE TABLE",
"colName" -> "`s`",
"defaultValue" -> "badvalue"))
}
try {
- // The default value references session variables.
+ // The default value references a session variable, which is not a
constant.
sql("DECLARE test_var INT")
withTable("t") {
checkError(
exception = intercept[AnalysisException] {
sql("create table t(i boolean, s int default test_var) using
parquet")
},
- // V1 command still use the fake Analyzer which can't resolve
session variables and we
- // can only report UNRESOLVED_EXPRESSION error.
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "CREATE TABLE",
"colName" -> "`s`",
@@ -1235,8 +1233,6 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
exception = intercept[AnalysisException] {
sql(s"create table t(i int, j int default test_var) using
$v2Source")
},
- // V2 command uses the actual analyzer and can resolve session
variables. We can report
- // a more accurate NOT_CONSTANT error.
condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "CREATE TABLE",
@@ -1671,14 +1667,14 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS:
Negative tests") {
- // The default value fails to analyze.
+ // The default value references a non-existing column, which is not a
constant.
withTable("t") {
sql("create table t(i boolean) using parquet")
checkError(
exception = intercept[AnalysisException] {
sql("alter table t add column s bigint default badvalue")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "ALTER TABLE ADD COLUMNS",
"colName" -> "`s`",
@@ -1773,12 +1769,12 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
val createTable = "create table t(i boolean, s bigint) using parquet"
withTable("t") {
sql(createTable)
- // The default value fails to analyze.
+ // The default value references a non-existing column, which is not a
constant.
checkError(
exception = intercept[AnalysisException] {
sql("alter table t alter column s set default badvalue")
},
- condition = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+ condition = "INVALID_DEFAULT_VALUE.NOT_CONSTANT",
parameters = Map(
"statement" -> "ALTER TABLE ALTER COLUMN",
"colName" -> "`s`",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]