dajac commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1457451148


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,22 @@ private CompletableFuture<Void> assignPartitions(
         // Make assignment effective on the client by updating the 
subscription state.
         updateSubscription(assignedPartitions, false);
 
+        // Pause partitions to ensure that fetch does not start until the 
callback completes.
+        assignedPartitions.forEach(tp -> 
subscriptions.pause(tp.topicPartition()));

Review Comment:
   Shouldn't we pause only the `addedPartitions`?
   
   Moreover, I wonder if using `pause` is the correct approach. Pausing the 
newly added partitions will ensure that the consumer does not fetch. However, 
for the newly added partitions, the consumer may also try to initialize their 
offsets. See `initWithCommittedOffsetsIfNeeded`. I think that we must prevent 
this too. Otherwise, there is a race between it and the assigned callback for 
initializing the offsets. Do you see what I mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to