dajac commented on code in PR #19814:
URL: https://github.com/apache/kafka/pull/19814#discussion_r2131673167
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -91,6 +92,13 @@ private enum SubscriptionType {
/* the list of topics the user has requested */
private Set<String> subscription;
+ /**
+ * Topic IDs received in an assignment from the coordinator when using the
Consumer rebalance protocol.
+ * This will be used to include assigned topic IDs in metadata requests
when the consumer
+ * does not know the topic names (ex. when the user subscribes to a RE2J
regex computed on the broker)
+ */
+ private Set<Uuid> assignedTopicIds;
+
Review Comment:
I think that this is fine for addressing the bug. However, it would be great
if we could try to consolidate it with the assignment in the future. I haven't
look into it but it seems feasible to consolidate all the various assignment
states that we have (here and in the membership manager) into a single view.
Have you considered it?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -490,18 +500,16 @@ public synchronized boolean hasAutoAssignedPartitions() {
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
}
- public synchronized boolean isAssignedFromRe2j(String topic) {
- if (!hasRe2JPatternSubscription()) {
+ /**
+ * Check if the topic ID has been received in an assignment
+ * from the coordinator after subscribing to a broker-side regex.
+ */
+ public synchronized boolean isAssignedFromRe2j(Uuid topic) {
Review Comment:
nit: topicId?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java:
##########
@@ -66,14 +67,24 @@ public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
+ /**
+ * Constructs a metadata request builder for fetching cluster metadata for
the topics the consumer needs.
+ * This will include:
+ * <ul>
+ * <li>topics the consumer is subscribed to using topic names (calls
to subscribe with topic name list or client-side regex)</li>
+ * <li>topics the consumer is subscribed to using topic IDs (calls to
subscribe with broker-side regex RE2J)</li>
+ * <li>topics involved in calls for fetching offsets (transient
topics)</li>
+ * </ul>
+ * Note that this will generate a request for all topics in the cluster
only when the consumer is subscribed to a client-side regex.
+ */
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
- if (subscription.hasPatternSubscription() ||
subscription.hasRe2JPatternSubscription())
+ if (subscription.hasPatternSubscription())
return MetadataRequest.Builder.allTopics();
List<String> topics = new ArrayList<>();
topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
- return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
+ return new MetadataRequest.Builder(topics,
subscription.assignedTopicIds(), allowAutoTopicCreation);
Review Comment:
I am a bit confused here. The javadoc says that the metadata request will
include all the topics. However, the builder does not do it internally. Why
that? I suppose that we could actually have both transient topics and topic ids
at the same time, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]