This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8300178a4c4b [SPARK-55347][SQL] Pass Generated Column as Expression to 
DSV2
8300178a4c4b is described below

commit 8300178a4c4b909dc4e60574c08b58c2a6444d2a
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Jun 4 10:54:17 2026 +0800

    [SPARK-55347][SQL] Pass Generated Column as Expression to DSV2
    
    ### What changes were proposed in this pull request?
    This change passes a DSV2 Expression (from an analyzed generated column 
Expression) to the DSV2 API.
    
    It also cleans up the code.  Previously, the generation expression is 
analyzed by an independent analyzer.  Now, it is inline (similar to how 
constraint expressions are analyzed).  This makes it easier to run optimizer 
rules to constant fold the expression, before they are passed into DSV2.
    
    ### Why are the changes needed?
    Code cleanup and to make DSV2 data source able to work with DSV2 Expression 
API for generated columns
    
    ### Does this PR introduce _any_ user-facing change?
    Some error messages may change, but overall should be the same.
    
    ### How was this patch tested?
    Added new tests in DataSourceV2DataFrameSuite, and changed error case 
expectations in existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Some claude 4.5 opus, but hand tuned
    
    Closes #54126 from szehon-ho/analyze_generated_column.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/connector/catalog/Column.java |  58 ++-
 ...DefaultValue.java => ColumnExpressionBase.java} |  40 +-
 .../spark/sql/connector/catalog/DefaultValue.java  |  61 +--
 .../connector/catalog/GenerationExpression.java    |  56 +++
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  53 ++-
 .../expressions/AnalysisAwareExpression.scala      |   8 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  10 +-
 .../catalyst/plans/logical/ColumnDefinition.scala  |  31 +-
 .../plans/logical/GeneratedColumnExpression.scala  | 125 ++++++
 .../spark/sql/catalyst/util/GeneratedColumn.scala  | 150 +------
 .../sql/connector/catalog/CatalogV2Util.scala      |   3 +-
 .../spark/sql/internal/connector/ColumnImpl.scala  |   8 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |   5 +-
 .../sql/connector/catalog/V2TableUtilSuite.scala   |   2 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |   6 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 470 ++++++++++++++++++++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 368 +++++++++++++---
 17 files changed, 1149 insertions(+), 305 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
index 537c2edd1128..03c0409f39b4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
@@ -84,6 +84,14 @@ public interface Column {
         /* id = */ null);
   }
 
