dajac commented on code in PR #19814:
URL: https://github.com/apache/kafka/pull/19814#discussion_r2131673167


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -91,6 +92,13 @@ private enum SubscriptionType {
     /* the list of topics the user has requested */
     private Set<String> subscription;
 
+    /**
+     * Topic IDs received in an assignment from the coordinator when using the 
Consumer rebalance protocol.
+     * This will be used to include assigned topic IDs in metadata requests 
when the consumer
+     * does not know the topic names (ex. when the user subscribes to a RE2J 
regex computed on the broker)
+     */
+    private Set<Uuid> assignedTopicIds;
+

Review Comment:
   I think that this is fine for addressing the bug. However, it would be great 
if we could try to consolidate it with the assignment in the future. I haven't 
look into it but it seems feasible to consolidate all the various assignment 
states that we have (here and in the membership manager) into a single view. 
Have you considered it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -490,18 +500,16 @@ public synchronized boolean hasAutoAssignedPartitions() {
                 || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE 
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
     }
 
-    public synchronized boolean isAssignedFromRe2j(String topic) {
-        if (!hasRe2JPatternSubscription()) {
+    /**
+     * Check if the topic ID has been received in an assignment
+     * from the coordinator after subscribing to a broker-side regex.
+     */
+    public synchronized boolean isAssignedFromRe2j(Uuid topic) {

Review Comment:
   nit: topicId?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java:
##########
@@ -66,14 +67,24 @@ public boolean allowAutoTopicCreation() {
         return allowAutoTopicCreation;
     }
 
+    /**
+     * Constructs a metadata request builder for fetching cluster metadata for 
the topics the consumer needs.
+     * This will include:
+     * <ul>
+     *     <li>topics the consumer is subscribed to using topic names (calls 
to subscribe with topic name list or client-side regex)</li>
+     *     <li>topics the consumer is subscribed to using topic IDs (calls to 
subscribe with broker-side regex RE2J)</li>
+     *     <li>topics involved in calls for fetching offsets (transient 
topics)</li>
+     * </ul>
+     * Note that this will generate a request for all topics in the cluster 
only when the consumer is subscribed to a client-side regex.
+     */
     @Override
     public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
-        if (subscription.hasPatternSubscription() || 
subscription.hasRe2JPatternSubscription())
+        if (subscription.hasPatternSubscription())
             return MetadataRequest.Builder.allTopics();
         List<String> topics = new ArrayList<>();
         topics.addAll(subscription.metadataTopics());
         topics.addAll(transientTopics);
-        return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
+        return new MetadataRequest.Builder(topics, 
subscription.assignedTopicIds(), allowAutoTopicCreation);

Review Comment:
   I am a bit confused here. The javadoc says that the metadata request will 
include all the topics. However, the builder does not do it internally. Why 
that? I suppose that we could actually have both transient topics and topic ids 
at the same time, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to