dajac commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1622067615


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -806,7 +809,29 @@ public void validateOffsetCommit(
         if (memberEpoch < 0 && members().isEmpty()) return;
 
         final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-        validateMemberEpoch(memberEpoch, member.memberEpoch());
+        if (member.useClassicProtocol()) {
+            validateMemberInstanceId(member, groupInstanceId);

Review Comment:
   When a member is fenced, it is removed from the group. So in this particular 
case, `getOrMaybeCreateMember` will throw a `UnknownMemberIdException` 
exception before we have a change to validate the instance id here. If we want 
to validate it, it should be done before, I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -806,7 +809,29 @@ public void validateOffsetCommit(
         if (memberEpoch < 0 && members().isEmpty()) return;
 
         final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-        validateMemberEpoch(memberEpoch, member.memberEpoch());
+        if (member.useClassicProtocol()) {
+            validateMemberInstanceId(member, groupInstanceId);
+
+            try {
+                validateMemberEpoch(memberEpoch, member.memberEpoch());
+            } catch (StaleMemberEpochException ex) {
+                // StaleMemberEpochException is not supported in the classic 
protocol. We throw
+                // IllegalGenerationException instead for compatibility.
+                throw new IllegalGenerationException(String.format("Invalid 
offset commit because the "
+                    + "received generation id %d does not match the expected 
member epoch %d.",
+                    memberEpoch, member.memberEpoch()));
+            }
+
+            if (member.memberEpoch() < groupEpoch() ||
+                member.state() == MemberState.UNREVOKED_PARTITIONS ||
+                (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!waitingOnUnreleasedPartition(member))) {
+                throw new RebalanceInProgressException(String.format("Invalid 
offset commit because" +
+                    " a new rebalance has been triggered in group %s and 
member %s should rejoin to catch up.",
+                    groupId(), member.memberId()));
+            }

Review Comment:
   I am not sure about this logic here. Let's imagine that a member must 
rebalance and it got the signal via the heartbeat request. The first thing that 
it will do is committing its offsets. With this logic, the commits will be 
rejected. Therefore, I wonder if we should remove it and rely on the other APIs 
to trigger rebalances. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -857,6 +857,7 @@ public void validateOffsetCommit(
             throw Errors.UNKNOWN_MEMBER_ID.exception();
         }
 
+        // TODO: A temp marker. Will remove it when the pr is open.
         if (!isTransactional && isInState(COMPLETING_REBALANCE)) {

Review Comment:
   The member is allowed to commit while in `PREPARING_REBALANCE` state. When a 
rebalance is triggered, the member must commit its offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to