+  /**
+   * Creates a column with a generation expression in SQL string form.
+   *
+   * @since 4.3.0
+   * @deprecated Use
+   *   {@link #create(String, DataType, boolean, String, GenerationExpression, 
String)} instead.
+   */
+  @Deprecated
   static Column create(
       String name,
       DataType dataType,
@@ -91,6 +99,32 @@ public interface Column {
       String comment,
       String generationExpression,
       String metadataInJSON) {
+    GenerationExpression genExpr = generationExpression != null
+        ? new GenerationExpression(generationExpression) : null;
+    return new ColumnImpl(
+        name,
+        dataType,
+        nullable,
+        comment,
+        /* defaultValue = */ null,
+        genExpr,
+        /* identityColumnSpec = */ null,
+        metadataInJSON,
+        /* id = */ null);
+  }
+
+  /**
+   * Creates a column with a generation expression object.
+   *
+   * @since 4.3.0
+   */
+  static Column create(
+      String name,
+      DataType dataType,
+      boolean nullable,
+      String comment,
+      GenerationExpression generationExpression,
+      String metadataInJSON) {
     return new ColumnImpl(
         name,
         dataType,
@@ -150,13 +184,29 @@ public interface Column {
   ColumnDefaultValue defaultValue();
 
   /**
-   * Returns the generation expression of this table column. Null means no 
generation expression.
+   * Returns the generation expression of this table column as a SQL string. 
Null means no
+   * generation expression.
    * <p>
-   * The generation expression is stored as spark SQL dialect. It is up to the 
data source to verify
-   * expression compatibility and reject writes as necessary.
+   * This returns only the SQL string form. Prefer {@link 
#columnGenerationExpression()}, which can
+   * also carry a connector {@link 
org.apache.spark.sql.connector.expressions.Expression} and
+   * captures the semantics unambiguously. It is up to the data source to 
verify expression
+   * compatibility and reject writes as necessary.
    */
   @Nullable
-  String generationExpression();
+  default String generationExpression() {
+    return columnGenerationExpression() != null ? 
columnGenerationExpression().getSql() : null;
+  }
+
+  /**
+   * Returns the generation expression of this table column as a {@link 
GenerationExpression}.
+   * Null means no generation expression.
+   *
+   * @since 4.3.0
+   */
+  @Nullable
+  default GenerationExpression columnGenerationExpression() {
+    return null;
+  }
 
   /**
    * Returns the identity column specification of this table column. Null 
means no identity column.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnExpressionBase.java
similarity index 58%
copy from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnExpressionBase.java
index 6e487ce326a4..b4181bb74dd2 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnExpressionBase.java
@@ -22,34 +22,17 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 
 import org.apache.spark.SparkIllegalArgumentException;
-import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.expressions.Expression;
 
 /**
- * A class that represents default values.
- * <p>
- * Connectors can define default values using either a SQL string (Spark SQL 
dialect) or an
- * {@link Expression expression} if the default value can be expressed as a 
supported connector
- * expression. If both the SQL string and the expression are provided, Spark 
first attempts to
- * convert the given expression to its internal representation. If the 
expression cannot be
- * converted, and a SQL string is provided, Spark will fall back to parsing 
the SQL string.
- *
- * @since 4.1.0
+ * Base type for connector expressions that can be represented as a SQL string 
and/or a
+ * connector {@link Expression}.
  */
-@Evolving
-public class DefaultValue {
+abstract class ColumnExpressionBase {
   private final String sql;
   private final Expression expr;
 
-  public DefaultValue(String sql) {
-    this(sql, null /* no expression */);
-  }
-
-  public DefaultValue(Expression expr) {
-    this(null /* no sql */, expr);
-  }
-
-  public DefaultValue(String sql, Expression expr) {
+  ColumnExpressionBase(String sql, Expression expr) {
     if (sql == null && expr == null) {
       throw new SparkIllegalArgumentException(
           "INTERNAL_ERROR",
@@ -60,18 +43,18 @@ public class DefaultValue {
   }
 
   /**
-   * Returns the SQL representation of the default value (Spark SQL dialect), 
if provided.
+   * Returns the SQL representation, if provided.
    */
   @Nullable
-  public String getSql() {
+  public final String getSql() {
     return sql;
   }
 
   /**
-   * Returns the expression representing the default value, if provided.
+   * Returns the connector expression, if provided.
    */
   @Nullable
-  public Expression getExpression() {
+  public final Expression getExpression() {
     return expr;
   }
 
@@ -79,7 +62,7 @@ public class DefaultValue {
   public boolean equals(Object other) {
     if (this == other) return true;
     if (other == null || getClass() != other.getClass()) return false;
-    DefaultValue that = (DefaultValue) other;
+    ColumnExpressionBase that = (ColumnExpressionBase) other;
     return Objects.equals(sql, that.sql) && Objects.equals(expr, that.expr);
   }
 
@@ -87,9 +70,4 @@ public class DefaultValue {
   public int hashCode() {
     return Objects.hash(sql, expr);
   }
-
-  @Override
-  public String toString() {
-    return String.format("DefaultValue{sql=%s, expression=%s}", sql, expr);
-  }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
index 6e487ce326a4..9eb19fa387c2 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DefaultValue.java
@@ -17,29 +17,23 @@
 
 package org.apache.spark.sql.connector.catalog;
 
-import java.util.Map;
-import java.util.Objects;
-import javax.annotation.Nullable;
-
-import org.apache.spark.SparkIllegalArgumentException;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.expressions.Expression;
 
 /**
- * A class that represents default values.
+ * A class that represents default value expressions.
+ * <p>
+ * Connectors can define default values using either a SQL string (Spark SQL 
dialect) or a more
+ * portable connector {@link Expression expression}, whose semantics follow 
the ANSI SQL standard.
  * <p>
- * Connectors can define default values using either a SQL string (Spark SQL 
dialect) or an
- * {@link Expression expression} if the default value can be expressed as a 
supported connector
- * expression. If both the SQL string and the expression are provided, Spark 
first attempts to
- * convert the given expression to its internal representation. If the 
expression cannot be
- * converted, and a SQL string is provided, Spark will fall back to parsing 
the SQL string.
+ * If both the SQL string and the expression are provided, Spark first 
attempts to convert the
+ * given expression to its internal representation. If the expression cannot 
be converted, and a
+ * SQL string is provided, Spark will fall back to parsing the SQL string.
  *
  * @since 4.1.0
  */
 @Evolving
-public class DefaultValue {
-  private final String sql;
-  private final Expression expr;
+public class DefaultValue extends ColumnExpressionBase {
 
   public DefaultValue(String sql) {
     this(sql, null /* no expression */);
@@ -50,46 +44,11 @@ public class DefaultValue {
   }
 
   public DefaultValue(String sql, Expression expr) {
-    if (sql == null && expr == null) {
-      throw new SparkIllegalArgumentException(
-          "INTERNAL_ERROR",
-          Map.of("message", "SQL and expression can't be both null"));
-    }
-    this.sql = sql;
-    this.expr = expr;
-  }
-
-  /**
-   * Returns the SQL representation of the default value (Spark SQL dialect), 
if provided.
-   */
-  @Nullable
-  public String getSql() {
-    return sql;
-  }
-
-  /**
-   * Returns the expression representing the default value, if provided.
-   */
-  @Nullable
-  public Expression getExpression() {
-    return expr;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) return true;
-    if (other == null || getClass() != other.getClass()) return false;
-    DefaultValue that = (DefaultValue) other;
-    return Objects.equals(sql, that.sql) && Objects.equals(expr, that.expr);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(sql, expr);
+    super(sql, expr);
   }
 
   @Override
   public String toString() {
-    return String.format("DefaultValue{sql=%s, expression=%s}", sql, expr);
+    return String.format("DefaultValue{sql=%s, expression=%s}", getSql(), 
getExpression());
   }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/GenerationExpression.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/GenerationExpression.java
new file mode 100644
index 000000000000..8e64eba9c516
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/GenerationExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Expression;
+
+/**
+ * A class that represents generated column expressions.
+ * <p>
+ * Connectors can define generated columns using either a SQL string (Spark 
SQL dialect) or a more
+ * portable connector {@link Expression expression}, whose semantics follow 
the ANSI SQL standard.
+ * <p>
+ * If both the SQL string and the expression are provided, Spark first 
attempts to convert the
+ * given expression to its internal representation. If the expression cannot 
be converted, and a
+ * SQL string is provided, Spark will fall back to parsing the SQL string.
+ *
+ * @since 4.3.0
+ */
+@Evolving
+public class GenerationExpression extends ColumnExpressionBase {
+
+  public GenerationExpression(String sql) {
+    this(sql, null /* no expression */);
+  }
+
+  public GenerationExpression(Expression expr) {
+    this(null /* no sql */, expr);
+  }
+
+  public GenerationExpression(String sql, Expression expr) {
+    super(sql, expr);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "GenerationExpression{sql=%s, expression=%s}", getSql(), 
getExpression());
+  }
+}
+
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c63fa6398cbd..323dc4942612 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -582,7 +582,29 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
             WindowResolution.validateResolvedWindowExpression(w)
 
           case s: SubqueryExpression =>
-            checkSubqueryExpression(operator, s)
+            // A subquery inside a generation expression is unsupported. 
Surface a
+            // generated-column-specific error here, before the generic 
subquery validation below
+            // reports a less helpful message.
+            operator match {
+              case create: V2CreateTablePlan =>
+                create.columns.find { col =>
+                  col.generationExpression.exists(_.child.exists(_ eq s))
+                } match {
+                  case Some(col) =>
+                    throw new AnalysisException(
+                      errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+                      messageParameters = Map(
+                        "fieldName" -> col.name,
+                        "expressionStr" -> 
col.generationExpression.get.originalSQL,
+                        "reason" -> "subquery expressions are not allowed for 
generated columns"
+                      )
+                    )
+                  case None =>
+                    checkSubqueryExpression(operator, s)
+                }
+              case _ =>
+                checkSubqueryExpression(operator, s)
+            }
 
           case e: ExpressionWithRandomSeed if !e.seedExpression.foldable =>
             e.failAnalysis(
@@ -887,6 +909,35 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
             TypeUtils.failUnsupportedDataType(create.tableSchema, SQLConf.get)
             SchemaUtils.checkIndeterminateCollationInSchema(create.tableSchema)
 
+            // Validate generated column expressions
+            create.columns.foreach { col =>
+              col.generationExpression.foreach { genExpr =>
+                def unsupportedExpressionError(reason: String): 
AnalysisException = {
+                  new AnalysisException(
+                    errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+                    messageParameters = Map(
+                      "fieldName" -> col.name,
+                      "expressionStr" -> genExpr.originalSQL,
+                      "reason" -> reason
+                    )
+                  )
+                }
+                // Only built-in functions are allowed in generated columns. 
Functions that are
+                // not built-in already fail earlier in analysis: unknown 
functions with
+                // UNRESOLVED_ROUTINE and persistent SQL functions with 
UNSUPPORTED_SQL_UDF_USAGE.
+                // Here we only need to reject user-defined functions 
(Scala/Python/Java/Hive UDFs
+                // and v2 catalog functions, all of which are 
`UserDefinedExpression`), which
+                // resolve successfully.
+                genExpr.child.foreach {
+                  case udf: UserDefinedExpression =>
+                    throw unsupportedExpressionError(
+                      s"failed to resolve `${udf.name}` to a built-in 
function")
+                  case _ =>
+                }
+                genExpr.validate(col.name, col.dataType, create.columns)
+              }
+            }
+
           case write: V2WriteCommand if write.resolved =>
             write.query.schema.foreach(f => 
TypeUtils.failWithIntervalType(f.dataType))
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnalysisAwareExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnalysisAwareExpression.scala
index 274f0b7ff582..9da65cb0e778 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnalysisAwareExpression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AnalysisAwareExpression.scala
@@ -17,7 +17,13 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-/** An expression that must be notified when the analysis phase is finished. */
+/**
+ * An expression that is notified when the analysis phase finishes, via 
[[markAsAnalyzed]].
+ * Implementations typically use this to capture the analyzed (but not yet 
optimized) form of the
+ * expression. This is useful for use-cases like DSV2 Expression conversion 
that want the original
+ * form of the expression, without the optimizer folding context-dependent 
functions (e.g.
+ * `CURRENT_TIMESTAMP`) into definition-time literals.
+ */
 trait AnalysisAwareExpression[E <: Expression] {
   def markAsAnalyzed(): E
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index ccca53f351e8..eec60d9478fd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -4821,11 +4821,15 @@ class AstBuilder extends DataTypeAstBuilder
     }
 
   /**
-   * Create a generation expression string.
+   * Create a generation expression.
    */
-  override def visitGeneratedColumn(ctx: GeneratedColumnContext): String =
+  override def visitGeneratedColumn(ctx: GeneratedColumnContext): 
GeneratedColumnExpression =
     withOrigin(ctx) {
-      getDefaultExpression(ctx.expression(), "GENERATED").originalSQL
+      val expr = expression(ctx.expression())
+      if (expr.containsPattern(PARAMETER)) {
+        throw QueryParsingErrors.parameterMarkerNotAllowed("GENERATED", 
expr.origin)
+      }
+      GeneratedColumnExpression(expr, getOriginalText(ctx.expression()))
     }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
index 74c36f4099d9..8f3cd274e93d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
@@ -46,18 +46,30 @@ case class ColumnDefinition(
     nullable: Boolean = true,
     comment: Option[String] = None,
     defaultValue: Option[DefaultValueExpression] = None,
-    generationExpression: Option[String] = None,
+    generationExpression: Option[GeneratedColumnExpression] = None,
     identityColumnSpec: Option[IdentityColumnSpec] = None,
     metadata: Metadata = Metadata.empty) extends Expression with Unevaluable {
   assert(
     generationExpression.isEmpty || identityColumnSpec.isEmpty,
     "A ColumnDefinition cannot contain both a generation expression and an 
identity column spec.")
 
-  override def children: Seq[Expression] = defaultValue.toSeq
+  override def children: Seq[Expression] = defaultValue.toSeq ++ 
generationExpression.toSeq
 
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[Expression]): Expression = {
-    copy(defaultValue = 
newChildren.headOption.map(_.asInstanceOf[DefaultValueExpression]))
+    val hasDefault = defaultValue.isDefined
+    val hasGenExpr = generationExpression.isDefined
+    val newDefault = if (hasDefault) {
+      Some(newChildren.head.asInstanceOf[DefaultValueExpression])
+    } else {
+      None
+    }
+    val newGenExpr = if (hasGenExpr) {
+      Some(newChildren.last.asInstanceOf[GeneratedColumnExpression])
+    } else {
+      None
+    }
+    copy(defaultValue = newDefault, generationExpression = newGenExpr)
   }
 
   def toV2Column(statement: String): V2Column = {
@@ -67,7 +79,7 @@ case class ColumnDefinition(
       nullable,
       comment.orNull,
       defaultValue.map(_.toV2(statement, name)).orNull,
-      generationExpression.orNull,
+      generationExpression.map(_.toV2).orNull,
       identityColumnSpec.orNull,
       if (metadata == Metadata.empty) null else metadata.json)
   }
@@ -86,8 +98,9 @@ case class ColumnDefinition(
       }
       metadataBuilder.putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, existsSQL)
     }
-    generationExpression.foreach { generationExpr =>
-      
metadataBuilder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, 
generationExpr)
+    generationExpression.foreach { genExpr =>
+      
metadataBuilder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY,
+        genExpr.originalSQL)
     }
     encodeIdentityColumnSpec(metadataBuilder)
     StructField(name, dataType, nullable, metadataBuilder.build())
@@ -146,7 +159,9 @@ object ColumnDefinition {
     } else {
       None
     }
-    val generationExpr = GeneratedColumn.getGenerationExpression(col)
+    val generationExpr = GeneratedColumn.getGenerationExpression(col).map { 
sql =>
+      GeneratedColumnExpression(parser.parseExpression(sql), sql)
+    }
     val identityColumnSpec = if 
(col.metadata.contains(IdentityColumn.IDENTITY_INFO_START)) {
       Some(new IdentityColumnSpec(
         col.metadata.getLong(IdentityColumn.IDENTITY_INFO_START),
@@ -217,7 +232,7 @@ object ColumnDefinition {
         messageParameters = Map(
           "colName" -> col.name,
           "defaultValue" -> col.defaultValue.get.originalSQL,
-          "genExpr" -> col.generationExpression.get
+          "genExpr" -> col.generationExpression.get.originalSQL
         )
       )
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/GeneratedColumnExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/GeneratedColumnExpression.scala
new file mode 100644
index 000000000000..28548dacdfbd
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/GeneratedColumnExpression.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{AnalysisAwareExpression, 
AttributeReference, Cast, Expression, UnaryExpression, Unevaluable}
+import 
org.apache.spark.sql.catalyst.trees.TreePattern.{ANALYSIS_AWARE_EXPRESSION, 
PLAN_EXPRESSION, TreePattern}
+import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder
+import org.apache.spark.sql.connector.catalog.GenerationExpression
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.util.SchemaUtils
+
+/**
+ * A wrapper expression to hold the generation expression and its original SQL 
text.
+ */
+case class GeneratedColumnExpression(
+    child: Expression,
+    originalSQL: String,
+    analyzedChild: Option[Expression] = None)
+  extends UnaryExpression
+  with Unevaluable
+  with AnalysisAwareExpression[GeneratedColumnExpression] {
+
+  final override val nodePatterns: Seq[TreePattern] = 
Seq(ANALYSIS_AWARE_EXPRESSION)
+
+  override def dataType: DataType = child.dataType
+
+  override def stringArgs: Iterator[Any] = Iterator(child, originalSQL)
+
+  override def markAsAnalyzed(): GeneratedColumnExpression =
+    copy(analyzedChild = Some(child))
+
+  override protected def withNewChildInternal(newChild: Expression): 
Expression =
+    copy(child = newChild)
+
+  /**
+   * Validate the generation expression and throw an AnalysisException if 
invalid.
+   * Validations include:
+   * - The expression cannot reference itself
+   * - The expression cannot reference other generated columns
+   * - The expression must be deterministic
+   * - The expression data type can be safely up-cast to the destination 
column data type
+   * - No subquery expressions
+   * - No non-UTF8 binary collation
+   */
+  def validate(
+      fieldName: String,
+      targetDataType: DataType,
+      allColumns: Seq[ColumnDefinition]): Unit = {
+    def unsupportedExpressionError(reason: String): AnalysisException = {
+      new AnalysisException(
+        errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+        messageParameters = Map(
+          "fieldName" -> fieldName,
+          "expressionStr" -> originalSQL,
+          "reason" -> reason))
+    }
+
+    // Don't allow subquery expressions
+    if (child.containsPattern(PLAN_EXPRESSION)) {
+      throw unsupportedExpressionError("subquery expressions are not allowed 
for generated columns")
+    }
+
+    // Use the resolver to respect case sensitivity settings
+    val resolver = SQLConf.get.resolver
+
+    // Check for self-reference - the expression cannot reference itself
+    val referencedColumns = child.collect {
+      case a: AttributeReference => a.name
+    }
+    if (referencedColumns.exists(resolver(_, fieldName))) {
+      throw unsupportedExpressionError("generation expression cannot reference 
itself")
+    }
+
+    // Check for references to other generated columns
+    val generatedColumnNames = allColumns
+      .filter(col => col.generationExpression.isDefined && !resolver(col.name, 
fieldName))
+      .map(_.name)
+    if (referencedColumns.exists(ref => 
generatedColumnNames.exists(resolver(ref, _)))) {
+      throw unsupportedExpressionError(
+        "generation expression cannot reference another generated column")
+    }
+
+    if (!child.deterministic) {
+      throw unsupportedExpressionError("generation expression is not 
deterministic")
+    }
+
+    if (!Cast.canUpCast(child.dataType, targetDataType)) {
+      throw unsupportedExpressionError(
+        s"generation expression data type ${child.dataType.simpleString} " +
+          s"is incompatible with column data type 
${targetDataType.simpleString}")
+    }
+
+    if (child.exists(e => SchemaUtils.hasNonUTF8BinaryCollation(e.dataType))) {
+      throw unsupportedExpressionError(
+        "generation expression cannot contain non utf8 binary collated string 
type")
+    }
+  }
+
+  // Convert the generation expression to V2 GenerationExpression. The V2 
expression is built from
+  // the analyzed (pre-optimization) child so that context-dependent functions 
such as
+  // CURRENT_TIMESTAMP are not frozen into definition-time literals. When the 
analyzed child cannot
+  // be represented as a V2 expression (e.g. CURRENT_TIMESTAMP), the V2 
expression is null and the
+  // connector falls back to the SQL-string form.
+  def toV2: GenerationExpression = {
+    val v2Expr = analyzedChild.flatMap(new 
V2ExpressionBuilder(_).build()).orNull
+    new GenerationExpression(originalSQL, v2Expr)
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
index 793c994fdd43..2f92428cc53e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -17,19 +17,10 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import org.apache.spark.SparkException
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
-import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
-import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
-import org.apache.spark.sql.connector.catalog.{DefaultCatalogManager, 
Identifier, TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.catalyst.plans.logical.ColumnDefinition
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, 
TableCatalogCapability}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-import org.apache.spark.sql.util.SchemaUtils
+import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
  * This object contains utility methods and values for Generated Columns
@@ -42,9 +33,6 @@ object GeneratedColumn {
    */
   val GENERATION_EXPRESSION_METADATA_KEY = "GENERATION_EXPRESSION"
 
-  /** Parser for parsing generation expression SQL strings */
-  private lazy val parser = new CatalystSqlParser()
-
   /**
    * Whether the given `field` is a generated column
    */
@@ -71,139 +59,19 @@ object GeneratedColumn {
   }
 
   /**
-   * Parse and analyze `expressionStr` and perform verification. This means:
-   * - The expression cannot reference itself
-   * - The expression cannot reference other generated columns
-   * - No user-defined expressions
-   * - The expression must be deterministic
-   * - The expression data type can be safely up-cast to the destination 
column data type
-   * - No subquery expressions
-   *
-   * Throws an [[AnalysisException]] if the expression cannot be converted or 
is an invalid
-   * generation expression according to the above rules.
+   * Check if the table catalog supports generated columns.
+   * This is called from DataSourceV2Strategy for CREATE/REPLACE TABLE 
commands.
    */
-  private def analyzeAndVerifyExpression(
-      expressionStr: String,
-      fieldName: String,
-      dataType: DataType,
-      schema: StructType,
-      statementType: String): Unit = {
-    def unsupportedExpressionError(reason: String): AnalysisException = {
-      new AnalysisException(
-        errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
-        messageParameters = Map(
-          "fieldName" -> fieldName,
-          "expressionStr" -> expressionStr,
-          "reason" -> reason))
-    }
-
-    // Parse the expression string
-    val parsed: Expression = try {
-      parser.parseExpression(expressionStr)
-    } catch {
-      case ex: ParseException =>
-        // Shouldn't be possible since we check that the expression is a valid 
catalyst expression
-        // during parsing
-        throw SparkException.internalError(
-          s"Failed to execute $statementType command because the column 
$fieldName has " +
-            s"generation expression $expressionStr which fails to parse as a 
valid expression:" +
-            s"\n${ex.getMessage}")
-    }
-    // Don't allow subquery expressions
-    if (parsed.containsPattern(PLAN_EXPRESSION)) {
-      throw unsupportedExpressionError("subquery expressions are not allowed 
for generated columns")
-    }
-    // Analyze the parsed result
-    val allowedBaseColumns = schema
-      .filterNot(_.name == fieldName) // Can't reference itself
-      .filterNot(isGeneratedColumn) // Can't reference other generated columns
-    val relation = LocalRelation(
-      
CharVarcharUtils.replaceCharVarcharWithStringInSchema(StructType(allowedBaseColumns)))
-    val plan = try {
-      val analyzer: Analyzer = GeneratedColumnAnalyzer
-      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), 
relation))
-      analyzer.checkAnalysis(analyzed)
-      analyzed
-    } catch {
-      case ex: AnalysisException =>
-        // Improve error message if possible
-        if (ex.getCondition == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {
-          ex.messageParameters.get("objectName").foreach { unresolvedCol =>
-            val resolver = SQLConf.get.resolver
-            // Whether `col` = `unresolvedCol` taking into account 
case-sensitivity
-            def isUnresolvedCol(col: String) =
-              resolver(unresolvedCol, QueryCompilationErrors.toSQLId(col))
-            // Check whether the unresolved column is this column
-            if (isUnresolvedCol(fieldName)) {
-              throw unsupportedExpressionError("generation expression cannot 
reference itself")
-            }
-            // Check whether the unresolved column is another generated column 
in the schema
-            if (schema.exists(col => isGeneratedColumn(col) && 
isUnresolvedCol(col.name))) {
-              throw unsupportedExpressionError(
-                "generation expression cannot reference another generated 
column")
-            }
-          }
-        }
-        if (ex.getCondition == "UNRESOLVED_ROUTINE") {
-          // Cannot resolve function using built-in catalog
-          ex.messageParameters.get("routineName").foreach { fnName =>
-            throw unsupportedExpressionError(s"failed to resolve $fnName to a 
built-in function")
-          }
-        }
-        throw ex
-    }
-    val analyzed = plan.collectFirst {
-      case Project(Seq(a: Alias), _: LocalRelation) => a.child
-    }.get
-    if (!analyzed.deterministic) {
-      throw unsupportedExpressionError("generation expression is not 
deterministic")
-    }
-    if (!Cast.canUpCast(analyzed.dataType, dataType)) {
-      throw unsupportedExpressionError(
-        s"generation expression data type ${analyzed.dataType.simpleString} " +
-        s"is incompatible with column data type ${dataType.simpleString}")
-    }
-    if (analyzed.exists(e => 
SchemaUtils.hasNonUTF8BinaryCollation(e.dataType))) {
-      throw unsupportedExpressionError(
-        "generation expression cannot contain non utf8 binary collated string 
type")
-    }
-  }
-
-  /**
-   * For any generated columns in `schema`, parse, analyze and verify the 
generation expression.
-   */
-  private def verifyGeneratedColumns(schema: StructType, statementType: 
String): Unit = {
-    schema.foreach { field =>
-      getGenerationExpression(field).foreach { expressionStr =>
-        analyzeAndVerifyExpression(expressionStr, field.name, field.dataType, 
schema, statementType)
-      }
-    }
-  }
-
-  /**
-   * If `schema` contains any generated columns:
-   * 1) Check whether the table catalog supports generated columns. Otherwise 
throw an error.
-   * 2) Parse, analyze and verify the generation expressions for any generated 
columns.
-   */
-  def validateGeneratedColumns(
-      schema: StructType,
+  def validateCatalogForGeneratedColumn(
+      columns: Seq[ColumnDefinition],
       catalog: TableCatalog,
-      ident: Identifier,
-      statementType: String): Unit = {
-    if (hasGeneratedColumns(schema)) {
+      ident: Identifier): Unit = {
+    if (columns.exists(_.generationExpression.isDefined)) {
       if (!catalog.capabilities().contains(
         TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) {
         throw QueryCompilationErrors.unsupportedTableOperationError(
           catalog, ident, "generated columns")
       }
-      GeneratedColumn.verifyGeneratedColumns(schema, statementType)
     }
   }
 }
-
-/**
- * Analyzer for processing generated column expressions using built-in 
functions only.
- */
-object GeneratedColumnAnalyzer extends Analyzer(
-  new DefaultCatalogManager(BuiltInFunctionCatalog, 
BuiltInFunctionCatalog.v1Catalog)) {
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e42d5f3a8445..e34a21eeec76 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -749,7 +749,8 @@ private[sql] object CatalogV2Util {
       val cleanedMetadata = metadataWithKeysRemoved(
         Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY))
       Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
-        GeneratedColumn.getGenerationExpression(f).get, 
metadataAsJson(cleanedMetadata))
+        new 
GenerationExpression(GeneratedColumn.getGenerationExpression(f).get),
+        metadataAsJson(cleanedMetadata))
     } else if (isIdentityColumn) {
       val cleanedMetadata = metadataWithKeysRemoved(
         Seq("comment",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
index f97f90b7eb59..2e5cd4b8b0a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.internal.connector
 
 import java.util.Objects
 
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
IdentityColumnSpec}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, 
GenerationExpression, IdentityColumnSpec}
 import org.apache.spark.sql.types.DataType
 
 // The standard concrete implementation of data source V2 column.
@@ -29,7 +29,7 @@ case class ColumnImpl(
     nullable: Boolean,
     comment: String,
     defaultValue: ColumnDefaultValue,
-    generationExpression: String,
+    override val columnGenerationExpression: GenerationExpression,
     identityColumnSpec: IdentityColumnSpec,
     metadataInJSON: String,
     override val id: String = null) extends Column {
@@ -47,7 +47,7 @@ case class ColumnImpl(
         nullable == that.nullable &&
         comment == that.comment &&
         defaultValue == that.defaultValue &&
-        generationExpression == that.generationExpression &&
+        columnGenerationExpression == that.columnGenerationExpression &&
         identityColumnSpec == that.identityColumnSpec &&
         metadataInJSON == that.metadataInJSON
     case _ => false
@@ -60,7 +60,7 @@ case class ColumnImpl(
       Boolean.box(nullable),
       comment,
       defaultValue,
-      generationExpression,
+      columnGenerationExpression,
       identityColumnSpec,
       metadataInJSON)
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 1db22037d31f..7907d80e8b0c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -22,7 +22,7 @@ import java.util.Locale
 import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, GreaterThan, 
Hex, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Add, And, EqualTo, 
GreaterThan, Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.IdentityColumnSpec
 import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
@@ -3236,7 +3236,8 @@ class DDLParserSuite extends AnalysisTest {
         "b",
         IntegerType,
         nullable = false,
-        generationExpression = Some("a+1")
+        generationExpression = Some(GeneratedColumnExpression(
+          Add(UnresolvedAttribute(Seq("a")), Literal(1)), "a+1"))
       )
     )
     comparePlans(parsePlan(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
index c02c517ff546..f4db1a94c4d5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/V2TableUtilSuite.scala
@@ -706,7 +706,7 @@ class V2TableUtilSuite extends SparkFunSuite {
       nullable = nullable,
       comment = null,
       defaultValue = null,
-      generationExpression = null,
+      columnGenerationExpression = null,
       identityColumnSpec = null,
       metadataInJSON = null,
       id = id)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 4fd7d993cc3d..8e40f01601d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -241,8 +241,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       ResolveTableConstraints.validateCatalogForTableConstraint(
         tableSpec.constraints, tableCatalog, ident)
       val statementType = "CREATE TABLE"
-      GeneratedColumn.validateGeneratedColumns(
-        c.tableSchema, tableCatalog, ident, statementType)
+      GeneratedColumn.validateCatalogForGeneratedColumn(columns, tableCatalog, 
ident)
       IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)
 
       CreateTableExec(
@@ -299,8 +298,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       ResolveTableConstraints.validateCatalogForTableConstraint(
         tableSpec.constraints, tableCatalog, ident)
       val statementType = "REPLACE TABLE"
-      GeneratedColumn.validateGeneratedColumns(
-        c.tableSchema, tableCatalog, ident, statementType)
+      GeneratedColumn.validateCatalogForGeneratedColumn(columns, tableCatalog, 
ident)
       IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)
 
       val v2Columns = columns.map(_.toV2Column(statementType)).toArray
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index f272f28a5f92..71632e07c78b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -28,14 +28,14 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Row, SaveMode, SparkS
 import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, 
Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, 
NullColumnIdInMemoryTableCatalog, 
NullTableIdAndNullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, 
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, 
Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, 
GenerationExpression, Identifier, InMemoryTableCatalog, 
MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, 
NullTableIdAndNullColumnIdInMemoryTableCatalog, 
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, 
TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
 import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
UpdateColumnDefaultValue}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableWritePrivilege
 import org.apache.spark.sql.connector.catalog.TruncatableTable
-import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
GeneralScalarExpression, LiteralValue, Transform}
-import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, Cast => 
V2Cast, Extract, FieldReference, GeneralScalarExpression, LiteralValue, 
Transform}
+import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, 
AlwaysTrue, Predicate => V2Predicate}
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan
 import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, 
CreateTableExec, DataSourceV2Relation, ReplaceTableExec}
@@ -80,6 +80,7 @@ class DataSourceV2DataFrameSuite
     .set("spark.sql.catalog.composedidcat",
       classOf[ComposedColumnIdTableCatalog].getName)
     .set("spark.sql.catalog.composedidcat.copyOnLoad", "true")
+    .set(SQLConf.TIME_TYPE_ENABLED.key, "true")
 
   after {
     
catalog("cachingcat").asInstanceOf[CachingInMemoryTableCatalog].clearCache()
@@ -1155,6 +1156,445 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  test("create/replace table with generated columns should have V2 
Expression") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b INT GENERATED ALWAYS AS (a + 1),
+             |  c INT GENERATED ALWAYS AS (a * 2)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          new GenerationExpression(
+            "a + 1",
+            new GeneralScalarExpression(
+              "+",
+              Array(FieldReference("a"), LiteralValue(1, IntegerType)))),
+          new GenerationExpression(
+            "a * 2",
+            new GeneralScalarExpression(
+              "*",
+              Array(FieldReference("a"), LiteralValue(2, IntegerType))))))
+
+      val replaceExec = executeAndKeepPhysicalPlan[ReplaceTableExec] {
+        sql(
+          s"""REPLACE TABLE $tableName (
+             |  x INT,
+             |  y INT GENERATED ALWAYS AS (x - 10)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        replaceExec.columns,
+        Array(
+          null, // x has no generation expression
+          new GenerationExpression(
+            "x - 10",
+            new GeneralScalarExpression(
+              "-",
+              Array(FieldReference("x"), LiteralValue(10, IntegerType))))))
+    }
+  }
+
+  test("create table with generated columns using functions should have V2 
Expression") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b INT,
+             |  c INT GENERATED ALWAYS AS (a + b)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          null, // b has no generation expression
+          new GenerationExpression(
+            "a + b",
+            new GeneralScalarExpression(
+              "+",
+              Array(FieldReference("a"), FieldReference("b"))))))
+    }
+  }
+
+  test("generated column with CONCAT referencing multiple columns should have 
FieldReference") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  first_name STRING,
+             |  last_name STRING,
+             |  full_name STRING GENERATED ALWAYS AS (CONCAT(first_name, ' ', 
last_name))
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // first_name has no generation expression
+          null, // last_name has no generation expression
+          new GenerationExpression(
+            "CONCAT(first_name, ' ', last_name)",
+            new GeneralScalarExpression(
+              "CONCAT",
+              Array(
+                FieldReference("first_name"),
+                LiteralValue(UTF8String.fromString(" "), StringType),
+                FieldReference("last_name"))))))
+    }
+  }
+
+  test("generated column with UPPER referencing another column should have 
FieldReference") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  name STRING,
+             |  upper_name STRING GENERATED ALWAYS AS (UPPER(name))
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // name has no generation expression
+          new GenerationExpression(
+            "UPPER(name)",
+            new GeneralScalarExpression(
+              "UPPER",
+              Array(FieldReference("name"))))))
+    }
+  }
+
+  test("generated column with YEAR extraction should have FieldReference") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  event_date DATE,
+             |  event_year INT GENERATED ALWAYS AS (YEAR(event_date))
+             |) USING foo""".stripMargin)
+      }
+
+      // Verify structure directly since Extract class doesn't implement 
equals()
+      assert(createExec.columns.length == 2)
+      assert(createExec.columns(0).columnGenerationExpression() == null)
+
+      val genExpr = createExec.columns(1).columnGenerationExpression()
+      assert(genExpr != null)
+      assert(genExpr.getSql == "YEAR(event_date)")
+
+      val v2Expr = genExpr.getExpression
+      assert(v2Expr.isInstanceOf[Extract], s"Expected Extract but got 
${v2Expr.getClass}")
+
+      val extractExpr = v2Expr.asInstanceOf[Extract]
+      assert(extractExpr.field() == "YEAR")
+
+      val sourceExpr = extractExpr.source()
+      assert(sourceExpr.isInstanceOf[FieldReference],
+        s"Expected FieldReference but got ${sourceExpr.getClass}")
+
+      val fieldRef = sourceExpr.asInstanceOf[FieldReference]
+      assert(fieldRef.fieldNames.toSeq == Seq("event_date"),
+        s"Expected field 'event_date' but got 
${fieldRef.fieldNames.mkString(".")}")
+    }
+  }
+
+  test("generated column with nested expression referencing multiple columns") 
{
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      // Expression: (a + b) * c should produce nested GeneralScalarExpression 
with FieldReferences
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b INT,
+             |  c INT,
+             |  result INT GENERATED ALWAYS AS ((a + b) * c)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          null, // b has no generation expression
+          null, // c has no generation expression
+          new GenerationExpression(
+            "(a + b) * c",
+            new GeneralScalarExpression(
+              "*",
+              Array(
+                new GeneralScalarExpression(
+                  "+",
+                  Array(FieldReference("a"), FieldReference("b"))),
+                FieldReference("c"))))))
+    }
+  }
+
+  test("generated column with COALESCE referencing multiple columns") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  primary_value INT,
+             |  fallback_value INT,
+             |  effective_value INT GENERATED ALWAYS AS 
(COALESCE(primary_value, fallback_value, 0))
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // primary_value has no generation expression
+          null, // fallback_value has no generation expression
+          new GenerationExpression(
+            "COALESCE(primary_value, fallback_value, 0)",
+            new GeneralScalarExpression(
+              "COALESCE",
+              Array(
+                FieldReference("primary_value"),
+                FieldReference("fallback_value"),
+                LiteralValue(0, IntegerType))))))
+    }
+  }
+
+  test("generated column with CAST should have V2 Cast expression") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b BIGINT GENERATED ALWAYS AS (CAST(a AS BIGINT))
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          new GenerationExpression(
+            "CAST(a AS BIGINT)",
+            new V2Cast(FieldReference("a"), IntegerType, LongType))))
+    }
+  }
+
+  test("generated column with comparison should have V2 Predicate expression") 
{
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b BOOLEAN GENERATED ALWAYS AS (a > 5)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          new GenerationExpression(
+            "a > 5",
+            new V2Predicate(
+              ">",
+              Array(FieldReference("a"), LiteralValue(5, IntegerType))))))
+    }
+  }
+
+  test("generated column with CASE WHEN should have V2 expression") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b INT GENERATED ALWAYS AS (CASE WHEN a > 0 THEN 1 ELSE 0 END)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          new GenerationExpression(
+            "CASE WHEN a > 0 THEN 1 ELSE 0 END",
+            new GeneralScalarExpression(
+              "CASE_WHEN",
+              Array(
+                new V2Predicate(">", Array(FieldReference("a"), 
LiteralValue(0, IntegerType))),
+                LiteralValue(1, IntegerType),
+                LiteralValue(0, IntegerType))))))
+    }
+  }
+
+  test("generated column with a literal should have V2 Literal expression") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b INT GENERATED ALWAYS AS (1)
+             |) USING foo""".stripMargin)
+      }
+
+      checkGenerationExpressions(
+        createExec.columns,
+        Array(
+          null, // a has no generation expression
+          new GenerationExpression("1", LiteralValue(1, IntegerType))))
+    }
+  }
+
+  test("generated column with a non-translatable expression preserves SQL but 
has null V2 expr") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      // `factorial` is a built-in, deterministic function, so the generated 
column is valid, but
+      // V2ExpressionBuilder has no mapping for it. The V2 expression is 
therefore null while the
+      // original SQL text is still preserved.
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  a INT,
+             |  b BIGINT GENERATED ALWAYS AS (factorial(a))
+             |) USING foo""".stripMargin)
+      }
+
+      assert(createExec.columns.length == 2)
+      assert(createExec.columns(0).columnGenerationExpression() == null)
+
+      val genExpr = createExec.columns(1).columnGenerationExpression()
+      assert(genExpr != null)
+      assert(genExpr.getSql == "factorial(a)")
+      assert(genExpr.getExpression == null,
+        s"Expected null V2 expression but got ${genExpr.getExpression}")
+    }
+  }
+
+  test("generation expression with special column name should fail") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      // When the column name matches a special SQL keyword like CURRENT_DATE,
+      // the generation expression CURRENT_DATE is resolved as a reference to 
itself
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(
+            s"""CREATE TABLE $tableName (
+               |  current_date DATE GENERATED ALWAYS AS (CURRENT_DATE)
+               |) USING foo""".stripMargin)
+        },
+        condition = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+        parameters = Map(
+          "fieldName" -> "current_date",
+          "expressionStr" -> "CURRENT_DATE",
+          "reason" -> "generation expression cannot reference itself"))
+    }
+  }
+
+  test("generation expression with current_timestamp and different column name 
should work") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withTable(tableName) {
+      // When the column name is different, CURRENT_TIMESTAMP is resolved as 
the function.
+      val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+        sql(
+          s"""CREATE TABLE $tableName (
+             |  c1 TIMESTAMP GENERATED ALWAYS AS (CURRENT_TIMESTAMP)
+             |) USING foo""".stripMargin)
+      }
+
+      assert(createExec.columns.length == 1)
+      val genExpr = createExec.columns(0).columnGenerationExpression()
+      assert(genExpr != null)
+      // The SQL form is preserved so the connector can re-evaluate it.
+      assert(genExpr.getSql == "CURRENT_TIMESTAMP")
+      // The V2 expression is built from the analyzed (pre-optimization) 
child, so CURRENT_TIMESTAMP
+      // is not frozen into a definition-time literal. Since CURRENT_TIMESTAMP 
has no V2
+      // representation yet, the V2 expression is null and the connector falls 
back to the SQL form.
+      assert(genExpr.getExpression == null)
+    }
+  }
+
+  // enforceReservedKeywords is only effective when ANSI is enabled, but with 
the conf off it is
+  // disabled in both ANSI modes, so the column-priority behavior should hold 
regardless of ANSI.
+  Seq("true", "false").foreach { ansiEnabled =>
+    test("generation expression resolves a reserved keyword to the column when 
" +
+      s"enforceReservedKeywords is disabled (ansi=$ansiEnabled)") {
+      val tableName = "testcat.ns1.ns2.tbl"
+      withSQLConf(
+          SQLConf.ANSI_ENABLED.key -> ansiEnabled,
+          SQLConf.ENFORCE_RESERVED_KEYWORDS.key -> "false") {
+        withTable(tableName) {
+          // With reserved keyword enforcement off (the default), the unquoted 
`current_date` in the
+          // generation expression binds to the same-named column rather than 
the CURRENT_DATE
+          // function, so it is passed to the connector as a field reference.
+          val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+            sql(
+              s"""CREATE TABLE $tableName (
+                 |  `current_date` DATE,
+                 |  c2 DATE GENERATED ALWAYS AS (current_date)
+                 |) USING foo""".stripMargin)
+          }
+          val genExpr = createExec.columns.find(_.name == 
"c2").get.columnGenerationExpression()
+          assert(genExpr != null)
+          assert(genExpr.getSql == "current_date")
+          // The column reference is representable in V2 as a field reference, 
proving the column
+          // takes priority over the CURRENT_DATE function.
+          genExpr.getExpression match {
+            case ref: FieldReference => assert(ref.fieldNames().toSeq == 
Seq("current_date"))
+            case other => fail(s"Expected a FieldReference to current_date but 
got: $other")
+          }
+        }
+      }
+    }
+  }
+
+  // Unlike the disabled case above, this behavior is not parameterized over 
ANSI: reserved keyword
+  // enforcement (enforceReservedKeywords = ansiEnabled && 
ENFORCE_RESERVED_KEYWORDS) can only be on
+  // when ANSI is enabled. With ANSI disabled the conf is a no-op.
+  test("generation expression resolves a reserved keyword to the function when 
" +
+    "enforceReservedKeywords is enabled") {
+    val tableName = "testcat.ns1.ns2.tbl"
+    withSQLConf(
+        SQLConf.ANSI_ENABLED.key -> "true",
+        SQLConf.ENFORCE_RESERVED_KEYWORDS.key -> "true") {
+      withTable(tableName) {
+        // With reserved keyword enforcement on, the unquoted `current_date` 
in the generation
+        // expression binds to the built-in CURRENT_DATE function rather than 
the same-named
+        // column. This matches the SQL standard.
+        val createExec = executeAndKeepPhysicalPlan[CreateTableExec] {
+          sql(
+            s"""CREATE TABLE $tableName (
+               |  `current_date` DATE,
+               |  c2 DATE GENERATED ALWAYS AS (current_date)
+               |) USING foo""".stripMargin)
+        }
+        val genExpr = createExec.columns.find(_.name == 
"c2").get.columnGenerationExpression()
+        assert(genExpr != null)
+        assert(genExpr.getSql == "current_date")
+        // CURRENT_DATE has no V2 representation, so the connector falls back 
to the SQL form. A
+        // non-null field reference here would mean the column incorrectly 
took priority.
+        assert(genExpr.getExpression == null)
+      }
+    }
+  }
+
   private def executeAndKeepPhysicalPlan[T <: SparkPlan](func: => Unit): T = {
     val qe = withQueryExecutionsCaptured(spark) {
       func
@@ -3590,4 +4030,28 @@ class DataSourceV2DataFrameSuite
         parameters = Map.empty)
     }
   }
