This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b7fda7cb1128 [SPARK-54405][SQL][METRIC VIEW] CREATE command and SELECT
query resolution
b7fda7cb1128 is described below
commit b7fda7cb1128c992e1b52b5c853225e4f2af0517
Author: Linhong Liu <[email protected]>
AuthorDate: Wed Dec 17 12:42:24 2025 +0800
[SPARK-54405][SQL][METRIC VIEW] CREATE command and SELECT query resolution
### What changes were proposed in this pull request?
This PR implements the command to create metric views and the analysis rule
to resolve a metric view query:
- CREATE Metric view
- Add SQL grammar to support `WITH METRIC` when creating a view
- Add dollar-quoted string support for YAML definitions
- Implement CreateMetricViewCommand to analyze the view body
- Use a table property to indicate that the View is a metric view since
HIVE has no dedicated table type
- SELECT Metric view
- Update SessionCatalog to parse metric view definitions on read
- Add MetricViewPlanner utility to parse the YAML definition and
construct an unresolved plan
- Add ResolveMetricView rule to substitute the dimensions and measures
reference to actual expressions
NOTE: This PR depends on https://github.com/apache/spark/pull/53146
This PR also marks `org.apache.spark.sql.metricview` as an internal package
### Why are the changes needed?
[SPIP: Metrics & semantic modeling in
Spark](https://docs.google.com/document/d/1xVTLijvDTJ90lZ_ujwzf9HvBJgWg0mY6cYM44Fcghl0/edit?tab=t.0#heading=h.4iogryr5qznc)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
build/sbt "hive/testOnly
org.apache.spark.sql.execution.SimpleMetricViewSuite"
build/sbt "hive/testOnly
org.apache.spark.sql.hive.execution.HiveMetricViewSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53158 from linhongliu-db/metric-view-create-and-select.
Authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
docs/sql-ref-ansi-compliance.md | 2 +
project/SparkBuild.scala | 2 +
.../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 30 ++
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 23 ++
.../spark/sql/errors/QueryParsingErrors.scala | 11 +-
.../sql/catalyst/analysis/FunctionRegistry.scala | 1 +
.../apache/spark/sql/catalyst/analysis/view.scala | 2 +
.../sql/catalyst/catalog/SessionCatalog.scala | 26 +-
.../spark/sql/catalyst/catalog/interface.scala | 12 +
.../catalyst/expressions/aggregate/Measure.scala | 60 +++
.../spark/sql/catalyst/parser/AstBuilder.scala | 10 +
.../spark/sql/catalyst/parser/ParserUtils.scala | 14 +-
.../spark/sql/catalyst/trees/TreePatterns.scala | 3 +
.../sql/metricview/logical/metricViewNodes.scala | 53 +++
.../sql/metricview/util/MetricViewPlanner.scala | 83 ++++
.../jdbc/SparkConnectDatabaseMetaDataSuite.scala | 2 +-
.../sql/catalyst/analysis/ResolveMetricView.scala | 344 ++++++++++++++++
.../spark/sql/execution/SparkSqlParser.scala | 50 +++
.../sql/execution/command/metricViewCommands.scala | 96 +++++
.../apache/spark/sql/execution/command/views.scala | 125 +++---
.../sql/internal/BaseSessionStateBuilder.scala | 3 +-
.../sql-tests/results/keywords-enforced.sql.out | 2 +
.../resources/sql-tests/results/keywords.sql.out | 2 +
.../sql-tests/results/nonansi/keywords.sql.out | 2 +
.../spark/sql/execution/MetricViewSuite.scala | 436 +++++++++++++++++++++
sql/gen-sql-functions-docs.py | 2 +-
.../ThriftServerWithSparkContextSuite.scala | 2 +-
.../spark/sql/hive/HiveSessionStateBuilder.scala | 3 +-
.../sql/hive/execution/HiveMetricViewSuite.scala | 28 ++
29 files changed, 1372 insertions(+), 57 deletions(-)
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index dcbe772dbe5c..23973b26a265 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -615,7 +615,9 @@ Below is a list of all the keywords in Spark SQL.
|MATCHED|non-reserved|non-reserved|non-reserved|
|MATERIALIZED|non-reserved|non-reserved|non-reserved|
|MAX|non-reserved|non-reserved|non-reserved|
+|MEASURE|non-reserved|non-reserved|non-reserved|
|MERGE|non-reserved|non-reserved|non-reserved|
+|METRICS|non-reserved|non-reserved|non-reserved|
|MICROSECOND|non-reserved|non-reserved|non-reserved|
|MICROSECONDS|non-reserved|non-reserved|non-reserved|
|MILLISECOND|non-reserved|non-reserved|non-reserved|
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6168ad4cf083..041fe78d5e54 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -367,6 +367,7 @@ object SparkBuild extends PomBuild {
"org.apache.spark.kafka010",
"org.apache.spark.network",
"org.apache.spark.sql.avro",
+ "org.apache.spark.sql.metricview",
"org.apache.spark.sql.pipelines",
"org.apache.spark.sql.scripting",
"org.apache.spark.types.variant",
@@ -1532,6 +1533,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/classic/")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
+
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/metricview")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/pipelines")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/scripting")))
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/ml")))
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 400461d2d497..f12e033e5c32 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -16,6 +16,11 @@
lexer grammar SqlBaseLexer;
+@header {
+import java.util.ArrayDeque;
+import java.util.Deque;
+}
+
@members {
/**
* When true, parser should throw ParseException for unclosed bracketed
comment.
@@ -70,6 +75,11 @@ lexer grammar SqlBaseLexer;
has_unclosed_bracketed_comment = true;
}
+ /**
+ * This field stores the tags which are used to detect the end of a dollar
quoted string literal.
+ */
+ private final Deque<String> tags = new ArrayDeque<String>();
+
/**
* When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT
type.
*/
@@ -324,7 +334,9 @@ MAP: 'MAP' {incComplexTypeLevelCounter();};
MATCHED: 'MATCHED';
MATERIALIZED: 'MATERIALIZED';
MAX: 'MAX';
+MEASURE: 'MEASURE';
MERGE: 'MERGE';
+METRICS: 'METRICS';
MICROSECOND: 'MICROSECOND';
MICROSECONDS: 'MICROSECONDS';
MILLISECOND: 'MILLISECOND';
@@ -557,6 +569,10 @@ STRING_LITERAL
| 'R"'(~'"')* '"'
;
+BEGIN_DOLLAR_QUOTED_STRING
+ : DOLLAR_QUOTED_TAG {tags.push(getText());} ->
pushMode(DOLLAR_QUOTED_STRING_MODE)
+ ;
+
DOUBLEQUOTED_STRING
:'"' ( ~('"'|'\\') | '""' | ('\\' .) )* '"'
;
@@ -634,6 +650,10 @@ fragment LETTER
: [A-Z]
;
+fragment DOLLAR_QUOTED_TAG
+ : '$' LETTER* '$'
+ ;
+
fragment UNICODE_LETTER
: [\p{L}]
;
@@ -656,3 +676,13 @@ WS
UNRECOGNIZED
: .
;
+
+mode DOLLAR_QUOTED_STRING_MODE;
+DOLLAR_QUOTED_STRING_BODY
+ : ~'$'+
+ | '$' ~'$'*
+ ;
+
+END_DOLLAR_QUOTED_STRING
+ : DOLLAR_QUOTED_TAG {getText().equals(tags.peek())}? {tags.pop();} ->
popMode
+ ;
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 132ced820e9a..9b7eaece945b 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -323,6 +323,14 @@ statement
(PARTITIONED ON identifierList) |
(TBLPROPERTIES propertyList))*
AS query
#createView
+ | CREATE (OR REPLACE)?
+ VIEW (IF errorCapturingNot EXISTS)? identifierReference
+ identifierCommentList?
+ ((WITH METRICS) |
+ routineLanguage |
+ commentSpec |
+ (TBLPROPERTIES propertyList))*
+ AS codeLiteral
#createMetricView
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider
(OPTIONS propertyList)?
#createTempViewUsing
@@ -1523,6 +1531,17 @@ complexColType
: errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)?
commentSpec?
;
+// The code literal is defined as a dollar quoted string.
+// A dollar quoted string consists of
+// - a begin tag which contains a dollar sign, an optional tag, and another
dollar sign,
+// - a string literal that is made up of arbitrary sequence of characters, and
+// - an end tag which has to be exact the same as the begin tag.
+// As the string literal can contain dollar signs, we add + to
DOLLAR_QUOTED_STRING_BODY to avoid
+// the parser eagarly matching END_DOLLAR_QUOTED_STRING when seeing a dollar
sign.
+codeLiteral
+ : BEGIN_DOLLAR_QUOTED_STRING DOLLAR_QUOTED_STRING_BODY+
END_DOLLAR_QUOTED_STRING
+ ;
+
routineCharacteristics
: (routineLanguage
| specificName
@@ -1997,7 +2016,9 @@ ansiNonReserved
| MATCHED
| MATERIALIZED
| MAX
+ | MEASURE
| MERGE
+ | METRICS
| MICROSECOND
| MICROSECONDS
| MILLISECOND
@@ -2387,7 +2408,9 @@ nonReserved
| MATCHED
| MATERIALIZED
| MAX
+ | MEASURE
| MERGE
+ | METRICS
| MICROSECOND
| MICROSECONDS
| MILLISECOND
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 553161ea2db0..696ef78a1a97 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -618,7 +618,7 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx)
}
- def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext):
Throwable = {
+ def createViewWithBothIfNotExistsAndReplaceError(ctx: ParserRuleContext):
Throwable = {
new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx)
}
@@ -774,6 +774,15 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx)
}
+ def missingClausesForOperation(
+ ctx: ParserRuleContext,
+ clauses: String,
+ operation: String): Throwable =
+ new ParseException(
+ errorClass = "MISSING_CLAUSES_FOR_OPERATION",
+ messageParameters = Map("clauses" -> clauses, "operation" -> operation),
+ ctx)
+
def invalidDatetimeUnitError(
ctx: ParserRuleContext,
functionName: String,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 199306104d85..0d5f30bd2d78 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -825,6 +825,7 @@ object FunctionRegistry {
expression[ThetaDifference]("theta_difference"),
expression[ThetaIntersection]("theta_intersection"),
expression[ApproxTopKEstimate]("approx_top_k_estimate"),
+ expression[Measure]("measure"),
// grouping sets
expression[Grouping]("grouping"),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index ada47bd3a40c..25273c73eb7f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.metricview.logical.ResolvedMetricView
/**
* This file defines view types and analysis rules related to views.
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case v: View => v.child
+ case rmv: ResolvedMetricView => rmv.child
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index be90c7ad3656..191e2091c40d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -49,6 +49,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.metricview.util.MetricViewPlanner
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
import org.apache.spark.util.ArrayImplicits._
@@ -943,7 +944,9 @@ class SessionCatalog(
val table = qualifiedIdent.table
val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table)
- if (metadata.tableType == CatalogTableType.VIEW) {
+ if (CatalogTable.isMetricView(metadata)) {
+ parseMetricViewDefinition(metadata)
+ } else if (metadata.tableType == CatalogTableType.VIEW) {
// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the
view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name
of the view.
@@ -953,6 +956,27 @@ class SessionCatalog(
}
}
+ private def parseMetricViewDefinition(metadata: CatalogTable): LogicalPlan =
{
+ val viewDefinition = metadata.viewText.getOrElse {
+ throw SparkException.internalError("Invalid view without text.")
+ }
+ val viewConfigs = metadata.viewSQLConfigs
+ val origin = CurrentOrigin.get.copy(
+ objectType = Some("METRIC VIEW"),
+ objectName = Some(metadata.qualifiedName)
+ )
+ SQLConf.withExistingConf(
+ View.effectiveSQLConf(
+ configs = viewConfigs,
+ isTempView = false
+ )
+ ) {
+ CurrentOrigin.withOrigin(origin) {
+ MetricViewPlanner.planRead(metadata, viewDefinition, parser,
metadata.schema)
+ }
+ }
+ }
+
private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean):
Option[String] = {
if (isTempView) {
None
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index eab99a96f4c3..01153d516e5c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -722,6 +722,18 @@ object CatalogTable {
val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts"
val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX +
"catalogAndNamespace.part."
+
+ // Property to indicate that a VIEW is actually a METRIC VIEW
+ val VIEW_WITH_METRICS = VIEW_PREFIX + "viewWithMetrics"
+
+ /**
+ * Check if a CatalogTable is a metric view by looking at its properties.
+ */
+ def isMetricView(table: CatalogTable): Boolean = {
+ table.tableType == CatalogTableType.VIEW &&
+ table.properties.get(VIEW_WITH_METRICS).contains("true")
+ }
+
// Convert the current catalog and namespace to properties.
def catalogAndNamespaceToProps(
currentCatalog: String,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
new file mode 100644
index 000000000000..e2d85cafa52f
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes,
Expression, ExpressionDescription, UnevaluableAggregateFunc}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{MEASURE, TreePattern}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}
+
+// This function serves as an annotation to tell the analyzer to calculate
+// the measures defined in metric views. It cannot be evaluated in execution
phase
+// and instead it'll be replaced to the actual aggregate functions defined by
+// the measure (as input argument).
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = "_FUNC_(expr) - this function is used and can only be used to
calculate a measure defined in a metric view.",
+ examples = """
+ Examples:
+ > SELECT dimension_col, _FUNC_(measure_col)
+ FROM test_metric_view
+ GROUP BY dimension_col;
+ dim_1, 100
+ dim_2, 200
+ """,
+ group = "agg_funcs",
+ since = "4.2.0")
+// scalastyle:on line.size.limit
+case class Measure(child: Expression)
+ extends UnevaluableAggregateFunc with ExpectsInputTypes
+ with UnaryLike[Expression] {
+
+ override protected def withNewChildInternal(newChild: Expression): Measure =
+ copy(child = newChild)
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
+
+ override def dataType: DataType = child.dataType
+
+ override def prettyName: String =
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("measure")
+
+ override def nullable: Boolean = child.nullable
+
+ override val nodePatterns: Seq[TreePattern] = Seq(MEASURE)
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f918232c42ac..563eacab244d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3710,6 +3710,16 @@ class AstBuilder extends DataTypeAstBuilder
Literal.create(createString(ctx), StringType)
}
+ /**
+ * Create a String from a dollar-quoted string literal (e.g., $$text$$).
+ * This is used for code literals in features like metric views where the
content
+ * may contain special characters that would be difficult to escape in
regular strings.
+ */
+ override def visitCodeLiteral(ctx: CodeLiteralContext): String = {
+ assert(ctx != null)
+ dollarQuotedString(ctx.DOLLAR_QUOTED_STRING_BODY())
+ }
+
/**
* Create a String from a string literal context. This supports:
* - Consecutive string literals: `'hello' 'world'` becomes `'helloworld'`
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
index 336db1382f89..0f7b6be765ee 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
@@ -24,7 +24,7 @@ import scala.util.matching.Regex
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.misc.Interval
-import org.antlr.v4.runtime.tree.{ParseTree, TerminalNodeImpl}
+import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode, TerminalNodeImpl}
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
@@ -88,6 +88,18 @@ object ParserUtils extends SparkParserUtils {
node.getText.slice(1, node.getText.length - 1)
}
+ /**
+ * Obtain the string literal provided as a dollar quoted string.
+ * A dollar quoted string is defined as {{{$[tag]$<string literal>$[tag]$}}},
+ * where the string literal is parsed as a list of body sections.
+ * This helper method concatenates all body sections and restores the string
literal back.
+ */
+ def dollarQuotedString(sections: util.List[TerminalNode]): String = {
+ val sb = new StringBuilder()
+ sections forEach (body => sb.append(body.getText))
+ sb.toString()
+ }
+
/** Collect the entries if any. */
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).toSeq.map(x => key -> string(x))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 5ea93e74c5d7..a650eb8ed536 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -70,6 +70,7 @@ object TreePattern extends Enumeration {
val MAP_FROM_ARRAYS: Value = Value
val MAP_FROM_ENTRIES: Value = Value
val MAP_OBJECTS: Value = Value
+ val MEASURE: Value = Value
val MULTI_ALIAS: Value = Value
val NEW_INSTANCE: Value = Value
val NOT: Value = Value
@@ -149,6 +150,7 @@ object TreePattern extends Enumeration {
val LIMIT: Value = Value
val LOCAL_RELATION: Value = Value
val LOGICAL_QUERY_STAGE: Value = Value
+ val METRIC_VIEW_PLACEHOLDER: Value = Value
val NATURAL_LIKE_JOIN: Value = Value
val NO_GROUPING_AGGREGATE_REFERENCE: Value = Value
val OFFSET: Value = Value
@@ -162,6 +164,7 @@ object TreePattern extends Enumeration {
val RELATION_TIME_TRAVEL: Value = Value
val REPARTITION_OPERATION: Value = Value
val REBALANCE_PARTITIONS: Value = Value
+ val RESOLVED_METRIC_VIEW: Value = Value
val SERIALIZE_FROM_OBJECT: Value = Value
val SORT: Value = Value
val SQL_TABLE_FUNCTION: Value = Value
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
new file mode 100644
index 000000000000..a7fa037a4b33
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.logical
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import
org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER,
RESOLVED_METRIC_VIEW, TreePattern}
+import org.apache.spark.sql.metricview.serde.MetricView
+
+case class MetricViewPlaceholder(
+ metadata: CatalogTable,
+ desc: MetricView,
+ outputMetrics: Seq[Attribute],
+ child: LogicalPlan,
+ isCreate: Boolean = false) extends UnaryNode {
+ final override val nodePatterns: Seq[TreePattern] =
Seq(METRIC_VIEW_PLACEHOLDER)
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = {
+ copy(child = newChild)
+ }
+ override def output: Seq[Attribute] = outputMetrics
+ override lazy val resolved: Boolean = child.resolved
+ override def simpleString(maxFields: Int): String =
+ s"$nodeName ${output.mkString("[", ", ", "]")}".trim
+
+ override def producedAttributes: AttributeSet = AttributeSet(outputMetrics)
+}
+
+case class ResolvedMetricView(
+ identifier: TableIdentifier,
+ child: LogicalPlan) extends UnaryNode {
+ override def output: scala.Seq[Attribute] = child.output
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan =
+ copy(child = newChild)
+ override lazy val resolved: Boolean = child.resolved
+ final override val nodePatterns: Seq[TreePattern] = Seq(RESOLVED_METRIC_VIEW)
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
new file mode 100644
index 000000000000..121d908eda90
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.metricview.logical.MetricViewPlaceholder
+import org.apache.spark.sql.metricview.serde.{AssetSource, MetricView,
MetricViewFactory, MetricViewValidationException,
MetricViewYAMLParsingException, SQLSource}
+import org.apache.spark.sql.types.StructType
+
+object MetricViewPlanner {
+
+ def planWrite(
+ metadata: CatalogTable,
+ yaml: String,
+ sqlParser: ParserInterface): MetricViewPlaceholder = {
+ val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
+ MetricViewPlaceholder(
+ metadata,
+ metricView,
+ Seq.empty,
+ dataModelPlan,
+ isCreate = true
+ )
+ }
+
+ def planRead(
+ metadata: CatalogTable,
+ yaml: String,
+ sqlParser: ParserInterface,
+ expectedSchema: StructType): MetricViewPlaceholder = {
+ val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
+ MetricViewPlaceholder(
+ metadata,
+ metricView,
+ DataTypeUtils.toAttributes(expectedSchema),
+ dataModelPlan
+ )
+ }
+
+ private def parseYAML(
+ yaml: String,
+ sqlParser: ParserInterface): (MetricView, LogicalPlan) = {
+ val metricView = try {
+ MetricViewFactory.fromYAML(yaml)
+ } catch {
+ case e: MetricViewValidationException =>
+ throw QueryCompilationErrors.invalidLiteralForWindowDurationError()
+ case e: MetricViewYAMLParsingException =>
+ throw QueryCompilationErrors.invalidLiteralForWindowDurationError()
+ }
+ val source = metricView.from match {
+ case asset: AssetSource =>
UnresolvedRelation(sqlParser.parseMultipartIdentifier(asset.name))
+ case sqlSource: SQLSource => sqlParser.parsePlan(sqlSource.sql)
+ case _ => throw SparkException.internalError("Either SQLSource or
AssetSource")
+ }
+ // Compute filter here because all necessary information is available.
+ val parsedPlan = metricView.where.map { cond =>
+ Filter(sqlParser.parseExpression(cond), source)
+ }.getOrElse(source)
+ (metricView, parsedPlan)
+ }
+}
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
index 4d66392109e7..1a8ad1b622b8 100644
---
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala
@@ -209,7 +209,7 @@ class SparkConnectDatabaseMetaDataSuite extends
ConnectFunSuite with RemoteSpark
withConnection { conn =>
val metadata = conn.getMetaData
// scalastyle:off line.size.limit
- assert(metadata.getSQLKeywords ===
"ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,E
[...]
+ assert(metadata.getSQLKeywords ===
"ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,ARCHIVE,ASC,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTRIBUTE,DIV,DO,ELSEIF,E
[...]
// scalastyle:on line.size.limit
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
new file mode 100644
index 000000000000..a7763c2799a6
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Measure}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.METRIC_VIEW_PLACEHOLDER
+import org.apache.spark.sql.metricview.logical.{MetricViewPlaceholder,
ResolvedMetricView}
+import org.apache.spark.sql.metricview.serde.{Column => CanonicalColumn,
DimensionExpression, JsonUtils, MeasureExpression, MetricView =>
CanonicalMetricView}
+import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder}
+
+/**
+ * Analysis rule for resolving metric view operations (CREATE and SELECT).
+ *
+ * == Background ==
+ * A metric view is a special type of view that defines a semantic layer over
raw data by
+ * declaring dimensions (grouping columns) and measures (pre-aggregated
metrics). Users can
+ * query metric views using the MEASURE() function to access pre-defined
aggregations without
+ * needing to know the underlying aggregation logic.
+ *
+ * == Metric View Definition (YAML) ==
+ * A metric view is defined using YAML syntax that specifies:
+ * - source: The underlying table or SQL query
+ * - where: Optional filter condition applied to the source
+ * - select: List of columns, each being either a dimension or measure
+ * - Dimensions: Expressions used for grouping (e.g., "region",
"upper(region)")
+ * - Measures: Aggregate expressions (e.g., "sum(count)", "avg(price)")
+ *
+ * Example YAML definition:
+ * {{{
+ * version: "0.1"
+ * source:
+ * asset: "sales_table"
+ * where: "product = 'product_1'"
+ * select:
+ * - name: region
+ * expression: dimension(region)
+ * - name: region_upper
+ * expression: dimension(upper(region))
+ * - name: total_sales
+ * expression: measure(sum(amount))
+ * - name: avg_price
+ * expression: measure(avg(price))
+ * }}}
+ *
+ * This rule handles two distinct workflows:
+ *
+ * == Workflow 1: CREATE METRIC VIEW ==
+ * Purpose: Analyze the metric view definition and derive the output schema
for catalog storage.
+ *
+ * SQL Example:
+ * {{{
+ * CREATE VIEW sales_metrics
+ * WITH METRICS
+ * LANGUAGE YAML
+ * AS $$<yaml definition>$$
+ * }}}
+ *
+ * Processing steps:
+ * 1. Detect [[MetricViewPlaceholder]] nodes marked for creation (isCreate =
true)
+ * 2. Parse the YAML definition to extract dimensions and measures
+ * 3. Build an [[Aggregate]] logical plan:
+ * {{{
+ * Aggregate(
+ * groupingExpressions = [region, upper(region)], // all dimensions
+ * aggregateExpressions = [
+ * region, // dimensions become output columns
+ * upper(region) AS region_upper,
+ * sum(amount) AS total_sales, // measures with their aggregations
+ * avg(price) AS avg_price
+ * ],
+ * child = Filter(product = 'product_1', sales_table)
+ * )
+ * }}}
+ * 4. The analyzer resolves this plan to derive column data types
+ * 5. The resolved schema (with metadata about dimensions/measures) is stored
in the catalog
+ *
+ * Key insight: We construct an Aggregate node even though it won't be
executed. This allows
+ * the analyzer to infer proper data types for measures (e.g., sum(int) ->
long).
+ *
+ * == Workflow 2: SELECT FROM METRIC VIEW ==
+ * Purpose: Rewrite user queries to replace MEASURE() function calls with
actual aggregations.
+ *
+ * SQL Example:
+ * {{{
+ * SELECT region, MEASURE(total_sales), MEASURE(avg_price)
+ * FROM sales_metrics
+ * WHERE region_upper = 'REGION_1'
+ * GROUP BY region
+ * }}}
+ *
+ * Processing steps:
+ * 1. Detect queries against metric views (identified by
[[MetricViewReadOperation]])
+ * 2. Load and parse the stored metric view definition from catalog metadata
+ * 3. Build a [[Project]] node that:
+ * - Projects dimension expressions: [region, upper(region) AS region_upper]
+ * - Includes non-conflicting source columns for measure aggregate
functions to reference
+ * - Result: The metric view now exposes dimensions as queryable columns
+ * 4. Locate [[Aggregate]] nodes containing MEASURE() function calls
+ * 5. Substitute each MEASURE() call with its corresponding aggregate
expression:
+ * {{{
+ * Before substitution:
+ * Aggregate(
+ * groupingExpressions = [region],
+ * aggregateExpressions = [region, MEASURE(total_sales),
MEASURE(avg_price)],
+ * child = Filter(region_upper = 'REGION_1', sales_metrics)
+ * )
+ *
+ * After substitution:
+ * Aggregate(
+ * groupingExpressions = [region],
+ * aggregateExpressions = [region, sum(amount), avg(price)],
+ * child = Filter(region_upper = 'REGION_1',
+ * Project([upper(region) AS region_upper, region, amount,
price],
+ * Filter(product = 'product_1', sales_table)))
+ * )
+ * }}}
+ * 6. Return the rewritten plan for further optimization and execution
+ *
+ * Key behaviors:
+ * - Dimensions can be used directly in SELECT, WHERE, GROUP BY, ORDER BY
+ * - Measures must be accessed via MEASURE() function and can only appear in
aggregate context
+ * - The WHERE clause from the metric view definition is automatically applied
+ * - Source table columns are hidden from the metric view
+ *
+ * Example query patterns:
+ * {{{
+ * -- Dimension only (no aggregation needed)
+ * SELECT region_upper FROM sales_metrics GROUP BY 1
+ * => SELECT upper(region) FROM sales_table WHERE product = 'product_1'
GROUP BY 1
+ *
+ * -- Measure only (aggregates entire dataset)
+ * SELECT MEASURE(total_sales) FROM sales_metrics
+ * => SELECT sum(amount) FROM sales_table WHERE product = 'product_1'
+ *
+ * -- Dimensions + Measures (group by dimensions)
+ * SELECT region, MEASURE(total_sales) FROM sales_metrics GROUP BY region
+ * => SELECT region, sum(amount) FROM sales_table
+ * WHERE product = 'product_1' GROUP BY region
+ * }}}
+ *
+ * The rule operates on unresolved plans and transforms
[[MetricViewPlaceholder]] nodes
+ * into resolved logical plans that can be further optimized and executed.
+ */
+case class ResolveMetricView(session: SparkSession) extends Rule[LogicalPlan] {
+ private def parser: ParserInterface = session.sessionState.sqlParser
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!plan.containsPattern(METRIC_VIEW_PLACEHOLDER)) {
+ return plan
+ }
+ plan.resolveOperatorsUp {
+ // CREATE PATH: to create a metric view, we need to analyze the metric
view
+ // definition and get the output schema (with column metadata). Since
the measures
+ // are aggregate functions, we need to use an Aggregate node and group
by all
+ // dimensions to get the output schema.
+ case mvp: MetricViewPlaceholder if mvp.isCreate && mvp.child.resolved =>
+ val (dimensions, measures) = buildMetricViewOutput(mvp.desc)
+ Aggregate(
+ // group by all dimensions
+ dimensions.map(_.toAttribute).toSeq,
+ // select all dimensions and measures to get the final output
(mostly data types)
+ (dimensions ++ measures).toSeq,
+ mvp.child
+ )
+
+ // SELECT PATH: to read a metric view, user will use the `MEASURE`
aggregate function
+ // to read the measures, so it'll lead to an Aggregate node. This way,
we only need to
+ // Resolve the Aggregate node based on the metric view output and then
replace
+ // the AttributeReference of the metric view output to the actual
expressions.
+ case node @ MetricViewReadOperation(metricView) =>
+ // step 1: parse the metric view definition
+ val (dimensions, measures) =
+ parseMetricViewColumns(metricView.outputMetrics,
metricView.desc.select)
+
+ // step 2: build the Project node containing the dimensions
+ val dimensionExprs = dimensions.map(_.namedExpr)
+ // Drop the source columns if it conflicts with dimensions
+ val sourceOutput = metricView.child.output
+ // 1. hide the column conflict with dimensions
+ // 2. add an alias to the source column so they are stable with
DeduplicateRelation
+ // 3. metric view output should use the same exprId
+ val dimensionAttrs = metricView.outputMetrics.filter(a =>
+ dimensions.exists(_.exprId == a.exprId)
+ )
+ val sourceProjList = sourceOutput.filterNot { attr =>
+ // conflict with dimensions
+ dimensionAttrs
+ .resolve(Seq(attr.name), session.sessionState.conf.resolver)
+ .nonEmpty
+ }.map { attr =>
+ // add an alias to the source column so they are stable with
DeduplicateRelation
+ Alias(attr, attr.name)()
+ }
+ val withDimensions = node.transformDownWithPruning(
+ _.containsPattern(METRIC_VIEW_PLACEHOLDER)) {
+ case mv: MetricViewPlaceholder
+ if mv.metadata.identifier == metricView.metadata.identifier =>
+ ResolvedMetricView(
+ mv.metadata.identifier,
+ Project(sourceProjList ++ dimensionExprs, mv.child)
+ )
+ }
+
+ // step 3: resolve the measure references in Aggregate node
+ withDimensions match {
+ case aggregate: Aggregate => transformAggregateWithMeasures(
+ aggregate,
+ measures
+ )
+ case other =>
+ throw SparkException.internalError("ran into unexpected node: " +
other)
+ }
+ }
+ }
+
+ private def buildMetricViewOutput(metricView: CanonicalMetricView)
+ : (Seq[NamedExpression], Seq[NamedExpression]) = {
+ val dimensions = new mutable.ArrayBuffer[NamedExpression]()
+ val measures = new mutable.ArrayBuffer[NamedExpression]()
+ metricView.select.foreach { col =>
+ val metadata = new MetadataBuilder()
+
.withMetadata(Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata)))
+ .build()
+ col.expression match {
+ case DimensionExpression(expr) =>
+ dimensions.append(
+ Alias(parser.parseExpression(expr), col.name)(explicitMetadata =
Some(metadata)))
+ case MeasureExpression(expr) =>
+ measures.append(
+ Alias(parser.parseExpression(expr), col.name)(explicitMetadata =
Some(metadata)))
+ }
+ }
+ (dimensions.toSeq, measures.toSeq)
+ }
+
+ private def parseMetricViewColumns(
+ metricViewOutput: Seq[Attribute],
+ columns: Seq[CanonicalColumn]
+ ): (Seq[MetricViewDimension], Seq[MetricViewMeasure]) = {
+ val dimensions = new mutable.ArrayBuffer[MetricViewDimension]()
+ val measures = new mutable.ArrayBuffer[MetricViewMeasure]()
+ metricViewOutput.zip(columns).foreach { case (attr, column) =>
+ column.expression match {
+ case DimensionExpression(expr) =>
+ dimensions.append(
+ MetricViewDimension(
+ attr.name,
+ parser.parseExpression(expr),
+ attr.exprId,
+ attr.dataType)
+ )
+ case MeasureExpression(expr) =>
+ measures.append(
+ MetricViewMeasure(
+ attr.name,
+ parser.parseExpression(expr),
+ attr.exprId,
+ attr.dataType)
+ )
+ }
+ }
+ (dimensions.toSeq, measures.toSeq)
+ }
+
+ private def transformAggregateWithMeasures(
+ aggregate: Aggregate,
+ measures: Seq[MetricViewMeasure]): LogicalPlan = {
+ val measuresMap = measures.map(m => m.exprId -> m).toMap
+ val newAggExprs = aggregate.aggregateExpressions.map { expr =>
+ expr.transform {
+ case AggregateExpression(Measure(a: AttributeReference), _, _, _, _) =>
+ measuresMap(a.exprId).expr
+ }.asInstanceOf[NamedExpression]
+ }
+ aggregate.copy(aggregateExpressions = newAggExprs)
+ }
+}
+
+object MetricViewReadOperation {
+ def unapply(plan: LogicalPlan): Option[MetricViewPlaceholder] = {
+ plan match {
+ case a: Aggregate if a.resolved &&
a.containsPattern(METRIC_VIEW_PLACEHOLDER) =>
+ collectMetricViewNode(a.child)
+ case _ =>
+ None
+ }
+ }
+
+ @scala.annotation.tailrec
+ private def collectMetricViewNode(plan: LogicalPlan):
Option[MetricViewPlaceholder] = {
+ plan match {
+ case f: Filter => collectMetricViewNode(f.child)
+ case s: Expand => collectMetricViewNode(s.child)
+ case s: Project => collectMetricViewNode(s.child)
+ case s: SubqueryAlias => collectMetricViewNode(s.child)
+ case m: MetricViewPlaceholder => Some(m)
+ case _ => None
+ }
+ }
+}
+
+sealed trait MetricViewColumn {
+ def name: String
+ def expr: Expression
+ def exprId: ExprId
+ def dataType: DataType
+ def namedExpr: NamedExpression = {
+ Alias(UpCast(expr, dataType), name)(exprId = exprId)
+ }
+}
+
+case class MetricViewDimension(
+ name: String,
+ expr: Expression,
+ exprId: ExprId,
+ dataType: DataType) extends MetricViewColumn
+
+case class MetricViewMeasure(
+ name: String,
+ expr: Expression,
+ exprId: ExprId,
+ dataType: DataType) extends MetricViewColumn
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index f8f6e31be1bc..2885d215ee34 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -724,6 +724,56 @@ class SparkSqlAstBuilder extends AstBuilder {
}
}
+ override def visitCreateMetricView(ctx: CreateMetricViewContext):
LogicalPlan = withOrigin(ctx) {
+ checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
+ checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
+ checkDuplicateClauses(ctx.routineLanguage(), "LANGUAGE", ctx)
+ checkDuplicateClauses(ctx.METRICS(), "WITH METRICS", ctx)
+ val userSpecifiedColumns = Option(ctx.identifierCommentList).toSeq.flatMap
{ icl =>
+ icl.identifierComment.asScala.map { ic =>
+ ic.identifier.getText -> Option(ic.commentSpec()).map(visitCommentSpec)
+ }
+ }
+
+ if (ctx.EXISTS != null && ctx.REPLACE != null) {
+ throw
QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx)
+ }
+
+ if (ctx.METRICS(0) == null) {
+ throw QueryParsingErrors.missingClausesForOperation(
+ ctx, "WITH METRICS", "METRIC VIEW CREATION")
+ }
+
+ if (ctx.routineLanguage(0) == null) {
+ throw QueryParsingErrors.missingClausesForOperation(
+ ctx, "LANGUAGE", "METRIC VIEW CREATION")
+ }
+
+ val languageCtx = ctx.routineLanguage(0)
+ if (languageCtx.SQL() != null) {
+ operationNotAllowed("Unsupported language for metric view: SQL",
languageCtx)
+ }
+ val name: String = languageCtx.IDENTIFIER().getText
+ if (!name.equalsIgnoreCase("YAML")) {
+ operationNotAllowed(s"Unsupported language for metric view: $name",
languageCtx)
+ }
+
+ val properties = ctx.propertyList.asScala.headOption
+ .map(visitPropertyKeyValues)
+ .getOrElse(Map.empty)
+ val codeLiteral = visitCodeLiteral(ctx.codeLiteral())
+
+ CreateMetricViewCommand(
+ withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
+ userSpecifiedColumns,
+ visitCommentSpecList(ctx.commentSpec()),
+ properties,
+ codeLiteral,
+ allowExisting = ctx.EXISTS != null,
+ replace = ctx.REPLACE != null
+ )
+ }
+
/**
* Create a [[CreateFunctionCommand]].
*
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
new file mode 100644
index 000000000000..5fb36b429c4a
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier,
SchemaUnsupported}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData,
LogicalPlan}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.metricview.util.MetricViewPlanner
+import org.apache.spark.sql.types.StructType
+
+case class CreateMetricViewCommand(
+ child: LogicalPlan,
+ userSpecifiedColumns: Seq[(String, Option[String])],
+ comment: Option[String],
+ properties: Map[String, String],
+ originalText: String,
+ allowExisting: Boolean,
+ replace: Boolean) extends UnaryRunnableCommand with IgnoreCachedData {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val name = child match {
+ case v: ResolvedIdentifier =>
+ v.identifier.asTableIdentifier
+ case _ => throw SparkException.internalError(
+ s"Failed to resolve identifier for creating metric view")
+ }
+ val analyzed = MetricViewHelper.analyzeMetricViewText(sparkSession, name,
originalText)
+
+ if (userSpecifiedColumns.nonEmpty) {
+ if (userSpecifiedColumns.length > analyzed.output.length) {
+ throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
+ name, userSpecifiedColumns.map(_._1), analyzed)
+ } else if (userSpecifiedColumns.length < analyzed.output.length) {
+ throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
+ name, userSpecifiedColumns.map(_._1), analyzed)
+ }
+ }
+ catalog.createTable(
+ ViewHelper.prepareTable(
+ sparkSession, name, Some(originalText), analyzed, userSpecifiedColumns,
+ properties, SchemaUnsupported, comment,
+ None, isMetricView = true),
+ ignoreIfExists = allowExisting)
+ Seq.empty
+ }
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = {
+ copy(child = newChild)
+ }
+}
+
+case class AlterMetricViewCommand(child: LogicalPlan, originalText: String)
+
+object MetricViewHelper {
+ def analyzeMetricViewText(
+ session: SparkSession,
+ name: TableIdentifier,
+ viewText: String): LogicalPlan = {
+ val analyzer = session.sessionState.analyzer
+ // this metadata is used for analysis check, it'll be replaced during
create/update with
+ // more accurate information
+ val tableMeta = CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType(),
+ viewOriginalText = Some(viewText),
+ viewText = Some(viewText))
+ val metricViewNode = MetricViewPlanner.planWrite(
+ tableMeta, viewText, session.sessionState.sqlParser)
+ val analyzed = analyzer.executeAndCheck(metricViewNode, new
QueryPlanningTracker)
+ ViewHelper.verifyTemporaryObjectsNotExists(isTemporary = false, name,
analyzed, Seq.empty)
+ analyzed
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 11ec17ca57fd..95d76c72d295 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -133,7 +133,7 @@ case class CreateViewCommand(
SchemaUtils.checkIndeterminateCollationInSchema(plan.schema)
if (viewType == LocalTempView) {
- val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan,
userSpecifiedColumns)
val tableDefinition = createTemporaryViewRelation(
name,
sparkSession,
@@ -148,7 +148,7 @@ case class CreateViewCommand(
} else if (viewType == GlobalTempView) {
val db =
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
- val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan,
userSpecifiedColumns)
val tableDefinition = createTemporaryViewRelation(
viewIdent,
sparkSession,
@@ -178,7 +178,10 @@ case class CreateViewCommand(
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
// Nothing we need to retain from the old view, so just drop and
create a new one
catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false)
- catalog.createTable(prepareTable(sparkSession, analyzedPlan),
ignoreIfExists = false)
+ catalog.createTable(
+ prepareTable(
+ sparkSession, name, originalText, analyzedPlan,
userSpecifiedColumns, properties,
+ viewSchemaMode, comment, collation), ignoreIfExists = false)
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the
target view already
// exists.
@@ -186,56 +189,14 @@ case class CreateViewCommand(
}
} else {
// Create the view if it doesn't exist.
- catalog.createTable(prepareTable(sparkSession, analyzedPlan),
ignoreIfExists = allowExisting)
+ catalog.createTable(
+ prepareTable(
+ sparkSession, name, originalText, analyzedPlan,
userSpecifiedColumns, properties,
+ viewSchemaMode, comment, collation), ignoreIfExists = allowExisting)
}
Seq.empty[Row]
}
- /**
- * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user
specified columns,
- * else return the analyzed plan directly.
- */
- private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan):
LogicalPlan = {
- if (userSpecifiedColumns.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, None)) => Alias(attr, colName)()
- case (attr, (colName, Some(colComment))) =>
- val meta = new MetadataBuilder().putString("comment",
colComment).build()
- Alias(attr, colName)(explicitMetadata = Some(meta))
- }
- session.sessionState.executePlan(Project(projectList,
analyzedPlan)).analyzed
- }
- }
-
- /**
- * Returns a [[CatalogTable]] that can be used to save in the catalog.
Generate the view-specific
- * properties(e.g. view default database, view query output column names)
and store them as
- * properties in the CatalogTable, and also creates the proper schema for
the view.
- */
- private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan):
CatalogTable = {
- if (originalText.isEmpty) {
- throw
QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError()
- }
- val aliasedSchema = CharVarcharUtils.getRawSchema(
- aliasPlan(session, analyzedPlan).schema, session.sessionState.conf)
- val newProperties = generateViewProperties(
- properties, session, analyzedPlan.schema.fieldNames,
aliasedSchema.fieldNames, viewSchemaMode)
-
- CatalogTable(
- identifier = name,
- tableType = CatalogTableType.VIEW,
- storage = CatalogStorageFormat.empty,
- schema = aliasedSchema,
- properties = newProperties,
- viewOriginalText = originalText,
- viewText = originalText,
- comment = comment,
- collation = collation
- )
- }
-
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(plan = WithCTE(plan, cteDefs))
}
@@ -812,4 +773,70 @@ object ViewHelper extends SQLConfHelper with Logging with
CapturesConfig {
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")),
collation = collation)
}
+
+
+ /**
+ * Returns a [[CatalogTable]] that can be used to save in the catalog.
Generate the view-specific
+ * properties(e.g. view default database, view query output column names)
and store them as
+ * properties in the CatalogTable, and also creates the proper schema for
the view.
+ */
+ def prepareTable(
+ session: SparkSession,
+ name: TableIdentifier,
+ originalText: Option[String],
+ analyzedPlan: LogicalPlan,
+ userSpecifiedColumns: Seq[(String, Option[String])],
+ properties: Map[String, String],
+ viewSchemaMode: ViewSchemaMode,
+ comment: Option[String],
+ collation: Option[String],
+ isMetricView: Boolean = false): CatalogTable = {
+ if (originalText.isEmpty) {
+ throw
QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError()
+ }
+ val aliasedSchema = CharVarcharUtils.getRawSchema(
+ aliasPlan(session, analyzedPlan, userSpecifiedColumns).schema,
session.sessionState.conf)
+ val newProperties = generateViewProperties(
+ properties, session, analyzedPlan.schema.fieldNames,
aliasedSchema.fieldNames, viewSchemaMode)
+
+ // Add property to indicate if this is a metric view
+ val finalProperties = if (isMetricView) {
+ newProperties + (CatalogTable.VIEW_WITH_METRICS -> "true")
+ } else {
+ newProperties
+ }
+
+ CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = aliasedSchema,
+ properties = finalProperties,
+ viewOriginalText = originalText,
+ viewText = originalText,
+ comment = comment,
+ collation = collation
+ )
+ }
+
+ /**
+ * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user
specified columns,
+ * else return the analyzed plan directly.
+ */
+ def aliasPlan(
+ session: SparkSession,
+ analyzedPlan: LogicalPlan,
+ userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = {
+ if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (attr, (colName, None)) => Alias(attr, colName)()
+ case (attr, (colName, Some(colComment))) =>
+ val meta = new MetadataBuilder().putString("comment",
colComment).build()
+ Alias(attr, colName)(explicitMetadata = Some(meta))
+ }
+ session.sessionState.executePlan(Project(projectList,
analyzedPlan)).analyzed
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 00c9a26cb5bf..24bf618ee861 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.internal
import org.apache.spark.annotation.Unstable
import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods,
SparkSessionExtensions, UDTFRegistration}
import org.apache.spark.sql.artifact.ArtifactManager
-import org.apache.spark.sql.catalyst.analysis.{Analyzer,
EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures,
ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark,
ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose,
TableFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer,
EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures,
ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark,
ResolveExecuteImmediate, ResolveMetricView, ResolveSessionCatalog,
ResolveTranspose, TableFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension
import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder,
SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Expression,
ExtractSemiStructuredFields}
@@ -245,6 +245,7 @@ abstract class BaseSessionStateBuilder(
ResolveWriteToStream +:
new EvalSubqueriesForTimeTravel +:
new ResolveTranspose(session) +:
+ ResolveMetricView(session) +:
new InvokeProcedures(session) +:
ResolveExecuteImmediate(session, this.catalogManager) +:
ExtractSemiStructuredFields +:
diff --git
a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
index b8443e417caf..c363cf2f6985 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out
@@ -210,7 +210,9 @@ MAP false
MATCHED false
MATERIALIZED false
MAX false
+MEASURE false
MERGE false
+METRICS false
MICROSECOND false
MICROSECONDS false
MILLISECOND false
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index 00baa0c7e725..ddc5b05b3550 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -210,7 +210,9 @@ MAP false
MATCHED false
MATERIALIZED false
MAX false
+MEASURE false
MERGE false
+METRICS false
MICROSECOND false
MICROSECONDS false
MILLISECOND false
diff --git
a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
index 00baa0c7e725..ddc5b05b3550 100644
--- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out
@@ -210,7 +210,9 @@ MAP false
MATCHED false
MATERIALIZED false
MAX false
+MEASURE false
MERGE false
+METRICS false
MICROSECOND false
MICROSECONDS false
MILLISECOND false
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala
new file mode 100644
index 000000000000..5e6033aeaa75
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column,
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory,
SQLSource}
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+
+class SimpleMetricViewSuite extends MetricViewSuite with SharedSparkSession
+
+/**
+ * A suite for testing metric view related functionality.
+ */
+abstract class MetricViewSuite extends QueryTest with SQLTestUtils {
+ import testImplicits._
+
+ protected val testMetricViewName = "test_metric_view"
+ protected val testTableName = "test_table"
+ protected val testTableData = Seq(
+ ("region_1", "product_1", 80, 5.0),
+ ("region_1", "product_2", 70, 10.0),
+ ("REGION_1", "product_3", 60, 15.0),
+ ("REGION_1", "product_4", 50, 20.0),
+ ("region_2", "product_1", 40, 25.0),
+ ("region_2", "product_2", 30, 30.0),
+ ("REGION_2", "product_3", 20, 35.0),
+ ("REGION_2", "product_4", 10, 40.0)
+ )
+ protected val testMetricViewColumns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("product", DimensionExpression("product"), 1),
+ Column("region_upper", DimensionExpression("upper(region)"), 2),
+ Column("count_sum", MeasureExpression("sum(count)"), 3),
+ Column("price_avg", MeasureExpression("avg(price)"), 4)
+ )
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ testTableData
+ .toDF("region", "product", "count", "price")
+ .write
+ .saveAsTable(testTableName)
+ }
+
+ protected def createMetricView(
+ metricViewName: String,
+ metricViewDefinition: MetricView): Unit = {
+ val yaml = MetricViewFactory.toYAML(metricViewDefinition)
+ sql(s"""
+ |CREATE VIEW $metricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$
+ |""".stripMargin)
+ }
+
+ protected def withMetricView(
+ viewName: String,
+ metricViewDefinition: MetricView)(body: => Unit): Unit = {
+ createMetricView(viewName, metricViewDefinition)
+ withView(viewName) {
+ body
+ }
+ }
+
+ test("test source type") {
+ val sources = Seq(
+ AssetSource(testTableName),
+ SQLSource("SELECT * FROM test_table")
+ )
+ sources.foreach { source =>
+ val metricView = MetricView("0.1", source, None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) FROM
test_metric_view"),
+ sql("SELECT sum(count), avg(price) FROM test_table")
+ )
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) " +
+ "FROM test_metric_view WHERE region_upper = 'REGION_1'"),
+ sql("SELECT sum(count), avg(price) FROM test_table WHERE
upper(region) = 'REGION_1'")
+ )
+ }
+ }
+ }
+
+ test("test where clause") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName),
+ Some("product = 'product_1'"), testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) FROM
test_metric_view"),
+ sql("SELECT sum(count), avg(price) FROM test_table WHERE product =
'product_1'")
+ )
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) " +
+ "FROM test_metric_view WHERE region_upper = 'REGION_1'"),
+ sql("SELECT sum(count), avg(price) FROM test_table WHERE " +
+ "product = 'product_1' AND upper(region) = 'REGION_1'")
+ )
+ }
+ }
+
+ test("test dimensions and measures") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ // dimension and measure
+ checkAnswer(
+ sql("SELECT region, product, measure(count_sum), measure(price_avg) " +
+ "FROM test_metric_view GROUP BY region, product"),
+ sql("SELECT region, product, sum(count), avg(price) " +
+ "FROM test_table GROUP BY region, product")
+ )
+ // dimension only
+ checkAnswer(
+ sql("SELECT region_upper FROM test_metric_view GROUP BY 1"),
+ sql("SELECT upper(region) FROM test_table GROUP BY 1")
+ )
+ // measure only
+ checkAnswer(
+ sql("SELECT measure(count_sum) FROM test_metric_view"),
+ sql("SELECT sum(count) FROM test_table")
+ )
+ }
+ }
+
+ test("column from source cannot be used when query metric view") {
+ val metricView = MetricView("0.1", AssetSource(testTableName), None,
testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("SELECT sum(count) FROM test_metric_view").collect()
+ },
+ condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ parameters = Map(
+ "objectName" -> "`count`",
+ "proposal" -> "`count_sum`, `product`, `region`, `price_avg`,
`region_upper`"
+ ),
+ queryContext = Array(ExpectedContext(
+ fragment = "count",
+ start = 11,
+ stop = 15
+ ))
+ )
+ }
+ }
+
+ test("test ORDER BY and LIMIT clauses") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region, measure(count_sum) " +
+ "FROM test_metric_view GROUP BY region ORDER BY 2 DESC"),
+ sql("SELECT region, sum(count) " +
+ "FROM test_table GROUP BY region ORDER BY 2 DESC")
+ )
+ checkAnswer(
+ sql("SELECT product, measure(price_avg) " +
+ "FROM test_metric_view GROUP BY product ORDER BY 2 ASC LIMIT 2"),
+ sql("SELECT product, avg(price) " +
+ "FROM test_table GROUP BY product ORDER BY 2 ASC LIMIT 2")
+ )
+ }
+ }
+
+ test("test complex WHERE conditions with dimensions") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region, product, measure(count_sum) " +
+ "FROM test_metric_view WHERE region_upper = 'REGION_1' " +
+ "AND product IN ('product_1', 'product_2') " +
+ "GROUP BY region, product"),
+ sql("SELECT region, product, sum(count) " +
+ "FROM test_table WHERE upper(region) = 'REGION_1' " +
+ "AND product IN ('product_1', 'product_2') " +
+ "GROUP BY region, product")
+ )
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) " +
+ "FROM test_metric_view WHERE region_upper LIKE 'REGION_%' AND
product <> 'product_4'"),
+ sql("SELECT sum(count), avg(price) " +
+ "FROM test_table WHERE upper(region) LIKE 'REGION_%' AND product <>
'product_4'")
+ )
+ }
+ }
+
+ test("test metric view with where clause and additional query filters") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName),
+ Some("product IN ('product_1', 'product_2')"), testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region, measure(count_sum) " +
+ "FROM test_metric_view WHERE region_upper = 'REGION_1' GROUP BY
region"),
+ sql("SELECT region, sum(count) " +
+ "FROM test_table WHERE product IN ('product_1', 'product_2') " +
+ "AND upper(region) = 'REGION_1' GROUP BY region")
+ )
+ }
+ }
+
+ test("test multiple measures with different aggregations") {
+ val columns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1),
+ Column("count_avg", MeasureExpression("avg(count)"), 2),
+ Column("count_max", MeasureExpression("max(count)"), 3),
+ Column("count_min", MeasureExpression("min(count)"), 4),
+ Column("price_sum", MeasureExpression("sum(price)"), 5)
+ )
+ val metricView = MetricView("0.1", AssetSource(testTableName), None,
columns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(count_avg),
measure(count_max), " +
+ "measure(count_min), measure(price_sum) FROM test_metric_view"),
+ sql("SELECT sum(count), avg(count), max(count), min(count), sum(price)
FROM test_table")
+ )
+ checkAnswer(
+ sql("SELECT region, measure(count_sum), measure(count_max),
measure(price_sum) " +
+ "FROM test_metric_view GROUP BY region"),
+ sql("SELECT region, sum(count), max(count), sum(price) FROM test_table
GROUP BY region")
+ )
+ }
+ }
+
+ test("test dimension expressions with case statements") {
+ val columns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("region_category", DimensionExpression(
+ "CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group B' END"), 1),
+ Column("count_sum", MeasureExpression("sum(count)"), 2)
+ )
+ val metricView = MetricView("0.1", AssetSource(testTableName), None,
columns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region_category, measure(count_sum) " +
+ "FROM test_metric_view GROUP BY region_category"),
+ sql("SELECT CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group
B' END, " +
+ "sum(count) FROM test_table " +
+ "GROUP BY CASE WHEN region = 'region_1' THEN 'Group A' ELSE 'Group
B' END")
+ )
+ }
+ }
+
+ test("test measure expressions with arithmetic operations") {
+ val columns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("total_revenue", MeasureExpression("sum(count * price)"), 1),
+ Column("avg_revenue", MeasureExpression("avg(count * price)"), 2)
+ )
+ val metricView = MetricView("0.1", AssetSource(testTableName), None,
columns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(total_revenue), measure(avg_revenue) FROM
test_metric_view"),
+ sql("SELECT sum(count * price), avg(count * price) FROM test_table")
+ )
+ checkAnswer(
+ sql("SELECT region, measure(total_revenue) " +
+ "FROM test_metric_view GROUP BY region ORDER BY 2 DESC"),
+ sql("SELECT region, sum(count * price) " +
+ "FROM test_table GROUP BY region ORDER BY 2 DESC")
+ )
+ }
+ }
+
+ test("test dimensions with aggregate functions in GROUP BY") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region_upper, product, measure(count_sum) " +
+ "FROM test_metric_view GROUP BY region_upper, product ORDER BY
region_upper, product"),
+ sql("SELECT upper(region), product, sum(count) " +
+ "FROM test_table GROUP BY upper(region), product ORDER BY
upper(region), product")
+ )
+ }
+ }
+
+ test("test WHERE clause with OR conditions") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) " +
+ "FROM test_metric_view WHERE region = 'region_1' OR product =
'product_1'"),
+ sql("SELECT sum(count), avg(price) " +
+ "FROM test_table WHERE region = 'region_1' OR product = 'product_1'")
+ )
+ }
+ }
+
+ test("test dimension-only query with multiple dimensions") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region_upper, product " +
+ "FROM test_metric_view GROUP BY region_upper, product"),
+ sql("SELECT upper(region), product FROM test_table GROUP BY
upper(region), product")
+ )
+ }
+ }
+
+ test("test query with SELECT * should fail") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ intercept[Exception] {
+ sql("SELECT * FROM test_metric_view").collect()
+ }
+ }
+ }
+
+ test("test SQLSource with complex query") {
+ val sqlSource = SQLSource(
+ "SELECT region, product, count, price FROM test_table WHERE count > 20")
+ val metricView = MetricView("0.1", sqlSource, None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) FROM
test_metric_view"),
+ sql("SELECT sum(count), avg(price) FROM test_table WHERE count > 20")
+ )
+ checkAnswer(
+ sql("SELECT region, measure(count_sum) FROM test_metric_view GROUP BY
region"),
+ sql("SELECT region, sum(count) FROM test_table WHERE count > 20 GROUP
BY region")
+ )
+ }
+ }
+
+ test("test measure function without GROUP BY") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT measure(count_sum) FROM test_metric_view"),
+ sql("SELECT sum(count) FROM test_table")
+ )
+ checkAnswer(
+ sql("SELECT measure(count_sum), measure(price_avg) FROM
test_metric_view"),
+ sql("SELECT sum(count), avg(price) FROM test_table")
+ )
+ }
+ }
+
+ test("test combining multiple dimension expressions in WHERE") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT product, measure(count_sum) " +
+ "FROM test_metric_view WHERE region = 'region_1' AND region_upper =
'REGION_1' " +
+ "GROUP BY product"),
+ sql("SELECT product, sum(count) " +
+ "FROM test_table WHERE region = 'region_1' AND upper(region) =
'REGION_1' " +
+ "GROUP BY product")
+ )
+ }
+ }
+
+ test("test measure with COUNT DISTINCT") {
+ val columns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("product_count", MeasureExpression("count(distinct product)"), 1),
+ Column("count_sum", MeasureExpression("sum(count)"), 2)
+ )
+ val metricView = MetricView("0.1", AssetSource(testTableName), None,
columns)
+ withMetricView(testMetricViewName, metricView) {
+ checkAnswer(
+ sql("SELECT region, measure(product_count), measure(count_sum) " +
+ "FROM test_metric_view GROUP BY region"),
+ sql("SELECT region, count(distinct product), sum(count) " +
+ "FROM test_table GROUP BY region")
+ )
+ }
+ }
+
+ test("test union of same aggregated metric view dataframe") {
+ val metricView = MetricView(
+ "0.1", AssetSource(testTableName), None, testMetricViewColumns)
+ withMetricView(testMetricViewName, metricView) {
+ // Create a DataFrame with aggregation and groupBy from metric view
+ val df = sql(
+ s"""SELECT region, measure(count_sum) as total_count,
measure(price_avg) as avg_price
+ |FROM $testMetricViewName
+ |GROUP BY region
+ |""".stripMargin)
+
+ // Union the same DataFrame with itself - tests DeduplicateRelations
+ val unionDf = df.union(df)
+
+ // Expected result: each region should appear twice with identical values
+ val expectedDf = sql(
+ """
+ |SELECT region, sum(count) as total_count, avg(price) as avg_price
+ |FROM test_table
+ |GROUP BY region
+ |UNION ALL
+ |SELECT region, sum(count) as total_count, avg(price) as avg_price
+ |FROM test_table
+ |GROUP BY region
+ |""".stripMargin)
+
+ checkAnswer(unionDf, expectedDf)
+
+ // Verify the result has duplicate rows
+ assert(unionDf.count() == df.count() * 2,
+ "Union should double the row count")
+
+ // Verify that distinct values are the same as the original
+ checkAnswer(unionDf.distinct(), df)
+ }
+ }
+}
diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py
index b49124ece086..9cc478e5cadf 100644
--- a/sql/gen-sql-functions-docs.py
+++ b/sql/gen-sql-functions-docs.py
@@ -175,7 +175,7 @@ def _make_pretty_examples(jspark, infos):
pretty_output = ""
for info in infos:
if (info.examples.startswith("\n Examples:") and info.name.lower()
not in
- ("from_avro", "to_avro", "from_protobuf", "to_protobuf")):
+ ("from_avro", "to_avro", "from_protobuf", "to_protobuf",
"measure")):
output = []
output.append("-- %s" % info.name)
query_examples = filter(lambda x: x.startswith(" > "),
info.examples.split("\n"))
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index d69f99a1e42f..a95eeee03221 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends
SharedThriftServer {
val sessionHandle = client.openSession(user, "")
val infoValue = client.getInfo(sessionHandle,
GetInfoType.CLI_ODBC_KEYWORDS)
// scalastyle:off line.size.limit
- assert(infoValue.getStringValue ==
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,C
[...]
+ assert(infoValue.getStringValue ==
"ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,ATOMIC,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,C
[...]
// scalastyle:on line.size.limit
}
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index dec947651dd6..9f5566407e38 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver,
GenericUDF, GenericUDTF}
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{Analyzer,
EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar,
ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog,
ResolveTranspose}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer,
EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar,
ResolveDataSource, ResolveExecuteImmediate, ResolveMetricView,
ResolveSessionCatalog, ResolveTranspose}
import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener,
InvalidUDFClassException}
import org.apache.spark.sql.catalyst.expressions.{Expression,
ExtractSemiStructuredFields}
@@ -132,6 +132,7 @@ class HiveSessionStateBuilder(
new EvalSubqueriesForTimeTravel +:
new DetermineTableStats(session) +:
new ResolveTranspose(session) +:
+ ResolveMetricView(session) +:
new InvokeProcedures(session) +:
ResolveExecuteImmediate(session, catalogManager) +:
ExtractSemiStructuredFields +:
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala
new file mode 100644
index 000000000000..bccc2f629a15
--- /dev/null
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.execution.MetricViewSuite
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.tags.SlowHiveTest
+
+/**
+ * A test suite for Hive metric view related functionality.
+ */
+@SlowHiveTest
+class HiveMetricViewSuite extends MetricViewSuite with TestHiveSingleton
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]