This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7de318e61204 [SPARK-52158][CONNECT][SQL] Add InvalidInputErrors object
to centralize errors in SparkConnectPlanner
7de318e61204 is described below
commit 7de318e61204a0fa04d184f2112a933c41cf802c
Author: Yihong He <[email protected]>
AuthorDate: Mon May 19 09:27:04 2025 +0900
[SPARK-52158][CONNECT][SQL] Add InvalidInputErrors object to centralize
errors in SparkConnectPlanner
### What changes were proposed in this pull request?
- Add InvalidInputErrors object to centralize errors in SparkConnectPlanner
- Move InvalidPlanInput/InvalidCommandInput error messages into
InvalidInputErrors
### Why are the changes needed?
- Improve error reuse and reduce code duplication
- Easier to support the error framework in the future
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor 0.49.6 (Universal)
Closes #50915 from heyihong/SPARK-52158.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/planner/InvalidInputErrors.scala | 229 +++++++++++++++++++++
.../sql/connect/planner/SparkConnectPlanner.scala | 188 +++++++----------
2 files changed, 306 insertions(+), 111 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
new file mode 100644
index 000000000000..38f762af2438
--- /dev/null
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.mutable
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.connect.common.{InvalidCommandInput,
InvalidPlanInput}
+import org.apache.spark.sql.types.DataType
+
+object InvalidInputErrors {
+
+ def unknownRelationNotSupported(rel: proto.Relation): InvalidPlanInput =
+ InvalidPlanInput(s"${rel.getUnknown} not supported.")
+
+ def noHandlerFoundForExtension(): InvalidPlanInput =
+ InvalidPlanInput("No handler found for extension")
+
+ def catalogTypeNotSupported(catType: proto.Catalog.CatTypeCase):
InvalidPlanInput =
+ InvalidPlanInput(s"$catType not supported.")
+
+ def invalidSQLWithReferences(query: proto.WithRelations): InvalidPlanInput =
+ InvalidPlanInput(s"$query is not a valid relation for SQL with references")
+
+ def naFillValuesEmpty(): InvalidPlanInput =
+ InvalidPlanInput("values must contains at least 1 item!")
+
+ def naFillValuesLengthMismatch(): InvalidPlanInput =
+ InvalidPlanInput(
+ "When values contains more than 1 items, values and cols should have the
same length!")
+
+ def deduplicateNeedsInput(): InvalidPlanInput =
+ InvalidPlanInput("Deduplicate needs a plan input")
+
+ def deduplicateAllColumnsAndSubset(): InvalidPlanInput =
+ InvalidPlanInput("Cannot deduplicate on both all columns and a subset of
columns")
+
+ def deduplicateRequiresColumnsOrAll(): InvalidPlanInput =
+ InvalidPlanInput(
+ "Deduplicate requires to either deduplicate on all columns or a subset
of columns")
+
+ def invalidDeduplicateColumn(colName: String): InvalidPlanInput =
+ InvalidPlanInput(s"Invalid deduplicate column $colName")
+
+ def functionEvalTypeNotSupported(evalType: Int): InvalidPlanInput =
+ InvalidPlanInput(s"Function with EvalType: $evalType is not supported")
+
+ def functionIdNotSupported(functionId: Int): InvalidPlanInput =
+ InvalidPlanInput(s"Function with ID: $functionId is not supported")
+
+ def groupingExpressionAbsentForKeyValueGroupedDataset(): InvalidPlanInput =
+ InvalidPlanInput("The grouping expression cannot be absent for
KeyValueGroupedDataset")
+
+ def expectingScalaUdfButGot(exprType: proto.Expression.ExprTypeCase):
InvalidPlanInput =
+ InvalidPlanInput(s"Expecting a Scala UDF, but get $exprType")
+
+ def rowNotSupportedForUdf(errorType: String): InvalidPlanInput =
+ InvalidPlanInput(s"Row is not a supported $errorType type for this UDF.")
+
+ def invalidUserDefinedOutputSchemaType(actualType: String): InvalidPlanInput
=
+ InvalidPlanInput(
+ s"Invalid user-defined output schema type for
TransformWithStateInPandas. " +
+ s"Expect a struct type, but got $actualType.")
+
+ def notFoundCachedLocalRelation(hash: String, sessionUUID: String):
InvalidPlanInput =
+ InvalidPlanInput(
+ s"Not found any cached local relation with the hash: " +
+ s"$hash in the session with sessionUUID $sessionUUID.")
+
+ def withColumnsRequireSingleNamePart(got: String): InvalidPlanInput =
+ InvalidPlanInput(s"WithColumns require column name only contains one name
part, but got $got")
+
+ def inputDataForLocalRelationNoSchema(): InvalidPlanInput =
+ InvalidPlanInput("Input data for LocalRelation does not produce a schema.")
+
+ def schemaRequiredForLocalRelation(): InvalidPlanInput =
+ InvalidPlanInput("Schema for LocalRelation is required when the input data
is not provided.")
+
+ def invalidSchema(schema: DataType): InvalidPlanInput =
+ InvalidPlanInput(s"Invalid schema $schema")
+
+ def invalidJdbcParams(): InvalidPlanInput =
+ InvalidPlanInput("Invalid jdbc params, please specify jdbc url and table.")
+
+ def predicatesNotSupportedForDataSource(format: String): InvalidPlanInput =
+ InvalidPlanInput(s"Predicates are not supported for $format data sources.")
+
+ def multiplePathsNotSupportedForStreamingSource(): InvalidPlanInput =
+ InvalidPlanInput("Multiple paths are not supported for streaming source")
+
+ def doesNotSupport(what: String): InvalidPlanInput =
+ InvalidPlanInput(s"Does not support $what")
+
+ def invalidSchemaDataType(dataType: DataType): InvalidPlanInput =
+ InvalidPlanInput(s"Invalid schema dataType $dataType")
+
+ def expressionIdNotSupported(exprId: Int): InvalidPlanInput =
+ InvalidPlanInput(s"Expression with ID: $exprId is not supported")
+
+ def lambdaFunctionArgumentCountInvalid(got: Int): InvalidPlanInput =
+ InvalidPlanInput(s"LambdaFunction requires 1 ~ 3 arguments, but got $got
ones!")
+
+ def aliasWithMultipleIdentifiersAndMetadata(): InvalidPlanInput =
+ InvalidPlanInput(
+ "Alias expressions with more than 1 identifier must not use optional
metadata.")
+
+ def unresolvedStarTargetInvalid(target: String): InvalidPlanInput =
+ InvalidPlanInput(
+ s"UnresolvedStar requires a unparsed target ending with '.*', but got
$target.")
+
+ def unresolvedStarWithBothTargetAndPlanId(): InvalidPlanInput =
+ InvalidPlanInput("UnresolvedStar with both target and plan id is not
supported.")
+
+ def windowFunctionRequired(): InvalidPlanInput =
+ InvalidPlanInput("WindowFunction is required in WindowExpression")
+
+ def unknownFrameType(
+ frameType: proto.Expression.Window.WindowFrame.FrameType):
InvalidPlanInput =
+ InvalidPlanInput(s"Unknown FrameType $frameType")
+
+ def lowerBoundRequiredInWindowFrame(): InvalidPlanInput =
+ InvalidPlanInput("LowerBound is required in WindowFrame")
+
+ def unknownFrameBoundary(
+ boundary: proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase)
+ : InvalidPlanInput =
+ InvalidPlanInput(s"Unknown FrameBoundary $boundary")
+
+ def upperBoundRequiredInWindowFrame(): InvalidPlanInput =
+ InvalidPlanInput("UpperBound is required in WindowFrame")
+
+ def setOperationMustHaveTwoInputs(): InvalidPlanInput =
+ InvalidPlanInput("Set operation must have 2 inputs")
+
+ def exceptDoesNotSupportUnionByName(): InvalidPlanInput =
+ InvalidPlanInput("Except does not support union_by_name")
+
+ def intersectDoesNotSupportUnionByName(): InvalidPlanInput =
+ InvalidPlanInput("Intersect does not support union_by_name")
+
+ def unsupportedSetOperation(op: Int): InvalidPlanInput =
+ InvalidPlanInput(s"Unsupported set operation $op")
+
+ def joinTypeNotSupported(t: proto.Join.JoinType): InvalidPlanInput =
+ InvalidPlanInput(s"Join type $t is not supported")
+
+ def aggregateNeedsPlanInput(): InvalidPlanInput =
+ InvalidPlanInput("Aggregate needs a plan input")
+
+ def aggregateWithPivotRequiresPivot(): InvalidPlanInput =
+ InvalidPlanInput("Aggregate with GROUP_TYPE_PIVOT requires a Pivot")
+
+ def runnerCannotBeEmptyInExecuteExternalCommand(): InvalidPlanInput =
+ InvalidPlanInput("runner cannot be empty in executeExternalCommand")
+
+ def commandCannotBeEmptyInExecuteExternalCommand(): InvalidPlanInput =
+ InvalidPlanInput("command cannot be empty in executeExternalCommand")
+
+ def unexpectedForeachBatchFunction(): InvalidPlanInput =
+ InvalidPlanInput("Unexpected foreachBatch function")
+
+ def invalidWithRelationReference(): InvalidPlanInput =
+ InvalidPlanInput("Invalid WithRelation reference")
+
+ def assertionFailure(message: String): InvalidPlanInput =
+ InvalidPlanInput(message)
+
+ def unsupportedMergeActionType(actionType: proto.MergeAction.ActionType):
InvalidPlanInput =
+ InvalidPlanInput(s"Unsupported merge action type $actionType")
+
+ def unresolvedNamedLambdaVariableRequiresNamePart(): InvalidPlanInput =
+ InvalidPlanInput("UnresolvedNamedLambdaVariable requires at least one name
part!")
+
+ def usingColumnsOrJoinConditionSetInJoin(): InvalidPlanInput =
+ InvalidPlanInput("Using columns or join conditions cannot be set at the
same time in Join")
+
+ def invalidStateSchemaDataType(dataType: DataType): InvalidPlanInput =
+ InvalidPlanInput(s"Invalid state schema dataType $dataType for
flatMapGroupsWithState")
+
+ def sqlCommandExpectsSqlOrWithRelations(other: proto.Relation.RelTypeCase):
InvalidPlanInput =
+ InvalidPlanInput(s"SQL command expects either a SQL or a WithRelations,
but got $other")
+
+ def unknownGroupType(groupType: proto.Aggregate.GroupType): InvalidPlanInput
=
+ InvalidPlanInput(s"Unknown group type $groupType")
+
+ def dataSourceIdNotSupported(dataSourceId: Int): InvalidPlanInput =
+ InvalidPlanInput(s"Data source id $dataSourceId is not supported")
+
+ def unknownSubqueryType(subqueryType:
proto.SubqueryExpression.SubqueryType): InvalidPlanInput =
+ InvalidPlanInput(s"Unknown subquery type $subqueryType")
+
+ def reduceShouldCarryScalarScalaUdf(got: mutable.Buffer[proto.Expression]):
InvalidPlanInput =
+ InvalidPlanInput(s"reduce should carry a scalar scala udf, but got $got")
+
+ def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput =
+ InvalidPlanInput("UnionByName `allowMissingCol` can be true only if
`byName` is true.")
+
+ def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
+ InvalidCommandInput("BucketBy must specify a bucket count > 0, received
$numBuckets instead.")
+
+ def invalidPythonUdtfReturnType(actualType: String): InvalidPlanInput =
+ InvalidPlanInput(
+ s"Invalid Python user-defined table function return type. " +
+ s"Expect a struct type, but got $actualType.")
+
+ def invalidUserDefinedOutputSchemaTypeForTransformWithState(
+ actualType: String): InvalidPlanInput =
+ InvalidPlanInput(
+ s"Invalid user-defined output schema type for
TransformWithStateInPandas. " +
+ s"Expect a struct type, but got $actualType.")
+
+ def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]):
InvalidPlanInput =
+ InvalidPlanInput(s"Unsupported UserDefinedFunction implementation:
${clazz}")
+}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 687424459496..d821046d0831 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -62,7 +62,7 @@ import
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.classic.{Catalog, Dataset, MergeIntoWriter,
RelationalGroupedDataset, SparkSession, TypedAggUtils, UserDefinedFunctionUtils}
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
-import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
ForeachWriterPacket, InvalidCommandInput, InvalidPlanInput,
LiteralValueProtoConverter, StorageLevelProtoConverter,
StreamingListenerPacket, UdfPacket}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
ForeachWriterPacket, LiteralValueProtoConverter, StorageLevelProtoConverter,
StreamingListenerPacket, UdfPacket}
import
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
import org.apache.spark.sql.connect.ml.MLHandler
import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
@@ -233,7 +233,7 @@ class SparkConnectPlanner(
// Handle plugins for Spark Connect Relation types.
case proto.Relation.RelTypeCase.EXTENSION =>
transformRelationPlugin(rel.getExtension)
- case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+ case _ => throw InvalidInputErrors.unknownRelationNotSupported(rel)
}
if (rel.hasCommon && rel.getCommon.hasPlanId) {
plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId)
@@ -250,7 +250,7 @@ class SparkConnectPlanner(
.map(p => p.transform(extension.toByteArray, this))
// Find the first non-empty transformation or throw.
.find(_.isPresent)
- .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+ .getOrElse(throw InvalidInputErrors.noHandlerFoundForExtension())
.get()
}
@@ -297,7 +297,7 @@ class SparkConnectPlanner(
transformSetCurrentCatalog(catalog.getSetCurrentCatalog)
case proto.Catalog.CatTypeCase.LIST_CATALOGS =>
transformListCatalogs(catalog.getListCatalogs)
- case other => throw InvalidPlanInput(s"$other not supported.")
+ case other => throw InvalidInputErrors.catalogTypeNotSupported(other)
}
}
@@ -344,7 +344,7 @@ class SparkConnectPlanner(
private def transformSqlWithRefs(query: proto.WithRelations): LogicalPlan = {
if (!isValidSQLWithRefs(query)) {
- throw InvalidPlanInput(s"$query is not a valid relation for SQL with
references")
+ throw InvalidInputErrors.invalidSQLWithReferences(query)
}
executeSQLWithRefs(query).logicalPlan
}
@@ -413,12 +413,10 @@ class SparkConnectPlanner(
private def transformNAFill(rel: proto.NAFill): LogicalPlan = {
if (rel.getValuesCount == 0) {
- throw InvalidPlanInput(s"values must contains at least 1 item!")
+ throw InvalidInputErrors.naFillValuesEmpty()
}
if (rel.getValuesCount > 1 && rel.getValuesCount != rel.getColsCount) {
- throw InvalidPlanInput(
- s"When values contains more than 1 items, " +
- s"values and cols should have the same length!")
+ throw InvalidInputErrors.naFillValuesLengthMismatch()
}
val dataset = Dataset.ofRows(session, transformRelation(rel.getInput))
@@ -610,12 +608,10 @@ class SparkConnectPlanner(
isBarrier,
profile)
case _ =>
- throw InvalidPlanInput(
- s"Function with EvalType: ${pythonUdf.evalType} is not
supported")
+ throw
InvalidInputErrors.functionEvalTypeNotSupported(pythonUdf.evalType)
}
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(commonUdf.getFunctionCase.getNumber)
}
}
@@ -668,13 +664,11 @@ class SparkConnectPlanner(
transformTransformWithStateInPySpark(pythonUdf, group, rel,
usePandas = false)
case _ =>
- throw InvalidPlanInput(
- s"Function with EvalType: ${pythonUdf.evalType} is not
supported")
+ throw
InvalidInputErrors.functionEvalTypeNotSupported(pythonUdf.evalType)
}
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(commonUdf.getFunctionCase.getNumber)
}
}
@@ -779,8 +773,7 @@ class SparkConnectPlanner(
val stateSchema =
DataTypeProtoConverter.toCatalystType(rel.getStateSchema) match {
case s: StructType => s
case other =>
- throw InvalidPlanInput(
- s"Invalid state schema dataType $other for flatMapGroupsWithState")
+ throw InvalidInputErrors.invalidStateSchemaDataType(other)
}
val stateEncoder = TypedScalaUdf.encoderFor(
// the state agnostic encoder is the second element in the input
encoders.
@@ -875,13 +868,11 @@ class SparkConnectPlanner(
input.flatMapCoGroupsInArrow(other, Column(pythonUdf)).logicalPlan
case _ =>
- throw InvalidPlanInput(
- s"Function with EvalType: ${pythonUdf.evalType} is not
supported")
+ throw
InvalidInputErrors.functionEvalTypeNotSupported(pythonUdf.evalType)
}
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(commonUdf.getFunctionCase.getNumber)
}
}
@@ -958,8 +949,7 @@ class SparkConnectPlanner(
} else if (groupingExprs.size() > 1) {
createFromRelationalDataset(logicalPlan, groupingExprs, sortOrder)
} else {
- throw InvalidPlanInput(
- "The grouping expression cannot be absent for
KeyValueGroupedDataset")
+ throw
InvalidInputErrors.groupingExpressionAbsentForKeyValueGroupedDataset()
}
}
@@ -1035,7 +1025,7 @@ class SparkConnectPlanner(
&& expr.getCommonInlineUserDefinedFunction.hasScalarScalaUdf) {
apply(expr.getCommonInlineUserDefinedFunction, inputAttrs)
} else {
- throw InvalidPlanInput(s"Expecting a Scala UDF, but get
${expr.getExprTypeCase}")
+ throw InvalidInputErrors.expectingScalaUdfButGot(expr.getExprTypeCase)
}
}
@@ -1065,8 +1055,7 @@ class SparkConnectPlanner(
.map(attrs =>
ExpressionEncoder(RowEncoder.encoderFor(StructType(attrs.map(a =>
StructField(a.name, a.dataType, a.nullable))))))
- .getOrElse(
- throw InvalidPlanInput(s"Row is not a supported $errorType type
for this UDF."))
+ .getOrElse(throw InvalidInputErrors.rowNotSupportedForUdf(errorType))
} else {
// Nested unbound row encoders
val unboundTransformed = encoder match {
@@ -1116,9 +1105,8 @@ class SparkConnectPlanner(
transformDataType(twsInfo.getOutputSchema) match {
case s: StructType => s
case dt =>
- throw InvalidPlanInput(
- "Invalid user-defined output schema type for
TransformWithStateInPandas. " +
- s"Expect a struct type, but got ${dt.typeName}.")
+ throw
InvalidInputErrors.invalidUserDefinedOutputSchemaTypeForTransformWithState(
+ dt.typeName)
}
}
@@ -1194,8 +1182,7 @@ class SparkConnectPlanner(
fun.getArgumentsList.asScala.map(transformExpression).toSeq,
session.sessionState.sqlParser)
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${fun.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
}
}
@@ -1251,8 +1238,7 @@ class SparkConnectPlanner(
val (colNames, exprs, metadata) =
rel.getAliasesList.asScala.toSeq.map { alias =>
if (alias.getNameCount != 1) {
- throw InvalidPlanInput(s"""WithColumns require column name only
contains one name part,
- |but got ${alias.getNameList.toString}""".stripMargin)
+ throw
InvalidInputErrors.withColumnsRequireSingleNamePart(alias.getNameList.toString)
}
val metadata = if (alias.hasMetadata && alias.getMetadata.nonEmpty) {
@@ -1298,9 +1284,7 @@ class SparkConnectPlanner(
}
}
.getOrElse {
- throw InvalidPlanInput(
- s"Not found any cached local relation with the hash: ${blockId.hash}
in " +
- s"the session with sessionUUID ${blockId.sessionUUID}.")
+ throw InvalidInputErrors.notFoundCachedLocalRelation(blockId.hash,
blockId.sessionUUID)
}
}
@@ -1387,14 +1371,13 @@ class SparkConnectPlanner(
private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = {
if (!rel.hasInput) {
- throw InvalidPlanInput("Deduplicate needs a plan input")
+ throw InvalidInputErrors.deduplicateNeedsInput()
}
if (rel.getAllColumnsAsKeys && rel.getColumnNamesCount > 0) {
- throw InvalidPlanInput("Cannot deduplicate on both all columns and a
subset of columns")
+ throw InvalidInputErrors.deduplicateAllColumnsAndSubset()
}
if (!rel.getAllColumnsAsKeys && rel.getColumnNamesCount == 0) {
- throw InvalidPlanInput(
- "Deduplicate requires to either deduplicate on all columns or a subset
of columns")
+ throw InvalidInputErrors.deduplicateRequiresColumnsOrAll()
}
val queryExecution = new QueryExecution(session,
transformRelation(rel.getInput))
val resolver = session.sessionState.analyzer.resolver
@@ -1409,7 +1392,7 @@ class SparkConnectPlanner(
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
- throw InvalidPlanInput(s"Invalid deduplicate column ${colName}")
+ throw InvalidInputErrors.invalidDeduplicateColumn(colName)
}
cols
}
@@ -1463,7 +1446,7 @@ class SparkConnectPlanner(
Iterator(rel.getData.toByteArray),
TaskContext.get())
if (structType == null) {
- throw InvalidPlanInput(s"Input data for LocalRelation does not produce
a schema.")
+ throw InvalidInputErrors.inputDataForLocalRelationNoSchema()
}
val attributes = DataTypeUtils.toAttributes(structType)
val proj = UnsafeProjection.create(attributes, attributes)
@@ -1506,8 +1489,7 @@ class SparkConnectPlanner(
}
} else {
if (schema == null) {
- throw InvalidPlanInput(
- s"Schema for LocalRelation is required when the input data is not
provided.")
+ throw InvalidInputErrors.schemaRequiredForLocalRelation()
}
LocalRelation(schema)
}
@@ -1520,7 +1502,7 @@ class SparkConnectPlanner(
StructType.fromDDL,
fallbackParser = DataType.fromJson) match {
case s: StructType => s
- case other => throw InvalidPlanInput(s"Invalid schema $other")
+ case other => throw InvalidInputErrors.invalidSchema(other)
}
}
@@ -1538,7 +1520,7 @@ class SparkConnectPlanner(
if (rel.getDataSource.getFormat == "jdbc" &&
rel.getDataSource.getPredicatesCount > 0) {
if
(!rel.getDataSource.getOptionsMap.containsKey(JDBCOptions.JDBC_URL) ||
!rel.getDataSource.getOptionsMap.containsKey(JDBCOptions.JDBC_TABLE_NAME)) {
- throw InvalidPlanInput(s"Invalid jdbc params, please specify jdbc
url and table.")
+ throw InvalidInputErrors.invalidJdbcParams()
}
val url = rel.getDataSource.getOptionsMap.get(JDBCOptions.JDBC_URL)
@@ -1564,8 +1546,8 @@ class SparkConnectPlanner(
reader.load(rel.getDataSource.getPathsList.asScala.toSeq:
_*).queryExecution.analyzed
}
} else {
- throw InvalidPlanInput(
- s"Predicates are not supported for ${rel.getDataSource.getFormat}
data sources.")
+ throw InvalidInputErrors.predicatesNotSupportedForDataSource(
+ rel.getDataSource.getFormat)
}
case proto.Read.ReadTypeCase.DATA_SOURCE if rel.getIsStreaming =>
@@ -1582,12 +1564,12 @@ class SparkConnectPlanner(
case 0 => reader.load()
case 1 => reader.load(streamSource.getPaths(0))
case _ =>
- throw InvalidPlanInput(s"Multiple paths are not supported for
streaming source")
+ throw
InvalidInputErrors.multiplePathsNotSupportedForStreamingSource()
}
streamDF.queryExecution.analyzed
- case _ => throw InvalidPlanInput(s"Does not support
${rel.getReadTypeCase.name()}")
+ case _ => throw
InvalidInputErrors.doesNotSupport(rel.getReadTypeCase.name())
}
}
@@ -1598,7 +1580,7 @@ class SparkConnectPlanner(
if (rel.hasSchema) {
DataTypeProtoConverter.toCatalystType(rel.getSchema) match {
case s: StructType => reader.schema(s)
- case other => throw InvalidPlanInput(s"Invalid schema dataType
$other")
+ case other => throw InvalidInputErrors.invalidSchemaDataType(other)
}
}
localMap.foreach { case (key, value) => reader.option(key, value) }
@@ -1611,7 +1593,7 @@ class SparkConnectPlanner(
dataFrameReader.csv(ds).queryExecution.analyzed
case ParseFormat.PARSE_FORMAT_JSON =>
dataFrameReader.json(ds).queryExecution.analyzed
- case _ => throw InvalidPlanInput("Does not support " +
rel.getFormat.name())
+ case _ => throw InvalidInputErrors.doesNotSupport(rel.getFormat.name())
}
}
@@ -1793,8 +1775,7 @@ class SparkConnectPlanner(
case proto.Expression.ExprTypeCase.SUBQUERY_EXPRESSION =>
transformSubqueryExpression(exp.getSubqueryExpression)
case _ =>
- throw InvalidPlanInput(
- s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.expressionIdNotSupported(exp.getExprTypeCase.getNumber)
}
}
@@ -1823,7 +1804,7 @@ class SparkConnectPlanner(
.map(p => p.transform(extension.toByteArray, this))
// Find the first non-empty transformation or throw.
.find(_.isPresent)
- .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+ .getOrElse(throw InvalidInputErrors.noHandlerFoundForExtension())
.get
}
@@ -1901,8 +1882,7 @@ class SparkConnectPlanner(
case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF
=>
transformScalaUDF(fun)
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${fun.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
}
}
@@ -1974,8 +1954,7 @@ class SparkConnectPlanner(
case uda: UserDefinedAggregator[_, _, _] =>
ScalaAggregator(uda, children).toAggregateExpression(fun.getIsDistinct)
case other =>
- throw InvalidPlanInput(
- s"Unsupported UserDefinedFunction implementation: ${other.getClass}")
+ throw
InvalidInputErrors.unsupportedUserDefinedFunctionImplementation(other.getClass)
}
}
@@ -2066,9 +2045,7 @@ class SparkConnectPlanner(
*/
private def transformLambdaFunction(lambda:
proto.Expression.LambdaFunction): LambdaFunction = {
if (lambda.getArgumentsCount == 0 || lambda.getArgumentsCount > 3) {
- throw InvalidPlanInput(
- "LambdaFunction requires 1 ~ 3 arguments, " +
- s"but got ${lambda.getArgumentsCount} ones!")
+ throw
InvalidInputErrors.lambdaFunctionArgumentCountInvalid(lambda.getArgumentsCount)
}
LambdaFunction(
@@ -2080,7 +2057,7 @@ class SparkConnectPlanner(
private def transformUnresolvedNamedLambdaVariable(
variable: proto.Expression.UnresolvedNamedLambdaVariable):
UnresolvedNamedLambdaVariable = {
if (variable.getNamePartsCount == 0) {
- throw InvalidPlanInput("UnresolvedNamedLambdaVariable requires at least
one name part!")
+ throw InvalidInputErrors.unresolvedNamedLambdaVariableRequiresNamePart()
}
UnresolvedNamedLambdaVariable(variable.getNamePartsList.asScala.toSeq)
@@ -2096,8 +2073,7 @@ class SparkConnectPlanner(
Alias(transformExpression(alias.getExpr),
alias.getName(0))(explicitMetadata = metadata)
} else {
if (alias.hasMetadata) {
- throw InvalidPlanInput(
- "Alias expressions with more than 1 identifier must not use optional
metadata.")
+ throw InvalidInputErrors.aliasWithMultipleIdentifiersAndMetadata()
}
MultiAlias(transformExpression(alias.getExpr),
alias.getNameList.asScala.toSeq)
}
@@ -2117,8 +2093,7 @@ class SparkConnectPlanner(
// functions.col("s.*")
val target = star.getUnparsedTarget
if (!target.endsWith(".*")) {
- throw InvalidPlanInput(
- s"UnresolvedStar requires a unparsed target ending with '.*', but
got $target.")
+ throw InvalidInputErrors.unresolvedStarTargetInvalid(target)
}
val parts = UnresolvedAttribute.parseAttributeName(target.dropRight(2))
UnresolvedStar(Some(parts))
@@ -2128,7 +2103,7 @@ class SparkConnectPlanner(
UnresolvedDataFrameStar(star.getPlanId)
case _ =>
- throw InvalidPlanInput("UnresolvedStar with both target and plan id is
not supported.")
+ throw InvalidInputErrors.unresolvedStarWithBothTargetAndPlanId()
}
}
@@ -2192,7 +2167,7 @@ class SparkConnectPlanner(
private def transformWindowExpression(window: proto.Expression.Window) = {
if (!window.hasWindowFunction) {
- throw InvalidPlanInput(s"WindowFunction is required in WindowExpression")
+ throw InvalidInputErrors.windowFunctionRequired()
}
val frameSpec = if (window.hasFrameSpec) {
@@ -2203,11 +2178,11 @@ class SparkConnectPlanner(
case proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_RANGE =>
RangeFrame
- case other => throw InvalidPlanInput(s"Unknown FrameType $other")
+ case other => throw InvalidInputErrors.unknownFrameType(other)
}
if (!protoFrameSpec.hasLower) {
- throw InvalidPlanInput(s"LowerBound is required in WindowFrame")
+ throw InvalidInputErrors.lowerBoundRequiredInWindowFrame()
}
val lower = protoFrameSpec.getLower.getBoundaryCase match {
case
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.CURRENT_ROW =>
@@ -2219,11 +2194,11 @@ class SparkConnectPlanner(
case
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.VALUE =>
transformExpression(protoFrameSpec.getLower.getValue)
- case other => throw InvalidPlanInput(s"Unknown FrameBoundary $other")
+ case other => throw InvalidInputErrors.unknownFrameBoundary(other)
}
if (!protoFrameSpec.hasUpper) {
- throw InvalidPlanInput(s"UpperBound is required in WindowFrame")
+ throw InvalidInputErrors.upperBoundRequiredInWindowFrame()
}
val upper = protoFrameSpec.getUpper.getBoundaryCase match {
case
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.CURRENT_ROW =>
@@ -2235,7 +2210,7 @@ class SparkConnectPlanner(
case
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.VALUE =>
transformExpression(protoFrameSpec.getUpper.getValue)
- case other => throw InvalidPlanInput(s"Unknown FrameBoundary $other")
+ case other => throw InvalidInputErrors.unknownFrameBoundary(other)
}
SpecifiedWindowFrame(frameType = frameType, lower = lower, upper = upper)
@@ -2256,7 +2231,7 @@ class SparkConnectPlanner(
private def transformSetOperation(u: proto.SetOperation): LogicalPlan = {
if (!u.hasLeftInput || !u.hasRightInput) {
- throw InvalidPlanInput("Set operation must have 2 inputs")
+ throw InvalidInputErrors.setOperationMustHaveTwoInputs()
}
val leftPlan = transformRelation(u.getLeftInput)
val rightPlan = transformRelation(u.getRightInput)
@@ -2265,18 +2240,17 @@ class SparkConnectPlanner(
u.getSetOpType match {
case proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT =>
if (u.getByName) {
- throw InvalidPlanInput("Except does not support union_by_name")
+ throw InvalidInputErrors.exceptDoesNotSupportUnionByName()
}
Except(leftPlan, rightPlan, isAll)
case proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT =>
if (u.getByName) {
- throw InvalidPlanInput("Intersect does not support union_by_name")
+ throw InvalidInputErrors.intersectDoesNotSupportUnionByName()
}
Intersect(leftPlan, rightPlan, isAll)
case proto.SetOperation.SetOpType.SET_OP_TYPE_UNION =>
if (!u.getByName && u.getAllowMissingColumns) {
- throw InvalidPlanInput(
- "UnionByName `allowMissingCol` can be true only if `byName` is
true.")
+ throw InvalidInputErrors.unionByNameAllowMissingColRequiresByName()
}
val union = Union(Seq(leftPlan, rightPlan), u.getByName,
u.getAllowMissingColumns)
if (isAll) {
@@ -2286,7 +2260,7 @@ class SparkConnectPlanner(
}
case _ =>
- throw InvalidPlanInput(s"Unsupported set operation
${u.getSetOpTypeValue}")
+ throw InvalidInputErrors.unsupportedSetOperation(u.getSetOpTypeValue)
}
}
@@ -2313,8 +2287,7 @@ class SparkConnectPlanner(
private def transformJoin(rel: proto.Join): LogicalPlan = {
assertPlan(rel.hasLeft && rel.hasRight, "Both join sides must be present")
if (rel.hasJoinCondition && rel.getUsingColumnsCount > 0) {
- throw InvalidPlanInput(
- s"Using columns or join conditions cannot be set at the same time in
Join")
+ throw InvalidInputErrors.usingColumnsOrJoinConditionSetInJoin()
}
val joinCondition =
if (rel.hasJoinCondition)
Some(transformExpression(rel.getJoinCondition)) else None
@@ -2342,7 +2315,7 @@ class SparkConnectPlanner(
case proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER => RightOuter
case proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI => LeftSemi
case proto.Join.JoinType.JOIN_TYPE_CROSS => Cross
- case _ => throw InvalidPlanInput(s"Join type ${t} is not supported")
+ case _ => throw InvalidInputErrors.joinTypeNotSupported(t)
}
}
@@ -2472,7 +2445,7 @@ class SparkConnectPlanner(
private def transformRelationalGroupedAggregate(rel: proto.Aggregate):
LogicalPlan = {
if (!rel.hasInput) {
- throw InvalidPlanInput("Aggregate needs a plan input")
+ throw InvalidInputErrors.aggregateNeedsPlanInput()
}
val input = transformRelation(rel.getInput)
@@ -2511,7 +2484,7 @@ class SparkConnectPlanner(
case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT =>
if (!rel.hasPivot) {
- throw InvalidPlanInput("Aggregate with GROUP_TYPE_PIVOT requires a
Pivot")
+ throw InvalidInputErrors.aggregateWithPivotRequiresPivot()
}
val pivotExpr = transformExpression(rel.getPivot.getCol)
@@ -2543,7 +2516,7 @@ class SparkConnectPlanner(
aggregateExpressions = aliasedAgg,
child = logicalPlan)
- case other => throw InvalidPlanInput(s"Unknown Group Type $other")
+ case other => throw InvalidInputErrors.unknownGroupType(other)
}
}
@@ -2573,7 +2546,7 @@ class SparkConnectPlanner(
e.getCommonInlineUserDefinedFunction.hasScalarScalaUdf =>
unpackUdf(e.getCommonInlineUserDefinedFunction)
case other =>
- throw InvalidPlanInput(s"reduce should carry a scalar scala udf, but
got $other")
+ throw InvalidInputErrors.reduceShouldCarryScalarScalaUdf(other)
}
val encoder = udf.outputEncoder
val reduce = ReduceAggregator(udf.function)(encoder).toColumn.expr
@@ -2646,7 +2619,7 @@ class SparkConnectPlanner(
assertPlan(assignments.isEmpty, "UpdateStar action should not have
assignment.")
UpdateStarAction(condition)
case _ =>
- throw InvalidPlanInput(s"Unsupported merge action type
${action.getActionType}.")
+ throw
InvalidInputErrors.unsupportedMergeActionType(action.getActionType)
}
}
@@ -2731,10 +2704,10 @@ class SparkConnectPlanner(
command: proto.ExecuteExternalCommand,
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
if (command.getRunner.isEmpty) {
- throw InvalidPlanInput("runner cannot be empty in
executeExternalCommand")
+ throw InvalidInputErrors.runnerCannotBeEmptyInExecuteExternalCommand()
}
if (command.getCommand.isEmpty) {
- throw InvalidPlanInput("command cannot be empty in
executeExternalCommand")
+ throw InvalidInputErrors.commandCannotBeEmptyInExecuteExternalCommand()
}
val executor = ExternalCommandExecutor(
session,
@@ -2791,8 +2764,7 @@ class SparkConnectPlanner(
case proto.Relation.RelTypeCase.WITH_RELATIONS =>
executeSQLWithRefs(relation.getWithRelations, tracker)
case other =>
- throw InvalidPlanInput(
- s"SQL command expects either a SQL or a WithRelations, but got
$other")
+ throw InvalidInputErrors.sqlCommandExpectsSqlOrWithRelations(other)
}
// Check if command or SQL Script has been executed.
@@ -2895,7 +2867,7 @@ class SparkConnectPlanner(
query: proto.WithRelations,
tracker: QueryPlanningTracker = new QueryPlanningTracker) = {
if (!isValidSQLWithRefs(query)) {
- throw InvalidPlanInput(s"$query is not a valid relation for SQL with
references")
+ throw InvalidInputErrors.invalidSQLWithReferences(query)
}
// Eagerly execute commands of the provided SQL string, with given
references.
@@ -2957,8 +2929,7 @@ class SparkConnectPlanner(
case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF
=>
handleRegisterScalaUDF(fun)
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${fun.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
}
executeHolder.eventsManager.postFinished()
}
@@ -2970,8 +2941,7 @@ class SparkConnectPlanner(
val function = createPythonUserDefinedTableFunction(fun)
session.udtf.registerPython(fun.getFunctionName, function)
case _ =>
- throw InvalidPlanInput(
- s"Function with ID: ${fun.getFunctionCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
}
executeHolder.eventsManager.postFinished()
}
@@ -2984,8 +2954,7 @@ class SparkConnectPlanner(
val dataSource =
UserDefinedPythonDataSource(transformPythonDataSource(ds))
session.dataSource.registerPython(fun.getName, dataSource)
case _ =>
- throw InvalidPlanInput(
- s"Data source with ID: ${fun.getDataSourceCase.getNumber} is not
supported")
+ throw
InvalidInputErrors.dataSourceIdNotSupported(fun.getDataSourceCase.getNumber)
}
executeHolder.eventsManager.postFinished()
}
@@ -2997,9 +2966,7 @@ class SparkConnectPlanner(
transformDataType(udtf.getReturnType) match {
case s: StructType => Some(s)
case dt =>
- throw InvalidPlanInput(
- "Invalid Python user-defined table function return type. " +
- s"Expect a struct type, but got ${dt.typeName}.")
+ throw InvalidInputErrors.invalidPythonUdtfReturnType(dt.typeName)
}
} else {
None
@@ -3045,7 +3012,7 @@ class SparkConnectPlanner(
.map(p => p.process(extension.toByteArray, this))
// Find the first non-empty transformation or throw.
.find(_ == true)
- .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+ .getOrElse(throw InvalidInputErrors.noHandlerFoundForExtension())
executeHolder.eventsManager.postFinished()
}
@@ -3111,8 +3078,7 @@ class SparkConnectPlanner(
val op = writeOperation.getBucketBy
val cols = op.getBucketColumnNamesList.asScala
if (op.getNumBuckets <= 0) {
- throw InvalidCommandInput(
- s"BucketBy must specify a bucket count > 0, received
${op.getNumBuckets} instead.")
+ throw InvalidInputErrors.invalidBucketCount(op.getNumBuckets)
}
w.bucketBy(op.getNumBuckets, cols.head, cols.tail.toSeq: _*)
}
@@ -3306,7 +3272,7 @@ class SparkConnectPlanner(
sessionHolder)
case StreamingForeachFunction.FunctionCase.FUNCTION_NOT_SET =>
- throw InvalidPlanInput("Unexpected foreachBatch function") //
Unreachable
+ throw InvalidInputErrors.unexpectedForeachBatchFunction() //
Unreachable
}
writer.foreachBatch(foreachBatchFn)
@@ -4094,7 +4060,7 @@ class SparkConnectPlanner(
transformExpression(value)
}.toSeq,
planId)
- case other => throw InvalidPlanInput(s"Unknown SubqueryType $other")
+ case other => throw InvalidInputErrors.unknownSubqueryType(other)
}
}
@@ -4111,7 +4077,7 @@ class SparkConnectPlanner(
val plan = transformRelation(ref)
planId -> plan
} else {
- throw InvalidPlanInput("Invalid WithRelation reference")
+ throw InvalidInputErrors.invalidWithRelationReference()
}
}.toMap
@@ -4147,6 +4113,6 @@ class SparkConnectPlanner(
}
private def assertPlan(assertion: Boolean, message: => String = ""): Unit = {
- if (!assertion) throw InvalidPlanInput(message)
+ if (!assertion) throw InvalidInputErrors.assertionFailure(message)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]