+
+  private def checkGenerationExpressions(
+      columns: Array[Column],
+      expectedGenerationExprs: Array[GenerationExpression]): Unit = {
+    assert(columns.length == expectedGenerationExprs.length)
+
+    columns.zip(expectedGenerationExprs).foreach {
+      case (column, expectedGenExpr) =>
+        assert(
+          compareGenerationExpression(column.columnGenerationExpression(), 
expectedGenExpr),
+          s"Generation expression mismatch for column '${column.toString}': " +
+            s"expected $expectedGenExpr but found 
${column.columnGenerationExpression}")
+    }
+  }
+
+  private def compareGenerationExpression(
+      left: GenerationExpression,
+      right: GenerationExpression): Boolean = {
+    (left, right) match {
+      case (null, null) => true
+      case (null, _) | (_, null) => false
+      case _ => left.getSql == right.getSql && left.getExpression == 
right.getExpression
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index cb7531a0dbaf..3a729d6cf4b9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1676,49 +1676,61 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
-  test("SPARK-41290: Generated column expression must be valid generation 
expression") {
+  test("SPARK-41290: generation expression with an unresolved function fails") 
{
     val tblName = "my_tab"
-    def checkUnsupportedGenerationExpression(
-        expr: String,
-        expectedReason: String,
-        genColType: String = "INT",
-        customTableDef: Option[String] = None): Unit = {
-      val tableDef =
-        s"CREATE TABLE testcat.$tblName(a INT, b $genColType GENERATED ALWAYS 
AS ($expr)) USING foo"
-      withTable(s"testcat.$tblName") {
-        checkError(
-          exception = analysisException(customTableDef.getOrElse(tableDef)),
-          condition = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
-          parameters = Map(
-            "fieldName" -> "b",
-            "expressionStr" -> expr,
-            "reason" -> expectedReason)
-        )
-      }
-    }
-
-    // Expression cannot be resolved since it doesn't exist
-    checkUnsupportedGenerationExpression(
+    checkUnresolvedRoutineExceptionForCreate(
+      tblName,
       "not_a_function(a)",
-      "failed to resolve `not_a_function` to a built-in function"
+      "`not_a_function`"
     )
+  }
 
-    // Expression cannot be resolved since it's not a built-in function
+  test("SPARK-41290: generation expression with a non-built-in function 
fails") {
+    val tblName = "my_tab"
     spark.udf.register("timesTwo", (x: Int) => x * 2)
-    checkUnsupportedGenerationExpression(
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "timesTwo(a)",
       "failed to resolve `timesTwo` to a built-in function"
     )
+  }
 
-    // Generated column can't reference itself
-    checkUnsupportedGenerationExpression(
+  test("SPARK-41290: generation expression with a persistent SQL function 
fails") {
+    val tblName = "my_tab"
+    // A persistent SQL function is not a built-in function either. It is 
rejected earlier in
+    // analysis with UNSUPPORTED_SQL_UDF_USAGE, since SQL functions are not 
allowed in a
+    // CreateTable plan node.
+    withUserDefinedFunction("timesThree" -> false) {
+      sql("CREATE FUNCTION timesThree(x INT) RETURNS INT RETURN x * 3")
+      val expr = "timesThree(a)"
+      val tableDef =
+        s"CREATE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS 
($expr)) USING foo"
+      val start = tableDef.indexOf(expr)
+      withTable(s"testcat.$tblName") {
+        checkError(
+          exception = analysisException(tableDef),
+          condition = "UNSUPPORTED_SQL_UDF_USAGE",
+          parameters = Map(
+            "functionName" -> "`spark_catalog`.`default`.`timesthree`",
+            "nodeName" -> "CreateTable"),
+          context = ExpectedContext(fragment = expr, start = start, stop = 
start + expr.length - 1)
+        )
+      }
+    }
+  }
+
+  test("SPARK-41290: generation expression cannot reference itself") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "b + 1",
       "generation expression cannot reference itself"
     )
     // Obeys case sensitivity when intercepting the error message
     // Intercepts when case-insensitive
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
-      checkUnsupportedGenerationExpression(
+      checkUnsupportedGenerationExpressionForCreate(
+        tblName,
         "B + 1",
         "generation expression cannot reference itself"
       )
@@ -1726,12 +1738,15 @@ class DataSourceV2SQLSuiteV1Filter
     // Doesn't intercept when case-sensitive
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
       withTable(s"testcat.$tblName") {
+        val sql = s"CREATE TABLE testcat.$tblName(a INT, " +
+          "b INT GENERATED ALWAYS AS (B + 1)) USING foo"
+        // "B" appears at position 62 in the SQL string
+        val bPosition = sql.indexOf("(B + 1)") + 1
         checkError(
-          exception = analysisException(s"CREATE TABLE testcat.$tblName(a INT, 
" +
-              "b INT GENERATED ALWAYS AS (B + 1)) USING foo"),
+          exception = analysisException(sql),
           condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-          parameters = Map("objectName" -> "`B`", "proposal" -> "`a`"),
-          context = ExpectedContext(fragment = "B", start = 0, stop = 0)
+          parameters = Map("objectName" -> "`B`", "proposal" -> "`a`, `b`"),
+          context = ExpectedContext(fragment = "B", start = bPosition, stop = 
bPosition)
         )
       }
     }
@@ -1743,9 +1758,12 @@ class DataSourceV2SQLSuiteV1Filter
         
assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), 
tblName)))
       }
     }
