FrankYang0529 commented on code in PR #17444:
URL: https://github.com/apache/kafka/pull/17444#discussion_r2048415609
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -348,54 +376,58 @@ public Map<String, Assignment> targetAssignment() {
}
/**
- * @return An immutable Map of subscription metadata for
- * each topic that the consumer group is subscribed to.
- */
- public Map<String, TopicMetadata> subscriptionMetadata() {
- return Collections.unmodifiableMap(subscribedTopicMetadata);
- }
-
- /**
- * Updates the subscription metadata. This replaces the previous one.
+ * Compute metadata hash based on the current subscription info.
*
- * @param subscriptionMetadata The new subscription metadata.
+ * @param subscribedTopicNames Map of topic names to the number of
subscribers.
+ * @param metadataImage The current metadata for all available
topics.
+ * @param topicHashCache The cache of topic hashes.
*/
- public void setSubscriptionMetadata(
- Map<String, TopicMetadata> subscriptionMetadata
+ public long computeMetadataHash(
+ Map<String, SubscriptionCount> subscribedTopicNames,
+ MetadataImage metadataImage,
+ Map<String, Long> topicHashCache
) {
- this.subscribedTopicMetadata.clear();
- this.subscribedTopicMetadata.putAll(subscriptionMetadata);
+ TopicsImage topicsImage = metadataImage.topics();
+ List<HashCode> hashCodes = subscribedTopicNames.keySet().stream()
+ .filter(topicName -> topicsImage.getTopic(topicName) != null)
+ .map(topicName -> HashCode.fromLong(
+ topicHashCache.computeIfAbsent(
+ topicName,
+ key -> computeTopicHash(topicsImage.getTopic(topicName),
metadataImage.cluster())
+ )
+ ))
+ .toList();
+ return hashCodes.isEmpty() ? 0 : Hashing.combineUnordered(
+ hashCodes
+ ).asLong();
}
/**
- * Computes the subscription metadata based on the current subscription
info.
- *
- * @param subscribedTopicNames Map of topic names to the number of
subscribers.
- * @param topicsImage The current metadata for all available
topics.
- * @param clusterImage The current metadata for the Kafka
cluster.
+ * Computes the hash of the topic id, name, number of partitions, and
partition racks by Murmur3.
*
- * @return An immutable map of subscription metadata for each topic that
the consumer group is subscribed to.
- */
- public Map<String, TopicMetadata> computeSubscriptionMetadata(
- Map<String, SubscriptionCount> subscribedTopicNames,
- TopicsImage topicsImage,
- ClusterImage clusterImage
- ) {
- // Create the topic metadata for each subscribed topic.
- Map<String, TopicMetadata> newSubscriptionMetadata = new
HashMap<>(subscribedTopicNames.size());
-
- subscribedTopicNames.forEach((topicName, count) -> {
- TopicImage topicImage = topicsImage.getTopic(topicName);
- if (topicImage != null) {
- newSubscriptionMetadata.put(topicName, new TopicMetadata(
- topicImage.id(),
- topicImage.name(),
- topicImage.partitions().size()
- ));
- }
+ * @param topicImage The topic image.
+ * @param clusterImage The cluster image.
+ */
+ public static long computeTopicHash(TopicImage topicImage, ClusterImage
clusterImage) {
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(topicImage.id().hashCode()) // topic Id
+ .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+ .putLong(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
-> {
+ topicHasher.putInt(entry.getKey()); // partition id
+ Arrays.stream(entry.getValue().replicas)
+ .mapToObj(clusterImage::broker)
+ .filter(Objects::nonNull)
+ .map(BrokerRegistration::rack)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .sorted()
+ .forEach(rack -> topicHasher.putString(rack,
StandardCharsets.UTF_8)); // sorted racks
Review Comment:
@squah-confluent Good catch! Yes, like @dajac, we can use semicolon to
avoid the collision.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]