lianetm commented on code in PR #19814:
URL: https://github.com/apache/kafka/pull/19814#discussion_r2138798855
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java:
##########
@@ -66,14 +67,24 @@ public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
+ /**
+ * Constructs a metadata request builder for fetching cluster metadata for
the topics the consumer needs.
+ * This will include:
+ * <ul>
+ * <li>topics the consumer is subscribed to using topic names (calls
to subscribe with topic name list or client-side regex)</li>
+ * <li>topics the consumer is subscribed to using topic IDs (calls to
subscribe with broker-side regex RE2J)</li>
+ * <li>topics involved in calls for fetching offsets (transient
topics)</li>
+ * </ul>
+ * Note that this will generate a request for all topics in the cluster
only when the consumer is subscribed to a client-side regex.
+ */
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
- if (subscription.hasPatternSubscription() ||
subscription.hasRe2JPatternSubscription())
+ if (subscription.hasPatternSubscription())
return MetadataRequest.Builder.allTopics();
List<String> topics = new ArrayList<>();
topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
- return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
+ return new MetadataRequest.Builder(topics,
subscription.assignedTopicIds(), allowAutoTopicCreation);
Review Comment:
Thinking more about this I simplified it a bit more around the case where
the consumer may be using boker-side regex and transient topics. So the
consumer would need metadata for topic IDs and names at the same time only if
all this happens at the same time:
- call to offsets-related APIs (beginning/endOffsets or offsetsForTimes)
- topic not in metadata cache
- new partition assigned using RE2J (in background received in HB), for
topic not in metadata cache
Probably an edge case, but went ahead with the change to simply "pipeline"
the requests, resolving the reconciliation first (seemed sensible to prioritize
not blocking reconciliation/poll, than a single API call to
beginningOffsets/endOffsets/offsetsForTimes, thoughts?)
So with the current change the consumer that needs metadata for a topic ID
from RE2J and transient topics will send a first request for the assigned topic
ID metadata. Once that resolved, it will send the request for transient topics
if any, on a next background poll iteration (background poll -> network client
poll -> request metadata).
I'm adding more tests but pushed the change to align
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]