+  }
 
-    // Generated column can't reference other generated columns
-    checkUnsupportedGenerationExpression(
+  test("SPARK-41290: generation expression cannot reference other generated 
columns") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "c + 1",
       "generation expression cannot reference another generated column",
       customTableDef = Some(
@@ -1755,7 +1773,8 @@ class DataSourceV2SQLSuiteV1Filter
     )
     // Respects case-insensitivity
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
-      checkUnsupportedGenerationExpression(
+      checkUnsupportedGenerationExpressionForCreate(
+        tblName,
         "C + 1",
         "generation expression cannot reference another generated column",
         customTableDef = Some(
@@ -1763,7 +1782,8 @@ class DataSourceV2SQLSuiteV1Filter
             "b INT GENERATED ALWAYS AS (C + 1), c INT GENERATED ALWAYS AS (a + 
1)) USING foo"
         )
       )
-      checkUnsupportedGenerationExpression(
+      checkUnsupportedGenerationExpressionForCreate(
+        tblName,
         "c + 1",
         "generation expression cannot reference another generated column",
         customTableDef = Some(
@@ -1780,26 +1800,37 @@ class DataSourceV2SQLSuiteV1Filter
         
assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), 
tblName)))
       }
     }
+  }
 
-    // Generated column can't reference non-existent column
+  test("SPARK-41290: generation expression cannot reference a non-existent 
column") {
+    val tblName = "my_tab"
     withTable(s"testcat.$tblName") {
+      val sql = s"CREATE TABLE testcat.$tblName(a INT, " +
+        "b INT GENERATED ALWAYS AS (c + 1)) USING foo"
+      val cPosition = sql.indexOf("(c + 1)") + 1
       checkError(
-        exception = analysisException(s"CREATE TABLE testcat.$tblName(a INT, b 
INT GENERATED " +
-          s"ALWAYS AS (c + 1)) USING foo"),
+        exception = analysisException(sql),
         condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        parameters = Map("objectName" -> "`c`", "proposal" -> "`a`"),
-        context = ExpectedContext(fragment = "c", start = 0, stop = 0)
+        parameters = Map("objectName" -> "`c`", "proposal" -> "`a`, `b`"),
+        context = ExpectedContext(fragment = "c", start = cPosition, stop = 
cPosition)
       )
     }
+  }
 
