This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e778ce689dcb [SPARK-47250][SS] Add additional validations and NERF
changes for RocksDB state provider and use of column families
e778ce689dcb is described below
commit e778ce689dcbe5e75ce5781a03cf9d8466910cd2
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Mar 12 14:27:50 2024 +0900
[SPARK-47250][SS] Add additional validations and NERF changes for RocksDB
state provider and use of column families
### What changes were proposed in this pull request?
Add additional validations and NERF changes for RocksDB state provider and
use of col families
### Why are the changes needed?
Improve error handling and migrating errors to NERF.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new unit tests
StateStoreSuite
```
===== POSSIBLE THREAD LEAK IN SUITE
o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: shuffle-boss-36-1
(daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true),
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-33-1 (daemon=true),
ForkJoinPool.commonPool-worker-2 (daemon=true) =====
[info] Run completed in 2 minutes, 57 seconds.
[info] Total number of tests run: 151
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 151, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
RocksDBSuite
```
[info] Run completed in 4 minutes, 54 seconds.
[info] Total number of tests run: 188
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 188, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45360 from anishshri-db/task/SPARK-47250.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-classes.json | 10 +-
docs/sql-error-conditions.md | 10 +-
.../state/HDFSBackedStateStoreProvider.scala | 33 ++++--
.../sql/execution/streaming/state/RocksDB.scala | 92 +++++++++++----
.../state/RocksDBStateStoreProvider.scala | 4 +-
.../streaming/state/StateStoreErrors.scala | 49 ++++----
.../execution/streaming/state/RocksDBSuite.scala | 124 +++++++++++++++++++--
.../streaming/state/StateStoreSuite.scala | 63 +++++++++++
.../streaming/state/ValueStateSuite.scala | 2 +-
9 files changed, 318 insertions(+), 69 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 3d130fdce301..99fbc585f981 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3371,9 +3371,9 @@
],
"sqlState" : "0A000"
},
- "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : {
+ "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME" : {
"message" : [
- "Failed to remove default column family with reserved
name=<colFamilyName>."
+ "Failed to perform column family operation=<operationName> with invalid
name=<colFamilyName>. Column family name cannot be empty or include
leading/trailing spaces or use the reserved keyword=default"
],
"sqlState" : "42802"
},
@@ -3396,6 +3396,12 @@
],
"sqlState" : "XXKST"
},
+ "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : {
+ "message" : [
+ "State store operation=<operationType> not supported on missing column
family=<colFamilyName>."
+ ],
+ "sqlState" : "42802"
+ },
"STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST" : {
"message" : [
"Static partition column <staticName> is also specified in the column
list."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 2cddb6a94c14..b6b159f277c0 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2105,11 +2105,11 @@ The SQL config `<sqlConf>` cannot be found. Please
verify that the config exists
Star (*) is not allowed in a select list when GROUP BY an ordinal position is
used.
-### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY
+### STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME
[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
-Failed to remove default column family with reserved name=`<colFamilyName>`.
+Failed to perform column family operation=`<operationName>` with invalid
name=`<colFamilyName>`. Column family name cannot be empty or include
leading/trailing spaces or use the reserved keyword=default
### STATE_STORE_HANDLE_NOT_INITIALIZED
@@ -2130,6 +2130,12 @@ Store does not support multiple values per key
`<operationType>` operation not supported with `<entity>`
+### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY
+
+[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+State store operation=`<operationType>` not supported on missing column
family=`<colFamilyName>`.
+
### STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST
[SQLSTATE:
42713](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 01e2e7f26083..edb95615d588 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -73,6 +73,8 @@ import org.apache.spark.util.ArrayImplicits._
*/
private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider
with Logging {
+ private val providerName = "HDFSBackedStateStoreProvider"
+
class HDFSBackedReadStateStore(val version: Long, map:
HDFSBackedStateStoreMap)
extends ReadStateStore {
@@ -124,14 +126,25 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
numColsPrefixKey: Int,
valueSchema: StructType,
useMultipleValuesPerKey: Boolean = false): Unit = {
- throw
StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider")
+ throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
+ }
+
+ // Multiple col families are not supported with
HDFSBackedStateStoreProvider. Throw an exception
+ // if the user tries to use a non-default col family.
+ private def assertUseOfDefaultColFamily(colFamilyName: String): Unit = {
+ if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+
+ throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
+ }
}
override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = {
+ assertUseOfDefaultColFamily(colFamilyName)
mapToUpdate.get(key)
}
override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String):
Unit = {
+ assertUseOfDefaultColFamily(colFamilyName)
require(value != null, "Cannot put a null value")
verify(state == UPDATING, "Cannot put after already committed or
aborted")
val keyCopy = key.copy()
@@ -141,6 +154,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
override def remove(key: UnsafeRow, colFamilyName: String): Unit = {
+ assertUseOfDefaultColFamily(colFamilyName)
verify(state == UPDATING, "Cannot remove after already committed or
aborted")
val prevValue = mapToUpdate.remove(key)
if (prevValue != null) {
@@ -179,10 +193,14 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
* Get an iterator of all the store data.
* This can be called only after committing all the updates made in the
current thread.
*/
- override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] =
mapToUpdate.iterator()
+ override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {
+ assertUseOfDefaultColFamily(colFamilyName)
+ mapToUpdate.iterator()
+ }
override def prefixScan(prefixKey: UnsafeRow, colFamilyName: String):
Iterator[UnsafeRowPair] = {
+ assertUseOfDefaultColFamily(colFamilyName)
mapToUpdate.prefixScan(prefixKey)
}
@@ -211,18 +229,17 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
override def removeColFamilyIfExists(colFamilyName: String): Unit = {
- throw StateStoreErrors.removingColumnFamiliesNotSupported(
- "HDFSBackedStateStoreProvider")
+ throw StateStoreErrors.removingColumnFamiliesNotSupported(providerName)
}
override def valuesIterator(key: UnsafeRow, colFamilyName: String):
Iterator[UnsafeRow] = {
- throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
"HDFSStateStore")
+ throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
providerName)
}
override def merge(key: UnsafeRow,
value: UnsafeRow,
colFamilyName: String): Unit = {
- throw StateStoreErrors.unsupportedOperationException("merge",
"HDFSStateStore")
+ throw StateStoreErrors.unsupportedOperationException("merge",
providerName)
}
}
@@ -280,11 +297,11 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
// TODO: add support for multiple col families with
HDFSBackedStateStoreProvider
if (useColumnFamilies) {
- throw
StateStoreErrors.multipleColumnFamiliesNotSupported("HDFSStateStoreProvider")
+ throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
}
if (useMultipleValuesPerKey) {
- throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
"HDFSStateStore")
+ throw
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey",
providerName)
}
require((keySchema.length == 0 && numColsPrefixKey == 0) ||
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 41cab78df195..4437cc5583d4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _}
import org.rocksdb.CompressionType._
import org.rocksdb.TickerType._
-import org.apache.spark.{SparkUnsupportedOperationException, TaskContext}
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -242,26 +242,75 @@ class RocksDB(
loadedVersion = endVersion
}
+ /**
+ * Function to check if the column family exists in the state store instance.
+ * @param colFamilyName - name of the column family
+ * @return - true if the column family exists, false otherwise
+ */
private def checkColFamilyExists(colFamilyName: String): Boolean = {
colFamilyNameToHandleMap.contains(colFamilyName)
}
- private def verifyColFamilyExists(colFamilyName: String): Unit = {
- if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
- throw new RuntimeException(s"Column family with name=$colFamilyName does
not exist")
+ private val multColFamiliesDisabledStr = "multiple column families disabled
in " +
+ "RocksDBStateStoreProvider"
+
+ /**
+ * Function to verify invariants for column family based operations such as
get, put, remove etc.
+ * @param operationName - name of the store operation
+ * @param colFamilyName - name of the column family
+ */
+ private def verifyColFamilyOperations(
+ operationName: String,
+ colFamilyName: String): Unit = {
+ if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+ // if the state store instance does not support multiple column
families, throw an exception
+ if (!useColumnFamilies) {
+ throw StateStoreErrors.unsupportedOperationException(operationName,
+ multColFamiliesDisabledStr)
+ }
+
+ // if the column family name is empty or contains leading/trailing
whitespaces, throw an
+ // exception
+ if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
+ throw
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
+ }
+
+ // if the column family does not exist, throw an exception
+ if (!checkColFamilyExists(colFamilyName)) {
+ throw
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+ colFamilyName)
+ }
}
}
/**
- * Create RocksDB column family, if not created already
+ * Function to verify invariants for column family creation or deletion
operations.
+ * @param operationName - name of the store operation
+ * @param colFamilyName - name of the column family
*/
- def createColFamilyIfAbsent(colFamilyName: String): Unit = {
- if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
- throw new SparkUnsupportedOperationException(
- errorClass = "_LEGACY_ERROR_TEMP_3197",
- messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+ private def verifyColFamilyCreationOrDeletion(
+ operationName: String,
+ colFamilyName: String): Unit = {
+ // if the state store instance does not support multiple column families,
throw an exception
+ if (!useColumnFamilies) {
+ throw StateStoreErrors.unsupportedOperationException(operationName,
+ multColFamiliesDisabledStr)
+ }
+
+ // if the column family name is empty or contains leading/trailing
whitespaces
+ // or using the reserved "default" column family, throw an exception
+ if (colFamilyName.isEmpty
+ || colFamilyName.trim != colFamilyName
+ || colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+ throw
StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
}
+ }
+ /**
+ * Create RocksDB column family, if not created already
+ */
+ def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+ verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName)
if (!checkColFamilyExists(colFamilyName)) {
assert(db != null)
val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes,
columnFamilyOptions)
@@ -274,10 +323,7 @@ class RocksDB(
* Remove RocksDB column family, if exists
*/
def removeColFamilyIfExists(colFamilyName: String): Unit = {
- if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
- throw StateStoreErrors.cannotRemoveDefaultColumnFamily(colFamilyName)
- }
-
+ verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
if (checkColFamilyExists(colFamilyName)) {
assert(db != null)
val handle = colFamilyNameToHandleMap(colFamilyName)
@@ -293,7 +339,7 @@ class RocksDB(
def get(
key: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte]
= {
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("get", colFamilyName)
db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
}
@@ -305,7 +351,7 @@ class RocksDB(
key: Array[Byte],
value: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("put", colFamilyName)
if (conf.trackTotalNumberOfRows) {
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName),
readOptions, key)
if (oldValue == null) {
@@ -337,10 +383,10 @@ class RocksDB(
value: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
if (!useColumnFamilies) {
- throw new RuntimeException("Merge operation uses changelog checkpointing
v2 which" +
- " requires column families to be enabled.")
+ throw StateStoreErrors.unsupportedOperationException("merge",
+ multColFamiliesDisabledStr)
}
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("merge", colFamilyName)
if (conf.trackTotalNumberOfRows) {
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName),
readOptions, key)
@@ -360,7 +406,7 @@ class RocksDB(
def remove(
key: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("remove", colFamilyName)
if (conf.trackTotalNumberOfRows) {
val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions,
key)
if (value != null) {
@@ -380,7 +426,7 @@ class RocksDB(
*/
def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
Iterator[ByteArrayPair] = {
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("iterator", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
logInfo(s"Getting iterator from version $loadedVersion")
@@ -409,7 +455,7 @@ class RocksDB(
}
private def countKeys(colFamilyName: String =
StateStore.DEFAULT_COL_FAMILY_NAME): Long = {
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("countKeys", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
try {
@@ -431,7 +477,7 @@ class RocksDB(
def prefixScan(prefix: Array[Byte], colFamilyName: String =
StateStore.DEFAULT_COL_FAMILY_NAME):
Iterator[ByteArrayPair] = {
- verifyColFamilyExists(colFamilyName)
+ verifyColFamilyOperations("prefixScan", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
iter.seek(prefix)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 7374abdbde98..721d8aa03079 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -101,14 +101,14 @@ private[sql] class RocksDBStateStoreProvider
override def merge(key: UnsafeRow, value: UnsafeRow,
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
- verify(state == UPDATING, "Cannot put after already committed or
aborted")
+ verify(state == UPDATING, "Cannot merge after already committed or
aborted")
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
val keyEncoder = kvEncoder._1
val valueEncoder = kvEncoder._2
verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation
requires an encoder" +
" which supports multiple values for a single key")
verify(key != null, "Key cannot be null")
- require(value != null, "Cannot put a null value")
+ require(value != null, "Cannot merge a null value")
rocksDB.merge(keyEncoder.encodeKey(key),
valueEncoder.encodeValue(value), colFamilyName)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 6f4c3d4c9675..8a0276557f8f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -32,25 +32,30 @@ object StateStoreErrors {
)
}
+ def unsupportedOperationOnMissingColumnFamily(operationName: String,
colFamilyName: String):
+ StateStoreUnsupportedOperationOnMissingColumnFamily = {
+ new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName,
colFamilyName)
+ }
+
def multipleColumnFamiliesNotSupported(stateStoreProvider: String):
StateStoreMultipleColumnFamiliesNotSupportedException = {
- new
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
- }
+ new
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
+ }
def removingColumnFamiliesNotSupported(stateStoreProvider: String):
StateStoreRemovingColumnFamiliesNotSupportedException = {
- new
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
- }
+ new
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
+ }
- def cannotRemoveDefaultColumnFamily(colFamilyName: String):
- StateStoreCannotRemoveDefaultColumnFamily = {
- new StateStoreCannotRemoveDefaultColumnFamily(colFamilyName)
- }
+ def cannotUseColumnFamilyWithInvalidName(operationName: String,
colFamilyName: String):
+ StateStoreCannotUseColumnFamilyWithInvalidName = {
+ new StateStoreCannotUseColumnFamilyWithInvalidName(operationName,
colFamilyName)
+ }
def unsupportedOperationException(operationName: String, entity: String):
StateStoreUnsupportedOperationException = {
- new StateStoreUnsupportedOperationException(operationName, entity)
- }
+ new StateStoreUnsupportedOperationException(operationName, entity)
+ }
def requireNonNullStateValue(value: Any, stateName: String): Unit = {
SparkException.require(value != null,
@@ -68,23 +73,25 @@ object StateStoreErrors {
class
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider:
String)
extends SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
- messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
- )
+ messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
+
class
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider:
String)
extends SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
- messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
- )
+ messageParameters = Map("stateStoreProvider" -> stateStoreProvider))
-class StateStoreCannotRemoveDefaultColumnFamily(colFamilyName: String)
+class StateStoreCannotUseColumnFamilyWithInvalidName(operationName: String,
colFamilyName: String)
extends SparkUnsupportedOperationException(
- errorClass = "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY",
- messageParameters = Map("colFamilyName" -> colFamilyName)
- )
-
+ errorClass = "STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
+ messageParameters = Map("operationName" -> operationName, "colFamilyName"
-> colFamilyName))
class StateStoreUnsupportedOperationException(operationType: String, entity:
String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
- messageParameters = Map("operationType" -> operationType, "entity" ->
entity)
- )
+ messageParameters = Map("operationType" -> operationType, "entity" ->
entity))
+
+class StateStoreUnsupportedOperationOnMissingColumnFamily(
+ operationType: String,
+ colFamilyName: String) extends SparkUnsupportedOperationException(
+ errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
+ messageParameters = Map("operationType" -> operationType, "colFamilyName" ->
colFamilyName))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 0bc3828318da..a7d4ab362340 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -536,6 +536,110 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+
+ val conf = RocksDBConf().copy()
+ withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) {
db =>
+ Seq("default", "", " ", " ", " default", " default ").foreach {
colFamilyName =>
+ val ex = intercept[SparkUnsupportedOperationException] {
+ db.createColFamilyIfAbsent(colFamilyName)
+ }
+
+ if (!colFamiliesEnabled) {
+ checkError(
+ ex,
+ errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+ parameters = Map(
+ "operationType" -> "create_col_family",
+ "entity" -> "multiple column families disabled in
RocksDBStateStoreProvider"
+ ),
+ matchPVals = true
+ )
+ } else {
+ checkError(
+ ex,
+ errorClass =
"STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME",
+ parameters = Map(
+ "operationName" -> "create_col_family",
+ "colFamilyName" -> colFamilyName
+ ),
+ matchPVals = true
+ )
+ }
+ }
+ }
+ }
+
+ private def verifyStoreOperationUnsupported(
+ operationName: String,
+ colFamiliesEnabled: Boolean,
+ colFamilyName: String)
+ (testFn: => Unit): Unit = {
+ val ex = intercept[SparkUnsupportedOperationException] {
+ testFn
+ }
+
+ if (!colFamiliesEnabled) {
+ checkError(
+ ex,
+ errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+ parameters = Map(
+ "operationType" -> operationName,
+ "entity" -> "multiple column families disabled in
RocksDBStateStoreProvider"
+ ),
+ matchPVals = true
+ )
+ } else {
+ checkError(
+ ex,
+ errorClass =
"STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
+ parameters = Map(
+ "operationType" -> operationName,
+ "colFamilyName" -> colFamilyName
+ ),
+ matchPVals = true
+ )
+ }
+ }
+
+ testWithColumnFamilies(s"RocksDB: operations on absent column family",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+
+ val conf = RocksDBConf().copy()
+ withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) {
db =>
+ db.load(0)
+ val colFamilyName = "test"
+ verifyStoreOperationUnsupported("put", colFamiliesEnabled,
colFamilyName) {
+ db.put("a", "1", colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("remove", colFamiliesEnabled,
colFamilyName) {
+ db.remove("a", colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("get", colFamiliesEnabled,
colFamilyName) {
+ db.get("a", colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("iterator", colFamiliesEnabled,
colFamilyName) {
+ db.iterator(colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("merge", colFamiliesEnabled,
colFamilyName) {
+ db.merge("a", "1", colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("prefixScan", colFamiliesEnabled,
colFamilyName) {
+ db.prefixScan("a", colFamilyName)
+ }
+ }
+ }
+
testWithColumnFamilies(s"RocksDB: get, put, iterator, commit, load " +
s"with multiple column families",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
@@ -545,13 +649,6 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
val colFamily2: String = "xyz"
val conf = RocksDBConf().copy()
- withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
- val ex = intercept[Exception] {
- db.createColFamilyIfAbsent("default")
- }
- ex.getCause.isInstanceOf[UnsupportedOperationException]
- }
-
withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
db.createColFamilyIfAbsent(colFamily1)
db.createColFamilyIfAbsent(colFamily2)
@@ -572,7 +669,7 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
withDB(remoteDir, conf = conf, version = 0, useColumnFamilies = true) { db
=>
- val ex = intercept[Exception] {
+ val ex = intercept[SparkUnsupportedOperationException] {
// version 0 can be loaded again
assert(toStr(db.get("a", colFamily1)) === null)
assert(iterator(db, colFamily1).isEmpty)
@@ -581,8 +678,15 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
assert(toStr(db.get("a", colFamily2)) === null)
assert(iterator(db, colFamily2).isEmpty)
}
- assert(ex.isInstanceOf[RuntimeException])
- assert(ex.getMessage.contains("does not exist"))
+ checkError(
+ ex,
+ errorClass =
"STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY",
+ parameters = Map(
+ "operationType" -> "get",
+ "colFamilyName" -> colFamily1
+ ),
+ matchPVals = true
+ )
}
withDB(remoteDir, conf = conf, version = 1, useColumnFamilies = true) { db
=>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index a8c7fc05f21e..64b3e75ea976 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -134,6 +134,69 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
+ private def verifyStoreOperationUnsupported(operationName: String)(testFn:
=> Unit): Unit = {
+ if (operationName != "merge") {
+ val ex = intercept[SparkUnsupportedOperationException] {
+ testFn
+ }
+ checkError(
+ ex,
+ errorClass =
"UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
+ parameters = Map(
+ "stateStoreProvider" -> "HDFSBackedStateStoreProvider"
+ ),
+ matchPVals = true
+ )
+ } else {
+ val ex = intercept[SparkUnsupportedOperationException] {
+ testFn
+ }
+ checkError(
+ ex,
+ errorClass = "STATE_STORE_UNSUPPORTED_OPERATION",
+ parameters = Map(
+ "operationType" -> operationName,
+ "entity" -> "HDFSBackedStateStoreProvider"
+ ),
+ matchPVals = true
+ )
+
+ }
+ }
+
+ test("get, put, remove etc operations on non-default col family should
fail") {
+ tryWithProviderResource(newStoreProvider(opId = Random.nextInt(),
partition = 0,
+ minDeltasForSnapshot = 5)) { provider =>
+ val store = provider.getStore(0)
+ val keyRow = dataToKeyRow("a", 0)
+ val valueRow = dataToValueRow(1)
+ val colFamilyName = "test"
+ verifyStoreOperationUnsupported("put") {
+ store.put(keyRow, valueRow, colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("remove") {
+ store.remove(keyRow, colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("get") {
+ store.get(keyRow, colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("merge") {
+ store.merge(keyRow, valueRow, colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("iterator") {
+ store.iterator(colFamilyName)
+ }
+
+ verifyStoreOperationUnsupported("prefixScan") {
+ store.prefixScan(keyRow, colFamilyName)
+ }
+ }
+ }
+
test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to
1") {
tryWithProviderResource(newStoreProvider(opId = Random.nextInt(),
partition = 0,
numOfVersToRetainInMemory = 1)) { provider =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index 71462cb4b643..40e31239895c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -217,7 +217,7 @@ class ValueStateSuite extends SharedSparkSession
ex,
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
parameters = Map(
- "stateStoreProvider" -> "HDFSStateStoreProvider"
+ "stateStoreProvider" -> "HDFSBackedStateStoreProvider"
),
matchPVals = true
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]