This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fd7acd32895 [SPARK-44238][CORE][SQL] Introduce a new `readFrom` method
with byte array input for `BloomFilter`
fd7acd32895 is described below
commit fd7acd32895ed79094ff75aef8ff133966627ee4
Author: yangjie01 <[email protected]>
AuthorDate: Thu Aug 31 23:19:41 2023 -0500
[SPARK-44238][CORE][SQL] Introduce a new `readFrom` method with byte array
input for `BloomFilter`
### What changes were proposed in this pull request?
This pr introduce a new `readFrom` method with byte array input for
`BloomFilter`
### Why are the changes needed?
De-duplicate code
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
Closes #41781 from LuciferYang/bloomfilter-readFrom.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../main/java/org/apache/spark/util/sketch/BloomFilter.java | 7 +++++++
.../java/org/apache/spark/util/sketch/BloomFilterImpl.java | 6 ++++++
.../scala/org/apache/spark/util/sketch/BloomFilterSuite.scala | 6 ++----
.../sql/catalyst/expressions/BloomFilterMightContain.scala | 10 +---------
.../catalyst/expressions/aggregate/BloomFilterAggregate.scala | 8 +-------
5 files changed, 17 insertions(+), 20 deletions(-)
diff --git
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
index f3c2b05e7af..172b394689c 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
@@ -178,6 +178,13 @@ public abstract class BloomFilter {
return BloomFilterImpl.readFrom(in);
}
+ /**
+ * Reads in a {@link BloomFilter} from a byte array.
+ */
+ public static BloomFilter readFrom(byte[] bytes) throws IOException {
+ return BloomFilterImpl.readFrom(bytes);
+ }
+
/**
* Computes the optimal k (number of hashes per item inserted in Bloom
filter), given the
* expected insertions and total number of bits in the Bloom filter.
diff --git
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
index ccf1833af99..3fba5e33252 100644
---
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
+++
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
@@ -266,6 +266,12 @@ class BloomFilterImpl extends BloomFilter implements
Serializable {
return filter;
}
+ public static BloomFilterImpl readFrom(byte[] bytes) throws IOException {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
+ return readFrom(bis);
+ }
+ }
+
private void writeObject(ObjectOutputStream out) throws IOException {
writeTo(out);
}
diff --git
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
index cfdc9954772..4d0ba66637b 100644
---
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
+++
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.sketch
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.ByteArrayOutputStream
import scala.reflect.ClassTag
import scala.util.Random
@@ -34,9 +34,7 @@ class BloomFilterSuite extends AnyFunSuite { //
scalastyle:ignore funsuite
filter.writeTo(out)
out.close()
- val in = new ByteArrayInputStream(out.toByteArray)
- val deserialized = BloomFilter.readFrom(in)
- in.close()
+ val deserialized = BloomFilter.readFrom(out.toByteArray)
assert(filter == deserialized)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
index b2273b6a6d1..784bea899c4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.expressions
-import java.io.ByteArrayInputStream
-
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -119,11 +117,5 @@ case class BloomFilterMightContain(
}
}
- final def deserialize(bytes: Array[Byte]): BloomFilter = {
- val in = new ByteArrayInputStream(bytes)
- val bloomFilter = BloomFilter.readFrom(in)
- in.close()
- bloomFilter
- }
-
+ final def deserialize(bytes: Array[Byte]): BloomFilter =
BloomFilter.readFrom(bytes)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
index 7cba462ce2c..424e191a0c9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
-import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import org.apache.spark.sql.catalyst.InternalRow
@@ -227,12 +226,7 @@ object BloomFilterAggregate {
out.toByteArray
}
- final def deserialize(bytes: Array[Byte]): BloomFilter = {
- val in = new ByteArrayInputStream(bytes)
- val bloomFilter = BloomFilter.readFrom(in)
- in.close()
- bloomFilter
- }
+ final def deserialize(bytes: Array[Byte]): BloomFilter =
BloomFilter.readFrom(bytes)
}
private trait BloomFilterUpdater {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]