This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b7fda7cb1128 [SPARK-54405][SQL][METRIC VIEW] CREATE command and SELECT 
query resolution
b7fda7cb1128 is described below

commit b7fda7cb1128c992e1b52b5c853225e4f2af0517
Author: Linhong Liu <[email protected]>
AuthorDate: Wed Dec 17 12:42:24 2025 +0800

    [SPARK-54405][SQL][METRIC VIEW] CREATE command and SELECT query resolution
    
    ### What changes were proposed in this pull request?
    This PR implements the command to create metric views and the analysis rule 
to resolve a metric view query:
    
    - CREATE Metric view
      - Add SQL grammar to support `WITH METRIC` when creating a view
      - Add dollar-quoted string support for YAML definitions
      - Implement CreateMetricViewCommand to analyze the view body
      - Use a table property to indicate that the View is a metric view since 
HIVE has no dedicated table type
    - SELECT Metric view
      - Update SessionCatalog to parse metric view definitions on read
      - Add MetricViewPlanner utility to parse the YAML definition and 
construct an unresolved plan
      - Add ResolveMetricView rule to substitute the dimensions and measures 
reference to actual expressions
    
    NOTE: This PR depends on https://github.com/apache/spark/pull/53146
    
    This PR also marks `org.apache.spark.sql.metricview` as an internal package
    
    ### Why are the changes needed?
    [SPIP: Metrics & semantic modeling in 
Spark](https://docs.google.com/document/d/1xVTLijvDTJ90lZ_ujwzf9HvBJgWg0mY6cYM44Fcghl0/edit?tab=t.0#heading=h.4iogryr5qznc)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    ```
    build/sbt "hive/testOnly  
org.apache.spark.sql.execution.SimpleMetricViewSuite"
    build/sbt "hive/testOnly  
org.apache.spark.sql.hive.execution.HiveMetricViewSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53158 from linhongliu-db/metric-view-create-and-select.
    
    Authored-by: Linhong Liu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 docs/sql-ref-ansi-compliance.md                    |   2 +
 project/SparkBuild.scala                           |   2 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |  30 ++
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  23 ++
 .../spark/sql/errors/QueryParsingErrors.scala      |  11 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../apache/spark/sql/catalyst/analysis/view.scala  |   2 +
 .../sql/catalyst/catalog/SessionCatalog.scala      |  26 +-
 .../spark/sql/catalyst/catalog/interface.scala     |  12 +
 .../catalyst/expressions/aggregate/Measure.scala   |  60 +++
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  10 +
 .../spark/sql/catalyst/parser/ParserUtils.scala    |  14 +-
 .../spark/sql/catalyst/trees/TreePatterns.scala    |   3 +
 .../sql/metricview/logical/metricViewNodes.scala   |  53 +++
 .../sql/metricview/util/MetricViewPlanner.scala    |  83 ++++
 .../jdbc/SparkConnectDatabaseMetaDataSuite.scala   |   2 +-
 .../sql/catalyst/analysis/ResolveMetricView.scala  | 344 ++++++++++++++++
 .../spark/sql/execution/SparkSqlParser.scala       |  50 +++
 .../sql/execution/command/metricViewCommands.scala |  96 +++++
 .../apache/spark/sql/execution/command/views.scala | 125 +++---
 .../sql/internal/BaseSessionStateBuilder.scala     |   3 +-
 .../sql-tests/results/keywords-enforced.sql.out    |   2 +
 .../resources/sql-tests/results/keywords.sql.out   |   2 +
 .../sql-tests/results/nonansi/keywords.sql.out     |   2 +
 .../spark/sql/execution/MetricViewSuite.scala      | 436 +++++++++++++++++++++
 sql/gen-sql-functions-docs.py                      |   2 +-
 .../ThriftServerWithSparkContextSuite.scala        |   2 +-
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   3 +-
 .../sql/hive/execution/HiveMetricViewSuite.scala   |  28 ++
 29 files changed, 1372 insertions(+), 57 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index dcbe772dbe5c..23973b26a265 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -615,7 +615,9 @@ Below is a list of all the keywords in Spark SQL.
 |MATCHED|non-reserved|non-reserved|non-reserved|
 |MATERIALIZED|non-reserved|non-reserved|non-reserved|
 |MAX|non-reserved|non-reserved|non-reserved|
+|MEASURE|non-reserved|non-reserved|non-reserved|
 |MERGE|non-reserved|non-reserved|non-reserved|
+|METRICS|non-reserved|non-reserved|non-reserved|
 |MICROSECOND|non-reserved|non-reserved|non-reserved|
 |MICROSECONDS|non-reserved|non-reserved|non-reserved|
 |MILLISECOND|non-reserved|non-reserved|non-reserved|
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6168ad4cf083..041fe78d5e54 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -367,6 +367,7 @@ object SparkBuild extends PomBuild {
         "org.apache.spark.kafka010",
         "org.apache.spark.network",
         "org.apache.spark.sql.avro",
+        "org.apache.spark.sql.metricview",
         "org.apache.spark.sql.pipelines",
         "org.apache.spark.sql.scripting",
         "org.apache.spark.types.variant",
@@ -1532,6 +1533,7 @@ object Unidoc {
       
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/classic/")))
       
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
       
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
+      
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/metricview")))
       
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/pipelines")))
       
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/scripting")))
       .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/ml")))
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 400461d2d497..f12e033e5c32 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -16,6 +16,11 @@
 
 lexer grammar SqlBaseLexer;
 
+@header {
+import java.util.ArrayDeque;
+import java.util.Deque;
+}
+
 @members {
   /**
    * When true, parser should throw ParseException for unclosed bracketed 
comment.
@@ -70,6 +75,11 @@ lexer grammar SqlBaseLexer;
     has_unclosed_bracketed_comment = true;
   }
 
+  /**
+   * This field stores the tags which are used to detect the end of a dollar 
quoted string literal.
+   */
+  private final Deque<String> tags = new ArrayDeque<String>();
+
   /**
    * When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT 
type.
    */
@@ -324,7 +334,9 @@ MAP: 'MAP' {incComplexTypeLevelCounter();};
 MATCHED: 'MATCHED';
 MATERIALIZED: 'MATERIALIZED';
 MAX: 'MAX';
+MEASURE: 'MEASURE';
 MERGE: 'MERGE';
+METRICS: 'METRICS';
 MICROSECOND: 'MICROSECOND';
 MICROSECONDS: 'MICROSECONDS';
 MILLISECOND: 'MILLISECOND';
@@ -557,6 +569,10 @@ STRING_LITERAL
     | 'R"'(~'"')* '"'
     ;
 
+BEGIN_DOLLAR_QUOTED_STRING
+    : DOLLAR_QUOTED_TAG {tags.push(getText());} -> 
pushMode(DOLLAR_QUOTED_STRING_MODE)
+    ;
+
 DOUBLEQUOTED_STRING
     :'"' ( ~('"'|'\\') | '""' | ('\\' .) )* '"'
     ;
@@ -634,6 +650,10 @@ fragment LETTER
     : [A-Z]
     ;
 
+fragment DOLLAR_QUOTED_TAG
+    : '$' LETTER* '$'
+    ;
+
 fragment UNICODE_LETTER
     : [\p{L}]
     ;
@@ -656,3 +676,13 @@ WS
 UNRECOGNIZED
     : .
     ;
+
+mode DOLLAR_QUOTED_STRING_MODE;
+DOLLAR_QUOTED_STRING_BODY
+    : ~'$'+
+    | '$' ~'$'*
+    ;
+
+END_DOLLAR_QUOTED_STRING
+    : DOLLAR_QUOTED_TAG {getText().equals(tags.peek())}? {tags.pop();} -> 
popMode
+    ;
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 132ced820e9a..9b7eaece945b 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -323,6 +323,14 @@ statement
          (PARTITIONED ON identifierList) |
          (TBLPROPERTIES propertyList))*
         AS query                                                       
#createView
+    | CREATE (OR REPLACE)?
+        VIEW (IF errorCapturingNot EXISTS)? identifierReference
+        identifierCommentList?
+        ((WITH METRICS) |
+         routineLanguage |
+         commentSpec |
+         (TBLPROPERTIES propertyList))*
+        AS codeLiteral                                                 
#createMetricView
     | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
         tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider
         (OPTIONS propertyList)?                                        
#createTempViewUsing
@@ -1523,6 +1531,17 @@ complexColType
     : errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)? 
commentSpec?
     ;
 
+// The code literal is defined as a dollar quoted string.
+// A dollar quoted string consists of
+// - a begin tag which contains a dollar sign, an optional tag, and another 
dollar sign,
+// - a string literal that is made up of arbitrary sequence of characters, and
+// - an end tag which has to be exact the same as the begin tag.
+// As the string literal can contain dollar signs, we add + to 
DOLLAR_QUOTED_STRING_BODY to avoid
+// the parser eagarly matching END_DOLLAR_QUOTED_STRING when seeing a dollar 
sign.
+codeLiteral
+    : BEGIN_DOLLAR_QUOTED_STRING DOLLAR_QUOTED_STRING_BODY+ 
END_DOLLAR_QUOTED_STRING
+    ;
+
 routineCharacteristics
     : (routineLanguage
     | specificName
@@ -1997,7 +2016,9 @@ ansiNonReserved
     | MATCHED
     | MATERIALIZED
     | MAX
+    | MEASURE
     | MERGE
+    | METRICS
     | MICROSECOND
     | MICROSECONDS
     | MILLISECOND
@@ -2387,7 +2408,9 @@ nonReserved
     | MATCHED
     | MATERIALIZED
     | MAX
+    | MEASURE
     | MERGE
+    | METRICS
     | MICROSECOND
     | MICROSECONDS
     | MILLISECOND
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 553161ea2db0..696ef78a1a97 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -618,7 +618,7 @@ private[sql] object QueryParsingErrors extends 
DataTypeErrorsBase {
       ctx)
   }
 
-  def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext): 
Throwable = {
+  def createViewWithBothIfNotExistsAndReplaceError(ctx: ParserRuleContext): 
Throwable = {
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx)
   }
 
@@ -774,6 +774,15 @@ private[sql] object QueryParsingErrors extends 
DataTypeErrorsBase {
       ctx)
   }
 
