This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8f9b2102e00e [SPARK-53991][SQL][TEST][FOLLOW-UP] Improve KLL quantile
errors by removing internal details
8f9b2102e00e is described below
commit 8f9b2102e00ef1c96a4f630252fb173f7c6787a5
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Jan 7 14:45:29 2026 -0800
[SPARK-53991][SQL][TEST][FOLLOW-UP] Improve KLL quantile errors by removing
internal details
### What changes were proposed in this pull request?
This PR improves error messages from the new KLL quantile sketch functions
added in https://github.com/apache/spark/pull/52800.
### Why are the changes needed?
The previous error messages reported internal DataSketches library state
which was not meaningful for end users of the SQL/DF functions in Apache Spark.
### Does this PR introduce _any_ user-facing change?
Yes, error messages are improved. For example, before this change, we
observed the following:
```
SELECT kll_sketch_get_rank_bigint(agg, 5) AS wrong_type
FROM (
SELECT kll_sketch_agg_float(col1) AS agg
FROM t_float_1_5_through_7_11
)
> For function `kll_sketch_get_rank_bigint`, invalid KLL sketch binary
data: reqOffset: 40, reqLength: 56, (reqOff + reqLen): 96, allocSize: 60"
```
Now the error message becomes:
```
> "Invalid call to `kll_sketch_get_rank_bigint`; only valid KLL sketch
buffers are supported as inputs (such as those produced by the `kll_sketch_agg`
function)."
```
### How was this patch tested?
This PR updates golden file test coverage to show the improved error
messages.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53702 from dtenedor/kll-quantiles-golden-files.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
(cherry picked from commit 9aea212dfd4fe8b092eb44237c15db9cfd170db1)
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 12 ++----
python/pyspark/sql/functions/builtin.py | 6 +--
python/pyspark/sql/tests/test_functions.py | 6 +--
.../expressions/aggregate/kllAggregates.scala | 24 +++++------
.../sql/catalyst/expressions/kllExpressions.scala | 48 +++++++++++-----------
.../spark/sql/errors/QueryExecutionErrors.scala | 28 ++++---------
.../sql-tests/results/kllquantiles.sql.out | 39 +++++++-----------
7 files changed, 69 insertions(+), 94 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index dfaf8425a1a0..aa0f0a89f97c 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4228,21 +4228,15 @@
],
"sqlState" : "42K0E"
},
- "KLL_SKETCH_INCOMPATIBLE_MERGE" : {
+ "KLL_INVALID_INPUT_SKETCH_BUFFER" : {
"message" : [
- "For function <functionName>, cannot merge KLL sketches: <reason>"
- ],
- "sqlState" : "22000"
- },
- "KLL_SKETCH_INVALID_INPUT" : {
- "message" : [
- "For function <functionName>, invalid KLL sketch binary data: <reason>"
+ "Invalid call to <function>; only valid KLL sketch buffers are supported
as inputs (such as those produced by the `kll_sketch_agg` function)."
],
"sqlState" : "22000"
},
"KLL_SKETCH_INVALID_QUANTILE_RANGE" : {
"message" : [
- "For function <functionName>, the quantile value must be between 0.0 and
1.0 (inclusive), but got <quantile>."
+ "For function <functionName>, the quantile value must be between 0.0 and
1.0 (inclusive)."
],
"sqlState" : "22003"
},
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index 12bac34289d6..5bb1b2d8b5ef 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -26717,7 +26717,7 @@ def kll_sketch_to_string_bigint(col: "ColumnOrName") ->
Column:
>>> df = spark.createDataFrame([1,2,3,4,5], "INT")
>>> sketch_df = df.agg(sf.kll_sketch_agg_bigint("value").alias("sketch"))
>>> result =
sketch_df.select(sf.kll_sketch_to_string_bigint("sketch")).first()[0]
- >>> "Kll" in result and "N" in result
+ >>> "kll" in result.lower()
True
"""
fn = "kll_sketch_to_string_bigint"
@@ -26747,7 +26747,7 @@ def kll_sketch_to_string_float(col: "ColumnOrName") ->
Column:
>>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "FLOAT")
>>> sketch_df = df.agg(sf.kll_sketch_agg_float("value").alias("sketch"))
>>> result =
sketch_df.select(sf.kll_sketch_to_string_float("sketch")).first()[0]
- >>> "Kll" in result and "N" in result
+ >>> "kll" in result.lower()
True
"""
fn = "kll_sketch_to_string_float"
@@ -26777,7 +26777,7 @@ def kll_sketch_to_string_double(col: "ColumnOrName") ->
Column:
>>> df = spark.createDataFrame([1.0,2.0,3.0,4.0,5.0], "DOUBLE")
>>> sketch_df = df.agg(sf.kll_sketch_agg_double("value").alias("sketch"))
>>> result =
sketch_df.select(sf.kll_sketch_to_string_double("sketch")).first()[0]
- >>> "Kll" in result and "N" in result
+ >>> "kll" in result.lower()
True
"""
fn = "kll_sketch_to_string_double"
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 23c895f5629e..b6d4e3a0547b 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -2151,7 +2151,7 @@ class FunctionsTestsMixin:
result =
sketch_df.select(F.kll_sketch_to_string_bigint("sketch")).first()[0]
self.assertIsNotNone(result)
self.assertIsInstance(result, str)
- self.assertIn("Kll", result)
+ self.assertIn("kll", result.lower())
def test_kll_sketch_get_n_bigint(self):
"""Test kll_sketch_get_n_bigint function"""
@@ -2212,7 +2212,7 @@ class FunctionsTestsMixin:
# Test to_string
string_result =
sketch_df.select(F.kll_sketch_to_string_float("sketch")).first()[0]
- self.assertIn("Kll", string_result)
+ self.assertIn("kll", string_result.lower())
# Test get_n
n = sketch_df.select(F.kll_sketch_get_n_float("sketch")).first()[0]
@@ -2240,7 +2240,7 @@ class FunctionsTestsMixin:
# Test to_string
string_result =
sketch_df.select(F.kll_sketch_to_string_double("sketch")).first()[0]
- self.assertIn("Kll", string_result)
+ self.assertIn("kll", string_result.lower())
# Test get_n
n = sketch_df.select(F.kll_sketch_get_n_double("sketch")).first()[0]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
index 5891155cf753..e74b22219cf6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/kllAggregates.scala
@@ -151,8 +151,8 @@ case class KllSketchAggBigint(
updateBuffer.merge(input)
updateBuffer
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
@@ -167,8 +167,8 @@ case class KllSketchAggBigint(
try {
KllLongsSketch.heapify(Memory.wrap(buffer))
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
} else {
this.createAggregationBuffer()
@@ -287,8 +287,8 @@ case class KllSketchAggFloat(
updateBuffer.merge(input)
updateBuffer
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
@@ -303,8 +303,8 @@ case class KllSketchAggFloat(
try {
KllFloatsSketch.heapify(Memory.wrap(buffer))
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
} else {
this.createAggregationBuffer()
@@ -425,8 +425,8 @@ case class KllSketchAggDouble(
updateBuffer.merge(input)
updateBuffer
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
@@ -441,8 +441,8 @@ case class KllSketchAggDouble(
try {
KllDoublesSketch.heapify(Memory.wrap(buffer))
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
} else {
this.createAggregationBuffer()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/kllExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/kllExpressions.scala
index b446b2d0d443..af6c1a32e229 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/kllExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/kllExpressions.scala
@@ -50,8 +50,8 @@ case class KllSketchToStringBigint(child: Expression) extends
KllSketchToStringB
val sketch = KllLongsSketch.heapify(Memory.wrap(buffer))
UTF8String.fromString(sketch.toString())
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -78,8 +78,8 @@ case class KllSketchToStringFloat(child: Expression) extends
KllSketchToStringBa
val sketch = KllFloatsSketch.heapify(Memory.wrap(buffer))
UTF8String.fromString(sketch.toString())
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -106,8 +106,8 @@ case class KllSketchToStringDouble(child: Expression)
extends KllSketchToStringB
val sketch = KllDoublesSketch.heapify(Memory.wrap(buffer))
UTF8String.fromString(sketch.toString())
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -144,8 +144,8 @@ case class KllSketchGetNBigint(child: Expression) extends
KllSketchGetNBase {
val sketch = KllLongsSketch.heapify(Memory.wrap(buffer))
sketch.getN()
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -172,8 +172,8 @@ case class KllSketchGetNFloat(child: Expression) extends
KllSketchGetNBase {
val sketch = KllFloatsSketch.heapify(Memory.wrap(buffer))
sketch.getN()
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -200,8 +200,8 @@ case class KllSketchGetNDouble(child: Expression) extends
KllSketchGetNBase {
val sketch = KllDoublesSketch.heapify(Memory.wrap(buffer))
sketch.getN()
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -241,8 +241,8 @@ case class KllSketchMergeBigint(left: Expression, right:
Expression) extends Kll
leftSketch.merge(rightSketch)
leftSketch.toByteArray
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -272,8 +272,8 @@ case class KllSketchMergeFloat(left: Expression, right:
Expression) extends KllS
leftSketch.merge(rightSketch)
leftSketch.toByteArray
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -303,8 +303,8 @@ case class KllSketchMergeDouble(left: Expression, right:
Expression) extends Kll
leftSketch.merge(rightSketch)
leftSketch.toByteArray
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchIncompatibleMergeError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
}
@@ -456,12 +456,12 @@ abstract class KllSketchGetQuantileBase
} catch {
case e: org.apache.datasketches.common.SketchesArgumentException =>
if (e.getMessage.contains("normalized rank")) {
- throw
QueryExecutionErrors.kllSketchInvalidQuantileRangeError(prettyName,
rankForError)
+ throw
QueryExecutionErrors.kllSketchInvalidQuantileRangeError(prettyName)
} else {
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
@@ -617,8 +617,8 @@ abstract class KllSketchGetRankBase
try {
operation
} catch {
- case e: Exception =>
- throw QueryExecutionErrors.kllSketchInvalidInputError(prettyName,
e.getMessage)
+ case _: Exception =>
+ throw QueryExecutionErrors.kllInvalidInputSketchBuffer(prettyName)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 27aba1f7f2df..351868fcc2e2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2830,6 +2830,13 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
"function" -> toSQLId(function)))
}
+ def kllInvalidInputSketchBuffer(function: String, reason: String = ""):
Throwable = {
+ new SparkRuntimeException(
+ errorClass = "KLL_INVALID_INPUT_SKETCH_BUFFER",
+ messageParameters = Map(
+ "function" -> toSQLId(function)))
+ }
+
def hllUnionDifferentLgK(left: Int, right: Int, function: String): Throwable
= {
new SparkRuntimeException(
errorClass = "HLL_UNION_DIFFERENT_LG_K",
@@ -3195,28 +3202,11 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
messageParameters = Map("function" -> toSQLId(function)))
}
- def kllSketchInvalidQuantileRangeError(function: String, quantile: Double):
Throwable = {
+ def kllSketchInvalidQuantileRangeError(function: String): Throwable = {
new SparkRuntimeException(
errorClass = "KLL_SKETCH_INVALID_QUANTILE_RANGE",
messageParameters = Map(
- "functionName" -> toSQLId(function),
- "quantile" -> toSQLValue(quantile, DoubleType)))
- }
-
- def kllSketchInvalidInputError(function: String, reason: String): Throwable
= {
- new SparkRuntimeException(
- errorClass = "KLL_SKETCH_INVALID_INPUT",
- messageParameters = Map(
- "functionName" -> toSQLId(function),
- "reason" -> reason))
- }
-
- def kllSketchIncompatibleMergeError(function: String, reason: String):
Throwable = {
- new SparkRuntimeException(
- errorClass = "KLL_SKETCH_INCOMPATIBLE_MERGE",
- messageParameters = Map(
- "functionName" -> toSQLId(function),
- "reason" -> reason))
+ "functionName" -> toSQLId(function)))
}
def kllSketchKMustBeConstantError(function: String): Throwable = {
diff --git a/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
b/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
index 863bded1599b..fef44781e17c 100644
--- a/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/kllquantiles.sql.out
@@ -837,8 +837,7 @@ org.apache.spark.SparkRuntimeException
"errorClass" : "KLL_SKETCH_INVALID_QUANTILE_RANGE",
"sqlState" : "22003",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_quantile_bigint`",
- "quantile" : "-0.5D"
+ "functionName" : "`kll_sketch_get_quantile_bigint`"
}
}
@@ -857,8 +856,7 @@ org.apache.spark.SparkRuntimeException
"errorClass" : "KLL_SKETCH_INVALID_QUANTILE_RANGE",
"sqlState" : "22003",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_quantile_bigint`",
- "quantile" : "1.5D"
+ "functionName" : "`kll_sketch_get_quantile_bigint`"
}
}
@@ -877,8 +875,7 @@ org.apache.spark.SparkRuntimeException
"errorClass" : "KLL_SKETCH_INVALID_QUANTILE_RANGE",
"sqlState" : "22003",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_quantile_float`",
- "quantile" : "-0.1D"
+ "functionName" : "`kll_sketch_get_quantile_float`"
}
}
@@ -894,11 +891,10 @@ struct<>
-- !query output
org.apache.spark.SparkRuntimeException
{
- "errorClass" : "KLL_SKETCH_INVALID_INPUT",
+ "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
"sqlState" : "22000",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_rank_bigint`",
- "reason" : "reqOffset: 40, reqLength: 56, (reqOff + reqLen): 96,
allocSize: 60"
+ "function" : "`kll_sketch_get_rank_bigint`"
}
}
@@ -915,11 +911,10 @@ struct<>
-- !query output
org.apache.spark.SparkRuntimeException
{
- "errorClass" : "KLL_SKETCH_INCOMPATIBLE_MERGE",
+ "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
"sqlState" : "22000",
"messageParameters" : {
- "functionName" : "`kll_sketch_merge_bigint`",
- "reason" : "reqOffset: 40, reqLength: 56, (reqOff + reqLen): 96,
allocSize: 60"
+ "function" : "`kll_sketch_merge_bigint`"
}
}
@@ -931,11 +926,10 @@ struct<>
-- !query output
org.apache.spark.SparkRuntimeException
{
- "errorClass" : "KLL_SKETCH_INVALID_INPUT",
+ "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
"sqlState" : "22000",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_quantile_bigint`",
- "reason" : "Error combination of PreInts and SerVer: PreInts: 110, SerVer:
111"
+ "function" : "`kll_sketch_get_quantile_bigint`"
}
}
@@ -1098,11 +1092,10 @@ struct<>
-- !query output
org.apache.spark.SparkRuntimeException
{
- "errorClass" : "KLL_SKETCH_INVALID_INPUT",
+ "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
"sqlState" : "22000",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_n_bigint`",
- "reason" : "A sketch memory image must be at least 8 bytes. 4"
+ "function" : "`kll_sketch_get_n_bigint`"
}
}
@@ -1114,11 +1107,10 @@ struct<>
-- !query output
org.apache.spark.SparkRuntimeException
{
- "errorClass" : "KLL_SKETCH_INVALID_INPUT",
+ "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
"sqlState" : "22000",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_n_float`",
- "reason" : "A sketch memory image must be at least 8 bytes. 4"
+ "function" : "`kll_sketch_get_n_float`"
}
}
@@ -1130,11 +1122,10 @@ struct<>
-- !query output
org.apache.spark.SparkRuntimeException
{
- "errorClass" : "KLL_SKETCH_INVALID_INPUT",
+ "errorClass" : "KLL_INVALID_INPUT_SKETCH_BUFFER",
"sqlState" : "22000",
"messageParameters" : {
- "functionName" : "`kll_sketch_get_n_double`",
- "reason" : "A sketch memory image must be at least 8 bytes. 4"
+ "function" : "`kll_sketch_get_n_double`"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]