dajac commented on code in PR #14327:
URL: https://github.com/apache/kafka/pull/14327#discussion_r1469244568
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1040,6 +1051,7 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
Review Comment:
I think that we miss a few fundamental things in `consumerGroupHeartbeat`.
Let me explain. The `consumerGroupHeartbeat` is structured in 3 parts.
1) We update the member and its subscriptions.
2) We compute the new target assignment if needed.
3) We reconcile the member.
I think that in 1), we need to update the subscription for the member as you
do here. However, we also need to verify it before storing it and we also need
to update the subscription metadata if the regex was changed. See L1080. In
step 2), we also need to change the logic to include the topic matching the
regex. See L1115. Step 3) is fine as it is.
We also need a mechanism to periodically refresh the regexes in order to
catch new topics or deleted topics. What was your plan for this? I thought that
we could piggy back the mechanism to refresh the subscription metadata (L1076)
to also refresh the regexes.
I think that we also need to store the resolved regular expressions somehow.
I mean a mapping from the regex (as string) to the matching topics because we
need this for step 2). For this, I was considering whether we could just use a
LRU cache. What do you think?
##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##########
@@ -35,6 +35,8 @@
"about": "-1 if it didn't change since the last heartbeat; the maximum
time in milliseconds that the coordinator will wait on the member to revoke its
partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+",
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
"about": "null if it didn't change since the last heartbeat; the
subscribed topic names otherwise." },
+ { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
+ "about": "null if it didn't change since the last heartbeat; the
subscribed topic regex otherwise" },
Review Comment:
We must bump the version of the API to version 1 and use version `1+` for
this field in order to make it backward compatible.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1573,6 +1589,33 @@ private void updateGroupsByTopics(
}
});
}
+ if (!oldSubscribedTopicRegex.isEmpty()) {
+ oldSubscribedTopicRegex.forEach(regex -> {
+ groupsByRegex.computeIfPresent(regex, (__, groupIds) -> {
+ groupIds.remove(groupId);
+ return groupIds.isEmpty() ? null : groupIds;
+ });
+ Pattern pattern = Pattern.compile(regex);
+ for (String topicName :
metadataImage.topics().topicsByName().keySet()) {
+ if (pattern.matcher(topicName).matches()) {
+ unsubscribeGroupFromTopic(groupId, topicName);
+ }
+ }
+ });
+ }
+ if (!newSubscribedTopicRegex.isEmpty()) {
+ newSubscribedTopicRegex.forEach(regex -> {
+ groupsByRegex
+ .computeIfAbsent(regex, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
+ .add(groupId);
+ Pattern pattern = Pattern.compile(regex);
+ for (String topicName :
metadataImage.topics().topicsByName().keySet()) {
+ if (pattern.matcher(topicName).matches()) {
+ subscribeGroupToTopic(groupId, topicName);
+ }
+ }
Review Comment:
I am not sure about this for two reasons: (1) Computing the list of topics
is quite expensive so doing it here may not be the best place; and (2) It would
not catch the changes as it applies it only when the regex is stored. I think
that we need to discuss the high level approach. Take a look at my previous
comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]