apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624682684
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState
applicationState,
final int maxPairs = taskCount * (taskCount - 1) / 2;
this.taskPairs = new TaskPairs(maxPairs);
- this.newTaskLocations = new HashMap<>();
- this.newAssignments = new HashMap<>();
+ this.newTaskLocations = previousActiveAssignment.keySet().stream()
+ .collect(Collectors.toMap(Function.identity(), taskId -> new
HashSet<>()));
+ this.newAssignments =
clients.values().stream().collect(Collectors.toMap(
+ KafkaStreamsState::processId,
+ state -> KafkaStreamsAssignment.of(state.processId(), new
HashSet<>())
+ ));
}
public void finalizeAssignment(final TaskId taskId, final ProcessId
client, final AssignedTask.Type type) {
- newAssignments.computeIfAbsent(client, k -> new HashSet<>());
- newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
- final Set<TaskId> newAssignmentsForClient =
newAssignments.get(client)
- .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+ final Set<TaskId> newAssignmentsForClient =
newAssignments.get(client).tasks().keySet();
taskPairs.addPairs(taskId, newAssignmentsForClient);
- newAssignments.get(client).add(new AssignedTask(taskId, type));
- newTaskLocations.get(taskId).add(client);
+
+ newAssignments.get(client).assignTask(new AssignedTask(taskId,
type));
+ newTaskLocations.computeIfAbsent(taskId, k -> new
HashSet<>()).add(client);
}
- public Map<ProcessId, KafkaStreamsAssignment>
buildKafkaStreamsAssignments() {
- final Map<ProcessId, KafkaStreamsAssignment>
kafkaStreamsAssignments = new HashMap<>();
- for (final Map.Entry<ProcessId, Set<AssignedTask>> entry :
newAssignments.entrySet()) {
- final ProcessId processId = entry.getKey();
- final Set<AssignedTask> assignedTasks =
newAssignments.get(processId);
- final KafkaStreamsAssignment assignment =
KafkaStreamsAssignment.of(processId, assignedTasks);
- kafkaStreamsAssignments.put(processId, assignment);
- }
- return kafkaStreamsAssignments;
+ public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {
+ return newAssignments;
}
public void processOptimizedAssignments(final Map<ProcessId,
KafkaStreamsAssignment> optimizedAssignments) {
Review Comment:
> now that `#processOptimizedAssignments` is only updating the
`newTaskLocations`, can we simplify things further by just keeping the
`newTaskLocations` map up to date as we move tasks around? That way we can get
rid of `processOptimizedAssignments` altogether
Since the task assignments are mutated rather than "moved" around, and that
mutation happens frequently outside of the context of this class (by
`TaskAssignmentUtils` for instance), we can't easily make sure both variables
change together. We would need this `processOptimizedAssignments` call to
happen regardless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]