+  def missingClausesForOperation(
+      ctx: ParserRuleContext,
+      clauses: String,
+      operation: String): Throwable =
+    new ParseException(
+      errorClass = "MISSING_CLAUSES_FOR_OPERATION",
+      messageParameters = Map("clauses" -> clauses, "operation" -> operation),
+      ctx)
+
   def invalidDatetimeUnitError(
       ctx: ParserRuleContext,
       functionName: String,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 199306104d85..0d5f30bd2d78 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -825,6 +825,7 @@ object FunctionRegistry {
     expression[ThetaDifference]("theta_difference"),
     expression[ThetaIntersection]("theta_intersection"),
     expression[ApproxTopKEstimate]("approx_top_k_estimate"),
+    expression[Measure]("measure"),
 
     // grouping sets
     expression[Grouping]("grouping"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index ada47bd3a40c..25273c73eb7f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.metricview.logical.ResolvedMetricView
 
 /**
  * This file defines view types and analysis rules related to views.
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 object EliminateView extends Rule[LogicalPlan] with CastSupport {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case v: View => v.child
+    case rmv: ResolvedMetricView => rmv.child
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index be90c7ad3656..191e2091c40d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -49,6 +49,7 @@ import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.metricview.util.MetricViewPlanner
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
 import org.apache.spark.util.ArrayImplicits._
@@ -943,7 +944,9 @@ class SessionCatalog(
     val table = qualifiedIdent.table
     val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table)
 
-    if (metadata.tableType == CatalogTableType.VIEW) {
+    if (CatalogTable.isMetricView(metadata)) {
+      parseMetricViewDefinition(metadata)
+    } else if (metadata.tableType == CatalogTableType.VIEW) {
       // The relation is a view, so we wrap the relation by:
       // 1. Add a [[View]] operator over the relation to keep track of the 
view desc;
       // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name 
of the view.
@@ -953,6 +956,27 @@ class SessionCatalog(
     }
   }
 
+  private def parseMetricViewDefinition(metadata: CatalogTable): LogicalPlan = 
{
+    val viewDefinition = metadata.viewText.getOrElse {
+      throw SparkException.internalError("Invalid view without text.")
+    }
+    val viewConfigs = metadata.viewSQLConfigs
+    val origin = CurrentOrigin.get.copy(
+      objectType = Some("METRIC VIEW"),
+      objectName = Some(metadata.qualifiedName)
+    )
+    SQLConf.withExistingConf(
+      View.effectiveSQLConf(
+        configs = viewConfigs,
+        isTempView = false
+      )
+    ) {
+      CurrentOrigin.withOrigin(origin) {
+        MetricViewPlanner.planRead(metadata, viewDefinition, parser, 
metadata.schema)
+      }
+    }
+  }
+
   private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean): 
Option[String] = {
     if (isTempView) {
       None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index eab99a96f4c3..01153d516e5c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -722,6 +722,18 @@ object CatalogTable {
 
   val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts"
   val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX + 
"catalogAndNamespace.part."
+
+  // Property to indicate that a VIEW is actually a METRIC VIEW
+  val VIEW_WITH_METRICS = VIEW_PREFIX + "viewWithMetrics"
+
+  /**
+   * Check if a CatalogTable is a metric view by looking at its properties.
+   */
+  def isMetricView(table: CatalogTable): Boolean = {
+    table.tableType == CatalogTableType.VIEW &&
+      table.properties.get(VIEW_WITH_METRICS).contains("true")
+  }
+
   // Convert the current catalog and namespace to properties.
   def catalogAndNamespaceToProps(
       currentCatalog: String,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
new file mode 100644
index 000000000000..e2d85cafa52f
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, ExpressionDescription, UnevaluableAggregateFunc}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{MEASURE, TreePattern}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
+
+// This function serves as an annotation to tell the analyzer to calculate
+// the measures defined in metric views. It cannot be evaluated in execution 
phase
+// and instead it'll be replaced to the actual aggregate functions defined by
+// the measure (as input argument).
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(expr) - this function is used and can only be used to 
calculate a measure defined in a metric view.",
+  examples = """
+    Examples:
+      > SELECT dimension_col, _FUNC_(measure_col)
+        FROM test_metric_view
+        GROUP BY dimension_col;
+      dim_1, 100
+      dim_2, 200
+  """,
+  group = "agg_funcs",
+  since = "4.2.0")
+// scalastyle:on line.size.limit
+case class Measure(child: Expression)
+  extends UnevaluableAggregateFunc with ExpectsInputTypes
+    with UnaryLike[Expression] {
+
+  override protected def withNewChildInternal(newChild: Expression): Measure =
+    copy(child = newChild)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+
+  override def dataType: DataType = child.dataType
+
+  override def prettyName: String = 
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("measure")
+
+  override def nullable: Boolean = child.nullable
+
+  override val nodePatterns: Seq[TreePattern] = Seq(MEASURE)
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f918232c42ac..563eacab244d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3710,6 +3710,16 @@ class AstBuilder extends DataTypeAstBuilder
     Literal.create(createString(ctx), StringType)
   }
 
+  /**
+   * Create a String from a dollar-quoted string literal (e.g., $$text$$).
+   * This is used for code literals in features like metric views where the 
content
+   * may contain special characters that would be difficult to escape in 
regular strings.
+   */
+  override def visitCodeLiteral(ctx: CodeLiteralContext): String = {
+    assert(ctx != null)
+    dollarQuotedString(ctx.DOLLAR_QUOTED_STRING_BODY())
+  }
+
   /**
    * Create a String from a string literal context. This supports:
    *   - Consecutive string literals: `'hello' 'world'` becomes `'helloworld'`
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
index 336db1382f89..0f7b6be765ee 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
@@ -24,7 +24,7 @@ import scala.util.matching.Regex
 
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
 import org.antlr.v4.runtime.misc.Interval
-import org.antlr.v4.runtime.tree.{ParseTree, TerminalNodeImpl}
+import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode, TerminalNodeImpl}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
@@ -88,6 +88,18 @@ object ParserUtils extends SparkParserUtils {
     node.getText.slice(1, node.getText.length - 1)
   }
 
+  /**
+   * Obtain the string literal provided as a dollar quoted string.
+   * A dollar quoted string is defined as {{{$[tag]$<string literal>$[tag]$}}},
+   * where the string literal is parsed as a list of body sections.
+   * This helper method concatenates all body sections and restores the string 
literal back.
+   */
+  def dollarQuotedString(sections: util.List[TerminalNode]): String = {
+    val sb = new StringBuilder()
+    sections forEach (body => sb.append(body.getText))
+    sb.toString()
+  }
+
   /** Collect the entries if any. */
   def entry(key: String, value: Token): Seq[(String, String)] = {
     Option(value).toSeq.map(x => key -> string(x))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 5ea93e74c5d7..a650eb8ed536 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -70,6 +70,7 @@ object TreePattern extends Enumeration  {
   val MAP_FROM_ARRAYS: Value = Value
   val MAP_FROM_ENTRIES: Value = Value
   val MAP_OBJECTS: Value = Value
+  val MEASURE: Value = Value
   val MULTI_ALIAS: Value = Value
   val NEW_INSTANCE: Value = Value
   val NOT: Value = Value
@@ -149,6 +150,7 @@ object TreePattern extends Enumeration  {
   val LIMIT: Value = Value
   val LOCAL_RELATION: Value = Value
   val LOGICAL_QUERY_STAGE: Value = Value
+  val METRIC_VIEW_PLACEHOLDER: Value = Value
   val NATURAL_LIKE_JOIN: Value = Value
   val NO_GROUPING_AGGREGATE_REFERENCE: Value = Value
   val OFFSET: Value = Value
@@ -162,6 +164,7 @@ object TreePattern extends Enumeration  {
   val RELATION_TIME_TRAVEL: Value = Value
   val REPARTITION_OPERATION: Value = Value
   val REBALANCE_PARTITIONS: Value = Value
+  val RESOLVED_METRIC_VIEW: Value = Value
   val SERIALIZE_FROM_OBJECT: Value = Value
   val SORT: Value = Value
   val SQL_TABLE_FUNCTION: Value = Value
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
new file mode 100644
index 000000000000..a7fa037a4b33
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.logical
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import 
org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER, 
RESOLVED_METRIC_VIEW, TreePattern}
+import org.apache.spark.sql.metricview.serde.MetricView
+
+case class MetricViewPlaceholder(
+    metadata: CatalogTable,
+    desc: MetricView,
+    outputMetrics: Seq[Attribute],
+    child: LogicalPlan,
+    isCreate: Boolean = false) extends UnaryNode {
+  final override val nodePatterns: Seq[TreePattern] = 
Seq(METRIC_VIEW_PLACEHOLDER)
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+    copy(child = newChild)
+  }
+  override def output: Seq[Attribute] = outputMetrics
+  override lazy val resolved: Boolean = child.resolved
+  override def simpleString(maxFields: Int): String =
+    s"$nodeName ${output.mkString("[", ", ", "]")}".trim
+
+  override def producedAttributes: AttributeSet = AttributeSet(outputMetrics)
+}
+
+case class ResolvedMetricView(
+    identifier: TableIdentifier,
+    child: LogicalPlan) extends UnaryNode {
+  override def output: scala.Seq[Attribute] = child.output
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(child = newChild)
+  override lazy val resolved: Boolean = child.resolved
+  final override val nodePatterns: Seq[TreePattern] = Seq(RESOLVED_METRIC_VIEW)
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
new file mode 100644
index 000000000000..121d908eda90
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.metricview.logical.MetricViewPlaceholder
+import org.apache.spark.sql.metricview.serde.{AssetSource, MetricView, 
MetricViewFactory, MetricViewValidationException, 
MetricViewYAMLParsingException, SQLSource}
+import org.apache.spark.sql.types.StructType
+
+object MetricViewPlanner {
+
+  def planWrite(
+      metadata: CatalogTable,
+      yaml: String,
+      sqlParser: ParserInterface): MetricViewPlaceholder = {
+    val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
+    MetricViewPlaceholder(
+      metadata,
+      metricView,
+      Seq.empty,
+      dataModelPlan,
+      isCreate = true
+    )
+  }
+
+  def planRead(
+      metadata: CatalogTable,
+      yaml: String,
+      sqlParser: ParserInterface,
+      expectedSchema: StructType): MetricViewPlaceholder = {
+    val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
+    MetricViewPlaceholder(
+      metadata,
+      metricView,
+      DataTypeUtils.toAttributes(expectedSchema),
+      dataModelPlan
+    )
+  }
+
+  private def parseYAML(
+      yaml: String,
+      sqlParser: ParserInterface): (MetricView, LogicalPlan) = {
+    val metricView = try {
+      MetricViewFactory.fromYAML(yaml)
+    } catch {
+      case e: MetricViewValidationException =>
+        throw QueryCompilationErrors.invalidLiteralForWindowDurationError()
+      case e: MetricViewYAMLParsingException =>
+        throw QueryCompilationErrors.invalidLiteralForWindowDurationError()
+    }
+    val source = metricView.from match {
+      case asset: AssetSource => 
UnresolvedRelation(sqlParser.parseMultipartIdentifier(asset.name))
+      case sqlSource: SQLSource => sqlParser.parsePlan(sqlSource.sql)
+      case _ => throw SparkException.internalError("Either SQLSource or 
AssetSource")
+    }
+    // Compute filter here because all necessary information is available.
+    val parsedPlan = metricView.where.map { cond =>
+      Filter(sqlParser.parseExpression(cond), source)
+    }.getOrElse(source)
+    (metricView, parsedPlan)
+  }
+}
diff --git 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
index 4d66392109e7..1a8ad1b622b8 100644
--- 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
+++ 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
@@ -209,7 +209,7 @@ class SparkConnectDatabaseMetaDataSuite extends 
ConnectFunSuite with RemoteSpark
     withConnection { conn =>
       val metadata = conn.getMetaData
       // scalastyle:off line.size.limit
-      assert(metadata.getSQLKeywords === 
"ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,E
 [...]
+      assert(metadata.getSQLKeywords === 
"ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,E
 [...]
       // scalastyle:on line.size.limit
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
new file mode 100644
index 000000000000..a7763c2799a6
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Measure}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.METRIC_VIEW_PLACEHOLDER
+import org.apache.spark.sql.metricview.logical.{MetricViewPlaceholder, 
ResolvedMetricView}
+import org.apache.spark.sql.metricview.serde.{Column => CanonicalColumn, 
DimensionExpression, JsonUtils, MeasureExpression, MetricView => 
CanonicalMetricView}
+import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder}
+
+/**
+ * Analysis rule for resolving metric view operations (CREATE and SELECT).
+ *
+ * == Background ==
+ * A metric view is a special type of view that defines a semantic layer over 
raw data by
+ * declaring dimensions (grouping columns) and measures (pre-aggregated 
metrics). Users can
+ * query metric views using the MEASURE() function to access pre-defined 
aggregations without
+ * needing to know the underlying aggregation logic.
+ *
+ * == Metric View Definition (YAML) ==
+ * A metric view is defined using YAML syntax that specifies:
+ * - source: The underlying table or SQL query
+ * - where: Optional filter condition applied to the source
+ * - select: List of columns, each being either a dimension or measure
+ *   - Dimensions: Expressions used for grouping (e.g., "region", 
"upper(region)")
+ *   - Measures: Aggregate expressions (e.g., "sum(count)", "avg(price)")
+ *
+ * Example YAML definition:
+ * {{{
+ *   version: "0.1"
+ *   source:
+ *     asset: "sales_table"
+ *   where: "product = 'product_1'"
+ *   select:
+ *     - name: region
+ *       expression: dimension(region)
+ *     - name: region_upper
+ *       expression: dimension(upper(region))
+ *     - name: total_sales
+ *       expression: measure(sum(amount))
+ *     - name: avg_price
+ *       expression: measure(avg(price))
+ * }}}
+ *
+ * This rule handles two distinct workflows:
+ *
+ * == Workflow 1: CREATE METRIC VIEW ==
+ * Purpose: Analyze the metric view definition and derive the output schema 
for catalog storage.
+ *
+ * SQL Example:
+ * {{{
+ *   CREATE VIEW sales_metrics
+ *   WITH METRICS
+ *   LANGUAGE YAML
+ *   AS $$<yaml definition>$$
+ * }}}
+ *
+ * Processing steps:
+ * 1. Detect [[MetricViewPlaceholder]] nodes marked for creation (isCreate = 
true)
+ * 2. Parse the YAML definition to extract dimensions and measures
+ * 3. Build an [[Aggregate]] logical plan:
+ *    {{{
+ *      Aggregate(
+ *        groupingExpressions = [region, upper(region)],  // all dimensions
+ *        aggregateExpressions = [
+ *          region,                // dimensions become output columns
+ *          upper(region) AS region_upper,
+ *          sum(amount) AS total_sales,    // measures with their aggregations
+ *          avg(price) AS avg_price
+ *        ],
+ *        child = Filter(product = 'product_1', sales_table)
+ *      )
+ *    }}}
+ * 4. The analyzer resolves this plan to derive column data types
+ * 5. The resolved schema (with metadata about dimensions/measures) is stored 
in the catalog
+ *
+ * Key insight: We construct an Aggregate node even though it won't be 
executed. This allows
+ * the analyzer to infer proper data types for measures (e.g., sum(int) -> 
long).
+ *
+ * == Workflow 2: SELECT FROM METRIC VIEW ==
+ * Purpose: Rewrite user queries to replace MEASURE() function calls with 
actual aggregations.
+ *
+ * SQL Example:
+ * {{{
+ *   SELECT region, MEASURE(total_sales), MEASURE(avg_price)
+ *   FROM sales_metrics
+ *   WHERE region_upper = 'REGION_1'
+ *   GROUP BY region
+ * }}}
+ *
+ * Processing steps:
+ * 1. Detect queries against metric views (identified by 
[[MetricViewReadOperation]])
+ * 2. Load and parse the stored metric view definition from catalog metadata
+ * 3. Build a [[Project]] node that:
+ *    - Projects dimension expressions: [region, upper(region) AS region_upper]
+ *    - Includes non-conflicting source columns for measure aggregate 
functions to reference
+ *    - Result: The metric view now exposes dimensions as queryable columns
+ * 4. Locate [[Aggregate]] nodes containing MEASURE() function calls
+ * 5. Substitute each MEASURE() call with its corresponding aggregate 
expression:
+ *    {{{
+ *      Before substitution:
+ *        Aggregate(
+ *          groupingExpressions = [region],
+ *          aggregateExpressions = [region, MEASURE(total_sales), 
MEASURE(avg_price)],
+ *          child = Filter(region_upper = 'REGION_1', sales_metrics)
+ *        )
+ *
+ *      After substitution:
+ *        Aggregate(
+ *          groupingExpressions = [region],
+ *          aggregateExpressions = [region, sum(amount), avg(price)],
+ *          child = Filter(region_upper = 'REGION_1',
+ *                   Project([upper(region) AS region_upper, region, amount, 
price],
+ *                     Filter(product = 'product_1', sales_table)))
+ *        )
+ *    }}}
+ * 6. Return the rewritten plan for further optimization and execution
+ *
+ * Key behaviors:
+ * - Dimensions can be used directly in SELECT, WHERE, GROUP BY, ORDER BY
+ * - Measures must be accessed via MEASURE() function and can only appear in 
aggregate context
+ * - The WHERE clause from the metric view definition is automatically applied
+ * - Source table columns are hidden from the metric view
+ *
+ * Example query patterns:
+ * {{{
+ *   -- Dimension only (no aggregation needed)
+ *   SELECT region_upper FROM sales_metrics GROUP BY 1
+ *   => SELECT upper(region) FROM sales_table WHERE product = 'product_1' 
GROUP BY 1
+ *
+ *   -- Measure only (aggregates entire dataset)
+ *   SELECT MEASURE(total_sales) FROM sales_metrics
+ *   => SELECT sum(amount) FROM sales_table WHERE product = 'product_1'
+ *
+ *   -- Dimensions + Measures (group by dimensions)
+ *   SELECT region, MEASURE(total_sales) FROM sales_metrics GROUP BY region
+ *   => SELECT region, sum(amount) FROM sales_table
+ *      WHERE product = 'product_1' GROUP BY region
+ * }}}
+ *
+ * The rule operates on unresolved plans and transforms 
[[MetricViewPlaceholder]] nodes
+ * into resolved logical plans that can be further optimized and executed.
+ */
+case class ResolveMetricView(session: SparkSession) extends Rule[LogicalPlan] {
+  private def parser: ParserInterface = session.sessionState.sqlParser
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!plan.containsPattern(METRIC_VIEW_PLACEHOLDER)) {
+      return plan
+    }
+    plan.resolveOperatorsUp {
+      // CREATE PATH: to create a metric view, we need to analyze the metric 
view
+      // definition and get the output schema (with column metadata). Since 
the measures
+      // are aggregate functions, we need to use an Aggregate node and group 
by all
+      // dimensions to get the output schema.
+      case mvp: MetricViewPlaceholder if mvp.isCreate && mvp.child.resolved =>
+        val (dimensions, measures) = buildMetricViewOutput(mvp.desc)
+        Aggregate(
+          // group by all dimensions
+          dimensions.map(_.toAttribute).toSeq,
+          // select all dimensions and measures to get the final output 
(mostly data types)
+          (dimensions ++ measures).toSeq,
+          mvp.child
+        )
+
+      // SELECT PATH: to read a metric view, user will use the `MEASURE` 
aggregate function
+      // to read the measures, so it'll lead to an Aggregate node. This way, 
we only need to
+      // Resolve the Aggregate node based on the metric view output and then 
replace
+      // the AttributeReference of the metric view output to the actual 
expressions.
+      case node @ MetricViewReadOperation(metricView) =>
+        // step 1: parse the metric view definition
+        val (dimensions, measures) =
+          parseMetricViewColumns(metricView.outputMetrics, 
metricView.desc.select)
+
+        // step 2: build the Project node containing the dimensions
+        val dimensionExprs = dimensions.map(_.namedExpr)
+        // Drop the source columns if it conflicts with dimensions
+        val sourceOutput = metricView.child.output
+        // 1. hide the column conflict with dimensions
+        // 2. add an alias to the source column so they are stable with 
DeduplicateRelation
+        // 3. metric view output should use the same exprId
+        val dimensionAttrs = metricView.outputMetrics.filter(a =>
+          dimensions.exists(_.exprId == a.exprId)
+        )
+        val sourceProjList = sourceOutput.filterNot { attr =>
+          // conflict with dimensions
+          dimensionAttrs
+            .resolve(Seq(attr.name), session.sessionState.conf.resolver)
+            .nonEmpty
+        }.map { attr =>
+          // add an alias to the source column so they are stable with 
DeduplicateRelation
+          Alias(attr, attr.name)()
+        }
+        val withDimensions = node.transformDownWithPruning(
+          _.containsPattern(METRIC_VIEW_PLACEHOLDER)) {
+          case mv: MetricViewPlaceholder
+            if mv.metadata.identifier == metricView.metadata.identifier =>
+            ResolvedMetricView(
+              mv.metadata.identifier,
+              Project(sourceProjList ++ dimensionExprs, mv.child)
+            )
+        }
+
+        // step 3: resolve the measure references in Aggregate node
+        withDimensions match {
+          case aggregate: Aggregate => transformAggregateWithMeasures(
+            aggregate,
+            measures
+          )
+          case other =>
+            throw SparkException.internalError("ran into unexpected node: " + 
other)
+        }
+    }
+  }
+
+  private def buildMetricViewOutput(metricView: CanonicalMetricView)
+  : (Seq[NamedExpression], Seq[NamedExpression]) = {
+    val dimensions = new mutable.ArrayBuffer[NamedExpression]()
+    val measures = new mutable.ArrayBuffer[NamedExpression]()
+    metricView.select.foreach { col =>
+      val metadata = new MetadataBuilder()
+        
.withMetadata(Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata)))
+        .build()
+      col.expression match {
+        case DimensionExpression(expr) =>
+          dimensions.append(
+            Alias(parser.parseExpression(expr), col.name)(explicitMetadata = 
Some(metadata)))
+        case MeasureExpression(expr) =>
+          measures.append(
+            Alias(parser.parseExpression(expr), col.name)(explicitMetadata = 
Some(metadata)))
+      }
+    }
+    (dimensions.toSeq, measures.toSeq)
+  }
+
+  private def parseMetricViewColumns(
+      metricViewOutput: Seq[Attribute],
+      columns: Seq[CanonicalColumn]
+  ): (Seq[MetricViewDimension], Seq[MetricViewMeasure]) = {
+    val dimensions = new mutable.ArrayBuffer[MetricViewDimension]()
+    val measures = new mutable.ArrayBuffer[MetricViewMeasure]()
+    metricViewOutput.zip(columns).foreach { case (attr, column) =>
+      column.expression match {
+        case DimensionExpression(expr) =>
+          dimensions.append(
+            MetricViewDimension(
+              attr.name,
+              parser.parseExpression(expr),
+              attr.exprId,
+              attr.dataType)
+          )
+        case MeasureExpression(expr) =>
+          measures.append(
+            MetricViewMeasure(
+              attr.name,
+              parser.parseExpression(expr),
+              attr.exprId,
+              attr.dataType)
+          )
+      }
+    }
+    (dimensions.toSeq, measures.toSeq)
+  }
+
+  private def transformAggregateWithMeasures(
+      aggregate: Aggregate,
+      measures: Seq[MetricViewMeasure]): LogicalPlan = {
+    val measuresMap = measures.map(m => m.exprId -> m).toMap
+    val newAggExprs = aggregate.aggregateExpressions.map { expr =>
+      expr.transform {
+        case AggregateExpression(Measure(a: AttributeReference), _, _, _, _) =>
+          measuresMap(a.exprId).expr
+      }.asInstanceOf[NamedExpression]
+    }
+    aggregate.copy(aggregateExpressions = newAggExprs)
+  }
+}
+
+object MetricViewReadOperation {
+  def unapply(plan: LogicalPlan): Option[MetricViewPlaceholder] = {
+    plan match {
+      case a: Aggregate if a.resolved && 
a.containsPattern(METRIC_VIEW_PLACEHOLDER) =>
+        collectMetricViewNode(a.child)
+      case _ =>
+        None
+    }
+  }
+
+  @scala.annotation.tailrec
+  private def collectMetricViewNode(plan: LogicalPlan): 
Option[MetricViewPlaceholder] = {
+    plan match {
+      case f: Filter => collectMetricViewNode(f.child)
+      case s: Expand => collectMetricViewNode(s.child)
+      case s: Project => collectMetricViewNode(s.child)
+      case s: SubqueryAlias => collectMetricViewNode(s.child)
+      case m: MetricViewPlaceholder => Some(m)
+      case _ => None
+    }
+  }
+}
+
+sealed trait MetricViewColumn {
+  def name: String
+  def expr: Expression
+  def exprId: ExprId
+  def dataType: DataType
+  def namedExpr: NamedExpression = {
+    Alias(UpCast(expr, dataType), name)(exprId = exprId)
+  }
+}
+
+case class MetricViewDimension(
+    name: String,
+    expr: Expression,
+    exprId: ExprId,
+    dataType: DataType) extends MetricViewColumn
+
+case class MetricViewMeasure(
+    name: String,
+    expr: Expression,
+    exprId: ExprId,
+    dataType: DataType) extends MetricViewColumn
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index f8f6e31be1bc..2885d215ee34 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -724,6 +724,56 @@ class SparkSqlAstBuilder extends AstBuilder {
     }
   }
 
+  override def visitCreateMetricView(ctx: CreateMetricViewContext): 
LogicalPlan = withOrigin(ctx) {
+    checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
+    checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
+    checkDuplicateClauses(ctx.routineLanguage(), "LANGUAGE", ctx)
+    checkDuplicateClauses(ctx.METRICS(), "WITH METRICS", ctx)
+    val userSpecifiedColumns = Option(ctx.identifierCommentList).toSeq.flatMap 
{ icl =>
+      icl.identifierComment.asScala.map { ic =>
+        ic.identifier.getText -> Option(ic.commentSpec()).map(visitCommentSpec)
+      }
+    }
+
+    if (ctx.EXISTS != null && ctx.REPLACE != null) {
+      throw 
QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx)
+    }
+
+    if (ctx.METRICS(0) == null) {
+      throw QueryParsingErrors.missingClausesForOperation(
+        ctx, "WITH METRICS", "METRIC VIEW CREATION")
+    }
+
+    if (ctx.routineLanguage(0) == null) {
+      throw QueryParsingErrors.missingClausesForOperation(
+        ctx, "LANGUAGE", "METRIC VIEW CREATION")
+    }
+
+    val languageCtx = ctx.routineLanguage(0)
+    if (languageCtx.SQL() != null) {
+      operationNotAllowed("Unsupported language for metric view: SQL", 
languageCtx)
+    }
+    val name: String = languageCtx.IDENTIFIER().getText
+    if (!name.equalsIgnoreCase("YAML")) {
+      operationNotAllowed(s"Unsupported language for metric view: $name", 
languageCtx)
+    }
+
+    val properties = ctx.propertyList.asScala.headOption
+      .map(visitPropertyKeyValues)
+      .getOrElse(Map.empty)
+    val codeLiteral = visitCodeLiteral(ctx.codeLiteral())
+
+    CreateMetricViewCommand(
+      withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
+      userSpecifiedColumns,
+      visitCommentSpecList(ctx.commentSpec()),
+      properties,
+      codeLiteral,
+      allowExisting = ctx.EXISTS != null,
+      replace = ctx.REPLACE != null
+    )
+  }
+
   /**
    * Create a [[CreateFunctionCommand]].
    *
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
new file mode 100644
index 000000000000..5fb36b429c4a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, 
SchemaUnsupported}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, 
LogicalPlan}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.metricview.util.MetricViewPlanner
+import org.apache.spark.sql.types.StructType
+
+case class CreateMetricViewCommand(
+    child: LogicalPlan,
+    userSpecifiedColumns: Seq[(String, Option[String])],
+    comment: Option[String],
+    properties: Map[String, String],
+    originalText: String,
+    allowExisting: Boolean,
+    replace: Boolean) extends UnaryRunnableCommand with IgnoreCachedData {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+    val name = child match {
+      case v: ResolvedIdentifier =>
+        v.identifier.asTableIdentifier
+      case _ => throw SparkException.internalError(
+        s"Failed to resolve identifier for creating metric view")
+    }
+    val analyzed = MetricViewHelper.analyzeMetricViewText(sparkSession, name, 
originalText)
+
+    if (userSpecifiedColumns.nonEmpty) {
+      if (userSpecifiedColumns.length > analyzed.output.length) {
+        throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+          name, userSpecifiedColumns.map(_._1), analyzed)
+      } else if (userSpecifiedColumns.length < analyzed.output.length) {
+        throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+          name, userSpecifiedColumns.map(_._1), analyzed)
+      }
+    }
+    catalog.createTable(
+      ViewHelper.prepareTable(
+        sparkSession, name, Some(originalText), analyzed, userSpecifiedColumns,
+        properties, SchemaUnsupported, comment,
+        None, isMetricView = true),
+      ignoreIfExists = allowExisting)
+    Seq.empty
+  }
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+    copy(child = newChild)
+  }
+}
+
+case class AlterMetricViewCommand(child: LogicalPlan, originalText: String)
+
+object MetricViewHelper {
+  def analyzeMetricViewText(
+      session: SparkSession,
+      name: TableIdentifier,
+      viewText: String): LogicalPlan = {
+    val analyzer = session.sessionState.analyzer
+    // this metadata is used for analysis check, it'll be replaced during 
create/update with
+    // more accurate information
+    val tableMeta = CatalogTable(
+      identifier = name,
+      tableType = CatalogTableType.VIEW,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType(),
+      viewOriginalText = Some(viewText),
+      viewText = Some(viewText))
+    val metricViewNode = MetricViewPlanner.planWrite(
+      tableMeta, viewText, session.sessionState.sqlParser)
+    val analyzed = analyzer.executeAndCheck(metricViewNode, new 
QueryPlanningTracker)
+    ViewHelper.verifyTemporaryObjectsNotExists(isTemporary = false, name, 
analyzed, Seq.empty)
+    analyzed
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 11ec17ca57fd..95d76c72d295 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -133,7 +133,7 @@ case class CreateViewCommand(
     SchemaUtils.checkIndeterminateCollationInSchema(plan.schema)
 
     if (viewType == LocalTempView) {
-      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
+      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan, 
userSpecifiedColumns)
       val tableDefinition = createTemporaryViewRelation(
         name,
         sparkSession,
@@ -148,7 +148,7 @@ case class CreateViewCommand(
     } else if (viewType == GlobalTempView) {
       val db = 
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
       val viewIdent = TableIdentifier(name.table, Option(db))
-      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
+      val aliasedPlan = aliasPlan(sparkSession, analyzedPlan, 
userSpecifiedColumns)
       val tableDefinition = createTemporaryViewRelation(
         viewIdent,
         sparkSession,
@@ -178,7 +178,10 @@ case class CreateViewCommand(
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
         // Nothing we need to retain from the old view, so just drop and 
create a new one
         catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false)
-        catalog.createTable(prepareTable(sparkSession, analyzedPlan), 
ignoreIfExists = false)
+        catalog.createTable(
+          prepareTable(
+            sparkSession, name, originalText, analyzedPlan, 
userSpecifiedColumns, properties,
+            viewSchemaMode, comment, collation), ignoreIfExists = false)
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the 
target view already
         // exists.
@@ -186,56 +189,14 @@ case class CreateViewCommand(
       }
     } else {
       // Create the view if it doesn't exist.
-      catalog.createTable(prepareTable(sparkSession, analyzedPlan), 
ignoreIfExists = allowExisting)
+      catalog.createTable(
+        prepareTable(
+          sparkSession, name, originalText, analyzedPlan, 
userSpecifiedColumns, properties,
+          viewSchemaMode, comment, collation), ignoreIfExists = allowExisting)
     }
     Seq.empty[Row]
   }
 
-  /**
-   * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user 
specified columns,
-   * else return the analyzed plan directly.
-   */
-  private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): 
LogicalPlan = {
-    if (userSpecifiedColumns.isEmpty) {
-      analyzedPlan
-    } else {
-      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, None)) => Alias(attr, colName)()
-        case (attr, (colName, Some(colComment))) =>
-          val meta = new MetadataBuilder().putString("comment", 
colComment).build()
-          Alias(attr, colName)(explicitMetadata = Some(meta))
-      }
-      session.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
-    }
-  }
-
-  /**
-   * Returns a [[CatalogTable]] that can be used to save in the catalog. 
Generate the view-specific
-   * properties(e.g. view default database, view query output column names) 
and store them as
-   * properties in the CatalogTable, and also creates the proper schema for 
the view.
-   */
-  private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): 
CatalogTable = {
-    if (originalText.isEmpty) {
-      throw 
QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError()
-    }
-    val aliasedSchema = CharVarcharUtils.getRawSchema(
-      aliasPlan(session, analyzedPlan).schema, session.sessionState.conf)
-    val newProperties = generateViewProperties(
-      properties, session, analyzedPlan.schema.fieldNames, 
aliasedSchema.fieldNames, viewSchemaMode)
-
-    CatalogTable(
-      identifier = name,
-      tableType = CatalogTableType.VIEW,
-      storage = CatalogStorageFormat.empty,
-      schema = aliasedSchema,
-      properties = newProperties,
-      viewOriginalText = originalText,
-      viewText = originalText,
-      comment = comment,
-      collation = collation
-    )
-  }
-
   override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
     copy(plan = WithCTE(plan, cteDefs))
   }
