ableegoldman commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1788533234


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +196,105 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata,
+                                             final StreamsMetricsImpl 
streamsMetrics,
+                                             final LogContext logContext) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology subTopology = 
topologyMetadata.buildSubtopology(id);
+
+                // we still check if the task's sub-topology is stateful, even 
though we know its directory contains state,
+                // because it's possible that the topology has changed since 
that data was written, and is now stateless

Review Comment:
   Kind of feel like we should log a warning if we detect this -- having a 
stateful task become stateless is not in itself a problem, but it's probably 
rare that someone literally just removed the state from a subtopology and more 
likely that they modified the topology in a way that shuffled task ids which 
can be dangerous. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -222,6 +223,39 @@ public ProcessorStateManager(final TaskId taskId,
         log.debug("Created state store manager for task {}", taskId);
     }
 
+    /**
+     * Special constructor used by {@link StateDirectory} to partially 
initialize "pending" tasks for local state, before

Review Comment:
   nit: can we make it a bit more obvious that this is a "special constructor" 
by having a static constructor method called 
#createUninitializedTaskStateManager  or #createUnassignedTaskStateManager 
(something to that effect) -- I just always worry about leaving essential 
context in comments/javadocs since they so easily end up out-of-date and 
incorrect



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -314,7 +348,7 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
     }
 
     private void maybeRegisterStoreWithChangelogReader(final String storeName) 
{
-        if (isLoggingEnabled(storeName)) {
+        if (isLoggingEnabled(storeName) && changelogReader != null) {

Review Comment:
   This sort of thing makes me a bit nervous. At the very least, I think we 
should throw an exception if any of these methods are invoked on a 
ProcessorStateManager for an unassigned task with uninitialized fields.
   
   To be honest the safest thing to do would probably be to split up the 
ProcessorStateManager and extract everything from before/after assignment into 
separate classes (or subclasses). I won't demand this if it's going to be a 
huge refactor, just give it a thought



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +196,105 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata,

Review Comment:
   nit: let's pick a term for this new task type and use it in these method 
names so the specific functionality they're intended for is clear. I noticed 
you've used "pending task" so far but while digging through the state updater 
code recently I saw that they use this term pretty extensively already to mean 
something different. Maybe "unassigned task" to match with terminology you've 
used elsewhere already? 
   
   Then name this accordingly, eg `#initializeStateForUnassignedTasks`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -222,6 +223,39 @@ public ProcessorStateManager(final TaskId taskId,
         log.debug("Created state store manager for task {}", taskId);
     }
 
+    /**
+     * Special constructor used by {@link StateDirectory} to partially 
initialize startup tasks for local state, before
+     * they're assigned to a thread. When the task is assigned to a thread, 
the initialization of this StateManager is
+     * completed in {@link #assignToStreamThread(LogContext, 
ChangelogRegister, Collection)}.
+     */

Review Comment:
   Instead of adding a new constructor like this can we use a static 
constructor named something like `#createUnassignedTaskStateManager`  to drive 
home that it won't yet be a "real"/ "full" ProcessorStateManager? 



##########
checkstyle/checkstyle.xml:
##########
@@ -133,7 +133,7 @@
 
     <module name="ClassFanOutComplexity">
       <!-- default is 20 -->
-      <property name="max" value="52"/>
+      <property name="max" value="53"/>

Review Comment:
   hm...is this bumping up the limit for the whole kafka project? While it's 
only an increase of 1, I'd rather just add an exception for the offending class 
in the `suppressions` file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to