This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c31f5a807e7 [SPARK-50443][SS] Fixing Maven build errors introduced by
Guava cache in RocksDBStateStoreProvider
0c31f5a807e7 is described below
commit 0c31f5a807e7aa01cd46424d52441f514e491943
Author: Eric Marnadi <[email protected]>
AuthorDate: Thu Nov 28 12:48:24 2024 +0900
[SPARK-50443][SS] Fixing Maven build errors introduced by Guava cache in
RocksDBStateStoreProvider
### What changes were proposed in this pull request?
There are maven errors introduced by the guava dependency in `sql/core`, as
we use the Guava cache to store the Avro encoders, outlined in this comment:
https://github.com/apache/spark/pull/48401#issuecomment-2504353098
Introduced a new constructor for the NonFateSharingCache and used this with
the RocksDBStateStoreProvider.
### Why are the changes needed?
To resolve maven build errors, so that the Avro change here:
https://github.com/apache/spark/pull/48401 does not get reverted.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests are sufficient and maven build works on devbox
```
[INFO] Tests run: 47, Failures: 0, Errors: 0, Skipped: 0, Time elapsed:
17.64 s -- in test.org.apache.spark.sql.JavaDatasetSuite
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 47, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO]
[INFO] --- surefire:3.2.5:test (test) spark-sql_2.13 ---
[INFO] Skipping execution of surefire because it has already been run for
this configuration
[INFO]
[INFO] --- scalatest:2.2.0:test (test) spark-sql_2.13 ---
[INFO] ScalaTest report directory:
/home/eric.marnadi/spark/sql/core/target/surefire-reports
WARNING: Using incubator modules: jdk.incubator.vector
Discovery starting.
Discovery completed in 2 seconds, 737 milliseconds.
Run starting. Expected test count is: 0
DiscoverySuite:
Run completed in 2 seconds, 765 milliseconds.
Total number of tests run: 0
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
No tests were executed.
[INFO]
------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO]
------------------------------------------------------------------------
[INFO] Total time: 03:15 min
[INFO] Finished at: 2024-11-28T01:10:36Z
[INFO]
------------------------------------------------------------------------
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48996 from ericm-db/chm.
Authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/util/NonFateSharingCache.scala | 16 +++++++++++++++-
.../streaming/state/RocksDBStateStoreProvider.scala | 17 ++++++++---------
2 files changed, 23 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
b/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
index 21184d70b386..7d01facc1e42 100644
--- a/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
+++ b/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util
-import java.util.concurrent.Callable
+import java.util.concurrent.{Callable, TimeUnit}
import com.google.common.cache.{Cache, CacheBuilder, CacheLoader, LoadingCache}
@@ -68,6 +68,20 @@ private[spark] object NonFateSharingCache {
override def load(k: K): V = loadingFunc.apply(k)
}))
}
+
+ def apply[K, V](
+ maximumSize: Long,
+ expireAfterAccessTime: Long,
+ expireAfterAccessTimeUnit: TimeUnit): NonFateSharingCache[K, V] = {
+ val builder = CacheBuilder.newBuilder().asInstanceOf[CacheBuilder[K, V]]
+ if (maximumSize > 0L) {
+ builder.maximumSize(maximumSize)
+ }
+ if(expireAfterAccessTime > 0) {
+ builder.expireAfterAccess(expireAfterAccessTime,
expireAfterAccessTimeUnit)
+ }
+ new NonFateSharingCache(builder.build[K, V]())
+ }
}
private[spark] class NonFateSharingCache[K, V](protected val cache: Cache[K,
V]) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index e5a4175aeec1..c9c987fa1620 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.util.control.NonFatal
-import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -613,15 +612,15 @@ object RocksDBStateStoreProvider {
val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2
private val MAX_AVRO_ENCODERS_IN_CACHE = 1000
- // Add the cache at companion object level so it persists across provider
instances
- private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] = {
- val guavaCache = CacheBuilder.newBuilder()
- .maximumSize(MAX_AVRO_ENCODERS_IN_CACHE) // Adjust size based on your
needs
- .expireAfterAccess(1, TimeUnit.HOURS) // Optional: Add expiration if
needed
- .build[String, AvroEncoder]()
+ private val AVRO_ENCODER_LIFETIME_HOURS = 1L
- new NonFateSharingCache(guavaCache)
- }
+ // Add the cache at companion object level so it persists across provider
instances
+ private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] =
+ NonFateSharingCache(
+ maximumSize = MAX_AVRO_ENCODERS_IN_CACHE,
+ expireAfterAccessTime = AVRO_ENCODER_LIFETIME_HOURS,
+ expireAfterAccessTimeUnit = TimeUnit.HOURS
+ )
def getAvroEnc(
stateStoreEncoding: String,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]