This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7de318e61204 [SPARK-52158][CONNECT][SQL] Add InvalidInputErrors object 
to centralize errors in SparkConnectPlanner
7de318e61204 is described below

commit 7de318e61204a0fa04d184f2112a933c41cf802c
Author: Yihong He <[email protected]>
AuthorDate: Mon May 19 09:27:04 2025 +0900

    [SPARK-52158][CONNECT][SQL] Add InvalidInputErrors object to centralize 
errors in SparkConnectPlanner
    
    ### What changes were proposed in this pull request?
    
    - Add InvalidInputErrors object to centralize errors in SparkConnectPlanner
    - Move InvalidPlanInput/InvalidCommandInput error messages into 
InvalidInputErrors
    
    ### Why are the changes needed?
    
    - Improve error reuse and reduce code duplication
    - Easier to support the error framework in the future
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor  0.49.6 (Universal)
    
    Closes #50915 from heyihong/SPARK-52158.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/connect/planner/InvalidInputErrors.scala   | 229 +++++++++++++++++++++
 .../sql/connect/planner/SparkConnectPlanner.scala  | 188 +++++++----------
 2 files changed, 306 insertions(+), 111 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
new file mode 100644
index 000000000000..38f762af2438
--- /dev/null
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import scala.collection.mutable
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.connect.common.{InvalidCommandInput, 
InvalidPlanInput}
+import org.apache.spark.sql.types.DataType
+
+object InvalidInputErrors {
+
+  def unknownRelationNotSupported(rel: proto.Relation): InvalidPlanInput =
+    InvalidPlanInput(s"${rel.getUnknown} not supported.")
+
+  def noHandlerFoundForExtension(): InvalidPlanInput =
+    InvalidPlanInput("No handler found for extension")
+
+  def catalogTypeNotSupported(catType: proto.Catalog.CatTypeCase): 
InvalidPlanInput =
+    InvalidPlanInput(s"$catType not supported.")
+
+  def invalidSQLWithReferences(query: proto.WithRelations): InvalidPlanInput =
+    InvalidPlanInput(s"$query is not a valid relation for SQL with references")
+
+  def naFillValuesEmpty(): InvalidPlanInput =
+    InvalidPlanInput("values must contains at least 1 item!")
+
+  def naFillValuesLengthMismatch(): InvalidPlanInput =
+    InvalidPlanInput(
+      "When values contains more than 1 items, values and cols should have the 
same length!")
+
+  def deduplicateNeedsInput(): InvalidPlanInput =
+    InvalidPlanInput("Deduplicate needs a plan input")
+
+  def deduplicateAllColumnsAndSubset(): InvalidPlanInput =
+    InvalidPlanInput("Cannot deduplicate on both all columns and a subset of 
columns")
+
+  def deduplicateRequiresColumnsOrAll(): InvalidPlanInput =
+    InvalidPlanInput(
+      "Deduplicate requires to either deduplicate on all columns or a subset 
of columns")
+
+  def invalidDeduplicateColumn(colName: String): InvalidPlanInput =
+    InvalidPlanInput(s"Invalid deduplicate column $colName")
+
+  def functionEvalTypeNotSupported(evalType: Int): InvalidPlanInput =
+    InvalidPlanInput(s"Function with EvalType: $evalType is not supported")
+
+  def functionIdNotSupported(functionId: Int): InvalidPlanInput =
+    InvalidPlanInput(s"Function with ID: $functionId is not supported")
+
+  def groupingExpressionAbsentForKeyValueGroupedDataset(): InvalidPlanInput =
+    InvalidPlanInput("The grouping expression cannot be absent for 
KeyValueGroupedDataset")
+
+  def expectingScalaUdfButGot(exprType: proto.Expression.ExprTypeCase): 
InvalidPlanInput =
+    InvalidPlanInput(s"Expecting a Scala UDF, but get $exprType")
+
+  def rowNotSupportedForUdf(errorType: String): InvalidPlanInput =
+    InvalidPlanInput(s"Row is not a supported $errorType type for this UDF.")
+
+  def invalidUserDefinedOutputSchemaType(actualType: String): InvalidPlanInput 
=
+    InvalidPlanInput(
+      s"Invalid user-defined output schema type for 
TransformWithStateInPandas. " +
+        s"Expect a struct type, but got $actualType.")
+
+  def notFoundCachedLocalRelation(hash: String, sessionUUID: String): 
InvalidPlanInput =
+    InvalidPlanInput(
+      s"Not found any cached local relation with the hash: " +
+        s"$hash in the session with sessionUUID $sessionUUID.")
+
+  def withColumnsRequireSingleNamePart(got: String): InvalidPlanInput =
+    InvalidPlanInput(s"WithColumns require column name only contains one name 
part, but got $got")
+
+  def inputDataForLocalRelationNoSchema(): InvalidPlanInput =
+    InvalidPlanInput("Input data for LocalRelation does not produce a schema.")
+
+  def schemaRequiredForLocalRelation(): InvalidPlanInput =
+    InvalidPlanInput("Schema for LocalRelation is required when the input data 
is not provided.")
+
+  def invalidSchema(schema: DataType): InvalidPlanInput =
+    InvalidPlanInput(s"Invalid schema $schema")
+
+  def invalidJdbcParams(): InvalidPlanInput =
+    InvalidPlanInput("Invalid jdbc params, please specify jdbc url and table.")
+
+  def predicatesNotSupportedForDataSource(format: String): InvalidPlanInput =
+    InvalidPlanInput(s"Predicates are not supported for $format data sources.")
+
+  def multiplePathsNotSupportedForStreamingSource(): InvalidPlanInput =
+    InvalidPlanInput("Multiple paths are not supported for streaming source")
+
+  def doesNotSupport(what: String): InvalidPlanInput =
+    InvalidPlanInput(s"Does not support $what")
+
+  def invalidSchemaDataType(dataType: DataType): InvalidPlanInput =
+    InvalidPlanInput(s"Invalid schema dataType $dataType")
+
+  def expressionIdNotSupported(exprId: Int): InvalidPlanInput =
+    InvalidPlanInput(s"Expression with ID: $exprId is not supported")
+
+  def lambdaFunctionArgumentCountInvalid(got: Int): InvalidPlanInput =
+    InvalidPlanInput(s"LambdaFunction requires 1 ~ 3 arguments, but got $got 
ones!")
+
+  def aliasWithMultipleIdentifiersAndMetadata(): InvalidPlanInput =
+    InvalidPlanInput(
+      "Alias expressions with more than 1 identifier must not use optional 
metadata.")
+
+  def unresolvedStarTargetInvalid(target: String): InvalidPlanInput =
+    InvalidPlanInput(
+      s"UnresolvedStar requires a unparsed target ending with '.*', but got 
$target.")
+
+  def unresolvedStarWithBothTargetAndPlanId(): InvalidPlanInput =
+    InvalidPlanInput("UnresolvedStar with both target and plan id is not 
supported.")
+
+  def windowFunctionRequired(): InvalidPlanInput =
+    InvalidPlanInput("WindowFunction is required in WindowExpression")
+
+  def unknownFrameType(
+      frameType: proto.Expression.Window.WindowFrame.FrameType): 
InvalidPlanInput =
+    InvalidPlanInput(s"Unknown FrameType $frameType")
+
+  def lowerBoundRequiredInWindowFrame(): InvalidPlanInput =
+    InvalidPlanInput("LowerBound is required in WindowFrame")
+
+  def unknownFrameBoundary(
+      boundary: proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase)
+      : InvalidPlanInput =
+    InvalidPlanInput(s"Unknown FrameBoundary $boundary")
+
+  def upperBoundRequiredInWindowFrame(): InvalidPlanInput =
+    InvalidPlanInput("UpperBound is required in WindowFrame")
+
+  def setOperationMustHaveTwoInputs(): InvalidPlanInput =
+    InvalidPlanInput("Set operation must have 2 inputs")
+
+  def exceptDoesNotSupportUnionByName(): InvalidPlanInput =
+    InvalidPlanInput("Except does not support union_by_name")
+
+  def intersectDoesNotSupportUnionByName(): InvalidPlanInput =
+    InvalidPlanInput("Intersect does not support union_by_name")
+
+  def unsupportedSetOperation(op: Int): InvalidPlanInput =
+    InvalidPlanInput(s"Unsupported set operation $op")
+
+  def joinTypeNotSupported(t: proto.Join.JoinType): InvalidPlanInput =
+    InvalidPlanInput(s"Join type $t is not supported")
+
+  def aggregateNeedsPlanInput(): InvalidPlanInput =
+    InvalidPlanInput("Aggregate needs a plan input")
+
+  def aggregateWithPivotRequiresPivot(): InvalidPlanInput =
+    InvalidPlanInput("Aggregate with GROUP_TYPE_PIVOT requires a Pivot")
+
+  def runnerCannotBeEmptyInExecuteExternalCommand(): InvalidPlanInput =
+    InvalidPlanInput("runner cannot be empty in executeExternalCommand")
+
+  def commandCannotBeEmptyInExecuteExternalCommand(): InvalidPlanInput =
+    InvalidPlanInput("command cannot be empty in executeExternalCommand")
+
+  def unexpectedForeachBatchFunction(): InvalidPlanInput =
+    InvalidPlanInput("Unexpected foreachBatch function")
+
+  def invalidWithRelationReference(): InvalidPlanInput =
+    InvalidPlanInput("Invalid WithRelation reference")
+
+  def assertionFailure(message: String): InvalidPlanInput =
+    InvalidPlanInput(message)
+
+  def unsupportedMergeActionType(actionType: proto.MergeAction.ActionType): 
InvalidPlanInput =
+    InvalidPlanInput(s"Unsupported merge action type $actionType")
+
+  def unresolvedNamedLambdaVariableRequiresNamePart(): InvalidPlanInput =
+    InvalidPlanInput("UnresolvedNamedLambdaVariable requires at least one name 
part!")
+
+  def usingColumnsOrJoinConditionSetInJoin(): InvalidPlanInput =
+    InvalidPlanInput("Using columns or join conditions cannot be set at the 
same time in Join")
+
+  def invalidStateSchemaDataType(dataType: DataType): InvalidPlanInput =
+    InvalidPlanInput(s"Invalid state schema dataType $dataType for 
flatMapGroupsWithState")
+
+  def sqlCommandExpectsSqlOrWithRelations(other: proto.Relation.RelTypeCase): 
InvalidPlanInput =
+    InvalidPlanInput(s"SQL command expects either a SQL or a WithRelations, 
but got $other")
+
+  def unknownGroupType(groupType: proto.Aggregate.GroupType): InvalidPlanInput 
=
+    InvalidPlanInput(s"Unknown group type $groupType")
+
+  def dataSourceIdNotSupported(dataSourceId: Int): InvalidPlanInput =
+    InvalidPlanInput(s"Data source id $dataSourceId is not supported")
+
+  def unknownSubqueryType(subqueryType: 
proto.SubqueryExpression.SubqueryType): InvalidPlanInput =
+    InvalidPlanInput(s"Unknown subquery type $subqueryType")
+
+  def reduceShouldCarryScalarScalaUdf(got: mutable.Buffer[proto.Expression]): 
InvalidPlanInput =
+    InvalidPlanInput(s"reduce should carry a scalar scala udf, but got $got")
+
+  def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput =
+    InvalidPlanInput("UnionByName `allowMissingCol` can be true only if 
`byName` is true.")
+
+  def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
+    InvalidCommandInput("BucketBy must specify a bucket count > 0, received 
$numBuckets instead.")
+
+  def invalidPythonUdtfReturnType(actualType: String): InvalidPlanInput =
+    InvalidPlanInput(
+      s"Invalid Python user-defined table function return type. " +
+        s"Expect a struct type, but got $actualType.")
+
+  def invalidUserDefinedOutputSchemaTypeForTransformWithState(
+      actualType: String): InvalidPlanInput =
+    InvalidPlanInput(
+      s"Invalid user-defined output schema type for 
TransformWithStateInPandas. " +
+        s"Expect a struct type, but got $actualType.")
+
+  def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): 
InvalidPlanInput =
+    InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: 
${clazz}")
+}
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 687424459496..d821046d0831 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -62,7 +62,7 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
 import org.apache.spark.sql.classic.{Catalog, Dataset, MergeIntoWriter, 
RelationalGroupedDataset, SparkSession, TypedAggUtils, UserDefinedFunctionUtils}
 import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
-import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
ForeachWriterPacket, InvalidCommandInput, InvalidPlanInput, 
LiteralValueProtoConverter, StorageLevelProtoConverter, 
StreamingListenerPacket, UdfPacket}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
ForeachWriterPacket, LiteralValueProtoConverter, StorageLevelProtoConverter, 
StreamingListenerPacket, UdfPacket}
 import 
org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
 import org.apache.spark.sql.connect.ml.MLHandler
 import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
@@ -233,7 +233,7 @@ class SparkConnectPlanner(
         // Handle plugins for Spark Connect Relation types.
         case proto.Relation.RelTypeCase.EXTENSION =>
           transformRelationPlugin(rel.getExtension)
-        case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
+        case _ => throw InvalidInputErrors.unknownRelationNotSupported(rel)
       }
       if (rel.hasCommon && rel.getCommon.hasPlanId) {
         plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId)
@@ -250,7 +250,7 @@ class SparkConnectPlanner(
       .map(p => p.transform(extension.toByteArray, this))
       // Find the first non-empty transformation or throw.
       .find(_.isPresent)
-      .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+      .getOrElse(throw InvalidInputErrors.noHandlerFoundForExtension())
       .get()
   }
 
@@ -297,7 +297,7 @@ class SparkConnectPlanner(
         transformSetCurrentCatalog(catalog.getSetCurrentCatalog)
       case proto.Catalog.CatTypeCase.LIST_CATALOGS =>
         transformListCatalogs(catalog.getListCatalogs)
-      case other => throw InvalidPlanInput(s"$other not supported.")
+      case other => throw InvalidInputErrors.catalogTypeNotSupported(other)
     }
   }
 
@@ -344,7 +344,7 @@ class SparkConnectPlanner(
 
   private def transformSqlWithRefs(query: proto.WithRelations): LogicalPlan = {
     if (!isValidSQLWithRefs(query)) {
-      throw InvalidPlanInput(s"$query is not a valid relation for SQL with 
references")
+      throw InvalidInputErrors.invalidSQLWithReferences(query)
     }
     executeSQLWithRefs(query).logicalPlan
   }
@@ -413,12 +413,10 @@ class SparkConnectPlanner(
 
   private def transformNAFill(rel: proto.NAFill): LogicalPlan = {
     if (rel.getValuesCount == 0) {
-      throw InvalidPlanInput(s"values must contains at least 1 item!")
+      throw InvalidInputErrors.naFillValuesEmpty()
     }
     if (rel.getValuesCount > 1 && rel.getValuesCount != rel.getColsCount) {
-      throw InvalidPlanInput(
-        s"When values contains more than 1 items, " +
-          s"values and cols should have the same length!")
+      throw InvalidInputErrors.naFillValuesLengthMismatch()
     }
 
     val dataset = Dataset.ofRows(session, transformRelation(rel.getInput))
@@ -610,12 +608,10 @@ class SparkConnectPlanner(
               isBarrier,
               profile)
           case _ =>
-            throw InvalidPlanInput(
-              s"Function with EvalType: ${pythonUdf.evalType} is not 
supported")
+            throw 
InvalidInputErrors.functionEvalTypeNotSupported(pythonUdf.evalType)
         }
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(commonUdf.getFunctionCase.getNumber)
     }
   }
 
@@ -668,13 +664,11 @@ class SparkConnectPlanner(
             transformTransformWithStateInPySpark(pythonUdf, group, rel, 
usePandas = false)
 
           case _ =>
-            throw InvalidPlanInput(
-              s"Function with EvalType: ${pythonUdf.evalType} is not 
supported")
+            throw 
InvalidInputErrors.functionEvalTypeNotSupported(pythonUdf.evalType)
         }
 
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(commonUdf.getFunctionCase.getNumber)
     }
   }
 
@@ -779,8 +773,7 @@ class SparkConnectPlanner(
       val stateSchema = 
DataTypeProtoConverter.toCatalystType(rel.getStateSchema) match {
         case s: StructType => s
         case other =>
-          throw InvalidPlanInput(
-            s"Invalid state schema dataType $other for flatMapGroupsWithState")
+          throw InvalidInputErrors.invalidStateSchemaDataType(other)
       }
       val stateEncoder = TypedScalaUdf.encoderFor(
         // the state agnostic encoder is the second element in the input 
encoders.
@@ -875,13 +868,11 @@ class SparkConnectPlanner(
             input.flatMapCoGroupsInArrow(other, Column(pythonUdf)).logicalPlan
 
           case _ =>
-            throw InvalidPlanInput(
-              s"Function with EvalType: ${pythonUdf.evalType} is not 
supported")
+            throw 
InvalidInputErrors.functionEvalTypeNotSupported(pythonUdf.evalType)
         }
 
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(commonUdf.getFunctionCase.getNumber)
     }
   }
 
@@ -958,8 +949,7 @@ class SparkConnectPlanner(
       } else if (groupingExprs.size() > 1) {
         createFromRelationalDataset(logicalPlan, groupingExprs, sortOrder)
       } else {
-        throw InvalidPlanInput(
-          "The grouping expression cannot be absent for 
KeyValueGroupedDataset")
+        throw 
InvalidInputErrors.groupingExpressionAbsentForKeyValueGroupedDataset()
       }
     }
 
@@ -1035,7 +1025,7 @@ class SparkConnectPlanner(
         && expr.getCommonInlineUserDefinedFunction.hasScalarScalaUdf) {
         apply(expr.getCommonInlineUserDefinedFunction, inputAttrs)
       } else {
-        throw InvalidPlanInput(s"Expecting a Scala UDF, but get 
${expr.getExprTypeCase}")
+        throw InvalidInputErrors.expectingScalaUdfButGot(expr.getExprTypeCase)
       }
     }
 
@@ -1065,8 +1055,7 @@ class SparkConnectPlanner(
           .map(attrs =>
             ExpressionEncoder(RowEncoder.encoderFor(StructType(attrs.map(a =>
               StructField(a.name, a.dataType, a.nullable))))))
-          .getOrElse(
-            throw InvalidPlanInput(s"Row is not a supported $errorType type 
for this UDF."))
+          .getOrElse(throw InvalidInputErrors.rowNotSupportedForUdf(errorType))
       } else {
         // Nested unbound row encoders
         val unboundTransformed = encoder match {
@@ -1116,9 +1105,8 @@ class SparkConnectPlanner(
       transformDataType(twsInfo.getOutputSchema) match {
         case s: StructType => s
         case dt =>
-          throw InvalidPlanInput(
-            "Invalid user-defined output schema type for 
TransformWithStateInPandas. " +
-              s"Expect a struct type, but got ${dt.typeName}.")
+          throw 
InvalidInputErrors.invalidUserDefinedOutputSchemaTypeForTransformWithState(
+            dt.typeName)
       }
     }
 
@@ -1194,8 +1182,7 @@ class SparkConnectPlanner(
           fun.getArgumentsList.asScala.map(transformExpression).toSeq,
           session.sessionState.sqlParser)
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${fun.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
     }
   }
 
@@ -1251,8 +1238,7 @@ class SparkConnectPlanner(
     val (colNames, exprs, metadata) =
       rel.getAliasesList.asScala.toSeq.map { alias =>
         if (alias.getNameCount != 1) {
-          throw InvalidPlanInput(s"""WithColumns require column name only 
contains one name part,
-             |but got ${alias.getNameList.toString}""".stripMargin)
+          throw 
InvalidInputErrors.withColumnsRequireSingleNamePart(alias.getNameList.toString)
         }
 
         val metadata = if (alias.hasMetadata && alias.getMetadata.nonEmpty) {
@@ -1298,9 +1284,7 @@ class SparkConnectPlanner(
         }
       }
       .getOrElse {
-        throw InvalidPlanInput(
-          s"Not found any cached local relation with the hash: ${blockId.hash} 
in " +
-            s"the session with sessionUUID ${blockId.sessionUUID}.")
+        throw InvalidInputErrors.notFoundCachedLocalRelation(blockId.hash, 
blockId.sessionUUID)
       }
   }
 
@@ -1387,14 +1371,13 @@ class SparkConnectPlanner(
 
   private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = {
     if (!rel.hasInput) {
-      throw InvalidPlanInput("Deduplicate needs a plan input")
+      throw InvalidInputErrors.deduplicateNeedsInput()
     }
     if (rel.getAllColumnsAsKeys && rel.getColumnNamesCount > 0) {
-      throw InvalidPlanInput("Cannot deduplicate on both all columns and a 
subset of columns")
+      throw InvalidInputErrors.deduplicateAllColumnsAndSubset()
     }
     if (!rel.getAllColumnsAsKeys && rel.getColumnNamesCount == 0) {
-      throw InvalidPlanInput(
-        "Deduplicate requires to either deduplicate on all columns or a subset 
of columns")
+      throw InvalidInputErrors.deduplicateRequiresColumnsOrAll()
     }
     val queryExecution = new QueryExecution(session, 
transformRelation(rel.getInput))
     val resolver = session.sessionState.analyzer.resolver
@@ -1409,7 +1392,7 @@ class SparkConnectPlanner(
         // so we call filter instead of find.
         val cols = allColumns.filter(col => resolver(col.name, colName))
         if (cols.isEmpty) {
-          throw InvalidPlanInput(s"Invalid deduplicate column ${colName}")
+          throw InvalidInputErrors.invalidDeduplicateColumn(colName)
         }
         cols
       }
@@ -1463,7 +1446,7 @@ class SparkConnectPlanner(
         Iterator(rel.getData.toByteArray),
         TaskContext.get())
       if (structType == null) {
-        throw InvalidPlanInput(s"Input data for LocalRelation does not produce 
a schema.")
+        throw InvalidInputErrors.inputDataForLocalRelationNoSchema()
       }
       val attributes = DataTypeUtils.toAttributes(structType)
       val proj = UnsafeProjection.create(attributes, attributes)
@@ -1506,8 +1489,7 @@ class SparkConnectPlanner(
       }
     } else {
       if (schema == null) {
-        throw InvalidPlanInput(
-          s"Schema for LocalRelation is required when the input data is not 
provided.")
+        throw InvalidInputErrors.schemaRequiredForLocalRelation()
       }
       LocalRelation(schema)
     }
@@ -1520,7 +1502,7 @@ class SparkConnectPlanner(
       StructType.fromDDL,
       fallbackParser = DataType.fromJson) match {
       case s: StructType => s
-      case other => throw InvalidPlanInput(s"Invalid schema $other")
+      case other => throw InvalidInputErrors.invalidSchema(other)
     }
   }
 
@@ -1538,7 +1520,7 @@ class SparkConnectPlanner(
         if (rel.getDataSource.getFormat == "jdbc" && 
rel.getDataSource.getPredicatesCount > 0) {
           if 
(!rel.getDataSource.getOptionsMap.containsKey(JDBCOptions.JDBC_URL) ||
             
!rel.getDataSource.getOptionsMap.containsKey(JDBCOptions.JDBC_TABLE_NAME)) {
-            throw InvalidPlanInput(s"Invalid jdbc params, please specify jdbc 
url and table.")
+            throw InvalidInputErrors.invalidJdbcParams()
           }
 
           val url = rel.getDataSource.getOptionsMap.get(JDBCOptions.JDBC_URL)
@@ -1564,8 +1546,8 @@ class SparkConnectPlanner(
             reader.load(rel.getDataSource.getPathsList.asScala.toSeq: 
_*).queryExecution.analyzed
           }
         } else {
-          throw InvalidPlanInput(
-            s"Predicates are not supported for ${rel.getDataSource.getFormat} 
data sources.")
+          throw InvalidInputErrors.predicatesNotSupportedForDataSource(
+            rel.getDataSource.getFormat)
         }
 
       case proto.Read.ReadTypeCase.DATA_SOURCE if rel.getIsStreaming =>
@@ -1582,12 +1564,12 @@ class SparkConnectPlanner(
           case 0 => reader.load()
           case 1 => reader.load(streamSource.getPaths(0))
           case _ =>
-            throw InvalidPlanInput(s"Multiple paths are not supported for 
streaming source")
+            throw 
InvalidInputErrors.multiplePathsNotSupportedForStreamingSource()
         }
 
         streamDF.queryExecution.analyzed
 
-      case _ => throw InvalidPlanInput(s"Does not support 
${rel.getReadTypeCase.name()}")
+      case _ => throw 
InvalidInputErrors.doesNotSupport(rel.getReadTypeCase.name())
     }
   }
 
@@ -1598,7 +1580,7 @@ class SparkConnectPlanner(
       if (rel.hasSchema) {
         DataTypeProtoConverter.toCatalystType(rel.getSchema) match {
           case s: StructType => reader.schema(s)
-          case other => throw InvalidPlanInput(s"Invalid schema dataType 
$other")
+          case other => throw InvalidInputErrors.invalidSchemaDataType(other)
         }
       }
       localMap.foreach { case (key, value) => reader.option(key, value) }
@@ -1611,7 +1593,7 @@ class SparkConnectPlanner(
         dataFrameReader.csv(ds).queryExecution.analyzed
       case ParseFormat.PARSE_FORMAT_JSON =>
         dataFrameReader.json(ds).queryExecution.analyzed
-      case _ => throw InvalidPlanInput("Does not support " + 
rel.getFormat.name())
+      case _ => throw InvalidInputErrors.doesNotSupport(rel.getFormat.name())
     }
   }
 
@@ -1793,8 +1775,7 @@ class SparkConnectPlanner(
       case proto.Expression.ExprTypeCase.SUBQUERY_EXPRESSION =>
         transformSubqueryExpression(exp.getSubqueryExpression)
       case _ =>
-        throw InvalidPlanInput(
-          s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.expressionIdNotSupported(exp.getExprTypeCase.getNumber)
     }
   }
 
@@ -1823,7 +1804,7 @@ class SparkConnectPlanner(
       .map(p => p.transform(extension.toByteArray, this))
       // Find the first non-empty transformation or throw.
       .find(_.isPresent)
-      .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+      .getOrElse(throw InvalidInputErrors.noHandlerFoundForExtension())
       .get
   }
 
@@ -1901,8 +1882,7 @@ class SparkConnectPlanner(
       case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF 
=>
         transformScalaUDF(fun)
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${fun.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
     }
   }
 
@@ -1974,8 +1954,7 @@ class SparkConnectPlanner(
       case uda: UserDefinedAggregator[_, _, _] =>
         ScalaAggregator(uda, children).toAggregateExpression(fun.getIsDistinct)
       case other =>
-        throw InvalidPlanInput(
-          s"Unsupported UserDefinedFunction implementation: ${other.getClass}")
+        throw 
InvalidInputErrors.unsupportedUserDefinedFunctionImplementation(other.getClass)
     }
   }
 
@@ -2066,9 +2045,7 @@ class SparkConnectPlanner(
    */
   private def transformLambdaFunction(lambda: 
proto.Expression.LambdaFunction): LambdaFunction = {
     if (lambda.getArgumentsCount == 0 || lambda.getArgumentsCount > 3) {
-      throw InvalidPlanInput(
-        "LambdaFunction requires 1 ~ 3 arguments, " +
-          s"but got ${lambda.getArgumentsCount} ones!")
+      throw 
InvalidInputErrors.lambdaFunctionArgumentCountInvalid(lambda.getArgumentsCount)
     }
 
     LambdaFunction(
@@ -2080,7 +2057,7 @@ class SparkConnectPlanner(
   private def transformUnresolvedNamedLambdaVariable(
       variable: proto.Expression.UnresolvedNamedLambdaVariable): 
UnresolvedNamedLambdaVariable = {
     if (variable.getNamePartsCount == 0) {
-      throw InvalidPlanInput("UnresolvedNamedLambdaVariable requires at least 
one name part!")
+      throw InvalidInputErrors.unresolvedNamedLambdaVariableRequiresNamePart()
     }
 
     UnresolvedNamedLambdaVariable(variable.getNamePartsList.asScala.toSeq)
@@ -2096,8 +2073,7 @@ class SparkConnectPlanner(
       Alias(transformExpression(alias.getExpr), 
alias.getName(0))(explicitMetadata = metadata)
     } else {
       if (alias.hasMetadata) {
-        throw InvalidPlanInput(
-          "Alias expressions with more than 1 identifier must not use optional 
metadata.")
+        throw InvalidInputErrors.aliasWithMultipleIdentifiersAndMetadata()
       }
       MultiAlias(transformExpression(alias.getExpr), 
alias.getNameList.asScala.toSeq)
     }
@@ -2117,8 +2093,7 @@ class SparkConnectPlanner(
         // functions.col("s.*")
         val target = star.getUnparsedTarget
         if (!target.endsWith(".*")) {
-          throw InvalidPlanInput(
-            s"UnresolvedStar requires a unparsed target ending with '.*', but 
got $target.")
+          throw InvalidInputErrors.unresolvedStarTargetInvalid(target)
         }
         val parts = UnresolvedAttribute.parseAttributeName(target.dropRight(2))
         UnresolvedStar(Some(parts))
@@ -2128,7 +2103,7 @@ class SparkConnectPlanner(
         UnresolvedDataFrameStar(star.getPlanId)
 
       case _ =>
-        throw InvalidPlanInput("UnresolvedStar with both target and plan id is 
not supported.")
+        throw InvalidInputErrors.unresolvedStarWithBothTargetAndPlanId()
     }
   }
 
@@ -2192,7 +2167,7 @@ class SparkConnectPlanner(
 
   private def transformWindowExpression(window: proto.Expression.Window) = {
     if (!window.hasWindowFunction) {
-      throw InvalidPlanInput(s"WindowFunction is required in WindowExpression")
+      throw InvalidInputErrors.windowFunctionRequired()
     }
 
     val frameSpec = if (window.hasFrameSpec) {
@@ -2203,11 +2178,11 @@ class SparkConnectPlanner(
 
         case proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_RANGE => 
RangeFrame
 
-        case other => throw InvalidPlanInput(s"Unknown FrameType $other")
+        case other => throw InvalidInputErrors.unknownFrameType(other)
       }
 
       if (!protoFrameSpec.hasLower) {
-        throw InvalidPlanInput(s"LowerBound is required in WindowFrame")
+        throw InvalidInputErrors.lowerBoundRequiredInWindowFrame()
       }
       val lower = protoFrameSpec.getLower.getBoundaryCase match {
         case 
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.CURRENT_ROW =>
@@ -2219,11 +2194,11 @@ class SparkConnectPlanner(
         case 
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.VALUE =>
           transformExpression(protoFrameSpec.getLower.getValue)
 
-        case other => throw InvalidPlanInput(s"Unknown FrameBoundary $other")
+        case other => throw InvalidInputErrors.unknownFrameBoundary(other)
       }
 
       if (!protoFrameSpec.hasUpper) {
-        throw InvalidPlanInput(s"UpperBound is required in WindowFrame")
+        throw InvalidInputErrors.upperBoundRequiredInWindowFrame()
       }
       val upper = protoFrameSpec.getUpper.getBoundaryCase match {
         case 
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.CURRENT_ROW =>
@@ -2235,7 +2210,7 @@ class SparkConnectPlanner(
         case 
proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase.VALUE =>
           transformExpression(protoFrameSpec.getUpper.getValue)
 
-        case other => throw InvalidPlanInput(s"Unknown FrameBoundary $other")
+        case other => throw InvalidInputErrors.unknownFrameBoundary(other)
       }
 
       SpecifiedWindowFrame(frameType = frameType, lower = lower, upper = upper)
@@ -2256,7 +2231,7 @@ class SparkConnectPlanner(
 
   private def transformSetOperation(u: proto.SetOperation): LogicalPlan = {
     if (!u.hasLeftInput || !u.hasRightInput) {
-      throw InvalidPlanInput("Set operation must have 2 inputs")
+      throw InvalidInputErrors.setOperationMustHaveTwoInputs()
     }
     val leftPlan = transformRelation(u.getLeftInput)
     val rightPlan = transformRelation(u.getRightInput)
@@ -2265,18 +2240,17 @@ class SparkConnectPlanner(
     u.getSetOpType match {
       case proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT =>
         if (u.getByName) {
-          throw InvalidPlanInput("Except does not support union_by_name")
+          throw InvalidInputErrors.exceptDoesNotSupportUnionByName()
         }
         Except(leftPlan, rightPlan, isAll)
       case proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT =>
         if (u.getByName) {
-          throw InvalidPlanInput("Intersect does not support union_by_name")
+          throw InvalidInputErrors.intersectDoesNotSupportUnionByName()
         }
         Intersect(leftPlan, rightPlan, isAll)
       case proto.SetOperation.SetOpType.SET_OP_TYPE_UNION =>
         if (!u.getByName && u.getAllowMissingColumns) {
-          throw InvalidPlanInput(
-            "UnionByName `allowMissingCol` can be true only if `byName` is 
true.")
+          throw InvalidInputErrors.unionByNameAllowMissingColRequiresByName()
         }
         val union = Union(Seq(leftPlan, rightPlan), u.getByName, 
u.getAllowMissingColumns)
         if (isAll) {
@@ -2286,7 +2260,7 @@ class SparkConnectPlanner(
         }
 
       case _ =>
-        throw InvalidPlanInput(s"Unsupported set operation 
${u.getSetOpTypeValue}")
+        throw InvalidInputErrors.unsupportedSetOperation(u.getSetOpTypeValue)
     }
   }
 
@@ -2313,8 +2287,7 @@ class SparkConnectPlanner(
   private def transformJoin(rel: proto.Join): LogicalPlan = {
     assertPlan(rel.hasLeft && rel.hasRight, "Both join sides must be present")
     if (rel.hasJoinCondition && rel.getUsingColumnsCount > 0) {
-      throw InvalidPlanInput(
-        s"Using columns or join conditions cannot be set at the same time in 
Join")
+      throw InvalidInputErrors.usingColumnsOrJoinConditionSetInJoin()
     }
     val joinCondition =
       if (rel.hasJoinCondition) 
Some(transformExpression(rel.getJoinCondition)) else None
@@ -2342,7 +2315,7 @@ class SparkConnectPlanner(
       case proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER => RightOuter
       case proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI => LeftSemi
       case proto.Join.JoinType.JOIN_TYPE_CROSS => Cross
-      case _ => throw InvalidPlanInput(s"Join type ${t} is not supported")
+      case _ => throw InvalidInputErrors.joinTypeNotSupported(t)
     }
   }
 
@@ -2472,7 +2445,7 @@ class SparkConnectPlanner(
 
   private def transformRelationalGroupedAggregate(rel: proto.Aggregate): 
LogicalPlan = {
     if (!rel.hasInput) {
-      throw InvalidPlanInput("Aggregate needs a plan input")
+      throw InvalidInputErrors.aggregateNeedsPlanInput()
     }
     val input = transformRelation(rel.getInput)
 
@@ -2511,7 +2484,7 @@ class SparkConnectPlanner(
 
       case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT =>
         if (!rel.hasPivot) {
-          throw InvalidPlanInput("Aggregate with GROUP_TYPE_PIVOT requires a 
Pivot")
+          throw InvalidInputErrors.aggregateWithPivotRequiresPivot()
         }
 
         val pivotExpr = transformExpression(rel.getPivot.getCol)
@@ -2543,7 +2516,7 @@ class SparkConnectPlanner(
           aggregateExpressions = aliasedAgg,
           child = logicalPlan)
 
-      case other => throw InvalidPlanInput(s"Unknown Group Type $other")
+      case other => throw InvalidInputErrors.unknownGroupType(other)
     }
   }
 
@@ -2573,7 +2546,7 @@ class SparkConnectPlanner(
             e.getCommonInlineUserDefinedFunction.hasScalarScalaUdf =>
         unpackUdf(e.getCommonInlineUserDefinedFunction)
       case other =>
-        throw InvalidPlanInput(s"reduce should carry a scalar scala udf, but 
got $other")
+        throw InvalidInputErrors.reduceShouldCarryScalarScalaUdf(other)
     }
     val encoder = udf.outputEncoder
     val reduce = ReduceAggregator(udf.function)(encoder).toColumn.expr
@@ -2646,7 +2619,7 @@ class SparkConnectPlanner(
         assertPlan(assignments.isEmpty, "UpdateStar action should not have 
assignment.")
         UpdateStarAction(condition)
       case _ =>
-        throw InvalidPlanInput(s"Unsupported merge action type 
${action.getActionType}.")
+        throw 
InvalidInputErrors.unsupportedMergeActionType(action.getActionType)
     }
   }
 
@@ -2731,10 +2704,10 @@ class SparkConnectPlanner(
       command: proto.ExecuteExternalCommand,
       responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
     if (command.getRunner.isEmpty) {
-      throw InvalidPlanInput("runner cannot be empty in 
executeExternalCommand")
+      throw InvalidInputErrors.runnerCannotBeEmptyInExecuteExternalCommand()
     }
     if (command.getCommand.isEmpty) {
-      throw InvalidPlanInput("command cannot be empty in 
executeExternalCommand")
+      throw InvalidInputErrors.commandCannotBeEmptyInExecuteExternalCommand()
     }
     val executor = ExternalCommandExecutor(
       session,
@@ -2791,8 +2764,7 @@ class SparkConnectPlanner(
       case proto.Relation.RelTypeCase.WITH_RELATIONS =>
         executeSQLWithRefs(relation.getWithRelations, tracker)
       case other =>
-        throw InvalidPlanInput(
-          s"SQL command expects either a SQL or a WithRelations, but got 
$other")
+        throw InvalidInputErrors.sqlCommandExpectsSqlOrWithRelations(other)
     }
 
     // Check if command or SQL Script has been executed.
@@ -2895,7 +2867,7 @@ class SparkConnectPlanner(
       query: proto.WithRelations,
       tracker: QueryPlanningTracker = new QueryPlanningTracker) = {
     if (!isValidSQLWithRefs(query)) {
-      throw InvalidPlanInput(s"$query is not a valid relation for SQL with 
references")
+      throw InvalidInputErrors.invalidSQLWithReferences(query)
     }
 
     // Eagerly execute commands of the provided SQL string, with given 
references.
@@ -2957,8 +2929,7 @@ class SparkConnectPlanner(
       case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF 
=>
         handleRegisterScalaUDF(fun)
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${fun.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
     }
     executeHolder.eventsManager.postFinished()
   }
@@ -2970,8 +2941,7 @@ class SparkConnectPlanner(
         val function = createPythonUserDefinedTableFunction(fun)
         session.udtf.registerPython(fun.getFunctionName, function)
       case _ =>
-        throw InvalidPlanInput(
-          s"Function with ID: ${fun.getFunctionCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.functionIdNotSupported(fun.getFunctionCase.getNumber)
     }
     executeHolder.eventsManager.postFinished()
   }
@@ -2984,8 +2954,7 @@ class SparkConnectPlanner(
         val dataSource = 
UserDefinedPythonDataSource(transformPythonDataSource(ds))
         session.dataSource.registerPython(fun.getName, dataSource)
       case _ =>
-        throw InvalidPlanInput(
-          s"Data source with ID: ${fun.getDataSourceCase.getNumber} is not 
supported")
+        throw 
InvalidInputErrors.dataSourceIdNotSupported(fun.getDataSourceCase.getNumber)
     }
     executeHolder.eventsManager.postFinished()
   }
@@ -2997,9 +2966,7 @@ class SparkConnectPlanner(
       transformDataType(udtf.getReturnType) match {
         case s: StructType => Some(s)
         case dt =>
-          throw InvalidPlanInput(
-            "Invalid Python user-defined table function return type. " +
-              s"Expect a struct type, but got ${dt.typeName}.")
+          throw InvalidInputErrors.invalidPythonUdtfReturnType(dt.typeName)
       }
     } else {
       None
@@ -3045,7 +3012,7 @@ class SparkConnectPlanner(
       .map(p => p.process(extension.toByteArray, this))
       // Find the first non-empty transformation or throw.
       .find(_ == true)
-      .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+      .getOrElse(throw InvalidInputErrors.noHandlerFoundForExtension())
     executeHolder.eventsManager.postFinished()
   }
 
@@ -3111,8 +3078,7 @@ class SparkConnectPlanner(
       val op = writeOperation.getBucketBy
       val cols = op.getBucketColumnNamesList.asScala
       if (op.getNumBuckets <= 0) {
-        throw InvalidCommandInput(
-          s"BucketBy must specify a bucket count > 0, received 
${op.getNumBuckets} instead.")
+        throw InvalidInputErrors.invalidBucketCount(op.getNumBuckets)
       }
       w.bucketBy(op.getNumBuckets, cols.head, cols.tail.toSeq: _*)
     }
@@ -3306,7 +3272,7 @@ class SparkConnectPlanner(
             sessionHolder)
 
         case StreamingForeachFunction.FunctionCase.FUNCTION_NOT_SET =>
-          throw InvalidPlanInput("Unexpected foreachBatch function") // 
Unreachable
+          throw InvalidInputErrors.unexpectedForeachBatchFunction() // 
Unreachable
       }
 
       writer.foreachBatch(foreachBatchFn)
@@ -4094,7 +4060,7 @@ class SparkConnectPlanner(
             transformExpression(value)
           }.toSeq,
           planId)
-      case other => throw InvalidPlanInput(s"Unknown SubqueryType $other")
+      case other => throw InvalidInputErrors.unknownSubqueryType(other)
     }
   }
 
@@ -4111,7 +4077,7 @@ class SparkConnectPlanner(
           val plan = transformRelation(ref)
           planId -> plan
         } else {
-          throw InvalidPlanInput("Invalid WithRelation reference")
+          throw InvalidInputErrors.invalidWithRelationReference()
         }
       }.toMap
 
@@ -4147,6 +4113,6 @@ class SparkConnectPlanner(
     }
 
   private def assertPlan(assertion: Boolean, message: => String = ""): Unit = {
-    if (!assertion) throw InvalidPlanInput(message)
+    if (!assertion) throw InvalidInputErrors.assertionFailure(message)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to