ableegoldman commented on code in PR #13965:
URL: https://github.com/apache/kafka/pull/13965#discussion_r1599471029
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -739,8 +750,13 @@ private void
assignRackAwareRoundRobin(List<TopicPartition> unassignedPartitions
int firstIndex =
rackInfo.nextRackConsumer(unassignedPartition,
unfilledMembersWithExactlyMinQuotaPartitions, 0);
if (firstIndex >= 0) {
consumer =
unfilledMembersWithExactlyMinQuotaPartitions.get(firstIndex);
- if (assignment.get(consumer).size() + 1 == maxQuota)
+ if (assignment.get(consumer).size() + 1 == maxQuota) {
unfilledMembersWithExactlyMinQuotaPartitions.remove(firstIndex);
+ currentNumMembersWithOverMinQuotaPartitions++;
+ if (currentNumMembersWithOverMinQuotaPartitions ==
expectedNumMembersWithOverMinQuotaPartitions) {
Review Comment:
Let's add a comment above this line, too. Something like
```
// Clear this once the current num consumers over minQuota reaches the
expected number since this
// means all consumers at minQuota are now considered filled
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -729,8 +739,9 @@ private void assignRackAwareRoundRobin(List<TopicPartition>
unassignedPartitions
int assignmentCount = assignment.get(consumer).size() + 1;
if (assignmentCount >= minQuota) {
unfilledMembersWithUnderMinQuotaPartitions.remove(consumer);
- if (assignmentCount < maxQuota)
+ if (assignmentCount < maxQuota &&
(currentNumMembersWithOverMinQuotaPartitions <
expectedNumMembersWithOverMinQuotaPartitions)) {
Review Comment:
This fix makes sense, good find. But let's add a comment because obviously
this is difficult to understand if the bug slipped in.
Something like:
```
// Only add this consumer if the current num members at maxQuota is less
than the expected number
// since a consumer at minQuota can only be considered unfilled if it's
possible to add another partition,
// which would bump it to maxQuota and exceed the
expectedNumMembersWithOverMinQuotaPartitions
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -579,7 +579,7 @@ protected List<TopicPartition>
getAllTopicPartitions(List<String> sortedAllTopic
private class ConstrainedAssignmentBuilder extends
AbstractAssignmentBuilder {
private final Set<TopicPartition> partitionsWithMultiplePreviousOwners;
- private final Set<TopicPartition> allRevokedPartitions;
+ private final Map<TopicPartition, String> mayRevokedPartitions;
Review Comment:
nit: this name is a bit confusing, how about `maybeRevokedPartitions`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]