nicktelford commented on code in PR #21554:
URL: https://github.com/apache/kafka/pull/21554#discussion_r2871190828


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java:
##########
@@ -0,0 +1,218 @@
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
+
+public class LegacyCheckpointingStateStore<S extends StateStore, K, V> extends 
WrappedStateStore<S, K, V> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LegacyCheckpointingStateStore.class);
+
+    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
+
+    private final boolean eosEnabled;
+    private final Set<TopicPartition> changelogPartitions;
+    private final StateDirectory stateDirectory;
+    private final TaskId taskId;
+    private final OffsetCheckpoint checkpointFile;
+    private final String logPrefix;
+
+    private final Map<TopicPartition, Long> offsets = new HashMap<>();
+    private Map<TopicPartition, Long> checkpointedOffsets;
+
+    /**
+     * Wraps the given {@link StateStore} as a {@code 
LegacyCheckpointingStateStore}, only if it is both
+     * {@link StateStore#persistent() persistent}, and it does not {@link 
StateStore#managesOffsets() manage its own offsets}.
+     */
+    @SuppressWarnings("deprecation")
+    public static <S extends StateStore, K, V> StateStore maybeWrapStore(final 
S wrapped,
+                                                                         final 
boolean eosEnabled,
+                                                                         final 
Set<TopicPartition> changelogPartitions,
+                                                                         final 
StateDirectory stateDirectory,
+                                                                         final 
TaskId taskId,
+                                                                         final 
String logPrefix) {
+        return wrapped.persistent() && !wrapped.managesOffsets()
+                ? new LegacyCheckpointingStateStore<>(wrapped, eosEnabled, 
changelogPartitions, stateDirectory, taskId, logPrefix)
+                : wrapped;
+    }
+
+    /**
+     * Unwraps the given store, only if it is a {@code 
LegacyCheckpointingStateStore}.
+     */
+    public static StateStore maybeUnwrapStore(final StateStore store) {
+        return (store instanceof LegacyCheckpointingStateStore<?, ?, ?>)
+                ? ((LegacyCheckpointingStateStore<?, ?, ?>) store).wrapped()
+                : store;
+    }
+
+    /**
+     * Runs post-initialization for {@code LegacyCheckpointingStore}, only if 
the {@code store} is one.
+     *
+     * This must be run after <em>ALL</em> stores have been initialized, as 
it's possible it may delete a shared
+     * checkpoint file, which is needed during initialization.
+     */
+    public static void maybeCleanupCheckpointFile(final Iterable<StateStore> 
stores) {
+        for (final StateStore store : stores) {
+            if (store instanceof LegacyCheckpointingStateStore) {
+                final LegacyCheckpointingStateStore<?, ?, ?> wrappedStore = 
((LegacyCheckpointingStateStore<?, ?, ?>) store);
+                try {
+                    if (wrappedStore.eosEnabled) {
+                        wrappedStore.checkpointFile.delete();
+                    }
+                } catch (final IOException e) {
+                    throw new ProcessorStateException(String.format("%sError 
deleting checkpoint file when creating StateStore '%s'", 
wrappedStore.logPrefix, store.name()), e);
+                }
+            }
+        }
+    }
+
+    LegacyCheckpointingStateStore(final S wrapped,
+                                  final boolean eosEnabled,
+                                  final Set<TopicPartition> 
changelogPartitions,
+                                  final StateDirectory stateDirectory,
+                                  final TaskId taskId,
+                                  final String logPrefix) {
+        super(wrapped);
+        this.eosEnabled = eosEnabled;
+        this.changelogPartitions = changelogPartitions;
+        this.stateDirectory = stateDirectory;
+        this.taskId = taskId;
+        this.checkpointFile = new OffsetCheckpoint(checkpointFileFor(taskId));
+        this.logPrefix = logPrefix;
+
+        // fail-crash; in this case we would not need to immediately close the 
state store before throwing
+        if (CHECKPOINT_FILE_NAME.equals(wrapped.name())) {
+            wrapped.close();
+            throw new IllegalArgumentException(String.format("%sIllegal store 
name: %s, which collides with the pre-defined " +
+                    "checkpoint file name", logPrefix, wrapped.name()));
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        // load store offsets from checkpoint file
+        try {
+            final Map<TopicPartition, Long> allOffsets = checkpointFile.read();
+            for (final Map.Entry<TopicPartition, Long> entry : 
allOffsets.entrySet()) {
+                if (changelogPartitions.contains(entry.getKey())) {
+                    offsets.put(entry.getKey(), 
changelogOffsetFromCheckpointedOffset(entry.getValue()));
+                }
+            }
+            checkpointedOffsets = new HashMap<>(offsets);
+        } catch (final IOException e) {
+            throw new ProcessorStateException(String.format("%sError loading 
checkpoint file when creating StateStore '%s'", logPrefix, name()), e);
+        }
+
+        // initialize the actual store
+        super.init(stateStoreContext, root);
+    }
+
+    @Override
+    @Deprecated
+    public boolean managesOffsets() {
+        return true;

Review Comment:
   @bbejeck The idea is that this implementation is used to take any 
"non-managing" StateStore (i.e. one where `managesOffsets() == false`, and turn 
it into a "managing" StateStore (i.e. one where `managesOffsets() == true`).
   
   It does this by handling offset management via `.checkpoint` files. 
Consequently, since any store wrapped in this implementation knows how to 
manage its offsets, it's semantically correct for `managesOffsets()` to return 
`true`.
   
   The `managesOffsets()` method is only ever called by the `StateManager`s to 
determine whether to wrap the implementation in a 
`LegacyCheckpointingStateStore` anyway, so returning `true` here functions 
mostly as a safety mechanism, to prevent stores from being double-wrapped 
(although that's probably impossible anyway).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to