ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605660608
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata,
final GroupSubscription gr
*
* @param clientMetadataMap the map of process id to client metadata used
to build an immutable
* {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to
all the stateful
- * tasks that need to be reassigned.
* @return The {@code ApplicationState} needed by the TaskAssigner to
compute new task
* assignments.
*/
- private ApplicationState buildApplicationState(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final Set<TaskId>
statefulTasks) {
- final Set<TaskId> statelessTasks = new HashSet<>();
- for (final Map.Entry<UUID, ClientMetadata> clientEntry :
clientMetadataMap.entrySet()) {
- final ClientState clientState = clientEntry.getValue().state;
- statelessTasks.addAll(clientState.statelessActiveTasks());
+ private ApplicationState buildApplicationState(final TopologyMetadata
topologyMetadata,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final Map<Subtopology,
TopicsInfo> topicGroups,
+ final Cluster cluster) {
+ final Map<Subtopology, Set<String>> sourceTopicsByGroup = new
HashMap<>();
+ final Map<Subtopology, Set<String>> changelogTopicsByGroup = new
HashMap<>();
+ for (final Map.Entry<Subtopology, TopicsInfo> entry :
topicGroups.entrySet()) {
+ final Set<String> sourceTopics = entry.getValue().sourceTopics;
+ final Set<String> changelogTopics =
entry.getValue().stateChangelogTopics()
+ .stream().map(t -> t.name).collect(Collectors.toSet());
+ sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+ changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
}
+ final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
+ partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+ final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
+ partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+ final Set<TaskId> logicalTaskIds = new HashSet<>();
+ final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
+ sourcePartitionsForTask.forEach((taskId, partitions) -> {
+ logicalTaskIds.add(taskId);
+ sourceTopicPartitions.addAll(partitions);
+ });
+ final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
+ changelogPartitionsForTask.forEach((taskId, partitions) -> {
+ logicalTaskIds.add(taskId);
Review Comment:
Sorry for the wall of text 😅 It might not seem like a huge deal but if it's
an app with only source-changelog partitions, then doing this will save the
assignor from having to make any DescribeTopics request since there are no
non-source changelogs.
And yes, apps with only source changelogs do exist, they're pretty common
for certain kinds of table-based processing (and especially apps that make
heavy use of IQ). And saving a remote fetch is actually a pretty big deal,
doing them in the middle of an assignment makes the rebalance vulnerable to
timing out, especially when brokers are under heavy load or the app is
experiencing rebalancing issues to begin with
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]