dajac commented on code in PR #19611:
URL: https://github.com/apache/kafka/pull/19611#discussion_r2094404832
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -79,49 +80,65 @@ public int numPartitions(Uuid topicId) {
*/
@Override
public Set<String> racksForPartition(Uuid topicId, int partition) {
+ TopicImage topic = metadataImage.topics().getTopic(topicId);
+ if (topic != null) {
+ PartitionRegistration partitionRegistration =
topic.partitions().get(partition);
+ if (partitionRegistration != null) {
+ Set<String> racks = new HashSet<>();
+ for (int replica : partitionRegistration.replicas) {
+ // Only add the rack if it is available for the
broker/replica.
+
metadataImage.cluster().broker(replica).rack().ifPresent(racks::add);
+ }
+ return racks;
+ }
+ }
return Set.of();
}
/**
- * Returns a set of assignable partitions from the topic metadata.
- * If the allowed partition map is null, all the partitions in the
corresponding
- * topic metadata are returned for the argument topic id. If allowed map
is empty,
+ * Returns a set of assignable partitions from the metadata image.
+ * If the allowed partition map is Optional.empty(), all the partitions in
the corresponding
+ * topic image are returned for the argument topic id. If allowed map is
empty,
* empty set is returned.
*
* @param topicId The uuid of the topic
* @return Set of integers if assignable partitions available, empty
otherwise.
*/
@Override
public Set<Integer> assignablePartitions(Uuid topicId) {
- TopicMetadata topic = this.topicMetadata.get(topicId);
+ TopicImage topic = metadataImage.topics().getTopic(topicId);
if (topic == null) {
return Set.of();
}
- if (topicPartitionAllowedMap == null) {
- return IntStream.range(0,
topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet());
+ if (topicPartitionAllowedMap.isEmpty()) {
+ return Set.copyOf(topic.partitions().keySet());
Review Comment:
Yeah, this is correct. However, the TopicImage is never mutated once built.
The metadata image is immutable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]