nicktelford commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2879506037
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor
accessor, final Map<Topic
}
@Override
- public void close() {
+ public void open(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
+ final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle,
statusKey);
+ if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+ // If the status key is not present, we initialize it to "OPEN"
+ accessor.put(offsetColumnFamilyHandle, statusKey, openState);
+ // Store the new status on disk
+ accessor.flush(offsetColumnFamilyHandle);
+ open.set(true);
+ } else {
+ throw new RocksDBException("Invalid state");
+ }
+ }
+
+ @Override
+ public void close(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
+ accessor.put(offsetColumnFamilyHandle, statusKey, closedState);
+ accessor.flush(offsetColumnFamilyHandle);
Review Comment:
Again, is this `flush` necessary? Doesn't Rocks guarantee that it will flush
any unwritten data to disk on-close?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor
accessor, final Map<Topic
}
@Override
- public void close() {
+ public void open(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
+ final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle,
statusKey);
+ if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+ // If the status key is not present, we initialize it to "OPEN"
+ accessor.put(offsetColumnFamilyHandle, statusKey, openState);
+ // Store the new status on disk
+ accessor.flush(offsetColumnFamilyHandle);
Review Comment:
I don't think it's necessary to explicitly `flush` here. Since the status
flag is written to the offsets CF, we can guarantee that if the status flag
hasn't been flushed to disk, then no offsets written to the CF after the store
was opened were flushed either. Since we also have Atomic Flush, we can
guarantee that no _records_ have been flushed if the status flag has also not
been flushed.
Conversely, Atomic Flush guarantees us that if records/offsets, written
since the store was opened, have been flushed to disk, for any reason, then the
status flag has also been flushed.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor
accessor, final Map<Topic
}
@Override
- public void close() {
+ public void open(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
+ final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle,
statusKey);
+ if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+ // If the status key is not present, we initialize it to "OPEN"
+ accessor.put(offsetColumnFamilyHandle, statusKey, openState);
+ // Store the new status on disk
+ accessor.flush(offsetColumnFamilyHandle);
+ open.set(true);
+ } else {
+ throw new RocksDBException("Invalid state");
Review Comment:
Can we be a bit more descriptive with this error? Perhaps indicate that this
store was "dirty" and, under EOS, needs to be wiped.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -35,6 +37,10 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
private final ColumnFamilyHandle offsetColumnFamilyHandle;
private final StringSerializer stringSerializer = new StringSerializer();
private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
+ private final byte[] statusKey = stringSerializer.serialize(null,
"status");
+ private final byte[] openState = longSerde.serializer().serialize(null,
1L);
+ private final byte[] closedState = longSerde.serializer().serialize(null,
0L);
+ private final AtomicBoolean open = new AtomicBoolean(false);
Review Comment:
This was previously a `volatile boolean`, is it necessary to use an
`AtomicBoolean` instead?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java:
##########
@@ -356,6 +356,7 @@ public
RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig() {}
public void setConfig(final String storeName, final Options options,
final Map<String, Object> configs) {
options.setTableFormatConfig(new PlainTableConfig());
+ options.useFixedLengthPrefixExtractor(1);
Review Comment:
What does this do? 🤔
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -54,8 +60,30 @@ public final void commit(final RocksDBStore.DBAccessor
accessor, final Map<Topic
}
@Override
- public void close() {
+ public void open(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
+ final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle,
statusKey);
+ if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
Review Comment:
IIRC, we only care about the "status" flag under EOS. Under ALOS, even if
the store was previously opened, we want to use the committed offsets as our
starting point and restore forwards from there.
One wrinkle here is that `RocksDBStore` is not _currently_ aware of the
processing mode, but it can be determined via `stateStoreContext.appConfigs()`.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -245,7 +245,11 @@ void openDB(final Map<String, Object> configs, final File
stateDir) {
setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
- open = true;
+ try {
+ cfAccessor.open(dbAccessor);
+ } catch (final Throwable fatal) {
+ throw new ProcessorStateException(fatal);
Review Comment:
Should we perhaps provide an error message with a bit more context here,
like the `name()` of this StateStore?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]