squah-confluent commented on code in PR #20055:
URL: https://github.com/apache/kafka/pull/20055#discussion_r2186077989
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2243,21 +2243,30 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
// epoch 0 and that it is fully initialized.
boolean bumpGroupEpoch = group.groupEpoch() == 0;
- bumpGroupEpoch |= hasMemberSubscriptionChanged(
+ boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
-
- bumpGroupEpoch |= maybeUpdateRegularExpressions(
+ UpdateRegularExpressionsResult updateRegularExpressionsResult =
maybeUpdateRegularExpressions(
context,
group,
member,
updatedMember,
records
);
+ // The subscription has changed when either the subscribed topic names
or subscribed topic
+ // regex has changed.
+ boolean hasSubscriptionChanged = subscribedTopicNamesChanged ||
updateRegularExpressionsResult.subscribedTopicRegexChanged;
+ // Bumping the group epoch signals that the target assignment should
be updated. We bump the
+ // group epoch when the member has changed its subscribed topic names
or the member has
+ // changed its subscribed topic regex to a regex that is already
resolved. We explicitly
+ // avoid bumping the group epoch when the new subscribed topic regex
has not been resolved
Review Comment:
> the coord may briefly keep an outdated assignments (because it won't
re-compute until regex resolved)
Yes, the target assignment will remain outdated until the regex is resolved.
> we'll solve this by not sending this outdated assignment back to the
member while the regex is resolved.
Previously, we would not send any assignment until the regex was resolved,
unless the client asked for it (submitted a full request).
With this change, we won't send the outdated target assignment, but we will
immediately send a copy of it with all the regex topics filtered out.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2243,21 +2243,30 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
// epoch 0 and that it is fully initialized.
boolean bumpGroupEpoch = group.groupEpoch() == 0;
- bumpGroupEpoch |= hasMemberSubscriptionChanged(
+ boolean subscribedTopicNamesChanged = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
-
- bumpGroupEpoch |= maybeUpdateRegularExpressions(
+ UpdateRegularExpressionsResult updateRegularExpressionsResult =
maybeUpdateRegularExpressions(
context,
group,
member,
updatedMember,
records
);
+ // The subscription has changed when either the subscribed topic names
or subscribed topic
+ // regex has changed.
+ boolean hasSubscriptionChanged = subscribedTopicNamesChanged ||
updateRegularExpressionsResult.subscribedTopicRegexChanged;
+ // Bumping the group epoch signals that the target assignment should
be updated. We bump the
+ // group epoch when the member has changed its subscribed topic names
or the member has
+ // changed its subscribed topic regex to a regex that is already
resolved. We explicitly
+ // avoid bumping the group epoch when the new subscribed topic regex
has not been resolved
Review Comment:
> the coord may briefly keep an outdated assignments (because it won't
re-compute until regex resolved)
Yes, the target assignment will remain outdated until the regex is resolved.
> we'll solve this by not sending this outdated assignment back to the
member while the regex is resolved.
Previously, we would not send any assignment until the regex was resolved,
unless the client asked for it (submitted a full request).
With this change, we won't send the outdated target assignment, but we will
immediately send a copy of it with all the regex topics filtered out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]