mjsax commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1730446985


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {

Review Comment:
   Why do we need this additional check? -- Given that we have a non-empty task 
directory, it seems redundant? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {

Review Comment:
   Should this be called `hasInitializedTasks()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {
+        return !tasksForLocalState.isEmpty();
+    }
+
+    public Task assignInitialTask(final TaskId taskId) {
+        final Task task = tasksForLocalState.remove(taskId);
+        if (task != null) {
+            lockedTasksToOwner.replace(taskId, Thread.currentThread());
+        }
+        return task;
+    }
+
+    public void closeInitialTasksIfLastAssginedThread() {
+        if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >= 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
+            // we need to be careful here, because other StreamThreads may 
still be assigning tasks (via assignInitialTask)
+            // so we first "drain" our Map of all remaining Tasks, and then 
close all the Tasks we successfully claimed from the Map
+            final Set<Task> tasksToClose = drainInitialTasks();
+            for (final Task task : tasksToClose) {
+                task.closeClean();
+            }
+        }
+    }
+
+    private void closeRemainingInitialTasks() {
+        closeRemainingInitialTasks(t -> true);
+    }
+
+    private void closeRemainingInitialTasks(final Predicate<Task> predicate) {
+        final Set<Task> drainedTasks = drainInitialTasks(predicate);
+        for (final Task task : drainedTasks) {
+            task.closeClean();
+        }
+    }
+
+    private Set<Task> drainInitialTasks() {
+        return drainInitialTasks(t -> true);
+    }
+
+    private Set<Task> drainInitialTasks(final Predicate<Task> predicate) {
+        final Set<Task> drainedTasks = new 
HashSet<>(tasksForLocalState.size());
+        for (final Map.Entry<TaskId, Task> entry : 
tasksForLocalState.entrySet()) {
+            if (predicate.test(entry.getValue()) && 
tasksForLocalState.remove(entry.getKey()) != null) {

Review Comment:
   How could it be `null` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -454,6 +479,24 @@ private void handleTasksWithoutStateUpdater(final 
Map<TaskId, Set<TopicPartition
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);

Review Comment:
   Should we pass in `standbyTasksToCreate` instead of `activeTasksToCreate`? 
(If yes, seems we would need better testing to cover this case?)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {
+        return !tasksForLocalState.isEmpty();
+    }
+
+    public Task assignInitialTask(final TaskId taskId) {
+        final Task task = tasksForLocalState.remove(taskId);
+        if (task != null) {
+            lockedTasksToOwner.replace(taskId, Thread.currentThread());
+        }
+        return task;
+    }
+
+    public void closeInitialTasksIfLastAssginedThread() {

Review Comment:
   `closeInitial[ized]TasksIfLastAssginedThread`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState

Review Comment:
   Comment does not provide any value, it just describe what it obvious from 
the next line itself. Let's remove the comment.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {
+        return !tasksForLocalState.isEmpty();
+    }
+
+    public Task assignInitialTask(final TaskId taskId) {

Review Comment:
   Should this be called `assignInitializedTask`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one

Review Comment:
   Not sure if this comment provides much value?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {
+        return !tasksForLocalState.isEmpty();
+    }
+
+    public Task assignInitialTask(final TaskId taskId) {
+        final Task task = tasksForLocalState.remove(taskId);
+        if (task != null) {
+            lockedTasksToOwner.replace(taskId, Thread.currentThread());
+        }
+        return task;
+    }
+
+    public void closeInitialTasksIfLastAssginedThread() {
+        if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >= 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
+            // we need to be careful here, because other StreamThreads may 
still be assigning tasks (via assignInitialTask)
+            // so we first "drain" our Map of all remaining Tasks, and then 
close all the Tasks we successfully claimed from the Map
+            final Set<Task> tasksToClose = drainInitialTasks();
+            for (final Task task : tasksToClose) {
+                task.closeClean();
+            }
+        }
+    }
+
+    private void closeRemainingInitialTasks() {
+        closeRemainingInitialTasks(t -> true);
+    }
+
+    private void closeRemainingInitialTasks(final Predicate<Task> predicate) {
+        final Set<Task> drainedTasks = drainInitialTasks(predicate);
+        for (final Task task : drainedTasks) {
+            task.closeClean();
+        }
+    }
+
+    private Set<Task> drainInitialTasks() {

Review Comment:
   `drainInitial[ized]Tasks`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {
+        return !tasksForLocalState.isEmpty();
+    }
+
+    public Task assignInitialTask(final TaskId taskId) {
+        final Task task = tasksForLocalState.remove(taskId);
+        if (task != null) {
+            lockedTasksToOwner.replace(taskId, Thread.currentThread());
+        }
+        return task;
+    }
+
+    public void closeInitialTasksIfLastAssginedThread() {
+        if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >= 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
+            // we need to be careful here, because other StreamThreads may 
still be assigning tasks (via assignInitialTask)
+            // so we first "drain" our Map of all remaining Tasks, and then 
close all the Tasks we successfully claimed from the Map
+            final Set<Task> tasksToClose = drainInitialTasks();
+            for (final Task task : tasksToClose) {
+                task.closeClean();
+            }
+        }
+    }
+
+    private void closeRemainingInitialTasks() {

Review Comment:
   `closeInitilizedTasks`
   
   (`Remaining` sound redundant anyway?)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -325,6 +325,31 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
+    private Map<Task, Set<TopicPartition>> assignInitialTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
+                                                              final String 
threadLogPrefix,
+                                                              final 
TopologyMetadata topologyMetadata,
+                                                              final 
ChangelogRegister changelogReader) {
+        if (stateDirectory.hasInitialTasks()) {
+            final Map<Task, Set<TopicPartition>> assignedTasks = new 
HashMap<>(tasksToAssign.size());
+            for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
tasksToAssign.entrySet()) {
+                final TaskId taskId = entry.getKey();
+                final Set<TopicPartition> inputPartitions = entry.getValue();
+                final Task task = stateDirectory.assignInitialTask(taskId);

Review Comment:
   Seems `assignInitialTask` does not really do the assignment, but we do the 
actual assignment here? Should we rename the method to 
`removedInitializedTask()` instead?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -454,6 +479,24 @@ private void handleTasksWithoutStateUpdater(final 
Map<TaskId, Set<TopicPartition
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Set<TaskId> recycledInitialTasks = new 
HashSet<>(initialStandbyTasksToRecycle.size() + 
initialStandbyTasksToUse.size());
+
+        // if this was the last local thread to receive its assignment, close 
all the remaining Tasks, as they are not needed
+        stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+        // recycle the initial standbys to active, and remove them from the 
set of actives that need to be created
+        tasksToRecycle.putAll(initialStandbyTasksToRecycle);

Review Comment:
   Would it be easier to just add these tasks to `TaskRegistry` `this.tasks` 
and everything else falls into place automatically by the existing code?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {

Review Comment:
   This and other methods below are called from different `StreamThreads`, 
right? Seems we would need to `synchronized` them ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // create a StandbyTask for each one
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks
+                    try {
+                        task.initializeIfNeeded();
+                        task.suspend();
+
+                        // add new Tasks to tasksForLocalState
+                        tasksForLocalState.put(id, task);
+                    } catch (final TaskCorruptedException e) {
+                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
+                        task.suspend();
+                        task.closeDirty();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean hasInitialTasks() {
+        return !tasksForLocalState.isEmpty();
+    }
+
+    public Task assignInitialTask(final TaskId taskId) {
+        final Task task = tasksForLocalState.remove(taskId);
+        if (task != null) {
+            lockedTasksToOwner.replace(taskId, Thread.currentThread());
+        }
+        return task;
+    }
+
+    public void closeInitialTasksIfLastAssginedThread() {
+        if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >= 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {

Review Comment:
   Given that thread could be added/removed programmatically, this guard might 
not be good enough? It seems we would need to rely on `KafkaStreams#threads` 
(and/or use `numLiveStreamThreads()`) instead of the config?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -519,6 +563,33 @@ private void handleTasksPendingInitialization() {
         }
     }
 
+    private void handleInitialTaskReuse(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                        final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                        final Map<TaskId, RuntimeException> 
failedTasks) {
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse = 
assignInitialTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+
+        // if this was the last local thread to receive its assignment, close 
all the remaining Tasks, as they are not needed
+        stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+        // recycle the initial standbys to active, and remove them from the 
set of actives that need to be created
+        if (!initialStandbyTasksToRecycle.isEmpty()) {

Review Comment:
   Do we need this? The loop would not execute anyway if empty...



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -519,6 +563,33 @@ private void handleTasksPendingInitialization() {
         }
     }
 
+    private void handleInitialTaskReuse(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                        final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                        final Map<TaskId, RuntimeException> 
failedTasks) {
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse = 
assignInitialTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+
+        // if this was the last local thread to receive its assignment, close 
all the remaining Tasks, as they are not needed
+        stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+        // recycle the initial standbys to active, and remove them from the 
set of actives that need to be created
+        if (!initialStandbyTasksToRecycle.isEmpty()) {
+            final Set<Task> tasksToCloseDirty = new HashSet<>();
+            for (final Map.Entry<Task, Set<TopicPartition>> entry : 
initialStandbyTasksToRecycle.entrySet()) {
+                final Task task = entry.getKey();
+                final Set<TopicPartition> inputPartitions = entry.getValue();
+                recycleTaskFromStateUpdater(task, inputPartitions, 
tasksToCloseDirty, failedTasks);
+                activeTasksToCreate.remove(task.id());
+            }
+        }
+
+        // use initial Standbys as real Standby tasks
+        if (!initialStandbyTasksToUse.isEmpty()) {

Review Comment:
   Same as above? If empty, nothing will happen anyway as `keySet()` will be 
empty?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -454,6 +479,24 @@ private void handleTasksWithoutStateUpdater(final 
Map<TaskId, Set<TopicPartition
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Set<TaskId> recycledInitialTasks = new 
HashSet<>(initialStandbyTasksToRecycle.size() + 
initialStandbyTasksToUse.size());
+
+        // if this was the last local thread to receive its assignment, close 
all the remaining Tasks, as they are not needed
+        stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+        // recycle the initial standbys to active, and remove them from the 
set of actives that need to be created
+        tasksToRecycle.putAll(initialStandbyTasksToRecycle);
+        tasks.addStandbyTasks(initialStandbyTasksToRecycle.keySet());
+        initialStandbyTasksToRecycle.keySet().forEach(task -> 
activeTasksToCreate.remove(task.id()));
+        
recycledInitialTasks.addAll(initialStandbyTasksToRecycle.keySet().stream().map(Task::id).collect(Collectors.toSet()));
+
+        // use initial Standbys as real Standby tasks
+        tasks.addStandbyTasks(initialStandbyTasksToUse.keySet());

Review Comment:
   Same question as above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -475,7 +518,7 @@ private void handleTasksWithoutStateUpdater(final 
Map<TaskId, Set<TopicPartition
                     tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
                 }
                 standbyTasksToCreate.remove(taskId);
-            } else {
+            } else if (!recycledInitialTasks.contains(taskId)) {

Review Comment:
   Given the current code, ` tasks.allTasks()` should not contain any 
"initialized tasks" thus it seems we don't need this guard as it would always 
be true (this might change based on the comment above?)?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -519,6 +563,33 @@ private void handleTasksPendingInitialization() {
         }
     }
 
+    private void handleInitialTaskReuse(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                                        final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                                        final Map<TaskId, RuntimeException> 
failedTasks) {
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle = 
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse = 
assignInitialTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+
+        // if this was the last local thread to receive its assignment, close 
all the remaining Tasks, as they are not needed
+        stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+        // recycle the initial standbys to active, and remove them from the 
set of actives that need to be created
+        if (!initialStandbyTasksToRecycle.isEmpty()) {
+            final Set<Task> tasksToCloseDirty = new HashSet<>();

Review Comment:
   What is the purpose of this one? We pass it into 
`recycleTaskFromStateUpdater` but we never read it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to