dajac commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1626453679
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -100,12 +100,15 @@ public static GroupType parse(String name) {
* @param generationIdOrMemberEpoch The generation id for genetic groups
or the member epoch
* for consumer groups.
* @param isTransactional Whether the offset commit is
transactional or not.
+ * @param version The reqyest context api version.
Review Comment:
nit: `The api version`.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -1016,32 +1017,86 @@ public void testMetadataRefreshDeadline() {
assertEquals(0, group.metadataRefreshDeadline().epoch);
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testValidateOffsetCommit(boolean isTransactional) {
- ConsumerGroup group = createConsumerGroup("group-foo");
-
- // Simulate a call from the admin client without member id and member
epoch.
- // This should pass only if the group is empty.
- group.validateOffsetCommit("", "", -1, isTransactional);
-
- // The member does not exist.
- assertThrows(UnknownMemberIdException.class, () ->
- group.validateOffsetCommit("member-id", null, 0, isTransactional));
-
- // Create a member.
- group.updateMember(new
ConsumerGroupMember.Builder("member-id").build());
-
- // A call from the admin client should fail as the group is not empty.
- assertThrows(UnknownMemberIdException.class, () ->
- group.validateOffsetCommit("", "", -1, isTransactional));
-
- // The member epoch is stale.
- assertThrows(StaleMemberEpochException.class, () ->
- group.validateOffsetCommit("member-id", "", 10, isTransactional));
+ @Test
+ public void testValidateTransactionalOffsetCommit() {
+ boolean isTransactional = true;
+ for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <=
ApiKeys.OFFSET_COMMIT.latestVersion(); v++) {
+ final short version = v;
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ // Simulate a call from the admin client without member id and
member epoch.
+ // This should pass only if the group is empty.
+ group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("member-id", null, 0,
isTransactional, version));
+
+ // Create a member.
+ group.updateMember(new
ConsumerGroupMember.Builder("member-id").build());
+
+ // A call from the admin client should fail as the group is not
empty.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetCommit("", "", -1, isTransactional,
version));
+
+ // The member epoch is stale.
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+
+ // This should succeed.
+ group.validateOffsetCommit("member-id", "", 0, isTransactional,
version);
+ }
+ }
- // This should succeed.
- group.validateOffsetCommit("member-id", "", 0, isTransactional);
+ @Test
+ public void testNonTransactionalValidateOffsetCommit() {
Review Comment:
nit: `testValidateOffsetCommit`. If we don't specify transactional, then it
is not.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or
not. It has no
* impact when a consumer group is used.
+ * @param version The request context api version.
Review Comment:
same here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or
not. It has no
* impact when a consumer group is used.
+ * @param version The request context api version.
+ * @throws UnknownMemberIdException If the member is not found.
+ * @throws StaleMemberEpochException If the member uses the consumer
protocol and the provided
+ * member epoch doesn't match the
actual member epoch.
+ * @throws IllegalGenerationException If the member uses the classic
protocol and the provided
+ * generation id is not equal to the
member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
- boolean isTransactional
- ) throws UnknownMemberIdException, StaleMemberEpochException {
+ boolean isTransactional,
+ short version
+ ) throws UnknownMemberIdException, StaleMemberEpochException,
IllegalGenerationException {
// When the member epoch is -1, the request comes from either the
admin client
// or a consumer which does not use the group management facility. In
this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;
final ConsumerGroupMember member = getOrMaybeCreateMember(memberId,
false);
- validateMemberEpoch(memberEpoch, member.memberEpoch());
+
+ // If the commit is not transactional and the member uses the new
consumer protocol (KIP-848),
+ // the member should be using the OffsetCommit API version >= 9.
+ if (!isTransactional && !member.useClassicProtocol() && version < 9) {
+ throw new UnsupportedVersionException(String.format("The
OffsetCommit API version %d " +
+ "is smaller than the lowest version supporting new consumer
protocol 9.", version));
Review Comment:
nit: 'OffsetCommit version 9 or above must be used by members using the
consumer group protocol`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -100,12 +100,15 @@ public static GroupType parse(String name) {
* @param generationIdOrMemberEpoch The generation id for genetic groups
or the member epoch
* for consumer groups.
* @param isTransactional Whether the offset commit is
transactional or not.
+ * @param version The reqyest context api version.
*/
void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationIdOrMemberEpoch,
- boolean isTransactional
+ boolean isTransactional,
+ short version
Review Comment:
nit: `apiVersion`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java:
##########
@@ -989,71 +990,76 @@ public void testMaybeElectNewJoinedLeaderChooseExisting()
{
@Test
public void testValidateOffsetCommit() {
Review Comment:
nit:
```
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or
not. It has no
* impact when a consumer group is used.
+ * @param version The request context api version.
+ * @throws UnknownMemberIdException If the member is not found.
+ * @throws StaleMemberEpochException If the member uses the consumer
protocol and the provided
+ * member epoch doesn't match the
actual member epoch.
+ * @throws IllegalGenerationException If the member uses the classic
protocol and the provided
+ * generation id is not equal to the
member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
- boolean isTransactional
- ) throws UnknownMemberIdException, StaleMemberEpochException {
+ boolean isTransactional,
+ short version
+ ) throws UnknownMemberIdException, StaleMemberEpochException,
IllegalGenerationException {
// When the member epoch is -1, the request comes from either the
admin client
// or a consumer which does not use the group management facility. In
this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;
final ConsumerGroupMember member = getOrMaybeCreateMember(memberId,
false);
- validateMemberEpoch(memberEpoch, member.memberEpoch());
+
+ // If the commit is not transactional and the member uses the new
consumer protocol (KIP-848),
+ // the member should be using the OffsetCommit API version >= 9.
+ if (!isTransactional && !member.useClassicProtocol() && version < 9) {
+ throw new UnsupportedVersionException(String.format("The
OffsetCommit API version %d " +
+ "is smaller than the lowest version supporting new consumer
protocol 9.", version));
+ }
+ validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
Review Comment:
nit: Let's add an empty line before this one.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or
not. It has no
* impact when a consumer group is used.
+ * @param version The request context api version.
+ * @throws UnknownMemberIdException If the member is not found.
+ * @throws StaleMemberEpochException If the member uses the consumer
protocol and the provided
+ * member epoch doesn't match the
actual member epoch.
+ * @throws IllegalGenerationException If the member uses the classic
protocol and the provided
+ * generation id is not equal to the
member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
- boolean isTransactional
- ) throws UnknownMemberIdException, StaleMemberEpochException {
+ boolean isTransactional,
+ short version
Review Comment:
same here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -1016,32 +1017,86 @@ public void testMetadataRefreshDeadline() {
assertEquals(0, group.metadataRefreshDeadline().epoch);
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testValidateOffsetCommit(boolean isTransactional) {
- ConsumerGroup group = createConsumerGroup("group-foo");
-
- // Simulate a call from the admin client without member id and member
epoch.
- // This should pass only if the group is empty.
- group.validateOffsetCommit("", "", -1, isTransactional);
-
- // The member does not exist.
- assertThrows(UnknownMemberIdException.class, () ->
- group.validateOffsetCommit("member-id", null, 0, isTransactional));
-
- // Create a member.
- group.updateMember(new
ConsumerGroupMember.Builder("member-id").build());
-
- // A call from the admin client should fail as the group is not empty.
- assertThrows(UnknownMemberIdException.class, () ->
- group.validateOffsetCommit("", "", -1, isTransactional));
-
- // The member epoch is stale.
- assertThrows(StaleMemberEpochException.class, () ->
- group.validateOffsetCommit("member-id", "", 10, isTransactional));
+ @Test
Review Comment:
Let's also use `@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]