This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 29e09473a81 [SPARK-44154] Implement bitmap functions
29e09473a81 is described below
commit 29e09473a816f1cb0ddea3e49d6dcb492ad473d9
Author: Gene Pang <[email protected]>
AuthorDate: Wed Jul 5 17:39:03 2023 -0700
[SPARK-44154] Implement bitmap functions
### What changes were proposed in this pull request?
Implemented bitmap functions. The functions are:
- `bitmap_bucket_number()`: returns the bucket number for a given input
number
- `bitmap_bit_position()`: returns bit position for a given input number
- `bitmap_count()`: returns the number of set bits from an input bitmap
- `bitmap_construct_agg()`: aggregation function that aggregates input bit
positions, and creates a bitmap
- `bitmap_or_agg()`: aggregation function that performs a bitwise OR on all
the input bitmaps
### Why are the changes needed?
These functions can be used to count distinct values for integer columns.
For example:
```sql
SELECT sum(cnt) FROM (
SELECT bitmap_bucket_number(c),
bitmap_count(bitmap_construct_agg(bitmap_bit_position(c))) cnt
FROM table GROUP BY 1
)
```
is equivalent to:
```sql
SELECT count(distinct c) FROM table
```
### Does this PR introduce _any_ user-facing change?
Yes. After this PR, these functions are usable in queries.
### How was this patch tested?
New tests were added.
Closes #41623 from gene-db/bitmap-fns.
Authored-by: Gene Pang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 6 +
.../expressions/BitmapExpressionUtils.java | 59 ++++
.../sql/catalyst/analysis/FunctionRegistry.scala | 7 +
.../catalyst/expressions/bitmapExpressions.scala | 305 +++++++++++++++++++++
.../spark/sql/errors/QueryExecutionErrors.scala | 12 +
.../expressions/BitmapExpressionUtilsSuite.scala | 90 ++++++
.../sql-functions/sql-expression-schema.md | 5 +
.../spark/sql/BitmapExpressionsQuerySuite.scala | 151 ++++++++++
.../sql/errors/QueryExecutionErrorsSuite.scala | 28 ++
9 files changed, 663 insertions(+)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 3126fb9519b..8bdb02470ef 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1038,6 +1038,12 @@
],
"sqlState" : "22003"
},
+ "INVALID_BITMAP_POSITION" : {
+ "message" : [
+ "The 0-indexed bitmap position <bitPosition> is out of bounds. The
bitmap has <bitmapNumBits> bits (<bitmapNumBytes> bytes)."
+ ],
+ "sqlState" : "22003"
+ },
"INVALID_BOUNDARY" : {
"message" : [
"The boundary <boundary> is invalid: <invalidValue>."
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/BitmapExpressionUtils.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/BitmapExpressionUtils.java
new file mode 100644
index 00000000000..e11aea62664
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/BitmapExpressionUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+/**
+ * A utility class for constructing bitmap expressions.
+ */
+public class BitmapExpressionUtils {
+ /** Number of bytes in a bitmap. */
+ public static final int NUM_BYTES = 4 * 1024;
+
+ /** Number of bits in a bitmap. */
+ public static final int NUM_BITS = 8 * NUM_BYTES;
+
+ public static long bitmapBucketNumber(long value) {
+ if (value > 0) {
+ return 1 + (value - 1) / NUM_BITS;
+ }
+ return value / NUM_BITS;
+ }
+
+ public static long bitmapBitPosition(long value) {
+ if (value > 0) {
+ // inputs: (1 -> NUM_BITS) map to positions (0 -> NUM_BITS - 1)
+ return (value - 1) % NUM_BITS;
+ }
+ return (-value) % NUM_BITS;
+ }
+
+ public static long bitmapCount(byte[] bitmap) {
+ long count = 0;
+ for (byte b : bitmap) {
+ count += Integer.bitCount(b & 0x0FF);
+ }
+ return count;
+ }
+
+ /** Merges both bitmaps and writes the result into bitmap1. */
+ public static void bitmapMerge(byte[] bitmap1, byte[] bitmap2) {
+ for (int i = 0; i < java.lang.Math.min(bitmap1.length, bitmap2.length);
++i) {
+ bitmap1[i] = (byte) ((bitmap1[i] & 0x0FF) | (bitmap2[i] & 0x0FF));
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index eaf1bf21d34..a9bda2e0b7c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -792,6 +792,13 @@ object FunctionRegistry {
expression[BitwiseGet]("bit_get"),
expression[BitwiseGet]("getbit", true),
+ // bitmap functions and aggregates
+ expression[BitmapBucketNumber]("bitmap_bucket_number"),
+ expression[BitmapBitPosition]("bitmap_bit_position"),
+ expression[BitmapConstructAgg]("bitmap_construct_agg"),
+ expression[BitmapCount]("bitmap_count"),
+ expression[BitmapOrAgg]("bitmap_or_agg"),
+
// json
expression[StructsToJson]("to_json"),
expression[JsonToStructs]("from_json"),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitmapExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitmapExpressions.scala
new file mode 100644
index 00000000000..350ca5c2525
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitmapExpressions.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType,
LongType, StructType}
+
+@ExpressionDescription(
+ usage = "_FUNC_(child) - Returns the bucket number for the given input child
expression.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(123);
+ 1
+ > SELECT _FUNC_(0);
+ 0
+ """,
+ since = "3.5.0",
+ group = "misc_funcs"
+)
+case class BitmapBucketNumber(child: Expression)
+ extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(LongType)
+
+ override def dataType: DataType = LongType
+
+ override def prettyName: String = "bitmap_bucket_number"
+
+ override lazy val replacement: Expression = StaticInvoke(
+ classOf[BitmapExpressionUtils],
+ LongType,
+ "bitmapBucketNumber",
+ Seq(child),
+ inputTypes,
+ returnNullable = false)
+
+ override protected def withNewChildInternal(newChild: Expression):
BitmapBucketNumber =
+ copy(child = newChild)
+}
+
+@ExpressionDescription(
+ usage = "_FUNC_(child) - Returns the bit position for the given input child
expression.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(1);
+ 0
+ > SELECT _FUNC_(123);
+ 122
+ """,
+ since = "3.5.0",
+ group = "misc_funcs"
+)
+case class BitmapBitPosition(child: Expression)
+ extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(LongType)
+
+ override def dataType: DataType = LongType
+
+ override def prettyName: String = "bitmap_bit_position"
+
+ override lazy val replacement: Expression = StaticInvoke(
+ classOf[BitmapExpressionUtils],
+ LongType,
+ "bitmapBitPosition",
+ Seq(child),
+ inputTypes,
+ returnNullable = false)
+
+ override protected def withNewChildInternal(newChild: Expression):
BitmapBitPosition =
+ copy(child = newChild)
+}
+
+@ExpressionDescription(
+ usage = "_FUNC_(child) - Returns the number of set bits in the child
bitmap.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(X '1010');
+ 2
+ > SELECT _FUNC_(X 'FFFF');
+ 16
+ > SELECT _FUNC_(X '0');
+ 0
+ """,
+ since = "3.5.0",
+ group = "misc_funcs"
+)
+case class BitmapCount(child: Expression)
+ extends UnaryExpression with RuntimeReplaceable {
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (child.dataType != BinaryType) {
+ TypeCheckResult.TypeCheckFailure("Bitmap must be a BinaryType")
+ } else {
+ TypeCheckResult.TypeCheckSuccess
+ }
+ }
+
+ override def dataType: DataType = LongType
+
+ override def prettyName: String = "bitmap_count"
+
+ override lazy val replacement: Expression = StaticInvoke(
+ classOf[BitmapExpressionUtils],
+ LongType,
+ "bitmapCount",
+ Seq(child),
+ Seq(BinaryType),
+ returnNullable = false)
+
+ override protected def withNewChildInternal(newChild: Expression):
BitmapCount =
+ copy(child = newChild)
+}
+
+@ExpressionDescription(
+ usage = """
+ _FUNC_(child) - Returns a bitmap with the positions of the bits set from
all the values from
+ the child expression. The child expression will most likely be
bitmap_bit_position().
+ """,
+ // scalastyle:off line.size.limit
+ examples = """
+ Examples:
+ > SELECT substring(hex(_FUNC_((bitmap_bit_position(col)))), 0, 6) FROM
VALUES (1), (2), (3) AS tab(col);
+ 070000
+ > SELECT substring(hex(_FUNC_((bitmap_bit_position(col)))), 0, 6) FROM
VALUES (1), (1), (1) AS tab(col);
+ 010000
+ """,
+ // scalastyle:on line.size.limit
+ since = "3.5.0",
+ group = "agg_funcs"
+)
+case class BitmapConstructAgg(child: Expression,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends ImperativeAggregate with ImplicitCastInputTypes with
UnaryLike[Expression] {
+
+ def this(child: Expression) = {
+ this(child = child, mutableAggBufferOffset = 0, inputAggBufferOffset = 0)
+ }
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(LongType)
+
+ override def dataType: DataType = BinaryType
+
+ override def prettyName: String = "bitmap_construct_agg"
+
+ override protected def withNewChildInternal(newChild: Expression):
BitmapConstructAgg =
+ copy(child = newChild)
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def nullable: Boolean = false
+
+ override def aggBufferSchema: StructType =
StructType.fromAttributes(aggBufferAttributes)
+
+ override def aggBufferAttributes: Seq[AttributeReference] = bitmapAttr :: Nil
+
+ override def defaultResult: Option[Literal] =
+ Option(Literal(Array.fill[Byte](BitmapExpressionUtils.NUM_BYTES)(0)))
+
+ override def inputAggBufferAttributes: Seq[AttributeReference] =
+ aggBufferAttributes.map(_.newInstance())
+
+ // The aggregation buffer is a fixed size binary.
+ private val bitmapAttr = AttributeReference("bitmap", BinaryType, nullable =
false)()
+
+ override def initialize(buffer: InternalRow): Unit = {
+ buffer.update(mutableAggBufferOffset,
Array.fill[Byte](BitmapExpressionUtils.NUM_BYTES)(0))
+ }
+
+ override def update(buffer: InternalRow, input: InternalRow): Unit = {
+ val position = child.eval(input)
+ if (position != null) {
+ val bitmap = buffer.getBinary(mutableAggBufferOffset)
+ val bitPosition = position.asInstanceOf[Long]
+
+ if (bitPosition < 0 || bitPosition >= (8 * bitmap.length)) {
+ throw QueryExecutionErrors.invalidBitmapPositionError(bitPosition,
bitmap.length)
+ }
+
+ val bytePosition = (bitPosition / 8).toInt
+ val bit = (bitPosition % 8).toInt
+ bitmap.update(bytePosition, (bitmap(bytePosition) | (1 << bit)).toByte)
+ }
+ }
+
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
+ val bitmap1 = buffer1.getBinary(mutableAggBufferOffset)
+ val bitmap2 = buffer2.getBinary(inputAggBufferOffset)
+ BitmapExpressionUtils.bitmapMerge(bitmap1, bitmap2)
+ }
+
+ override def eval(buffer: InternalRow): Any = {
+ buffer.getBinary(mutableAggBufferOffset)
+ }
+}
+
+@ExpressionDescription(
+ usage = """
+ _FUNC_(child) - Returns a bitmap that is the bitwise OR of all of the
bitmaps from the child
+ expression. The input should be bitmaps created from
bitmap_construct_agg().
+ """,
+ // scalastyle:off line.size.limit
+ examples = """
+ Examples:
+ > SELECT substring(hex(_FUNC_(col)), 0, 6) FROM VALUES (X '10'), (X
'20'), (X '40') AS tab(col);
+ 700000
+ > SELECT substring(hex(_FUNC_(col)), 0, 6) FROM VALUES (X '10'), (X
'10'), (X '10') AS tab(col);
+ 100000
+ """,
+ // scalastyle:on line.size.limit
+ since = "3.5.0",
+ group = "agg_funcs"
+)
+case class BitmapOrAgg(child: Expression,
+ mutableAggBufferOffset: Int = 0,
+ inputAggBufferOffset: Int = 0)
+ extends ImperativeAggregate with UnaryLike[Expression] {
+
+ def this(child: Expression) = {
+ this(child = child, mutableAggBufferOffset = 0, inputAggBufferOffset = 0)
+ }
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (child.dataType != BinaryType) {
+ TypeCheckResult.TypeCheckFailure("Bitmap must be a BinaryType")
+ } else {
+ TypeCheckResult.TypeCheckSuccess
+ }
+ }
+
+ override def dataType: DataType = BinaryType
+
+ override def prettyName: String = "bitmap_or_agg"
+
+ override protected def withNewChildInternal(newChild: Expression):
BitmapOrAgg =
+ copy(child = newChild)
+
+ override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int):
ImperativeAggregate =
+ copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+ override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int):
ImperativeAggregate =
+ copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+ override def nullable: Boolean = false
+
+ override def aggBufferSchema: StructType =
StructType.fromAttributes(aggBufferAttributes)
+
+ override def aggBufferAttributes: Seq[AttributeReference] = bitmapAttr :: Nil
+
+ override def defaultResult: Option[Literal] =
+ Option(Literal(Array.fill[Byte](BitmapExpressionUtils.NUM_BYTES)(0)))
+
+ override def inputAggBufferAttributes: Seq[AttributeReference] =
+ aggBufferAttributes.map(_.newInstance())
+
+ // The aggregation buffer is a fixed size binary.
+ private val bitmapAttr = AttributeReference("bitmap", BinaryType, false)()
+
+ override def initialize(buffer: InternalRow): Unit = {
+ buffer.update(mutableAggBufferOffset,
Array.fill[Byte](BitmapExpressionUtils.NUM_BYTES)(0))
+ }
+
+ override def update(buffer: InternalRow, input: InternalRow): Unit = {
+ val input_bitmap = child.eval(input).asInstanceOf[Array[Byte]]
+ if (input_bitmap != null) {
+ val bitmap = buffer.getBinary(mutableAggBufferOffset)
+ BitmapExpressionUtils.bitmapMerge(bitmap, input_bitmap)
+ }
+ }
+
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
+ val bitmap1 = buffer1.getBinary(mutableAggBufferOffset)
+ val bitmap2 = buffer2.getBinary(inputAggBufferOffset)
+ BitmapExpressionUtils.bitmapMerge(bitmap1, bitmap2)
+ }
+
+ override def eval(buffer: InternalRow): Any = {
+ buffer.getBinary(mutableAggBufferOffset)
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 630cf2fa55a..eded5e6534f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -262,6 +262,18 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
summary = getSummary(context))
}
+ def invalidBitmapPositionError(bitPosition: Long,
+ bitmapNumBytes: Long):
ArrayIndexOutOfBoundsException = {
+ new SparkArrayIndexOutOfBoundsException(
+ errorClass = "INVALID_BITMAP_POSITION",
+ messageParameters = Map(
+ "bitPosition" -> s"$bitPosition",
+ "bitmapNumBytes" -> s"$bitmapNumBytes",
+ "bitmapNumBits" -> s"${bitmapNumBytes * 8}"),
+ context = Array.empty,
+ summary = "")
+ }
+
def invalidFractionOfSecondError(): DateTimeException = {
new SparkDateTimeException(
errorClass = "INVALID_FRACTION_OF_SECOND",
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/BitmapExpressionUtilsSuite.scala
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/BitmapExpressionUtilsSuite.scala
new file mode 100644
index 00000000000..ee1f4026fed
--- /dev/null
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/BitmapExpressionUtilsSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+
+class BitmapExpressionUtilsSuite extends SparkFunSuite {
+
+ test("bitmap_bucket_number with positive inputs") {
+ Seq((0L, 0L), (1L, 1L), (2L, 1L), (3L, 1L),
+ (32768L, 1L), (32769L, 2L), (32770L, 2L)).foreach {
+ case (input, expected) =>
+ assert(BitmapExpressionUtils.bitmapBucketNumber(input) == expected)
+ }
+ }
+
+ test("bitmap_bucket_number with negative inputs") {
+ Seq((-1L, 0L), (-2L, 0L), (-3L, 0L),
+ (-32767L, 0L), (-32768L, -1L), (-32769L, -1L)).foreach {
+ case (input, expected) =>
+ assert(BitmapExpressionUtils.bitmapBucketNumber(input) == expected)
+ }
+ }
+
+ test("bitmap_bit_position with positive inputs") {
+ Seq((0L, 0L), (1L, 0L), (2L, 1L), (3L, 2L),
+ (32768L, 32767L), (32769L, 0L), (32770L, 1L)).foreach {
+ case (input, expected) =>
+ assert(BitmapExpressionUtils.bitmapBitPosition(input) == expected)
+ }
+ }
+
+ test("bitmap_bit_position with negative inputs") {
+ Seq((-1L, 1L), (-2L, 2L), (-3L, 3L),
+ (-32767L, 32767L), (-32768L, 0L), (-32769L, 1L)).foreach {
+ case (input, expected) =>
+ assert(BitmapExpressionUtils.bitmapBitPosition(input) == expected)
+ }
+ }
+
+ private def createBitmap(): Array[Byte] = {
+ Array.fill[Byte](BitmapExpressionUtils.NUM_BYTES)(0)
+ }
+
+ private def clearBitmap(bitmap: Array[Byte]): Unit = {
+ for (i <- bitmap.indices) {
+ bitmap(i) = 0
+ }
+ }
+
+ private def setBitmapBits(bitmap: Array[Byte], bytePos: Int, bits: Int):
Unit = {
+ bitmap.update(bytePos, (bitmap(bytePos) & 0x0ff | bits & 0x0ff).toByte)
+ }
+
+ test("bitmap_count empty") {
+ val bitmap = createBitmap()
+ assert(BitmapExpressionUtils.bitmapCount(bitmap) == 0L)
+ }
+
+ test("bitmap_count") {
+ val bitmap = createBitmap()
+ setBitmapBits(bitmap, 0, 0x01)
+ assert(BitmapExpressionUtils.bitmapCount(bitmap) == 1L)
+
+ clearBitmap(bitmap)
+ setBitmapBits(bitmap, 0, 0xff)
+ assert(BitmapExpressionUtils.bitmapCount(bitmap) == 8L)
+
+ setBitmapBits(bitmap, 1, 0x22)
+ assert(BitmapExpressionUtils.bitmapCount(bitmap) == 10L)
+
+ setBitmapBits(bitmap, bitmap.length - 1, 0x67)
+ assert(BitmapExpressionUtils.bitmapCount(bitmap) == 15L)
+ }
+}
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 32c4c02b1b2..f979a138e20 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -46,6 +46,11 @@
| org.apache.spark.sql.catalyst.expressions.Base64 | base64 | SELECT
base64('Spark SQL') | struct<base64(Spark SQL):string> |
| org.apache.spark.sql.catalyst.expressions.Bin | bin | SELECT bin(13) |
struct<bin(13):string> |
| org.apache.spark.sql.catalyst.expressions.BitLength | bit_length | SELECT
bit_length('Spark SQL') | struct<bit_length(Spark SQL):int> |
+| org.apache.spark.sql.catalyst.expressions.BitmapBitPosition |
bitmap_bit_position | SELECT bitmap_bit_position(1) |
struct<bitmap_bit_position(1):bigint> |
+| org.apache.spark.sql.catalyst.expressions.BitmapBucketNumber |
bitmap_bucket_number | SELECT bitmap_bucket_number(123) |
struct<bitmap_bucket_number(123):bigint> |
+| org.apache.spark.sql.catalyst.expressions.BitmapConstructAgg |
bitmap_construct_agg | SELECT
substring(hex(bitmap_construct_agg((bitmap_bit_position(col)))), 0, 6) FROM
VALUES (1), (2), (3) AS tab(col) |
struct<substring(hex(bitmap_construct_agg(bitmap_bit_position(col))), 0,
6):string> |
+| org.apache.spark.sql.catalyst.expressions.BitmapCount | bitmap_count |
SELECT bitmap_count(X '1010') | struct<bitmap_count(X'1010'):bigint> |
+| org.apache.spark.sql.catalyst.expressions.BitmapOrAgg | bitmap_or_agg |
SELECT substring(hex(bitmap_or_agg(col)), 0, 6) FROM VALUES (X '10'), (X '20'),
(X '40') AS tab(col) | struct<substring(hex(bitmap_or_agg(col)), 0, 6):string> |
| org.apache.spark.sql.catalyst.expressions.BitwiseAnd | & | SELECT 3 & 5 |
struct<(3 & 5):int> |
| org.apache.spark.sql.catalyst.expressions.BitwiseCount | bit_count | SELECT
bit_count(0) | struct<bit_count(0):int> |
| org.apache.spark.sql.catalyst.expressions.BitwiseGet | bit_get | SELECT
bit_get(11, 0) | struct<bit_get(11, 0):tinyint> |
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/BitmapExpressionsQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/BitmapExpressionsQuerySuite.scala
new file mode 100644
index 00000000000..76b9019475a
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/BitmapExpressionsQuerySuite.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+class BitmapExpressionsQuerySuite extends QueryTest with SharedSparkSession {
+ import testImplicits._
+
+ test("bitmap_construct_agg") {
+ val table = "bitmaps_table"
+ withTable(table) {
+ (0 until 10000).toDF("id").selectExpr("100 * cast(id / 2 as int) col")
+ .createOrReplaceTempView(table)
+
+ val expected = spark.sql(
+ s"""
+ | select count (distinct col) c from $table
+ |""".stripMargin).collect()
+
+ val df = spark.sql(
+ s"""
+ | select sum(c) from (
+ | select bitmap_bucket_number(col) bn,
+ | bitmap_count(bitmap_construct_agg(bitmap_bit_position(col))) c
+ | from $table
+ | group by 1
+ | )
+ |""".stripMargin)
+ checkAnswer(df, expected)
+ }
+ }
+
+ test("grouping bitmap_construct_agg") {
+ val table = "bitmaps_table"
+ withTable(table) {
+ (0 until 10000).toDF("id").selectExpr(
+ "(id % 4) part",
+ "100 * cast(id / 8 as int) col")
+ .createOrReplaceTempView(table)
+
+ val expected = spark.sql(
+ s"""
+ | select part, count (distinct col) c from $table group by 1 order
by 1
+ |""".stripMargin).collect()
+
+ val df = spark.sql(
+ s"""
+ | select part, sum(c) from (
+ | select part, bitmap_bucket_number(col) bn,
+ | bitmap_count(bitmap_construct_agg(bitmap_bit_position(col))) c
+ | from $table group by 1, 2 order by 1, 2
+ | ) group by 1 order by 1
+ |""".stripMargin)
+ checkAnswer(df, expected)
+ }
+ }
+
+ test("precomputed bitmaps") {
+ val table = "bitmaps_table"
+ val precomputed = "precomputed_table"
+ withTable(table) {
+ withTable(precomputed) {
+ (0 until 10000).toDF("id").selectExpr(
+ "(id % 4) part1",
+ "((id + 7) % 3) part2",
+ "100 * cast(id / 17 as int) col")
+ .createOrReplaceTempView(table)
+ spark.sql(
+ s"""
+ | select part1, part2, bitmap_bucket_number(col) bn,
+ | bitmap_construct_agg(bitmap_bit_position(col)) bm
+ | from $table group by 1, 2, 3
+ |""".stripMargin).createOrReplaceTempView(precomputed)
+
+ // Compute over both partitions
+ {
+ val expected = spark.sql(
+ s"""
+ | select part1, part2, count (distinct col) c from $table group
by 1, 2 order by 1, 2
+ |""".stripMargin).collect()
+
+ val df = spark.sql(
+ s"""
+ | select part1, part2, sum(bitmap_count(bm))
+ | from $precomputed group by 1, 2 order by 1, 2
+ |""".stripMargin)
+ checkAnswer(df, expected)
+ }
+
+ // Compute over one of the partitions
+ Seq("part1", "part2").foreach {
+ case part =>
+ val expected = spark.sql(
+ s"""
+ | select $part, count (distinct col) c from $table group by 1
order by 1
+ |""".stripMargin).collect()
+
+ val df = spark.sql(
+ s"""
+ | select $part, sum(c) from (
+ | select $part, bn, bitmap_count(bitmap_or_agg(bm)) c
+ | from $precomputed group by 1, 2
+ | ) group by 1 order by 1
+ |""".stripMargin)
+ checkAnswer(df, expected)
+ }
+ }
+ }
+ }
+
+ test("bitmap functions with floats") {
+ val table = "bitmaps_table"
+ withTable(table) {
+ (0 until 10000).toDF("id").selectExpr(
+ "(id % 4) part",
+ "100 * id + cast(id / 8.0 as float) col")
+ .createOrReplaceTempView(table)
+
+ val expected = spark.sql(
+ s"""
+ | select part, count (distinct col) c from $table group by 1 order
by 1
+ |""".stripMargin).collect()
+
+ val df = spark.sql(
+ s"""
+ | select part, sum(c) from (
+ | select part, bitmap_bucket_number(col) bn,
+ | bitmap_count(bitmap_construct_agg(bitmap_bit_position(col))) c
+ | from $table group by 1, 2 order by 1, 2
+ | ) group by 1 order by 1
+ |""".stripMargin)
+ checkAnswer(df, expected)
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 37cdfcf10c7..df82e3c268f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -937,6 +937,34 @@ class QueryExecutionErrorsSuite
sqlState = "XX000")
}
+ test("INVALID_BITMAP_POSITION: position out of bounds") {
+ val e = intercept[SparkException] {
+ sql("select bitmap_construct_agg(col) from values (32768) as
tab(col)").collect()
+ }.getCause.asInstanceOf[SparkArrayIndexOutOfBoundsException]
+ checkError(
+ exception = e,
+ errorClass = "INVALID_BITMAP_POSITION",
+ parameters = Map(
+ "bitPosition" -> "32768",
+ "bitmapNumBytes" -> "4096",
+ "bitmapNumBits" -> "32768"),
+ sqlState = "22003")
+ }
+
+ test("INVALID_BITMAP_POSITION: negative position") {
+ val e = intercept[SparkException] {
+ sql("select bitmap_construct_agg(col) from values (-1) as
tab(col)").collect()
+ }.getCause.asInstanceOf[SparkArrayIndexOutOfBoundsException]
+ checkError(
+ exception = e,
+ errorClass = "INVALID_BITMAP_POSITION",
+ parameters = Map(
+ "bitPosition" -> "-1",
+ "bitmapNumBytes" -> "4096",
+ "bitmapNumBits" -> "32768"),
+ sqlState = "22003")
+ }
+
test("SPARK-43589: Use bytesToString instead of shift operation") {
checkError(
exception = intercept[SparkException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]