ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605639072


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
      *
      * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
      *                          {@code ApplicationState}
-     * @param statefulTasks     the set of {@code TaskId} that correspond to 
all the stateful
-     *                          tasks that need to be reassigned.
      * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
      *         assignments.
      */
-    private ApplicationState buildApplicationState(final Map<UUID, 
ClientMetadata> clientMetadataMap,
-                                                   final Set<TaskId> 
statefulTasks) {
-        final Set<TaskId> statelessTasks = new HashSet<>();
-        for (final Map.Entry<UUID, ClientMetadata> clientEntry : 
clientMetadataMap.entrySet()) {
-            final ClientState clientState = clientEntry.getValue().state;
-            statelessTasks.addAll(clientState.statelessActiveTasks());
+    private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+                                                   final Map<UUID, 
ClientMetadata> clientMetadataMap,
+                                                   final Map<Subtopology, 
TopicsInfo> topicGroups,
+                                                   final Cluster cluster) {
+        final Map<Subtopology, Set<String>> sourceTopicsByGroup = new 
HashMap<>();
+        final Map<Subtopology, Set<String>> changelogTopicsByGroup = new 
HashMap<>();
+        for (final Map.Entry<Subtopology, TopicsInfo> entry : 
topicGroups.entrySet()) {
+            final Set<String> sourceTopics = entry.getValue().sourceTopics;
+            final Set<String> changelogTopics = 
entry.getValue().stateChangelogTopics()
+                .stream().map(t -> t.name).collect(Collectors.toSet());
+            sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+            changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
         }
 
+        final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
+            partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+        final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
+            partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+        final Set<TaskId> logicalTaskIds = new HashSet<>();
+        final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
+        sourcePartitionsForTask.forEach((taskId, partitions) -> {
+            logicalTaskIds.add(taskId);
+            sourceTopicPartitions.addAll(partitions);
+        });
+        final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
+        changelogPartitionsForTask.forEach((taskId, partitions) -> {
+            logicalTaskIds.add(taskId);

Review Comment:
   I suppose this doesn't hurt anything since `logicalTasks` is a Set, but the 
taskIds returned by the partition grouper should be the same for the source and 
changelog topics. So you can remove this line
   
   (alternatively you can create the `logicalTaskIds` map up front by copying 
the keyset of one of the partitionsForTask maps but that's just an 
implementation detail, up to you. However I would probably consider adding a 
check to make sure these two maps return the same set of tasks. Doesn't need to 
scan the entire thing, maybe just a simple 
   
   ```
   if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) {
     log.error("Partition grouper returned {} tasks for source topics but {} 
tasks for changelog topics, 
                      sourcePartitionsForTask.size(), 
changelogPartitionsForTask.size());
     throw new TaskAssignmentException(//error msg );
   }



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple container class corresponding to a given {@link TaskId}.
+ * Includes metadata such as whether it's stateful and the names of all state 
stores
+ * belonging to this task, the set of input topic partitions and changelog 
topic partitions
+ * for all logged state stores, and the rack ids of all replicas of each topic 
partition
+ * in the task.
+ */
+public interface TaskInfo {
+    TaskId id();
+
+    boolean isStateful();
+
+    Set<String> stateStoreNames();
+
+    Set<TopicPartition> sourceTopicPartitions();
+
+    Set<TopicPartition> changelogTopicPartitions();

Review Comment:
   We should add javadocs to these getters. Will be trivial for most of them 
but it's important to note that this set contains partitions for source 
changelog topics as well, and that source changelog topic partitions can be 
identified by checking if they exist in both this and the source topic 
partitions set



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+    public static Map<TopicPartition, Set<String>> 
getRacksForTopicPartition(final Cluster cluster,
+                                                                             
final InternalTopicManager internalTopicManager,
+                                                                             
final Set<TopicPartition> topicPartitions,
+                                                                             
final boolean isChangelog) {
+        final Set<String> topicsToDescribe = new HashSet<>();
+        if (isChangelog) {
+            
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+                Collectors.toSet()));
+        } else {
+            topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, 
topicPartitions));
+        }
+
+        final Set<TopicPartition> topicsWithUpToDateMetadata = 
topicPartitions.stream()
+            .filter(partition -> !topicsToDescribe.contains(partition.topic()))
+            .collect(Collectors.toSet());
+        final Map<TopicPartition, Set<String>> racksForTopicPartition = 
knownRacksForPartition(
+            cluster, topicsWithUpToDateMetadata);
+
+        final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
+            describeTopics(internalTopicManager, topicsToDescribe);
+        freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
+            partitionInfos.forEach(partitionInfo -> {
+                final int partition = partitionInfo.partition();
+                final TopicPartition topicPartition = new 
TopicPartition(topic, partition);
+                final List<Node> replicas = partitionInfo.replicas();
+                if (replicas == null || replicas.isEmpty()) {
+                    LOG.error("No replicas found for topic partition {}: {}", 
topic, partition);
+                    return;
+                }
+
+                final Set<String> racks = 
replicas.stream().filter(Node::hasRack).map(Node::rack).collect(
+                    Collectors.toSet());
+                racksForTopicPartition.computeIfAbsent(topicPartition, k -> 
new HashSet<>());
+                racksForTopicPartition.get(topicPartition).addAll(racks);
+            });
+        });
+
+        return racksForTopicPartition;
+    }
+
+    public static Set<String> topicsWithStaleMetadata(final Cluster cluster, 
final Set<TopicPartition> topicPartitions) {
+        final Set<String> topicsWithStaleMetadata = new HashSet<>();
+        for (final TopicPartition topicPartition : topicPartitions) {
+            final PartitionInfo partitionInfo = 
cluster.partition(topicPartition);
+            if (partitionInfo == null) {
+                LOG.error("TopicPartition {} doesn't exist in cluster", 
topicPartition);
+                continue;

Review Comment:
   It's not an error for a topic to not be included in the `Cluster`, even 
source topics might not exist here if they had to be created by the assignor 
during the rebalance, since the `Cluster` metadata represents the state of the 
cluster when this rebalance/assignment first began.
   
   Since the point of this method seems to be to collect topics with missing 
metadata that we'll need to look up via a DescribeTopics request, the ones for 
which `cluster.partition(topicPartition)` returns `null` are exactly the ones 
that should be returned by this method.
   
   In fact I'd go ahead and remove everything past this line as well, this 
method should focus only on differentiating topics with missing metadata from 
ones we already have the info for. If the `Cluster` has metadata for this 
partition but the `replicas` set is missing/empty, then there's something wrong 
with this partition, and calling DescribeTopics probably won't help
   
   Let's rename this to `#topicsWithMissingMetadata` while we're at it



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+    public static Map<TopicPartition, Set<String>> 
getRacksForTopicPartition(final Cluster cluster,
+                                                                             
final InternalTopicManager internalTopicManager,
+                                                                             
final Set<TopicPartition> topicPartitions,
+                                                                             
final boolean isChangelog) {
+        final Set<String> topicsToDescribe = new HashSet<>();
+        if (isChangelog) {
+            
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+                Collectors.toSet()));
+        } else {
+            topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, 
topicPartitions));
+        }
+
+        final Set<TopicPartition> topicsWithUpToDateMetadata = 
topicPartitions.stream()
+            .filter(partition -> !topicsToDescribe.contains(partition.topic()))
+            .collect(Collectors.toSet());
+        final Map<TopicPartition, Set<String>> racksForTopicPartition = 
knownRacksForPartition(
+            cluster, topicsWithUpToDateMetadata);
+
+        final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
+            describeTopics(internalTopicManager, topicsToDescribe);
+        freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
+            partitionInfos.forEach(partitionInfo -> {
+                final int partition = partitionInfo.partition();
+                final TopicPartition topicPartition = new 
TopicPartition(topic, partition);
+                final List<Node> replicas = partitionInfo.replicas();
+                if (replicas == null || replicas.isEmpty()) {
+                    LOG.error("No replicas found for topic partition {}: {}", 
topic, partition);
+                    return;

Review Comment:
   nit: can you factor the lambda out into a separate method? I was really 
confused by this empty return for a while until I realized it wasn't returning 
from the `getRacksForTopicPartition` method, just the lambda inside this loop



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1974,6 +1974,15 @@ public Set<InternalTopicConfig> 
nonSourceChangelogTopics() {
             return topicConfigs;
         }
 
+        /**
+         *
+         * @return the set of changelog topics, which includes both source 
changelog topics and non
+         * source changelog topics.
+         */
+        public Set<InternalTopicConfig> stateChangelogTopics() {

Review Comment:
   nit: I'd just call this changelogTopics which I think helps make it obvious 
that this is the super-set of the #sourceChangelogTopics and 
#nonSourceChangelogTopics APIs
   
   (you can rename the field itself as well but don't have to, that's up to you)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+    public static Map<TopicPartition, Set<String>> 
getRacksForTopicPartition(final Cluster cluster,
+                                                                             
final InternalTopicManager internalTopicManager,
+                                                                             
final Set<TopicPartition> topicPartitions,
+                                                                             
final boolean isChangelog) {
+        final Set<String> topicsToDescribe = new HashSet<>();
+        if (isChangelog) {
+            
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+                Collectors.toSet()));
+        } else {
+            topicsToDescribe.addAll(topicsWithStaleMetadata(cluster, 
topicPartitions));
+        }
+
+        final Set<TopicPartition> topicsWithUpToDateMetadata = 
topicPartitions.stream()
+            .filter(partition -> !topicsToDescribe.contains(partition.topic()))
+            .collect(Collectors.toSet());
+        final Map<TopicPartition, Set<String>> racksForTopicPartition = 
knownRacksForPartition(
+            cluster, topicsWithUpToDateMetadata);
+
+        final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
+            describeTopics(internalTopicManager, topicsToDescribe);

Review Comment:
   It's not a huge deal but if we have time left at the end it might make sense 
to condense this into a single call where we describe all the topics in one go 
rather than making a separate request for the source topics and changelogs
   
   But it really isn't a big deal because in general, after the first 
rebalance, all the source topics should have been created and we really will do 
only one call since only the changelogs will be unknown
   
   on that note, can you check to make sure this skips the actual 
DescribeTopics request if the set of topics to describe is empty? Like does it 
end up making a call with the admin client? If not then we should guard this 
with  a `if (!topicsToDescribe.isEmpty)` (or we can just add this check anyways 
to be safe)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {

Review Comment:
   nit: make this final and add private constructor so it's clear this is just 
a static utils class



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
      *
      * @param clientMetadataMap the map of process id to client metadata used 
to build an immutable
      *                          {@code ApplicationState}
-     * @param statefulTasks     the set of {@code TaskId} that correspond to 
all the stateful
-     *                          tasks that need to be reassigned.
      * @return The {@code ApplicationState} needed by the TaskAssigner to 
compute new task
      *         assignments.
      */
-    private ApplicationState buildApplicationState(final Map<UUID, 
ClientMetadata> clientMetadataMap,
-                                                   final Set<TaskId> 
statefulTasks) {
-        final Set<TaskId> statelessTasks = new HashSet<>();
-        for (final Map.Entry<UUID, ClientMetadata> clientEntry : 
clientMetadataMap.entrySet()) {
-            final ClientState clientState = clientEntry.getValue().state;
-            statelessTasks.addAll(clientState.statelessActiveTasks());
+    private ApplicationState buildApplicationState(final TopologyMetadata 
topologyMetadata,
+                                                   final Map<UUID, 
ClientMetadata> clientMetadataMap,
+                                                   final Map<Subtopology, 
TopicsInfo> topicGroups,
+                                                   final Cluster cluster) {
+        final Map<Subtopology, Set<String>> sourceTopicsByGroup = new 
HashMap<>();
+        final Map<Subtopology, Set<String>> changelogTopicsByGroup = new 
HashMap<>();
+        for (final Map.Entry<Subtopology, TopicsInfo> entry : 
topicGroups.entrySet()) {
+            final Set<String> sourceTopics = entry.getValue().sourceTopics;
+            final Set<String> changelogTopics = 
entry.getValue().stateChangelogTopics()
+                .stream().map(t -> t.name).collect(Collectors.toSet());

Review Comment:
   if all we need is the topic names from the #stateChangelogTopics API then 
let's just have it return that directly. You should be able to just return the 
#keySet of that `stateChangelogTopics` map to get a Set<String> with the topic 
names right away



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to