@@ -812,4 +773,70 @@ object ViewHelper extends SQLConfHelper with Logging with 
CapturesConfig {
       properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")),
       collation = collation)
   }
+
+
+  /**
+   * Returns a [[CatalogTable]] that can be used to save in the catalog. 
Generate the view-specific
+   * properties(e.g. view default database, view query output column names) 
and store them as
+   * properties in the CatalogTable, and also creates the proper schema for 
the view.
+   */
+  def prepareTable(
+      session: SparkSession,
+      name: TableIdentifier,
+      originalText: Option[String],
+      analyzedPlan: LogicalPlan,
+      userSpecifiedColumns: Seq[(String, Option[String])],
+      properties: Map[String, String],
+      viewSchemaMode: ViewSchemaMode,
+      comment: Option[String],
+      collation: Option[String],
+      isMetricView: Boolean = false): CatalogTable = {
+    if (originalText.isEmpty) {
+      throw 
QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError()
+    }
+    val aliasedSchema = CharVarcharUtils.getRawSchema(
+      aliasPlan(session, analyzedPlan, userSpecifiedColumns).schema, 
session.sessionState.conf)
+    val newProperties = generateViewProperties(
+      properties, session, analyzedPlan.schema.fieldNames, 
aliasedSchema.fieldNames, viewSchemaMode)
+
+    // Add property to indicate if this is a metric view
+    val finalProperties = if (isMetricView) {
+      newProperties + (CatalogTable.VIEW_WITH_METRICS -> "true")
+    } else {
+      newProperties
+    }
+
+    CatalogTable(
+      identifier = name,
+      tableType = CatalogTableType.VIEW,
+      storage = CatalogStorageFormat.empty,
+      schema = aliasedSchema,
+      properties = finalProperties,
+      viewOriginalText = originalText,
+      viewText = originalText,
+      comment = comment,
+      collation = collation
+    )
+  }
+
+  /**
+   * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user 
specified columns,
+   * else return the analyzed plan directly.
+   */
+  def aliasPlan(
+      session: SparkSession,
+      analyzedPlan: LogicalPlan,
+      userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = {
+    if (userSpecifiedColumns.isEmpty) {
+      analyzedPlan
+    } else {
+      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+        case (attr, (colName, None)) => Alias(attr, colName)()
+        case (attr, (colName, Some(colComment))) =>
+          val meta = new MetadataBuilder().putString("comment", 
colComment).build()
+          Alias(attr, colName)(explicitMetadata = Some(meta))
+      }
+      session.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 00c9a26cb5bf..24bf618ee861 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.internal
 import org.apache.spark.annotation.Unstable
 import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, 
SparkSessionExtensions, UDTFRegistration}
 import org.apache.spark.sql.artifact.ArtifactManager
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, 
ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, 
ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, 
TableFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, 
ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, 
ResolveExecuteImmediate, ResolveMetricView, ResolveSessionCatalog, 
ResolveTranspose, TableFunctionRegistry}
 import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension
 import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExtractSemiStructuredFields}
