nicktelford commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1818845973


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -222,6 +223,39 @@ public ProcessorStateManager(final TaskId taskId,
         log.debug("Created state store manager for task {}", taskId);
     }
 
+    /**
+     * Special constructor used by {@link StateDirectory} to partially 
initialize startup tasks for local state, before
+     * they're assigned to a thread. When the task is assigned to a thread, 
the initialization of this StateManager is
+     * completed in {@link #assignToStreamThread(LogContext, 
ChangelogRegister, Collection)}.
+     */

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +196,105 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to