kirktrue commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1539435022
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData
buildRequestData() {
} else {
// SubscribedTopicRegex - only sent if has changed since the
last heartbeat
// - not supported yet
Review Comment:
This comment would go away, if we do end up keeping a separate `else` block.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -550,6 +550,11 @@ public ConsumerGroupHeartbeatRequestData
buildRequestData() {
} else {
// SubscribedTopicRegex - only sent if has changed since the
last heartbeat
// - not supported yet
+ TreeSet<String> subscribedTopicNames = new
TreeSet<>(this.subscriptions.subscription());
+ if (sendAllFields ||
!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+ data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
+ sentFields.subscribedTopicNames = subscribedTopicNames;
+ }
Review Comment:
Thanks for catching this, @lianetm. I agree that if the logic is identical,
there's no need for duplication. Any speculation as to the reason the `if`
statement existed in the first place?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1667,6 +1667,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition
topicPartition, OffsetAnd
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
maybeInvokeCommitCallbacks();
+ if (subscriptions.hasPatternSubscription()) {
+ updatePatternSubscription(metadata.fetch());
+ }
Review Comment:
Good call, @lianetm.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern,
Optional<ConsumerRebalanceListen
throwIfNoAssignorsConfigured();
log.info("Subscribed to pattern: '{}'", pattern);
subscriptions.subscribe(pattern, listener);
- updatePatternSubscription(metadata.fetch());
metadata.requestUpdateForNewTopics();
+ Cluster cache = metadata.fetch();
+
+ while (cache == metadata.fetch()) {
+ log.info("Waiting for new metadata update");
+ }
Review Comment:
@Phuc-Hong-Tran—why was this `while` loop added? Did you see issues in your
testing, or were you preemptively wanting to make the code more robust?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern,
Optional<ConsumerRebalanceListen
throwIfNoAssignorsConfigured();
log.info("Subscribed to pattern: '{}'", pattern);
subscriptions.subscribe(pattern, listener);
- updatePatternSubscription(metadata.fetch());
metadata.requestUpdateForNewTopics();
+ Cluster cache = metadata.fetch();
+
+ while (cache == metadata.fetch()) {
+ log.info("Waiting for new metadata update");
+ }
Review Comment:
I'm wondering why we call `metadata.requestUpdateForNewTopics` here when
that's already in `updatePatternSubscription`? 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]