This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d61c1ea7f700 [SPARK-52335][CONNET][SQL] Unify the 'invalid bucket 
count' error for both Connect and Classic
d61c1ea7f700 is described below

commit d61c1ea7f700ccf4a0ca37e9caef0ee811881b1e
Author: Yihong He <heyihong...@gmail.com>
AuthorDate: Fri May 30 08:30:51 2025 +0900

    [SPARK-52335][CONNET][SQL] Unify the 'invalid bucket count' error for both 
Connect and Classic
    
    ### What changes were proposed in this pull request?
    
    This PR unifies the error handling for invalid bucket count validation 
between Spark Connect and Classic Spark. The main changes are:
    
    1. Updated the error message in `error-conditions.json` for 
`INVALID_BUCKET_COUNT` to be more descriptive and consistent
    2. Removed the legacy error condition `_LEGACY_ERROR_TEMP_1083` since its 
functionality is now merged into `INVALID_BUCKET_COUNT`
    3. Removed the `InvalidCommandInput` class and its usage in Connect since 
we're now using the standard `AnalysisException` with `INVALID_BUCKET_COUNT` 
error condition
    4. Updated the bucket count validation in `SparkConnectPlanner` to rely on 
the standard error handling path
    5. Updated the test case in `SparkConnectProtoSuite` to verify the new 
unified error handling
    
    The key improvement is that both Connect and Classic now use the same error 
condition and message format for invalid bucket count errors, making the error 
handling more consistent across Spark's different interfaces. The error message 
now includes both the maximum allowed bucket count and the invalid value 
received, providing better guidance to users.
    
    This change simplifies the error handling codebase by removing duplicate 
error definitions and standardizing on a single error condition for this 
validation case.
    
    ### Why are the changes needed?
    
    The changes are needed to:
    1. Provide consistent error messages across Spark Connect and Classic 
interfaces
    2. Simplify error handling by removing duplicate error definitions
    3. Improve error message clarity by including the maximum allowed bucket 
count in the error message
    4. Maintain better code maintainability by reducing code duplication in 
error handling
    
    The unified error message now clearly indicates both the requirement 
(bucket count > 0) and the upper limit (≤ bucketing.maxBuckets), making it more 
helpful for users to understand and fix the issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    ` build/sbt "connect/testOnly *SparkConnectProtoSuite"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51039 from heyihong/SPARK-52335.
    
    Authored-by: Yihong He <heyihong...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../src/main/resources/error/error-conditions.json |  7 +----
 python/pyspark/errors/exceptions/connect.py        |  7 -----
 .../spark/sql/errors/QueryCompilationErrors.scala  |  2 +-
 .../sql/connect/common/InvalidCommandInput.scala   | 36 ----------------------
 .../sql/connect/planner/InvalidInputErrors.scala   |  5 +--
 .../sql/connect/planner/SparkConnectPlanner.scala  |  3 --
 .../connect/planner/SparkConnectProtoSuite.scala   | 13 +++++---
 7 files changed, 12 insertions(+), 61 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 3e689cecac3b..8dddca5077a6 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2382,7 +2382,7 @@
   },
   "INVALID_BUCKET_COUNT" : {
     "message" : [
-      "BucketBy must specify a bucket count > 0, received <numBuckets> 
instead."
+      "Number of buckets should be greater than 0 but less than or equal to 
bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`."
     ],
     "sqlState" : "22003"
   },
@@ -7042,11 +7042,6 @@
       "Partition [<specString>] did not specify locationUri."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1083" : {
