eduwercamacaro commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2873396543
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -667,6 +636,21 @@ private boolean isOverflowing(final long value) {
return value < 0;
}
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ try {
+ return cfAccessor.getCommitedOffset(dbAccessor, partition);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while getting committed
offset for partition " + partition, e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public boolean managesOffsets() {
+ return true;
Review Comment:
Got it! I think I was confused with KIP-892. Thanks!
To solve this problem, I was thinking that we could persist the StateStore
`open` attribute in the Offset Column family or in a separate CF (but a new CF
seems like unnecessary overhead).
The idea is to persist `open=true` immediately after the store is
initialized and similarly `open=false` after a `clean` task shutdown, so,
during initialization, we can identify when a store closed clean.
If the initialization (`RocksDbStore#init`) detects that the store is
already in `open=true` we can throw an exception and catch it in the PSM, so it
can wipe the StateStore out because it is corrupted.
This is a similar approach to the `DIRTY` concept that you introduced early
in this thread, but the difference is the semantics. I'm fine with any of them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]