nicktelford commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1732603116


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {

Review Comment:
   If the topology has been changed since the on-disk state was written, it's 
possible that we may have some stale state on-disk for a now stateless Task. 
This can happen, for example, when sub-topologies get re-ordered, such that the 
on-disk state is now in the wrong directory.
   
   This therefore guards against us creating unnecessary Tasks for stateless 
Tasks just because we found some stale state on-disk that the Task doesn't 
actually need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to