dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1583740908
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1169,6 +1173,64 @@ private void
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
}
}
+ /**
+ * Validates if the received classic member protocols are supported by the
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId The joining member id.
+ * @param protocolType The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+ private void throwIfClassicProtocolIsNotSupported(
+ ConsumerGroup group,
+ String memberId,
+ String protocolType,
+ JoinGroupRequestProtocolCollection protocols
+ ) {
+ if (!group.supportsClassicProtocols(protocolType,
ClassicGroupMember.plainProtocolSet(protocols))) {
+ throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " +
memberId + "'s protocols are not supported.");
+ }
+ }
+
+ /**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+ private static ConsumerPartitionAssignor.Subscription
deserializeSubscription(
+ JoinGroupRequestProtocolCollection protocols
+ ) {
+ try {
+ return ConsumerProtocol.deserializeSubscription(
Review Comment:
Yeah it makes sense. I can open another pr once this one is merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]