ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605656530
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata,
final GroupSubscription gr
*
* @param clientMetadataMap the map of process id to client metadata used
to build an immutable
* {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to
all the stateful
- * tasks that need to be reassigned.
* @return The {@code ApplicationState} needed by the TaskAssigner to
compute new task
* assignments.
*/
- private ApplicationState buildApplicationState(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final Set<TaskId>
statefulTasks) {
- final Set<TaskId> statelessTasks = new HashSet<>();
- for (final Map.Entry<UUID, ClientMetadata> clientEntry :
clientMetadataMap.entrySet()) {
- final ClientState clientState = clientEntry.getValue().state;
- statelessTasks.addAll(clientState.statelessActiveTasks());
+ private ApplicationState buildApplicationState(final TopologyMetadata
topologyMetadata,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final Map<Subtopology,
TopicsInfo> topicGroups,
+ final Cluster cluster) {
+ final Map<Subtopology, Set<String>> sourceTopicsByGroup = new
HashMap<>();
+ final Map<Subtopology, Set<String>> changelogTopicsByGroup = new
HashMap<>();
+ for (final Map.Entry<Subtopology, TopicsInfo> entry :
topicGroups.entrySet()) {
+ final Set<String> sourceTopics = entry.getValue().sourceTopics;
+ final Set<String> changelogTopics =
entry.getValue().stateChangelogTopics()
+ .stream().map(t -> t.name).collect(Collectors.toSet());
+ sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+ changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
}
+ final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
+ partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+ final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
+ partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+ final Set<TaskId> logicalTaskIds = new HashSet<>();
+ final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
+ sourcePartitionsForTask.forEach((taskId, partitions) -> {
+ logicalTaskIds.add(taskId);
+ sourceTopicPartitions.addAll(partitions);
+ });
+ final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
+ changelogPartitionsForTask.forEach((taskId, partitions) -> {
+ logicalTaskIds.add(taskId);
+ changelogTopicPartitions.addAll(partitions);
+ });
+
+ final Map<TopicPartition, Set<String>> racksForSourcePartitions =
RackUtils.getRacksForTopicPartition(
+ cluster, internalTopicManager, sourceTopicPartitions, false);
+ final Map<TopicPartition, Set<String>> racksForChangelogPartitions =
RackUtils.getRacksForTopicPartition(
Review Comment:
Since the rack info is nontrivial to compute and always makes a remote call
(which can take a long time and even time out or otherwise fail) and not every
assignor/app will actually use it I'm thinking maybe we should try to
initialize it lazily, only if/when the user actually requests the rack info
I'm totally happy to push that into a followup PR to keep the scope
well-defined for now, so don't worry about it for now. We'd still need
everything you implemented here and would just be moving it around and/or
subbing in function pointers instead of passing around data strucutres
directly, so it shouldn't have any impact on how this PR goes. Just wanted to
make a note so I don't forget
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]