This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a5f019554991 [SPARK-45415] Allow selective disabling of "fallocate" in
RocksDB statestore
a5f019554991 is described below
commit a5f01955499141c53c619ddf81d6846a72ad789a
Author: Scott Schenkein <[email protected]>
AuthorDate: Thu Oct 12 08:44:13 2023 +0900
[SPARK-45415] Allow selective disabling of "fallocate" in RocksDB statestore
### What changes were proposed in this pull request?
Our spark environment features a number of parallel structured streaming
jobs, many of which have use state store. Most use state store for
dropDuplicates and work with a tiny amount of information, but a few have a
substantially large state store requiring use of RocksDB. In such a
configuration, spark allocates a minimum of `spark.sql.shuffle.partitions *
queryCount` partitions, each of which pre-allocate about 74mb (observed on
EMR/Hadoop) disk storage for RocksDB. This allocati [...]
This PR provides users with the option to simply disable fallocate so
RocksDB uses far less space for the smaller state stores, reducing complexity
and disk storage at the expense of performance.
### Why are the changes needed?
As previously mentioned, these changes allow a spark context to support
many parallel structured streaming jobs when using RocksDB state stores without
the need to allocate a glut of excess storage.
### Does this PR introduce _any_ user-facing change?
Users disable the fallocate rocksdb performance optimization by configuring
`spark.sql.streaming.stateStore.rocksdb.allowFAllocate=false`
### How was this patch tested?
1) A few test cases were added
2) The state store size was validated by running this script with and
without fallocate disabled
```
from pyspark.sql.types import StructType, StructField, StringType,
TimestampType
import datetime
if disable_fallocate:
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.allowFAllocate",
"false")
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)
schema = StructType(
[
StructField("one", TimestampType(), False),
StructField("two", StringType(), True),
]
)
now = datetime.datetime.now()
data = [(now, y) for y in range(300)]
init_df = spark.createDataFrame(data, schema)
path = "/tmp/stream_try/test"
init_df.write.format("parquet").mode("append").save(path)
stream_df = spark.readStream.schema(schema).format("parquet").load(path)
stream_df = stream_df.dropDuplicates(["one"])
def foreach_batch_function(batch_df, epoch_id):
batch_df.write.format("parquet").mode("append").option("path", path +
"_out").save()
stream_df.writeStream.foreachBatch(foreach_batch_function).option(
"checkpointLocation", path + "_checkpoint"
).start()
```
With these results (local run, docker container with small FS)
```
allowFAllocate=True (current default)
---------------------
root0ef384f699e0:/tmp# du -sh spark-d43a2964-c92a-4d94-9fdd-f3557a651fd9
808M spark-d43a2964-c92a-4d94-9fdd-f3557a651fd9
|
|-->4.1M
StateStoreId(opId=0,partId=0,name=default)-d59b907c-8004-47f9-a8a1-dec131f73505
|--> <snip>
|-->4.1M
StateStoreId(opId=0,partId=199,name=default)-b49a93fe-1007-4e92-8f8f-5767aef41e5c
allowFAllocate=False (new feature)
----------------------
root0ef384f699e0:/tmp# du -sh spark-00cb768d-2659-453c-8670-4aaf70148041
7.9M spark-00cb768d-2659-453c-8670-4aaf70148041
|
|-->40K StateStoreId(opId=0,partId=0,name=default)-45b38d9c-737b-49b1-bb82-
|--> <snip>
|-->40K
StateStoreId(opId=0,partId=199,name=default)-28a6cc02-2693-4360-b47a-1f1ab0d54a61
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43202 from schenksj/feature/rocksdb_allow_fallocate.
Authored-by: Scott Schenkein <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
docs/structured-streaming-programming-guide.md | 5 +++++
.../spark/sql/execution/streaming/state/RocksDB.scala | 15 +++++++++++++--
.../streaming/state/RocksDBStateStoreSuite.scala | 2 ++
.../sql/execution/streaming/state/RocksDBSuite.scala | 6 ++++++
4 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/docs/structured-streaming-programming-guide.md
b/docs/structured-streaming-programming-guide.md
index 774422a9cd9d..9fb823abaa3a 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2385,6 +2385,11 @@ Here are the configs regarding to RocksDB instance of
the state store provider:
<td>Total memory to be occupied by blocks in high priority pool as a
fraction of memory allocated across all RocksDB instances on a single node
using maxMemoryUsageMB.</td>
<td>0.1</td>
</tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.allowFAllocate</td>
+ <td>Allow the rocksdb runtime to use fallocate to pre-allocate disk space
for logs, etc... Disable for apps that have many smaller state stores to trade
off disk space for write performance.</td>
+ <td>true</td>
+ </tr>
</table>
##### RocksDB State Store Memory Management
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index a2868df94117..60249550c4ed 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -109,6 +109,7 @@ class RocksDB(
dbOptions.setCreateIfMissing(true)
dbOptions.setTableFormatConfig(tableFormatConfig)
dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
+ dbOptions.setAllowFAllocate(conf.allowFAllocate)
if (conf.boundedMemoryUsage) {
dbOptions.setWriteBufferManager(writeBufferManager)
@@ -674,7 +675,8 @@ case class RocksDBConf(
totalMemoryUsageMB: Long,
writeBufferCacheRatio: Double,
highPriorityPoolRatio: Double,
- compressionCodec: String)
+ compressionCodec: String,
+ allowFAllocate: Boolean)
object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -757,6 +759,14 @@ object RocksDBConf {
private val HIGH_PRIORITY_POOL_RATIO_CONF =
SQLConfEntry(HIGH_PRIORITY_POOL_RATIO_CONF_KEY,
"0.1")
+ // Allow files to be pre-allocated on disk using fallocate
+ // Disabling may slow writes, but can solve an issue where
+ // significant quantities of disk are wasted if there are
+ // many smaller concurrent state-stores running with the
+ // spark context
+ val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate"
+ private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY,
"true")
+
def apply(storeConf: StateStoreConf): RocksDBConf = {
val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -834,7 +844,8 @@ object RocksDBConf {
getLongConf(MAX_MEMORY_USAGE_MB_CONF),
getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
- storeConf.compressionCodec)
+ storeConf.compressionCodec,
+ getBooleanConf(ALLOW_FALLOCATE_CONF))
}
def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index d1cc7e0b3b9c..82f677a98162 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -86,6 +86,7 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber",
"3"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB",
"16"),
+ (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate",
"false"),
(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
)
testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -115,6 +116,7 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
assert(rocksDBConfInTask.maxOpenFiles == 1000)
assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
+ assert(rocksDBConfInTask.allowFAllocate == false)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 764358dc1f09..b5e1eccba339 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -1040,6 +1040,12 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("Verify that fallocate is allowed by default") {
+ val sqlConf = new SQLConf
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ assert(dbConf.allowFAllocate == true)
+ }
+
/** RocksDB memory management tests for bounded memory usage */
test("Memory mgmt - invalid config") {
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]