dajac commented on code in PR #20055:
URL: https://github.com/apache/kafka/pull/20055#discussion_r2174845778
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java:
##########
@@ -83,11 +125,35 @@ public ShareGroupMember build() {
// when the member is updated.
return new ShareGroupMember.Builder(member)
.setState(MemberState.STABLE)
- .setAssignedPartitions(targetAssignment.partitions())
+
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(),
member.subscribedTopicNames()))
.updateMemberEpoch(targetAssignmentEpoch)
.build();
+ } else if (hasSubscriptionChanged) {
+ return new ShareGroupMember.Builder(member)
+
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(),
member.subscribedTopicNames()))
+ .build();
+ } else {
+ return member;
}
+ }
- return member;
+ private Map<Uuid, Set<Integer>> filterAssignedPartitions(
+ Map<Uuid, Set<Integer>> partitions,
+ Set<String> subscribedTopicNames
+ ) {
+ TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(metadataImage.topics());
Review Comment:
Same question about the cache.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -20382,19 +20382,15 @@ public void
testConsumerGroupMemberJoinsWithUpdatedRegex() {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List.of(
- new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(fooTopicId)
- .setPartitions(List.of(0, 1, 2, 3, 4, 5))
- ))
+ .setTopicPartitions(List.of())
),
- result.response()
+ result1.response()
);
ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
- .setPreviousMemberEpoch(0)
+ .setPreviousMemberEpoch(10)
Review Comment:
Could you elaborate?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3056,6 +3064,22 @@ private boolean hasMemberSubscriptionChanged(
return false;
}
+ /**
+ * Check whether the member has updated its subscribed topic regular
expression.
+ *
+ * @param member The old member.
+ * @param updatedMember The new member.
+ * @return A boolean indicating whether the subscribed topic regular
expression has changed.
+ */
+ private boolean hasMemberRegularExpressionChanged(
Review Comment:
nit: static?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -317,4 +445,30 @@ private ConsumerGroupMember computeNextAssignment(
.build();
}
}
+
+ /**
+ * Gets the set of topic IDs that the member is subscribed to.
+ *
+ * @return The set of topic IDs that the member is subscribed to.
+ */
+ private Set<Uuid> subscribedTopicIds() {
+ Set<String> subscriptions = member.subscribedTopicNames();
+ String subscribedTopicRegex = member.subscribedTopicRegex();
+ if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) {
+ ResolvedRegularExpression resolvedRegularExpression =
resolvedRegularExpressions.get(subscribedTopicRegex);
+ if (resolvedRegularExpression != null) {
+ if (subscriptions.isEmpty()) {
+ subscriptions = resolvedRegularExpression.topics;
+ } else if (!resolvedRegularExpression.topics.isEmpty()) {
+ subscriptions = new UnionSet<>(subscriptions,
resolvedRegularExpression.topics);
+ }
+ } else {
+ // Treat an unresolved regex as matching no topics, to be
conservative.
+ }
+ }
+
+ TopicIds.TopicResolver topicResolver = new
TopicIds.CachedTopicResolver(metadataImage.topics());
Review Comment:
Do we need the cache in this case? It seems that we will check every topic
id once so the cache does not bring any benefits, does it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -151,14 +210,18 @@ public ConsumerGroupMember build() {
// If the member provides its owned partitions. We verify if
it still
// owns any of the revoked partitions. If it does, we cannot
progress.
if
(ownsRevokedPartitions(member.partitionsPendingRevocation())) {
- return member;
+ if (hasSubscriptionChanged) {
+ return
updateCurrentAssignment(member.assignedPartitions());
+ } else {
+ return member;
+ }
}
// When the member has revoked all the pending partitions, it
can
// transition to the next epoch (current + 1) and we can
reconcile
// its state towards the latest target assignment.
return computeNextAssignment(
- member.memberEpoch() + 1,
+ Math.min(member.memberEpoch() + 1, targetAssignmentEpoch),
Review Comment:
Why do we need this change? Is it to be more defensive?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2305,6 +2305,9 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
+ group.resolvedRegularExpressions(),
+ // Force consistency with the subscription when the subscription
has changed.
+ bumpGroupEpoch || hasMemberRegularExpressionChanged(member,
updatedMember),
Review Comment:
I am not really happy with this. Could we somehow make
hasMemberRegularExpressionChanged part of the checks that we already do in the
beginning of this method? We may be able to refactor it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -138,6 +195,8 @@ public ConsumerGroupMember build() {
member.memberEpoch(),
member.assignedPartitions()
);
+ } else if (hasSubscriptionChanged) {
+ return
updateCurrentAssignment(member.assignedPartitions());
Review Comment:
nit: Should we update the top level comment of the states?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupAssignmentBuilder.java:
##########
@@ -83,11 +125,35 @@ public ShareGroupMember build() {
// when the member is updated.
return new ShareGroupMember.Builder(member)
.setState(MemberState.STABLE)
- .setAssignedPartitions(targetAssignment.partitions())
+
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(),
member.subscribedTopicNames()))
Review Comment:
Do we really need filterAssignedPartitions here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -215,6 +278,64 @@ private boolean ownsRevokedPartitions(
return false;
}
+ /**
+ * Updates the current assignment, removing any partitions that are not
part of the subscribed topics.
+ * This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
+ *
+ * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @return A new ConsumerGroupMember.
+ */
+ private ConsumerGroupMember updateCurrentAssignment(
+ Map<Uuid, Set<Integer>> memberAssignedPartitions
+ ) {
+ Set<Uuid> subscribedTopicIds = subscribedTopicIds();
+
+ // Reuse the original map if no topics need to be removed.
+ Map<Uuid, Set<Integer>> newAssignedPartitions =
memberAssignedPartitions;
+ Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
+ for (Map.Entry<Uuid, Set<Integer>> entry :
memberAssignedPartitions.entrySet()) {
+ if (!subscribedTopicIds.contains(entry.getKey())) {
+ if (newAssignedPartitions == memberAssignedPartitions) {
+ newAssignedPartitions = new
HashMap<>(memberAssignedPartitions);
+ newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
+ }
+ newAssignedPartitions.remove(entry.getKey());
+ newPartitionsPendingRevocation.merge(
+ entry.getKey(),
+ entry.getValue(),
+ (existing, additional) -> {
+ existing = new HashSet<>(existing);
+ existing.addAll(additional);
+ return existing;
+ }
+ );
+ }
+ }
+
+ if (newAssignedPartitions == memberAssignedPartitions) {
+ // If no partitions were removed, we can return the member as is.
+ return member;
+ }
+
+ if (ownsRevokedPartitions(newPartitionsPendingRevocation)) {
Review Comment:
I wonder if we really need this check here or if we could just transition to
UNREVOKED_PARTITIONS if we revoked at least one partition above. I may be
missing something though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]