-    // Expression must be deterministic
-    checkUnsupportedGenerationExpression(
+  test("SPARK-41290: generation expression must be deterministic") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "rand()",
       "generation expression is not deterministic"
     )
+  }
 
+  test("SPARK-41290: generation expression data type must be compatible with 
the column type") {
+    val tblName = "my_tab"
     // Data type is incompatible
-    checkUnsupportedGenerationExpression(
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "a + 1",
       "generation expression data type int is incompatible with column data 
type boolean",
       "BOOLEAN"
@@ -1809,33 +1840,186 @@ class DataSourceV2SQLSuiteV1Filter
       sql(s"CREATE TABLE testcat.$tblName(a INT, b LONG GENERATED ALWAYS AS (a 
+ 1)) USING foo")
       
assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), 
tblName)))
     }
+  }
 
-    // No subquery expressions
-    checkUnsupportedGenerationExpression(
+  test("SPARK-41290: generation expression cannot contain a subquery") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "(SELECT 1)",
       "subquery expressions are not allowed for generated columns"
     )
-    checkUnsupportedGenerationExpression(
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "(SELECT (SELECT 2) + 1)", // nested
       "subquery expressions are not allowed for generated columns"
     )
-    checkUnsupportedGenerationExpression(
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "(SELECT 1) + a", // refers to another column
       "subquery expressions are not allowed for generated columns"
     )
     withTable("other") {
       sql("create table other(x INT) using parquet")
-      checkUnsupportedGenerationExpression(
+      checkUnsupportedGenerationExpressionForCreate(
+        tblName,
         "(select min(x) from other)", // refers to another table
         "subquery expressions are not allowed for generated columns"
       )
     }
-    checkUnsupportedGenerationExpression(
+    checkUnsupportedGenerationExpressionForCreate(
+      tblName,
       "(select min(x) from faketable)", // refers to a non-existent table
       "subquery expressions are not allowed for generated columns"
     )
   }
 
+  test("SPARK-41290: REPLACE TABLE - generation expression with an unresolved 
function fails") {
+    val tblName = "my_tab"
+    checkUnresolvedRoutineExceptionForReplace(
+      tblName,
+      "not_a_function(a)",
+      "`not_a_function`"
+    )
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression with a non-built-in 
function fails") {
+    val tblName = "my_tab"
+    spark.udf.register("timesTwo", (x: Int) => x * 2)
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "timesTwo(a)",
+      "failed to resolve `timesTwo` to a built-in function"
+    )
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression with a persistent 
SQL function fails") {
+    val tblName = "my_tab"
+    // A persistent SQL function is not a built-in function either. It is 
rejected earlier in
+    // analysis with UNSUPPORTED_SQL_UDF_USAGE, since SQL functions are not 
allowed in a
+    // ReplaceTable plan node.
+    withUserDefinedFunction("timesThree" -> false) {
+      sql("CREATE FUNCTION timesThree(x INT) RETURNS INT RETURN x * 3")
+      val expr = "timesThree(a)"
+      val tableDef =
+        s"REPLACE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS 
($expr)) USING foo"
+      val start = tableDef.indexOf(expr)
+      withTable(s"testcat.$tblName") {
+        sql(s"CREATE TABLE testcat.$tblName(dummy INT) USING foo")
+        checkError(
+          exception = analysisException(tableDef),
+          condition = "UNSUPPORTED_SQL_UDF_USAGE",
+          parameters = Map(
+            "functionName" -> "`spark_catalog`.`default`.`timesthree`",
+            "nodeName" -> "ReplaceTable"),
+          context = ExpectedContext(fragment = expr, start = start, stop = 
start + expr.length - 1)
+        )
+      }
+    }
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression cannot reference 
itself") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "b + 1",
+      "generation expression cannot reference itself"
+    )
+    // Obeys case sensitivity when intercepting the error message
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      checkUnsupportedGenerationExpressionForReplace(
+        tblName,
+        "B + 1",
+        "generation expression cannot reference itself"
+      )
+    }
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression cannot reference " +
+    "other generated columns") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "c + 1",
+      "generation expression cannot reference another generated column",
+      customTableDef = Some(
+        s"REPLACE TABLE testcat.$tblName(a INT, " +
+          "b INT GENERATED ALWAYS AS (c + 1), c INT GENERATED ALWAYS AS (a + 
1)) USING foo"
+      )
+    )
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression cannot reference a 
" +
+    "non-existent column") {
+    val tblName = "my_tab"
+    withTable(s"testcat.$tblName") {
+      sql(s"CREATE TABLE testcat.$tblName(dummy INT) USING foo")
+      val replaceSql = s"REPLACE TABLE testcat.$tblName(a INT, " +
+        "b INT GENERATED ALWAYS AS (c + 1)) USING foo"
+      val cPosition = replaceSql.indexOf("(c + 1)") + 1
+      checkError(
+        exception = analysisException(replaceSql),
+        condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map("objectName" -> "`c`", "proposal" -> "`a`, `b`"),
+        context = ExpectedContext(fragment = "c", start = cPosition, stop = 
cPosition)
+      )
+    }
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression must be 
deterministic") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "rand()",
+      "generation expression is not deterministic"
+    )
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression data type must be " 
+
+    "compatible with the column type") {
+    val tblName = "my_tab"
+    // Data type is incompatible
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "a + 1",
+      "generation expression data type int is incompatible with column data 
type boolean",
+      "BOOLEAN"
+    )
+    // But we allow valid up-casts
+    withTable(s"testcat.$tblName") {
+      sql(s"CREATE TABLE testcat.$tblName(dummy INT) USING foo")
+      sql(s"REPLACE TABLE testcat.$tblName(a INT, b LONG GENERATED ALWAYS AS 
(a + 1)) USING foo")
+      
assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), 
tblName)))
+    }
+  }
+
+  test("SPARK-41290: REPLACE TABLE - generation expression cannot contain a 
subquery") {
+    val tblName = "my_tab"
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "(SELECT 1)",
+      "subquery expressions are not allowed for generated columns"
+    )
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "(SELECT (SELECT 2) + 1)", // nested
+      "subquery expressions are not allowed for generated columns"
+    )
+    checkUnsupportedGenerationExpressionForReplace(
+      tblName,
+      "(SELECT 1) + a", // refers to another column
+      "subquery expressions are not allowed for generated columns"
+    )
+    withTable("other") {
+      sql("create table other(x INT) using parquet")
+      checkUnsupportedGenerationExpressionForReplace(
+        tblName,
+        "(select min(x) from other)", // refers to another table
+        "subquery expressions are not allowed for generated columns"
+      )
+    }
+  }
+
   test("SPARK-44313: generation expression validation passes when there is a 
char/varchar column") {
     val tblName = "my_tab"
     // InMemoryTableCatalog.capabilities() = 
{SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
@@ -4335,6 +4519,90 @@ class DataSourceV2SQLSuiteV1Filter
       sqlState = "0A000",
       parameters = Map("cmd" -> expectedArgument.getOrElse(sqlCommand)))
   }
