dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585155736
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames(
}
}
+ /**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+ public Map<String, Integer> computeSubscribedTopicNames(
+ ConsumerGroupMember oldMember,
+ ConsumerGroupMember newMember
+ ) {
+ Map<String, Integer> subscribedTopicNames = new
HashMap<>(this.subscribedTopicNames);
+ maybeUpdateSubscribedTopicNames(
+ subscribedTopicNames,
+ oldMember,
+ newMember
+ );
+ return subscribedTopicNames;
+ }
+
+ /**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all the members are subscribed to the same set of topics, the type
is homogeneous.
+ * Otherwise, it is heterogeneous.
+ *
+ * @param subscribedTopicNames A map of topic names to the count of
members subscribed to each topic.
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are
subscribed to exactly the same topics;
Review Comment:
nit: Let's add an empty line before this one in order to match the style in
the file.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1307,13 +1307,14 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
}
}
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ Map<String, Integer> subscribedTopicNamesMap =
group.subscribedTopicNames();
Review Comment:
nit: Should we use `subscribedTopicNames` too?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1307,13 +1307,14 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
}
}
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
Review Comment:
nit: Could we move it to right before `subscribedTopicNamesMap =
group.computeSubscribedTopicNames(member, updatedMember);`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames(
}
}
+ /**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+ public Map<String, Integer> computeSubscribedTopicNames(
+ ConsumerGroupMember oldMember,
+ ConsumerGroupMember newMember
+ ) {
+ Map<String, Integer> subscribedTopicNames = new
HashMap<>(this.subscribedTopicNames);
+ maybeUpdateSubscribedTopicNames(
+ subscribedTopicNames,
+ oldMember,
+ newMember
+ );
+ return subscribedTopicNames;
+ }
+
+ /**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all the members are subscribed to the same set of topics, the type
is homogeneous.
+ * Otherwise, it is heterogeneous.
Review Comment:
nit: We could remove this as it is already in the `@return`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]