lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1464026216


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,38 @@ private CompletableFuture<Void> assignPartitions(
         // Make assignment effective on the client by updating the 
subscription state.
         updateSubscription(assignedPartitions, false);
 
+        // Mark assigned partitions as pendingOnAssignedCallback to 
temporarily stop fetching or
+        // initializing positions for them. Passing the full set of assigned 
partitions
+        // (previously owned and newly added), given that they are all 
provided to the user in the
+        // callback, so we could expect offsets updates for any of them.
+        Set<TopicPartition> assignedTopicPartition = 
assignedPartitions.stream().map(tIdp -> 
tIdp.topicPartition()).collect(Collectors.toSet());
+        subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, 
true);

Review Comment:
   Totally, that was definitely the case. I made the changes to make sure that 
all `assignedPartitions` are updated in the subscription state and 
`addedPartitions` are marked as awaiting callback in a thread safe single 
operation in the subscription state. 
   This btw in line with your comment about not touching the previously owned 
partitions, and just blocking the added ones while the callback completes. 
Totally agree, fixed. Both, the legacy and this new logic always included only 
the added in the `onPartitionsAssigned`, so it was me getting mixed up before, 
sorry about the confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to