mjsax commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1730446985
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
Review Comment:
Why do we need this additional check? -- Given that we have a non-empty task
directory, it seems redundant?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
Review Comment:
Should this be called `hasInitializedTasks()`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
+ return !tasksForLocalState.isEmpty();
+ }
+
+ public Task assignInitialTask(final TaskId taskId) {
+ final Task task = tasksForLocalState.remove(taskId);
+ if (task != null) {
+ lockedTasksToOwner.replace(taskId, Thread.currentThread());
+ }
+ return task;
+ }
+
+ public void closeInitialTasksIfLastAssginedThread() {
+ if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >=
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
+ // we need to be careful here, because other StreamThreads may
still be assigning tasks (via assignInitialTask)
+ // so we first "drain" our Map of all remaining Tasks, and then
close all the Tasks we successfully claimed from the Map
+ final Set<Task> tasksToClose = drainInitialTasks();
+ for (final Task task : tasksToClose) {
+ task.closeClean();
+ }
+ }
+ }
+
+ private void closeRemainingInitialTasks() {
+ closeRemainingInitialTasks(t -> true);
+ }
+
+ private void closeRemainingInitialTasks(final Predicate<Task> predicate) {
+ final Set<Task> drainedTasks = drainInitialTasks(predicate);
+ for (final Task task : drainedTasks) {
+ task.closeClean();
+ }
+ }
+
+ private Set<Task> drainInitialTasks() {
+ return drainInitialTasks(t -> true);
+ }
+
+ private Set<Task> drainInitialTasks(final Predicate<Task> predicate) {
+ final Set<Task> drainedTasks = new
HashSet<>(tasksForLocalState.size());
+ for (final Map.Entry<TaskId, Task> entry :
tasksForLocalState.entrySet()) {
+ if (predicate.test(entry.getValue()) &&
tasksForLocalState.remove(entry.getKey()) != null) {
Review Comment:
How could it be `null` ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -454,6 +479,24 @@ private void handleTasksWithoutStateUpdater(final
Map<TaskId, Set<TopicPartition
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
Review Comment:
Should we pass in `standbyTasksToCreate` instead of `activeTasksToCreate`?
(If yes, seems we would need better testing to cover this case?)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
+ return !tasksForLocalState.isEmpty();
+ }
+
+ public Task assignInitialTask(final TaskId taskId) {
+ final Task task = tasksForLocalState.remove(taskId);
+ if (task != null) {
+ lockedTasksToOwner.replace(taskId, Thread.currentThread());
+ }
+ return task;
+ }
+
+ public void closeInitialTasksIfLastAssginedThread() {
Review Comment:
`closeInitial[ized]TasksIfLastAssginedThread`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
Review Comment:
Comment does not provide any value, it just describe what it obvious from
the next line itself. Let's remove the comment.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
+ return !tasksForLocalState.isEmpty();
+ }
+
+ public Task assignInitialTask(final TaskId taskId) {
Review Comment:
Should this be called `assignInitializedTask`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
Review Comment:
Not sure if this comment provides much value?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
+ return !tasksForLocalState.isEmpty();
+ }
+
+ public Task assignInitialTask(final TaskId taskId) {
+ final Task task = tasksForLocalState.remove(taskId);
+ if (task != null) {
+ lockedTasksToOwner.replace(taskId, Thread.currentThread());
+ }
+ return task;
+ }
+
+ public void closeInitialTasksIfLastAssginedThread() {
+ if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >=
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
+ // we need to be careful here, because other StreamThreads may
still be assigning tasks (via assignInitialTask)
+ // so we first "drain" our Map of all remaining Tasks, and then
close all the Tasks we successfully claimed from the Map
+ final Set<Task> tasksToClose = drainInitialTasks();
+ for (final Task task : tasksToClose) {
+ task.closeClean();
+ }
+ }
+ }
+
+ private void closeRemainingInitialTasks() {
+ closeRemainingInitialTasks(t -> true);
+ }
+
+ private void closeRemainingInitialTasks(final Predicate<Task> predicate) {
+ final Set<Task> drainedTasks = drainInitialTasks(predicate);
+ for (final Task task : drainedTasks) {
+ task.closeClean();
+ }
+ }
+
+ private Set<Task> drainInitialTasks() {
Review Comment:
`drainInitial[ized]Tasks`
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
+ return !tasksForLocalState.isEmpty();
+ }
+
+ public Task assignInitialTask(final TaskId taskId) {
+ final Task task = tasksForLocalState.remove(taskId);
+ if (task != null) {
+ lockedTasksToOwner.replace(taskId, Thread.currentThread());
+ }
+ return task;
+ }
+
+ public void closeInitialTasksIfLastAssginedThread() {
+ if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >=
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
+ // we need to be careful here, because other StreamThreads may
still be assigning tasks (via assignInitialTask)
+ // so we first "drain" our Map of all remaining Tasks, and then
close all the Tasks we successfully claimed from the Map
+ final Set<Task> tasksToClose = drainInitialTasks();
+ for (final Task task : tasksToClose) {
+ task.closeClean();
+ }
+ }
+ }
+
+ private void closeRemainingInitialTasks() {
Review Comment:
`closeInitilizedTasks`
(`Remaining` sound redundant anyway?)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -325,6 +325,31 @@ private void closeDirtyAndRevive(final Collection<Task>
taskWithChangelogs, fina
}
}
+ private Map<Task, Set<TopicPartition>> assignInitialTasks(final
Map<TaskId, Set<TopicPartition>> tasksToAssign,
+ final String
threadLogPrefix,
+ final
TopologyMetadata topologyMetadata,
+ final
ChangelogRegister changelogReader) {
+ if (stateDirectory.hasInitialTasks()) {
+ final Map<Task, Set<TopicPartition>> assignedTasks = new
HashMap<>(tasksToAssign.size());
+ for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
tasksToAssign.entrySet()) {
+ final TaskId taskId = entry.getKey();
+ final Set<TopicPartition> inputPartitions = entry.getValue();
+ final Task task = stateDirectory.assignInitialTask(taskId);
Review Comment:
Seems `assignInitialTask` does not really do the assignment, but we do the
actual assignment here? Should we rename the method to
`removedInitializedTask()` instead?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -454,6 +479,24 @@ private void handleTasksWithoutStateUpdater(final
Map<TaskId, Set<TopicPartition
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Set<TaskId> recycledInitialTasks = new
HashSet<>(initialStandbyTasksToRecycle.size() +
initialStandbyTasksToUse.size());
+
+ // if this was the last local thread to receive its assignment, close
all the remaining Tasks, as they are not needed
+ stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+ // recycle the initial standbys to active, and remove them from the
set of actives that need to be created
+ tasksToRecycle.putAll(initialStandbyTasksToRecycle);
Review Comment:
Would it be easier to just add these tasks to `TaskRegistry` `this.tasks`
and everything else falls into place automatically by the existing code?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
Review Comment:
This and other methods below are called from different `StreamThreads`,
right? Seems we would need to `synchronized` them ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
return stateDirLock != null;
}
+ public void initializeTasksForLocalState(final TopologyMetadata
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+ final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
+ if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+ final LogContext logContext = new LogContext("main-thread ");
+ final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
+ final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+ // discover all non-empty task directories in StateDirectory
+ for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+ final String dirName = taskDirectory.file().getName();
+ final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
+ final ProcessorTopology topology =
topologyMetadata.buildSubtopology(id);
+ final Set<TopicPartition> inputPartitions =
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic,
id.partition())).collect(Collectors.toSet());
+
+ // create a StandbyTask for each one
+ if (topology.hasStateWithChangelogs()) {
+ final ProcessorStateManager stateManager = new
ProcessorStateManager(
+ id,
+ Task.TaskType.STANDBY,
+ eosEnabled,
+ logContext,
+ this,
+ null,
+ topology.storeToChangelogTopic(),
+ inputPartitions,
+ stateUpdaterEnabled
+ );
+
+ final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
+ id,
+ config,
+ stateManager,
+ streamsMetrics,
+ dummyCache
+ );
+
+ final Task task = new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ topologyMetadata.taskConfig(id),
+ streamsMetrics,
+ stateManager,
+ this,
+ dummyCache,
+ context
+ );
+
+ // initialize and suspend new Tasks
+ try {
+ task.initializeIfNeeded();
+ task.suspend();
+
+ // add new Tasks to tasksForLocalState
+ tasksForLocalState.put(id, task);
+ } catch (final TaskCorruptedException e) {
+ // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
+ task.suspend();
+ task.closeDirty();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean hasInitialTasks() {
+ return !tasksForLocalState.isEmpty();
+ }
+
+ public Task assignInitialTask(final TaskId taskId) {
+ final Task task = tasksForLocalState.remove(taskId);
+ if (task != null) {
+ lockedTasksToOwner.replace(taskId, Thread.currentThread());
+ }
+ return task;
+ }
+
+ public void closeInitialTasksIfLastAssginedThread() {
+ if (hasInitialTasks() && threadsWithAssignment.incrementAndGet() >=
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)) {
Review Comment:
Given that thread could be added/removed programmatically, this guard might
not be good enough? It seems we would need to rely on `KafkaStreams#threads`
(and/or use `numLiveStreamThreads()`) instead of the config?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -519,6 +563,33 @@ private void handleTasksPendingInitialization() {
}
}
+ private void handleInitialTaskReuse(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
+ final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
+ final Map<TaskId, RuntimeException>
failedTasks) {
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse =
assignInitialTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+
+ // if this was the last local thread to receive its assignment, close
all the remaining Tasks, as they are not needed
+ stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+ // recycle the initial standbys to active, and remove them from the
set of actives that need to be created
+ if (!initialStandbyTasksToRecycle.isEmpty()) {
Review Comment:
Do we need this? The loop would not execute anyway if empty...
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -519,6 +563,33 @@ private void handleTasksPendingInitialization() {
}
}
+ private void handleInitialTaskReuse(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
+ final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
+ final Map<TaskId, RuntimeException>
failedTasks) {
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse =
assignInitialTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+
+ // if this was the last local thread to receive its assignment, close
all the remaining Tasks, as they are not needed
+ stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+ // recycle the initial standbys to active, and remove them from the
set of actives that need to be created
+ if (!initialStandbyTasksToRecycle.isEmpty()) {
+ final Set<Task> tasksToCloseDirty = new HashSet<>();
+ for (final Map.Entry<Task, Set<TopicPartition>> entry :
initialStandbyTasksToRecycle.entrySet()) {
+ final Task task = entry.getKey();
+ final Set<TopicPartition> inputPartitions = entry.getValue();
+ recycleTaskFromStateUpdater(task, inputPartitions,
tasksToCloseDirty, failedTasks);
+ activeTasksToCreate.remove(task.id());
+ }
+ }
+
+ // use initial Standbys as real Standby tasks
+ if (!initialStandbyTasksToUse.isEmpty()) {
Review Comment:
Same as above? If empty, nothing will happen anyway as `keySet()` will be
empty?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -454,6 +479,24 @@ private void handleTasksWithoutStateUpdater(final
Map<TaskId, Set<TopicPartition
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Set<TaskId> recycledInitialTasks = new
HashSet<>(initialStandbyTasksToRecycle.size() +
initialStandbyTasksToUse.size());
+
+ // if this was the last local thread to receive its assignment, close
all the remaining Tasks, as they are not needed
+ stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+ // recycle the initial standbys to active, and remove them from the
set of actives that need to be created
+ tasksToRecycle.putAll(initialStandbyTasksToRecycle);
+ tasks.addStandbyTasks(initialStandbyTasksToRecycle.keySet());
+ initialStandbyTasksToRecycle.keySet().forEach(task ->
activeTasksToCreate.remove(task.id()));
+
recycledInitialTasks.addAll(initialStandbyTasksToRecycle.keySet().stream().map(Task::id).collect(Collectors.toSet()));
+
+ // use initial Standbys as real Standby tasks
+ tasks.addStandbyTasks(initialStandbyTasksToUse.keySet());
Review Comment:
Same question as above.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -475,7 +518,7 @@ private void handleTasksWithoutStateUpdater(final
Map<TaskId, Set<TopicPartition
tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
}
standbyTasksToCreate.remove(taskId);
- } else {
+ } else if (!recycledInitialTasks.contains(taskId)) {
Review Comment:
Given the current code, ` tasks.allTasks()` should not contain any
"initialized tasks" thus it seems we don't need this guard as it would always
be true (this might change based on the comment above?)?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -519,6 +563,33 @@ private void handleTasksPendingInitialization() {
}
}
+ private void handleInitialTaskReuse(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
+ final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
+ final Map<TaskId, RuntimeException>
failedTasks) {
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToRecycle =
assignInitialTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> initialStandbyTasksToUse =
assignInitialTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+
+ // if this was the last local thread to receive its assignment, close
all the remaining Tasks, as they are not needed
+ stateDirectory.closeInitialTasksIfLastAssginedThread();
+
+ // recycle the initial standbys to active, and remove them from the
set of actives that need to be created
+ if (!initialStandbyTasksToRecycle.isEmpty()) {
+ final Set<Task> tasksToCloseDirty = new HashSet<>();
Review Comment:
What is the purpose of this one? We pass it into
`recycleTaskFromStateUpdater` but we never read it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]