nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2896796055


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -59,14 +60,14 @@ public final void commit(final RocksDBStore.DBAccessor 
accessor, final Map<Topic
     }
 
     @Override
-    public void open(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+    public final void open(final RocksDBStore.DBAccessor accessor, final 
boolean ignoreInvalidState) throws RocksDBException {
         final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
-        if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+        if (ignoreInvalidState || (valueBytes == null || 
Arrays.equals(valueBytes, closedState))) {
             // If the status key is not present, we initialize it to "OPEN"
             accessor.put(offsetColumnFamilyHandle, statusKey, openState);
             open = true;
         } else {
-            throw new RocksDBException("Invalid state");
+            throw new StreamsException("Invalid state during store open. 
Expected state to be either empty or closed");

Review Comment:
   We should probably throw a `ProcessorStateException` here, instead of 
`StreamsException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to