@@ -245,6 +245,7 @@ abstract class BaseSessionStateBuilder(
         ResolveWriteToStream +:
         new EvalSubqueriesForTimeTravel +:
         new ResolveTranspose(session) +:
+        ResolveMetricView(session) +:
         new InvokeProcedures(session) +:
         ResolveExecuteImmediate(session, this.catalogManager) +:
         ExtractSemiStructuredFields +:
diff --git 
a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
index b8443e417caf..c363cf2f6985 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
@@ -210,7 +210,9 @@ MAP false
 MATCHED        false
 MATERIALIZED   false
 MAX    false
+MEASURE        false
 MERGE  false
+METRICS        false
 MICROSECOND    false
 MICROSECONDS   false
 MILLISECOND    false
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index 00baa0c7e725..ddc5b05b3550 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -210,7 +210,9 @@ MAP false
 MATCHED        false
 MATERIALIZED   false
 MAX    false
+MEASURE        false
 MERGE  false
+METRICS        false
 MICROSECOND    false
 MICROSECONDS   false
 MILLISECOND    false
diff --git 
a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
index 00baa0c7e725..ddc5b05b3550 100644
--- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
@@ -210,7 +210,9 @@ MAP false
 MATCHED        false
 MATERIALIZED   false
 MAX    false
+MEASURE        false
 MERGE  false
+METRICS        false
 MICROSECOND    false
 MICROSECONDS   false
 MILLISECOND    false
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala
new file mode 100644
index 000000000000..5e6033aeaa75
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SimpleMetricViewSuite extends MetricViewSuite with SharedSparkSession
+
+/**
+ * A suite for testing metric view related functionality.
+ */
+abstract class MetricViewSuite extends QueryTest with SQLTestUtils {
+  import testImplicits._
+
+  protected val testMetricViewName = "test_metric_view"
+  protected val testTableName = "test_table"
+  protected val testTableData = Seq(
+    ("region_1", "product_1", 80, 5.0),
+    ("region_1", "product_2", 70, 10.0),
+    ("REGION_1", "product_3", 60, 15.0),
+    ("REGION_1", "product_4", 50, 20.0),
+    ("region_2", "product_1", 40, 25.0),
+    ("region_2", "product_2", 30, 30.0),
+    ("REGION_2", "product_3", 20, 35.0),
+    ("REGION_2", "product_4", 10, 40.0)
+  )
+  protected val testMetricViewColumns = Seq(
+    Column("region", DimensionExpression("region"), 0),
+    Column("product", DimensionExpression("product"), 1),
+    Column("region_upper", DimensionExpression("upper(region)"), 2),
+    Column("count_sum", MeasureExpression("sum(count)"), 3),
+    Column("price_avg", MeasureExpression("avg(price)"), 4)
+  )
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testTableData
+      .toDF("region", "product", "count", "price")
+      .write
+      .saveAsTable(testTableName)
+  }
+
+  protected def createMetricView(
+      metricViewName: String,
+      metricViewDefinition: MetricView): Unit = {
+    val yaml = MetricViewFactory.toYAML(metricViewDefinition)
+    sql(s"""
+        |CREATE VIEW $metricViewName
+        |WITH METRICS
+        |LANGUAGE YAML
+        |AS
+        |$$$$
+        |$yaml
+        |$$$$
+        |""".stripMargin)
+  }
+
+  protected def withMetricView(
+      viewName: String,
+      metricViewDefinition: MetricView)(body: => Unit): Unit = {
+    createMetricView(viewName, metricViewDefinition)
+    withView(viewName) {
+      body
+    }
+  }
+
+  test("test source type") {
+    val sources = Seq(
+      AssetSource(testTableName),
+      SQLSource("SELECT * FROM test_table")
+    )
+    sources.foreach { source =>
+      val metricView = MetricView("0.1", source, None, testMetricViewColumns)
+      withMetricView(testMetricViewName, metricView) {
+        checkAnswer(
+          sql("SELECT measure(count_sum), measure(price_avg) FROM 
test_metric_view"),
+          sql("SELECT sum(count), avg(price) FROM test_table")
+        )
+        checkAnswer(
+          sql("SELECT measure(count_sum), measure(price_avg) " +
+            "FROM test_metric_view WHERE region_upper = 'REGION_1'"),
+          sql("SELECT sum(count), avg(price) FROM test_table WHERE 
upper(region) = 'REGION_1'")
+        )
+      }
+    }
+  }
+
+  test("test where clause") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName),
+      Some("product = 'product_1'"), testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(price_avg) FROM 
test_metric_view"),
+        sql("SELECT sum(count), avg(price) FROM test_table WHERE product = 
'product_1'")
+      )
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(price_avg) " +
+          "FROM test_metric_view WHERE region_upper = 'REGION_1'"),
+        sql("SELECT sum(count), avg(price) FROM test_table WHERE " +
+          "product = 'product_1' AND upper(region) = 'REGION_1'")
+      )
+    }
+  }
+
+  test("test dimensions and measures") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      // dimension and measure
+      checkAnswer(
+        sql("SELECT region, product, measure(count_sum), measure(price_avg) " +
+          "FROM test_metric_view GROUP BY region, product"),
+        sql("SELECT region, product, sum(count), avg(price) " +
+          "FROM test_table GROUP BY region, product")
+      )
+      // dimension only
+      checkAnswer(
+        sql("SELECT region_upper FROM test_metric_view GROUP BY 1"),
+        sql("SELECT upper(region) FROM test_table GROUP BY 1")
+      )
+      // measure only
+      checkAnswer(
+        sql("SELECT measure(count_sum) FROM test_metric_view"),
+        sql("SELECT sum(count) FROM test_table")
+      )
+    }
+  }
+
+  test("column from source cannot be used when query metric view") {
+    val metricView = MetricView("0.1", AssetSource(testTableName), None, 
testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT sum(count) FROM test_metric_view").collect()
+        },
+        condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map(
+          "objectName" -> "`count`",
+          "proposal" -> "`count_sum`, `product`, `region`, `price_avg`, 
`region_upper`"
+        ),
+        queryContext = Array(ExpectedContext(
+          fragment = "count",
+          start = 11,
+          stop = 15
+        ))
+      )
+    }
+  }
+
+  test("test ORDER BY and LIMIT clauses") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region, measure(count_sum) " +
+          "FROM test_metric_view GROUP BY region ORDER BY 2 DESC"),
+        sql("SELECT region, sum(count) " +
+          "FROM test_table GROUP BY region ORDER BY 2 DESC")
+      )
+      checkAnswer(
+        sql("SELECT product, measure(price_avg) " +
+          "FROM test_metric_view GROUP BY product ORDER BY 2 ASC LIMIT 2"),
+        sql("SELECT product, avg(price) " +
+          "FROM test_table GROUP BY product ORDER BY 2 ASC LIMIT 2")
+      )
+    }
+  }
+
+  test("test complex WHERE conditions with dimensions") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region, product, measure(count_sum) " +
+          "FROM test_metric_view WHERE region_upper = 'REGION_1' " +
+          "AND product IN ('product_1', 'product_2') " +
+          "GROUP BY region, product"),
+        sql("SELECT region, product, sum(count) " +
+          "FROM test_table WHERE upper(region) = 'REGION_1' " +
+          "AND product IN ('product_1', 'product_2') " +
+          "GROUP BY region, product")
+      )
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(price_avg) " +
+          "FROM test_metric_view WHERE region_upper LIKE 'REGION_%' AND 
product <> 'product_4'"),
+        sql("SELECT sum(count), avg(price) " +
+          "FROM test_table WHERE upper(region) LIKE 'REGION_%' AND product <> 
'product_4'")
+      )
+    }
+  }
+
+  test("test metric view with where clause and additional query filters") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName),
+      Some("product IN ('product_1', 'product_2')"), testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region, measure(count_sum) " +
+          "FROM test_metric_view WHERE region_upper = 'REGION_1' GROUP BY 
region"),
+        sql("SELECT region, sum(count) " +
+          "FROM test_table WHERE product IN ('product_1', 'product_2') " +
+          "AND upper(region) = 'REGION_1' GROUP BY region")
+      )
+    }
+  }
+
+  test("test multiple measures with different aggregations") {
+    val columns = Seq(
+      Column("region", DimensionExpression("region"), 0),
+      Column("count_sum", MeasureExpression("sum(count)"), 1),
+      Column("count_avg", MeasureExpression("avg(count)"), 2),
+      Column("count_max", MeasureExpression("max(count)"), 3),
+      Column("count_min", MeasureExpression("min(count)"), 4),
+      Column("price_sum", MeasureExpression("sum(price)"), 5)
+    )
+    val metricView = MetricView("0.1", AssetSource(testTableName), None, 
columns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(count_avg), 
measure(count_max), " +
+          "measure(count_min), measure(price_sum) FROM test_metric_view"),
+        sql("SELECT sum(count), avg(count), max(count), min(count), sum(price) 
FROM test_table")
+      )
+      checkAnswer(
+        sql("SELECT region, measure(count_sum), measure(count_max), 
measure(price_sum) " +
+          "FROM test_metric_view GROUP BY region"),
+        sql("SELECT region, sum(count), max(count), sum(price) FROM test_table 
GROUP BY region")
+      )
+    }
+  }
+
+  test("test dimension expressions with case statements") {
+    val columns = Seq(
+      Column("region", DimensionExpression("region"), 0),
+      Column("region_category", DimensionExpression(
+        "CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group B' END"), 1),
+      Column("count_sum", MeasureExpression("sum(count)"), 2)
+    )
+    val metricView = MetricView("0.1", AssetSource(testTableName), None, 
columns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region_category, measure(count_sum) " +
+          "FROM test_metric_view GROUP BY region_category"),
+        sql("SELECT CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group 
B' END, " +
+          "sum(count) FROM test_table " +
+          "GROUP BY CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group 
B' END")
+      )
+    }
+  }
+
+  test("test measure expressions with arithmetic operations") {
+    val columns = Seq(
+      Column("region", DimensionExpression("region"), 0),
+      Column("total_revenue", MeasureExpression("sum(count * price)"), 1),
+      Column("avg_revenue", MeasureExpression("avg(count * price)"), 2)
+    )
+    val metricView = MetricView("0.1", AssetSource(testTableName), None, 
columns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT measure(total_revenue), measure(avg_revenue) FROM 
test_metric_view"),
+        sql("SELECT sum(count * price), avg(count * price) FROM test_table")
+      )
+      checkAnswer(
+        sql("SELECT region, measure(total_revenue) " +
+          "FROM test_metric_view GROUP BY region ORDER BY 2 DESC"),
+        sql("SELECT region, sum(count * price) " +
+          "FROM test_table GROUP BY region ORDER BY 2 DESC")
+      )
+    }
+  }
+
+  test("test dimensions with aggregate functions in GROUP BY") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region_upper, product, measure(count_sum) " +
+          "FROM test_metric_view GROUP BY region_upper, product ORDER BY 
region_upper, product"),
+        sql("SELECT upper(region), product, sum(count) " +
+          "FROM test_table GROUP BY upper(region), product ORDER BY 
upper(region), product")
+      )
+    }
+  }
+
+  test("test WHERE clause with OR conditions") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(price_avg) " +
+          "FROM test_metric_view WHERE region = 'region_1' OR product = 
'product_1'"),
+        sql("SELECT sum(count), avg(price) " +
+          "FROM test_table WHERE region = 'region_1' OR product = 'product_1'")
+      )
+    }
+  }
+
+  test("test dimension-only query with multiple dimensions") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region_upper, product " +
+          "FROM test_metric_view GROUP BY region_upper, product"),
+        sql("SELECT upper(region), product FROM test_table GROUP BY 
upper(region), product")
+      )
+    }
+  }
+
+  test("test query with SELECT * should fail") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      intercept[Exception] {
+        sql("SELECT * FROM test_metric_view").collect()
+      }
+    }
+  }
+
+  test("test SQLSource with complex query") {
+    val sqlSource = SQLSource(
+      "SELECT region, product, count, price FROM test_table WHERE count > 20")
+    val metricView = MetricView("0.1", sqlSource, None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(price_avg) FROM 
test_metric_view"),
+        sql("SELECT sum(count), avg(price) FROM test_table WHERE count > 20")
+      )
+      checkAnswer(
+        sql("SELECT region, measure(count_sum) FROM test_metric_view GROUP BY 
region"),
+        sql("SELECT region, sum(count) FROM test_table WHERE count > 20 GROUP 
BY region")
+      )
+    }
+  }
+
+  test("test measure function without GROUP BY") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT measure(count_sum) FROM test_metric_view"),
+        sql("SELECT sum(count) FROM test_table")
+      )
+      checkAnswer(
+        sql("SELECT measure(count_sum), measure(price_avg) FROM 
test_metric_view"),
+        sql("SELECT sum(count), avg(price) FROM test_table")
+      )
+    }
+  }
+
+  test("test combining multiple dimension expressions in WHERE") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT product, measure(count_sum) " +
+          "FROM test_metric_view WHERE region = 'region_1' AND region_upper = 
'REGION_1' " +
+          "GROUP BY product"),
+        sql("SELECT product, sum(count) " +
+          "FROM test_table WHERE region = 'region_1' AND upper(region) = 
'REGION_1' " +
+          "GROUP BY product")
+      )
+    }
+  }
+
+  test("test measure with COUNT DISTINCT") {
+    val columns = Seq(
+      Column("region", DimensionExpression("region"), 0),
+      Column("product_count", MeasureExpression("count(distinct product)"), 1),
+      Column("count_sum", MeasureExpression("sum(count)"), 2)
+    )
+    val metricView = MetricView("0.1", AssetSource(testTableName), None, 
columns)
+    withMetricView(testMetricViewName, metricView) {
+      checkAnswer(
+        sql("SELECT region, measure(product_count), measure(count_sum) " +
+          "FROM test_metric_view GROUP BY region"),
+        sql("SELECT region, count(distinct product), sum(count) " +
+          "FROM test_table GROUP BY region")
+      )
+    }
+  }
+
+  test("test union of same aggregated metric view dataframe") {
+    val metricView = MetricView(
+      "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+    withMetricView(testMetricViewName, metricView) {
+      // Create a DataFrame with aggregation and groupBy from metric view
+      val df = sql(
+        s"""SELECT region, measure(count_sum) as total_count, 
measure(price_avg) as avg_price
+           |FROM $testMetricViewName
+           |GROUP BY region
+           |""".stripMargin)
+
+      // Union the same DataFrame with itself - tests DeduplicateRelations
+      val unionDf = df.union(df)
+
+      // Expected result: each region should appear twice with identical values
+      val expectedDf = sql(
+        """
+          |SELECT region, sum(count) as total_count, avg(price) as avg_price
+          |FROM test_table
+          |GROUP BY region
+          |UNION ALL
+          |SELECT region, sum(count) as total_count, avg(price) as avg_price
+          |FROM test_table
+          |GROUP BY region
+          |""".stripMargin)
+
+      checkAnswer(unionDf, expectedDf)
+
+      // Verify the result has duplicate rows
+      assert(unionDf.count() == df.count() * 2,
+        "Union should double the row count")
+
+      // Verify that distinct values are the same as the original
+      checkAnswer(unionDf.distinct(), df)
+    }
+  }
+}
diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py
index b49124ece086..9cc478e5cadf 100644
--- a/sql/gen-sql-functions-docs.py
+++ b/sql/gen-sql-functions-docs.py
@@ -175,7 +175,7 @@ def _make_pretty_examples(jspark, infos):
     pretty_output = ""
     for info in infos:
         if (info.examples.startswith("\n    Examples:") and info.name.lower() 
not in
-                ("from_avro", "to_avro", "from_protobuf", "to_protobuf")):
+                ("from_avro", "to_avro", "from_protobuf", "to_protobuf", 
"measure")):
             output = []
             output.append("-- %s" % info.name)
             query_examples = filter(lambda x: x.startswith("      > "), 
info.examples.split("\n"))
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index d69f99a1e42f..a95eeee03221 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       val sessionHandle = client.openSession(user, "")
       val infoValue = client.getInfo(sessionHandle, 
GetInfoType.CLI_ODBC_KEYWORDS)
       // scalastyle:off line.size.limit
