This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d61c1ea7f700 [SPARK-52335][CONNET][SQL] Unify the 'invalid bucket count' error for both Connect and Classic d61c1ea7f700 is described below commit d61c1ea7f700ccf4a0ca37e9caef0ee811881b1e Author: Yihong He <heyihong...@gmail.com> AuthorDate: Fri May 30 08:30:51 2025 +0900 [SPARK-52335][CONNET][SQL] Unify the 'invalid bucket count' error for both Connect and Classic ### What changes were proposed in this pull request? This PR unifies the error handling for invalid bucket count validation between Spark Connect and Classic Spark. The main changes are: 1. Updated the error message in `error-conditions.json` for `INVALID_BUCKET_COUNT` to be more descriptive and consistent 2. Removed the legacy error condition `_LEGACY_ERROR_TEMP_1083` since its functionality is now merged into `INVALID_BUCKET_COUNT` 3. Removed the `InvalidCommandInput` class and its usage in Connect since we're now using the standard `AnalysisException` with `INVALID_BUCKET_COUNT` error condition 4. Updated the bucket count validation in `SparkConnectPlanner` to rely on the standard error handling path 5. Updated the test case in `SparkConnectProtoSuite` to verify the new unified error handling The key improvement is that both Connect and Classic now use the same error condition and message format for invalid bucket count errors, making the error handling more consistent across Spark's different interfaces. The error message now includes both the maximum allowed bucket count and the invalid value received, providing better guidance to users. This change simplifies the error handling codebase by removing duplicate error definitions and standardizing on a single error condition for this validation case. ### Why are the changes needed? The changes are needed to: 1. Provide consistent error messages across Spark Connect and Classic interfaces 2. Simplify error handling by removing duplicate error definitions 3. Improve error message clarity by including the maximum allowed bucket count in the error message 4. Maintain better code maintainability by reducing code duplication in error handling The unified error message now clearly indicates both the requirement (bucket count > 0) and the upper limit (≤ bucketing.maxBuckets), making it more helpful for users to understand and fix the issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ` build/sbt "connect/testOnly *SparkConnectProtoSuite"` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51039 from heyihong/SPARK-52335. Authored-by: Yihong He <heyihong...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../src/main/resources/error/error-conditions.json | 7 +---- python/pyspark/errors/exceptions/connect.py | 7 ----- .../spark/sql/errors/QueryCompilationErrors.scala | 2 +- .../sql/connect/common/InvalidCommandInput.scala | 36 ---------------------- .../sql/connect/planner/InvalidInputErrors.scala | 5 +-- .../sql/connect/planner/SparkConnectPlanner.scala | 3 -- .../connect/planner/SparkConnectProtoSuite.scala | 13 +++++--- 7 files changed, 12 insertions(+), 61 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3e689cecac3b..8dddca5077a6 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2382,7 +2382,7 @@ }, "INVALID_BUCKET_COUNT" : { "message" : [ - "BucketBy must specify a bucket count > 0, received <numBuckets> instead." + "Number of buckets should be greater than 0 but less than or equal to bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`." ], "sqlState" : "22003" }, @@ -7042,11 +7042,6 @@ "Partition [<specString>] did not specify locationUri." ] }, - "_LEGACY_ERROR_TEMP_1083" : { - "message" : [ - "Number of buckets should be greater than 0 but less than or equal to bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`." - ] - }, "_LEGACY_ERROR_TEMP_1089" : { "message" : [ "Column statistics deserialization is not supported for column <name> of data type: <dataType>." diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index fafdd6b84297..45c96a84f14d 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -371,12 +371,6 @@ class InvalidPlanInput(SparkConnectGrpcException): """ -class InvalidCommandInput(SparkConnectGrpcException): - """ - Error thrown when a connect command is not valid. - """ - - class StreamingPythonRunnerInitializationException( SparkConnectGrpcException, BaseStreamingPythonRunnerInitException ): @@ -411,7 +405,6 @@ EXCEPTION_CLASS_MAPPING = { "org.apache.spark.SparkNoSuchElementException": SparkNoSuchElementException, "org.apache.spark.SparkException": SparkException, "org.apache.spark.sql.connect.common.InvalidPlanInput": InvalidPlanInput, - "org.apache.spark.sql.connect.common.InvalidCommandInput": InvalidCommandInput, "org.apache.spark.api.python.StreamingPythonRunner" "$StreamingPythonRunnerInitializationException": StreamingPythonRunnerInitializationException, } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 9de52533dbbb..14f279ad5ad7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1276,7 +1276,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def invalidBucketNumberError(bucketingMaxBuckets: Int, numBuckets: Int): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1083", + errorClass = "INVALID_BUCKET_COUNT", messageParameters = Map( "bucketingMaxBuckets" -> bucketingMaxBuckets.toString, "numBuckets" -> numBuckets.toString)) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala deleted file mode 100644 index 38efa547e9dc..000000000000 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.connect.common - -import scala.jdk.CollectionConverters._ - -import org.apache.spark.{SparkThrowable, SparkThrowableHelper} - -/** - * Error thrown when a connect command is not valid. - */ -final case class InvalidCommandInput( - private val errorCondition: String, - private val messageParameters: Map[String, String] = Map.empty, - private val cause: Throwable = null) - extends Exception(SparkThrowableHelper.getMessage(errorCondition, messageParameters), cause) - with SparkThrowable { - - override def getCondition: String = errorCondition - - override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava -} diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala index 0f00d3643992..8076c62b0092 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.SparkThrowableHelper import org.apache.spark.connect.proto -import org.apache.spark.sql.connect.common.{InvalidCommandInput, InvalidPlanInput} +import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType} import org.apache.spark.sql.types.DataType @@ -214,9 +214,6 @@ object InvalidInputErrors { def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput = InvalidPlanInput("UnionByName `allowMissingCol` can be true only if `byName` is true.") - def invalidBucketCount(numBuckets: Int): InvalidCommandInput = - InvalidCommandInput("INVALID_BUCKET_COUNT", Map("numBuckets" -> numBuckets.toString)) - def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): InvalidPlanInput = InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: ${clazz}") } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 199fe7da3291..5978560c67d4 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3077,9 +3077,6 @@ class SparkConnectPlanner( if (writeOperation.hasBucketBy) { val op = writeOperation.getBucketBy val cols = op.getBucketColumnNamesList.asScala - if (op.getNumBuckets <= 0) { - throw InvalidInputErrors.invalidBucketCount(op.getNumBuckets) - } w.bucketBy(op.getNumBuckets, cols.head, cols.tail.toSeq: _*) } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index e5f19e714895..5c43715d2dd1 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, Distinct, Lo import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.classic.DataFrame -import org.apache.spark.sql.connect.common.{InvalidCommandInput, InvalidPlanInput} +import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto import org.apache.spark.sql.connect.dsl.MockRemoteSession import org.apache.spark.sql.connect.dsl.commands._ @@ -657,13 +657,18 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { } test("Write with invalid bucketBy configuration") { - val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = Some(0)) + val cmd = localRelation.write( + tableName = Some("testtable"), + tableSaveMethod = Some("save_as_table"), + format = Some("parquet"), + bucketByCols = Seq("id"), + numBuckets = Some(0)) checkError( - exception = intercept[InvalidCommandInput] { + exception = intercept[AnalysisException] { transform(cmd) }, condition = "INVALID_BUCKET_COUNT", - parameters = Map("numBuckets" -> "0")) + parameters = Map("bucketingMaxBuckets" -> "100000", "numBuckets" -> "0")) } test("Write to Path") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org