+
+  private def checkUnsupportedGenerationExpression(
+      tblName: String,
+      tableDef: String,
+      expr: String,
+      expectedReason: String,
+      createFirst: Boolean): Unit = {
+    withTable(s"testcat.$tblName") {
+      if (createFirst) {
+        sql(s"CREATE TABLE testcat.$tblName(dummy INT) USING foo")
+      }
+      checkError(
+        exception = analysisException(tableDef),
+        condition = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+        parameters = Map(
+          "fieldName" -> "b",
+          "expressionStr" -> expr,
+          "reason" -> expectedReason))
+    }
+  }
+
+  private def checkUnsupportedGenerationExpressionForCreate(
+      tblName: String,
+      expr: String,
+      expectedReason: String,
+      genColType: String = "INT",
+      customTableDef: Option[String] = None): Unit = {
+    val tableDef = customTableDef.getOrElse(
+      s"CREATE TABLE testcat.$tblName(a INT, b $genColType GENERATED ALWAYS AS 
($expr)) USING foo")
+    checkUnsupportedGenerationExpression(
+      tblName, tableDef, expr, expectedReason, createFirst = false)
+  }
+
+  private def checkUnsupportedGenerationExpressionForReplace(
+      tblName: String,
+      expr: String,
+      expectedReason: String,
+      genColType: String = "INT",
+      customTableDef: Option[String] = None): Unit = {
+    val tableDef = customTableDef.getOrElse(
+      s"REPLACE TABLE testcat.$tblName(a INT, b $genColType " +
+        s"GENERATED ALWAYS AS ($expr)) USING foo")
+    checkUnsupportedGenerationExpression(
+      tblName, tableDef, expr, expectedReason, createFirst = true)
+  }
+
+  private def checkUnresolvedRoutineException(
+      tblName: String,
+      tableDef: String,
+      expr: String,
+      routineName: String,
+      createFirst: Boolean): Unit = {
+    val start = tableDef.indexOf(expr)
+    withTable(s"testcat.$tblName") {
+      if (createFirst) {
+        sql(s"CREATE TABLE testcat.$tblName(dummy INT) USING foo")
+      }
+      checkError(
+        exception = analysisException(tableDef),
+        condition = "UNRESOLVED_ROUTINE",
+        parameters = Map(
+          "routineName" -> routineName,
+          "searchPath" -> "[`system`.`builtin`, `system`.`session`, 
`spark_catalog`.`default`]"),
+        context = ExpectedContext(fragment = expr, start = start, stop = start 
+ expr.length - 1))
+    }
+  }
+
+  private def checkUnresolvedRoutineExceptionForCreate(
+      tblName: String,
+      expr: String,
+      routineName: String): Unit = {
+    val tableDef =
+      s"CREATE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS 
($expr)) USING foo"
+    checkUnresolvedRoutineException(tblName, tableDef, expr, routineName, 
createFirst = false)
+  }
+
+  private def checkUnresolvedRoutineExceptionForReplace(
+      tblName: String,
+      expr: String,
+      routineName: String): Unit = {
+    val tableDef =
+      s"REPLACE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS 
($expr)) USING foo"
+    checkUnresolvedRoutineException(tblName, tableDef, expr, routineName, 
createFirst = true)
+  }
 }
 
 class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to