-      assert(infoValue.getStringValue == 
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,C
 [...]
+      assert(infoValue.getStringValue == 
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,C
 [...]
       // scalastyle:on line.size.limit
     }
   }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index dec947651dd6..9f5566407e38 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
 import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDF, GenericUDTF}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, 
ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, 
ResolveTranspose}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, 
ResolveDataSource, ResolveExecuteImmediate, ResolveMetricView, 
ResolveSessionCatalog, ResolveTranspose}
 import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension
 import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, 
InvalidUDFClassException}
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExtractSemiStructuredFields}
@@ -132,6 +132,7 @@ class HiveSessionStateBuilder(
         new EvalSubqueriesForTimeTravel +:
         new DetermineTableStats(session) +:
         new ResolveTranspose(session) +:
+        ResolveMetricView(session) +:
         new InvokeProcedures(session) +:
         ResolveExecuteImmediate(session, catalogManager) +:
         ExtractSemiStructuredFields +:
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala
new file mode 100644
index 000000000000..bccc2f629a15
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.execution.MetricViewSuite
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.tags.SlowHiveTest
+
+/**
+ * A test suite for Hive metric view related functionality.
+ */
+@SlowHiveTest
+class HiveMetricViewSuite extends MetricViewSuite with TestHiveSingleton


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to