-    "message" : [
-      "Number of buckets should be greater than 0 but less than or equal to 
bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1089" : {
     "message" : [
       "Column statistics deserialization is not supported for column <name> of 
data type: <dataType>."
diff --git a/python/pyspark/errors/exceptions/connect.py 
b/python/pyspark/errors/exceptions/connect.py
index fafdd6b84297..45c96a84f14d 100644
--- a/python/pyspark/errors/exceptions/connect.py
+++ b/python/pyspark/errors/exceptions/connect.py
@@ -371,12 +371,6 @@ class InvalidPlanInput(SparkConnectGrpcException):
     """
 
 
-class InvalidCommandInput(SparkConnectGrpcException):
-    """
-    Error thrown when a connect command is not valid.
-    """
-
-
 class StreamingPythonRunnerInitializationException(
     SparkConnectGrpcException, BaseStreamingPythonRunnerInitException
 ):
@@ -411,7 +405,6 @@ EXCEPTION_CLASS_MAPPING = {
     "org.apache.spark.SparkNoSuchElementException": 
SparkNoSuchElementException,
     "org.apache.spark.SparkException": SparkException,
     "org.apache.spark.sql.connect.common.InvalidPlanInput": InvalidPlanInput,
-    "org.apache.spark.sql.connect.common.InvalidCommandInput": 
InvalidCommandInput,
     "org.apache.spark.api.python.StreamingPythonRunner"
     "$StreamingPythonRunnerInitializationException": 
StreamingPythonRunnerInitializationException,
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9de52533dbbb..14f279ad5ad7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1276,7 +1276,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 
   def invalidBucketNumberError(bucketingMaxBuckets: Int, numBuckets: Int): 
Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1083",
+      errorClass = "INVALID_BUCKET_COUNT",
       messageParameters = Map(
         "bucketingMaxBuckets" -> bucketingMaxBuckets.toString,
         "numBuckets" -> numBuckets.toString))
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala
deleted file mode 100644
index 38efa547e9dc..000000000000
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.connect.common
-
-import scala.jdk.CollectionConverters._
-
-import org.apache.spark.{SparkThrowable, SparkThrowableHelper}
-
-/**
- * Error thrown when a connect command is not valid.
- */
-final case class InvalidCommandInput(
-    private val errorCondition: String,
-    private val messageParameters: Map[String, String] = Map.empty,
-    private val cause: Throwable = null)
-    extends Exception(SparkThrowableHelper.getMessage(errorCondition, 
messageParameters), cause)
-    with SparkThrowable {
-
-  override def getCondition: String = errorCondition
-
-  override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
-}
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
index 0f00d3643992..8076c62b0092 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.apache.spark.SparkThrowableHelper
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.connect.common.{InvalidCommandInput, 
InvalidPlanInput}
+import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
 import org.apache.spark.sql.types.DataType
 
@@ -214,9 +214,6 @@ object InvalidInputErrors {
   def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput =
     InvalidPlanInput("UnionByName `allowMissingCol` can be true only if 
`byName` is true.")
 
-  def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
-    InvalidCommandInput("INVALID_BUCKET_COUNT", Map("numBuckets" -> 
numBuckets.toString))
-
   def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): 
InvalidPlanInput =
     InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: 
${clazz}")
 }
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 199fe7da3291..5978560c67d4 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3077,9 +3077,6 @@ class SparkConnectPlanner(
     if (writeOperation.hasBucketBy) {
       val op = writeOperation.getBucketBy
       val cols = op.getBucketColumnNamesList.asScala
-      if (op.getNumBuckets <= 0) {
-        throw InvalidInputErrors.invalidBucketCount(op.getNumBuckets)
-      }
       w.bucketBy(op.getNumBuckets, cols.head, cols.tail.toSeq: _*)
     }
 
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index e5f19e714895..5c43715d2dd1 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, Distinct, Lo
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.classic.DataFrame
-import org.apache.spark.sql.connect.common.{InvalidCommandInput, 
InvalidPlanInput}
+import org.apache.spark.sql.connect.common.InvalidPlanInput
 import 
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
 import org.apache.spark.sql.connect.dsl.MockRemoteSession
 import org.apache.spark.sql.connect.dsl.commands._
@@ -657,13 +657,18 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
   }
 
   test("Write with invalid bucketBy configuration") {
-    val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = 
Some(0))
+    val cmd = localRelation.write(
+      tableName = Some("testtable"),
+      tableSaveMethod = Some("save_as_table"),
+      format = Some("parquet"),
+      bucketByCols = Seq("id"),
+      numBuckets = Some(0))
     checkError(
-      exception = intercept[InvalidCommandInput] {
+      exception = intercept[AnalysisException] {
         transform(cmd)
       },
       condition = "INVALID_BUCKET_COUNT",
-      parameters = Map("numBuckets" -> "0"))
+      parameters = Map("bucketingMaxBuckets" -> "100000", "numBuckets" -> "0"))
   }
 
   test("Write to Path") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to