squah-confluent commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2069188025
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}
+
+ /**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value
is the topic hash.
+ * @return The hash of the group.
+ */
+ static long computeGroupHash(Map<String, Long> topicHashes) {
+ return Hashing.combineOrdered(
+ topicHashes.entrySet()
+ .stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(e -> HashCode.fromLong(e.getValue()))
+ .toList()
+ ).asLong();
+ }
+
+ /**
+ * Computes the hash of the topic id, name, number of partitions, and
partition racks by Murmur3.
+ *
+ * @param topicImage The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+ static long computeTopicHash(TopicImage topicImage, ClusterImage
clusterImage) {
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(topicImage.id().hashCode()) // topic Id
+ .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+ .putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
-> {
+ topicHasher.putInt(entry.getKey()); // partition id
+ String racks = Arrays.stream(entry.getValue().replicas)
+ .mapToObj(clusterImage::broker)
+ .filter(Objects::nonNull)
+ .map(BrokerRegistration::rack)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .sorted()
+ .collect(Collectors.joining(";"));
Review Comment:
I think this is fine preventing for accidental collisions. Though it's still
possible to _intentionally_ come up with rack names that create collisions, but
I believe you'd only be impacting your own cluster.
To rule out any ambiguity, we could pretend this was a serialization format
and either prefixed strings with their length, or null-terminate them. The same
for variable-length lists of strings. These can either be length-prefixed or
terminated with an invalid string that cannot occur (""? but not sure on this).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}
+
+ /**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value
is the topic hash.
+ * @return The hash of the group.
+ */
+ static long computeGroupHash(Map<String, Long> topicHashes) {
+ return Hashing.combineOrdered(
+ topicHashes.entrySet()
+ .stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(e -> HashCode.fromLong(e.getValue()))
+ .toList()
+ ).asLong();
+ }
+
+ /**
+ * Computes the hash of the topic id, name, number of partitions, and
partition racks by Murmur3.
+ *
+ * @param topicImage The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+ static long computeTopicHash(TopicImage topicImage, ClusterImage
clusterImage) {
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(topicImage.id().hashCode()) // topic Id
+ .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+ .putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
-> {
+ topicHasher.putInt(entry.getKey()); // partition id
+ String racks = Arrays.stream(entry.getValue().replicas)
+ .mapToObj(clusterImage::broker)
+ .filter(Objects::nonNull)
+ .map(BrokerRegistration::rack)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .sorted()
+ .collect(Collectors.joining(";"));
Review Comment:
I think this is fine preventing for accidental collisions. Though it's still
possible to _intentionally_ come up with rack names that create collisions, but
I believe you'd only be impacting your own cluster.
To rule out any ambiguity, we could pretend this was a serialization format
and either prefix strings with their length, or null-terminate them. The same
for variable-length lists of strings. These can either be length-prefixed or
terminated with an invalid string that cannot occur (""? but not sure on this).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]