apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624657249
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##########
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState
applicationState,
final int maxPairs = taskCount * (taskCount - 1) / 2;
this.taskPairs = new TaskPairs(maxPairs);
- this.newTaskLocations = new HashMap<>();
- this.newAssignments = new HashMap<>();
+ this.newTaskLocations = previousActiveAssignment.keySet().stream()
+ .collect(Collectors.toMap(Function.identity(), taskId -> new
HashSet<>()));
+ this.newAssignments =
clients.values().stream().collect(Collectors.toMap(
+ KafkaStreamsState::processId,
+ state -> KafkaStreamsAssignment.of(state.processId(), new
HashSet<>())
+ ));
}
public void finalizeAssignment(final TaskId taskId, final ProcessId
client, final AssignedTask.Type type) {
- newAssignments.computeIfAbsent(client, k -> new HashSet<>());
- newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
- final Set<TaskId> newAssignmentsForClient =
newAssignments.get(client)
- .stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+ final Set<TaskId> newAssignmentsForClient =
newAssignments.get(client).tasks().keySet();
taskPairs.addPairs(taskId, newAssignmentsForClient);
- newAssignments.get(client).add(new AssignedTask(taskId, type));
- newTaskLocations.get(taskId).add(client);
+
+ newAssignments.get(client).assignTask(new AssignedTask(taskId,
type));
+ newTaskLocations.computeIfAbsent(taskId, k -> new
HashSet<>()).add(client);
}
- public Map<ProcessId, KafkaStreamsAssignment>
buildKafkaStreamsAssignments() {
- final Map<ProcessId, KafkaStreamsAssignment>
kafkaStreamsAssignments = new HashMap<>();
- for (final Map.Entry<ProcessId, Set<AssignedTask>> entry :
newAssignments.entrySet()) {
- final ProcessId processId = entry.getKey();
- final Set<AssignedTask> assignedTasks =
newAssignments.get(processId);
- final KafkaStreamsAssignment assignment =
KafkaStreamsAssignment.of(processId, assignedTasks);
- kafkaStreamsAssignments.put(processId, assignment);
- }
- return kafkaStreamsAssignments;
+ public Map<ProcessId, KafkaStreamsAssignment> newAssignments() {
+ return newAssignments;
}
public void processOptimizedAssignments(final Map<ProcessId,
KafkaStreamsAssignment> optimizedAssignments) {
Review Comment:
This would involve changing `TaskAssignmentUtils`. Currently it's not
obvious that those methods mutate the input (and until recently, it wasn't).
The public methods all return `Map<ProcessId, KafkaStreamsAssignment>` so I
suggest we change those to void and assume mutation everywhere uphill of those
calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]