ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1605639072
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata,
final GroupSubscription gr
*
* @param clientMetadataMap the map of process id to client metadata used
to build an immutable
* {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to
all the stateful
- * tasks that need to be reassigned.
* @return The {@code ApplicationState} needed by the TaskAssigner to
compute new task
* assignments.
*/
- private ApplicationState buildApplicationState(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final Set<TaskId>
statefulTasks) {
- final Set<TaskId> statelessTasks = new HashSet<>();
- for (final Map.Entry<UUID, ClientMetadata> clientEntry :
clientMetadataMap.entrySet()) {
- final ClientState clientState = clientEntry.getValue().state;
- statelessTasks.addAll(clientState.statelessActiveTasks());
+ private ApplicationState buildApplicationState(final TopologyMetadata
topologyMetadata,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final Map<Subtopology,
TopicsInfo> topicGroups,
+ final Cluster cluster) {
+ final Map<Subtopology, Set<String>> sourceTopicsByGroup = new
HashMap<>();
+ final Map<Subtopology, Set<String>> changelogTopicsByGroup = new
HashMap<>();
+ for (final Map.Entry<Subtopology, TopicsInfo> entry :
topicGroups.entrySet()) {
+ final Set<String> sourceTopics = entry.getValue().sourceTopics;
+ final Set<String> changelogTopics =
entry.getValue().stateChangelogTopics()
+ .stream().map(t -> t.name).collect(Collectors.toSet());
+ sourceTopicsByGroup.put(entry.getKey(), sourceTopics);
+ changelogTopicsByGroup.put(entry.getKey(), changelogTopics);
}
+ final Map<TaskId, Set<TopicPartition>> sourcePartitionsForTask =
+ partitionGrouper.partitionGroups(sourceTopicsByGroup, cluster);
+ final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask =
+ partitionGrouper.partitionGroups(changelogTopicsByGroup, cluster);
+
+ final Set<TaskId> logicalTaskIds = new HashSet<>();
+ final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
+ sourcePartitionsForTask.forEach((taskId, partitions) -> {
+ logicalTaskIds.add(taskId);
+ sourceTopicPartitions.addAll(partitions);
+ });
+ final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
+ changelogPartitionsForTask.forEach((taskId, partitions) -> {
+ logicalTaskIds.add(taskId);
Review Comment:
I suppose this doesn't hurt anything since `logicalTasks` is a Set, but the
taskIds returned by the partition grouper should be the same for the source and
changelog topics. So you can remove this line
(alternatively you can create the `logicalTaskIds` map up front by copying
the keyset of one of the partitionsForTask maps but that's just an
implementation detail, up to you. However I would probably consider adding a
check to make sure these two maps return the same set of tasks. Doesn't need to
scan the entire thing, maybe just a simple
```
if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) {
log.error("Partition grouper returned {} tasks for source topics but {}
tasks for changelog topics,
sourcePartitionsForTask.size(),
changelogPartitionsForTask.size());
throw new TaskAssignmentException(//error msg );
}
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment;
+
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * A simple container class corresponding to a given {@link TaskId}.
+ * Includes metadata such as whether it's stateful and the names of all state
stores
+ * belonging to this task, the set of input topic partitions and changelog
topic partitions
+ * for all logged state stores, and the rack ids of all replicas of each topic
partition
+ * in the task.
+ */
+public interface TaskInfo {
+ TaskId id();
+
+ boolean isStateful();
+
+ Set<String> stateStoreNames();
+
+ Set<TopicPartition> sourceTopicPartitions();
+
+ Set<TopicPartition> changelogTopicPartitions();
Review Comment:
We should add javadocs to these getters. Will be trivial for most of them
but it's important to note that this set contains partitions for source
changelog topics as well, and that source changelog topic partitions can be
identified by checking if they exist in both this and the source topic
partitions set
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+ public static Map<TopicPartition, Set<String>>
getRacksForTopicPartition(final Cluster cluster,
+
final InternalTopicManager internalTopicManager,
+
final Set<TopicPartition> topicPartitions,
+
final boolean isChangelog) {
+ final Set<String> topicsToDescribe = new HashSet<>();
+ if (isChangelog) {
+
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+ Collectors.toSet()));
+ } else {
+ topicsToDescribe.addAll(topicsWithStaleMetadata(cluster,
topicPartitions));
+ }
+
+ final Set<TopicPartition> topicsWithUpToDateMetadata =
topicPartitions.stream()
+ .filter(partition -> !topicsToDescribe.contains(partition.topic()))
+ .collect(Collectors.toSet());
+ final Map<TopicPartition, Set<String>> racksForTopicPartition =
knownRacksForPartition(
+ cluster, topicsWithUpToDateMetadata);
+
+ final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
+ describeTopics(internalTopicManager, topicsToDescribe);
+ freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
+ partitionInfos.forEach(partitionInfo -> {
+ final int partition = partitionInfo.partition();
+ final TopicPartition topicPartition = new
TopicPartition(topic, partition);
+ final List<Node> replicas = partitionInfo.replicas();
+ if (replicas == null || replicas.isEmpty()) {
+ LOG.error("No replicas found for topic partition {}: {}",
topic, partition);
+ return;
+ }
+
+ final Set<String> racks =
replicas.stream().filter(Node::hasRack).map(Node::rack).collect(
+ Collectors.toSet());
+ racksForTopicPartition.computeIfAbsent(topicPartition, k ->
new HashSet<>());
+ racksForTopicPartition.get(topicPartition).addAll(racks);
+ });
+ });
+
+ return racksForTopicPartition;
+ }
+
+ public static Set<String> topicsWithStaleMetadata(final Cluster cluster,
final Set<TopicPartition> topicPartitions) {
+ final Set<String> topicsWithStaleMetadata = new HashSet<>();
+ for (final TopicPartition topicPartition : topicPartitions) {
+ final PartitionInfo partitionInfo =
cluster.partition(topicPartition);
+ if (partitionInfo == null) {
+ LOG.error("TopicPartition {} doesn't exist in cluster",
topicPartition);
+ continue;
Review Comment:
It's not an error for a topic to not be included in the `Cluster`, even
source topics might not exist here if they had to be created by the assignor
during the rebalance, since the `Cluster` metadata represents the state of the
cluster when this rebalance/assignment first began.
Since the point of this method seems to be to collect topics with missing
metadata that we'll need to look up via a DescribeTopics request, the ones for
which `cluster.partition(topicPartition)` returns `null` are exactly the ones
that should be returned by this method.
In fact I'd go ahead and remove everything past this line as well, this
method should focus only on differentiating topics with missing metadata from
ones we already have the info for. If the `Cluster` has metadata for this
partition but the `replicas` set is missing/empty, then there's something wrong
with this partition, and calling DescribeTopics probably won't help
Let's rename this to `#topicsWithMissingMetadata` while we're at it
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+ public static Map<TopicPartition, Set<String>>
getRacksForTopicPartition(final Cluster cluster,
+
final InternalTopicManager internalTopicManager,
+
final Set<TopicPartition> topicPartitions,
+
final boolean isChangelog) {
+ final Set<String> topicsToDescribe = new HashSet<>();
+ if (isChangelog) {
+
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+ Collectors.toSet()));
+ } else {
+ topicsToDescribe.addAll(topicsWithStaleMetadata(cluster,
topicPartitions));
+ }
+
+ final Set<TopicPartition> topicsWithUpToDateMetadata =
topicPartitions.stream()
+ .filter(partition -> !topicsToDescribe.contains(partition.topic()))
+ .collect(Collectors.toSet());
+ final Map<TopicPartition, Set<String>> racksForTopicPartition =
knownRacksForPartition(
+ cluster, topicsWithUpToDateMetadata);
+
+ final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
+ describeTopics(internalTopicManager, topicsToDescribe);
+ freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
+ partitionInfos.forEach(partitionInfo -> {
+ final int partition = partitionInfo.partition();
+ final TopicPartition topicPartition = new
TopicPartition(topic, partition);
+ final List<Node> replicas = partitionInfo.replicas();
+ if (replicas == null || replicas.isEmpty()) {
+ LOG.error("No replicas found for topic partition {}: {}",
topic, partition);
+ return;
Review Comment:
nit: can you factor the lambda out into a separate method? I was really
confused by this empty return for a while until I realized it wasn't returning
from the `getRacksForTopicPartition` method, just the lambda inside this loop
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1974,6 +1974,15 @@ public Set<InternalTopicConfig>
nonSourceChangelogTopics() {
return topicConfigs;
}
+ /**
+ *
+ * @return the set of changelog topics, which includes both source
changelog topics and non
+ * source changelog topics.
+ */
+ public Set<InternalTopicConfig> stateChangelogTopics() {
Review Comment:
nit: I'd just call this changelogTopics which I think helps make it obvious
that this is the super-set of the #sourceChangelogTopics and
#nonSourceChangelogTopics APIs
(you can rename the field itself as well but don't have to, that's up to you)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);
+
+ public static Map<TopicPartition, Set<String>>
getRacksForTopicPartition(final Cluster cluster,
+
final InternalTopicManager internalTopicManager,
+
final Set<TopicPartition> topicPartitions,
+
final boolean isChangelog) {
+ final Set<String> topicsToDescribe = new HashSet<>();
+ if (isChangelog) {
+
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
+ Collectors.toSet()));
+ } else {
+ topicsToDescribe.addAll(topicsWithStaleMetadata(cluster,
topicPartitions));
+ }
+
+ final Set<TopicPartition> topicsWithUpToDateMetadata =
topicPartitions.stream()
+ .filter(partition -> !topicsToDescribe.contains(partition.topic()))
+ .collect(Collectors.toSet());
+ final Map<TopicPartition, Set<String>> racksForTopicPartition =
knownRacksForPartition(
+ cluster, topicsWithUpToDateMetadata);
+
+ final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
+ describeTopics(internalTopicManager, topicsToDescribe);
Review Comment:
It's not a huge deal but if we have time left at the end it might make sense
to condense this into a single call where we describe all the topics in one go
rather than making a separate request for the source topics and changelogs
But it really isn't a big deal because in general, after the first
rebalance, all the source topics should have been created and we really will do
only one call since only the changelogs will be unknown
on that note, can you check to make sure this skips the actual
DescribeTopics request if the set of topics to describe is empty? Like does it
end up making a call with the admin client? If not then we should guard this
with a `if (!topicsToDescribe.isEmpty)` (or we can just add this check anyways
to be safe)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackUtils {
Review Comment:
nit: make this final and add private constructor so it's clear this is just
a static utils class
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -472,23 +482,74 @@ public GroupAssignment assign(final Cluster metadata,
final GroupSubscription gr
*
* @param clientMetadataMap the map of process id to client metadata used
to build an immutable
* {@code ApplicationState}
- * @param statefulTasks the set of {@code TaskId} that correspond to
all the stateful
- * tasks that need to be reassigned.
* @return The {@code ApplicationState} needed by the TaskAssigner to
compute new task
* assignments.
*/
- private ApplicationState buildApplicationState(final Map<UUID,
ClientMetadata> clientMetadataMap,
- final Set<TaskId>
statefulTasks) {
- final Set<TaskId> statelessTasks = new HashSet<>();
- for (final Map.Entry<UUID, ClientMetadata> clientEntry :
clientMetadataMap.entrySet()) {
- final ClientState clientState = clientEntry.getValue().state;
- statelessTasks.addAll(clientState.statelessActiveTasks());
+ private ApplicationState buildApplicationState(final TopologyMetadata
topologyMetadata,
+ final Map<UUID,
ClientMetadata> clientMetadataMap,
+ final Map<Subtopology,
TopicsInfo> topicGroups,
+ final Cluster cluster) {
+ final Map<Subtopology, Set<String>> sourceTopicsByGroup = new
HashMap<>();
+ final Map<Subtopology, Set<String>> changelogTopicsByGroup = new
HashMap<>();
+ for (final Map.Entry<Subtopology, TopicsInfo> entry :
topicGroups.entrySet()) {
+ final Set<String> sourceTopics = entry.getValue().sourceTopics;
+ final Set<String> changelogTopics =
entry.getValue().stateChangelogTopics()
+ .stream().map(t -> t.name).collect(Collectors.toSet());
Review Comment:
if all we need is the topic names from the #stateChangelogTopics API then
let's just have it return that directly. You should be able to just return the
#keySet of that `stateChangelogTopics` map to get a Set<String> with the topic
names right away
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]