This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a80dedab971 KAFKA-20066: Implement KIP-1251: Assignment epochs for 
consumer groups [3/N] (#21692)
a80dedab971 is described below

commit a80dedab9714ad26acc3d17e5a242ceb5b2c2207
Author: Lucy Liu <[email protected]>
AuthorDate: Wed Mar 18 07:50:51 2026 -0500

    KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups 
[3/N] (#21692)
    
    # Summary
    
    This PR changes the offset commit fencing logic, as a follow-up to
    https://github.com/apache/kafka/pull/21558 change on assignment
    structure to include epoch information.
    
    It introduces per-partition assignment epochs to relax the strict member
    epoch validation for consumer group offset commits. When receiving an
    offset commit request that includes the client-side member epoch and a
    member ID, we previously require checking
    ```
    clientEpoch == brokerEpoch
    ```
    for a valid offset commit, which could lead to false fencing.
    
    We now allow a relaxed offset commit check using an assignment epoch for
    each assigned partition and each member,
    
    ```
    assignmentEpoch <= clientEpoch <= brokerEpoch
    ```
    This prevents false rejections of legitimate offset commits when a
    member's epoch is bumped but the client hasn't received the update yet.
    
    Reviewers: Lucas Brutschy <[email protected]>, Sean Quah
     <[email protected]>, Dongnuo Lyu <[email protected]>, David Jacot
     <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  58 +++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  12 +-
 .../coordinator/group/GroupCoordinatorShard.java   |   2 +
 .../coordinator/group/OffsetMetadataManager.java   |  41 +++-
 .../group/modern/consumer/ConsumerGroup.java       |  70 ++++++-
 .../group/GroupCoordinatorShardTest.java           |  35 ++++
 .../group/OffsetMetadataManagerTest.java           | 222 ++++++++++++++++++++-
 .../modern/consumer/ConsumerGroupMemberTest.java   |  33 +++
 .../group/modern/consumer/ConsumerGroupTest.java   | 187 ++++++++++++++++-
 .../TransactionalOffsetFetchBenchmark.java         |   3 +-
 10 files changed, 611 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 555469599b3..54c6395b5f1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -285,9 +285,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       if (useTopicIds) {
         offsetCommitRequest.data.topics.forEach { topic =>
-          if (topic.topicId != Uuid.ZERO_UUID) {
-            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
-          }
+          metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
         }
       }
 
@@ -311,32 +309,40 @@ class KafkaApis(val requestChannel: RequestChannel,
           // to the response with TOPIC_AUTHORIZATION_FAILED.
           
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
             topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
-        } else if (!metadataCache.contains(topic.name)) {
-          // If the topic is unknown, we add the topic and all its partitions
-          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
         } else {
-          // Otherwise, we check all partitions to ensure that they all exist.
-          val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
-            .setTopicId(topic.topicId)
-            .setName(topic.name)
-
-          topic.partitions.forEach { partition =>
-            if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent) {
-              topicWithValidPartitions.partitions.add(partition)
-            } else {
-              responseBuilder.addPartition(
-                topic.topicId,
-                topic.name,
-                partition.partitionIndex,
-                Errors.UNKNOWN_TOPIC_OR_PARTITION
-              )
+          // For lower API versions, the topic id may not be included in the 
request.
+          // In this case, we resolve the topic id from metadata cache to 
ensure that the topic exists.
+          // If the topic doesn't exist, the currentTopicId will fallback to 
ZERO_UUID.
+          val currentTopicId = metadataCache.getTopicId(topic.name)
+          topic.setTopicId(currentTopicId)
+
+          if (currentTopicId == Uuid.ZERO_UUID) {
+            // If the topic is unknown, we add the topic and all its partitions
+            // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+            
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+              Uuid.ZERO_UUID, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          } else {
+            // Otherwise, we check all partitions to ensure that they all 
exist.
+            val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
+              .setTopicId(topic.topicId())
+              .setName(topic.name)
+
+            topic.partitions.forEach { partition =>
+              if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent) {
+                topicWithValidPartitions.partitions.add(partition)
+              } else {
+                responseBuilder.addPartition(
+                  topic.topicId(),
+                  topic.name,
+                  partition.partitionIndex,
+                  Errors.UNKNOWN_TOPIC_OR_PARTITION
+                )
+              }
             }
-          }
 
-          if (!topicWithValidPartitions.partitions.isEmpty) {
-            authorizedTopicsRequest += topicWithValidPartitions
+            if (!topicWithValidPartitions.partitions.isEmpty) {
+              authorizedTopicsRequest += topicWithValidPartitions
+            }
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 069fe8645a7..663aaefe839 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1050,7 +1050,7 @@ class KafkaApisTest extends Logging {
       .setMemberId("member")
       .setTopics(util.List.of(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setTopicId(topicId)
           .setName(topicName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -1111,7 +1111,7 @@ class KafkaApisTest extends Logging {
       .setMemberId("member")
       .setTopics(util.List.of(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+          .setTopicId(topicId)
           .setName(topicName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -1305,8 +1305,10 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
-    addTopicToMetadataCache("foo", numPartitions = 2)
-    addTopicToMetadataCache("bar", numPartitions = 2)
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    addTopicToMetadataCache("foo", numPartitions = 2, topicId = fooId)
+    addTopicToMetadataCache("bar", numPartitions = 2, topicId = barId)
 
     val offsetCommitRequest = new OffsetCommitRequestData()
       .setGroupId("group")
@@ -1356,6 +1358,7 @@ class KafkaApisTest extends Logging {
         // foo exists but only has 2 partitions.
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
           .setName("foo")
+          .setTopicId(fooId)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
@@ -1365,6 +1368,7 @@ class KafkaApisTest extends Logging {
               .setCommittedOffset(20))),
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
           .setName("bar")
+          .setTopicId(barId)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 8937df57f32..9fc202ebae7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -1081,6 +1081,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     public void onLoaded(CoordinatorMetadataImage newImage) {
         CoordinatorMetadataDelta emptyDelta = newImage.emptyDelta();
         groupMetadataManager.onMetadataUpdate(emptyDelta, newImage);
+        offsetMetadataManager.onMetadataUpdate(emptyDelta, newImage);
         coordinatorMetrics.activateMetricsShard(metricsShard);
 
         groupMetadataManager.onLoaded();
@@ -1105,6 +1106,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     @Override
     public void onMetadataUpdate(CoordinatorMetadataDelta delta, 
CoordinatorMetadataImage newImage) {
         groupMetadataManager.onMetadataUpdate(delta, newImage);
+        offsetMetadataManager.onMetadataUpdate(delta, newImage);
     }
 
     private static OffsetCommitKey convertLegacyOffsetCommitKey(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index d1ce206c844..10ff727e637 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
 import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
@@ -47,7 +49,6 @@ import 
org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -84,7 +85,7 @@ public class OffsetMetadataManager {
         private SnapshotRegistry snapshotRegistry = null;
         private Time time = null;
         private GroupMetadataManager groupMetadataManager = null;
-        private MetadataImage metadataImage = null;
+        private CoordinatorMetadataImage metadataImage = null;
         private GroupCoordinatorConfig config = null;
         private GroupCoordinatorMetricsShard metrics = null;
 
@@ -113,7 +114,7 @@ public class OffsetMetadataManager {
             return this;
         }
 
-        public Builder withMetadataImage(MetadataImage metadataImage) {
+        public Builder withMetadataImage(CoordinatorMetadataImage 
metadataImage) {
             this.metadataImage = metadataImage;
             return this;
         }
@@ -126,7 +127,7 @@ public class OffsetMetadataManager {
         public OffsetMetadataManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
-            if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+            if (metadataImage == null) metadataImage = 
CoordinatorMetadataImage.EMPTY;
             if (time == null) time = Time.SYSTEM;
 
             if (groupMetadataManager == null) {
@@ -164,6 +165,11 @@ public class OffsetMetadataManager {
      */
     private final Time time;
 
+    /**
+     * The metadata image.
+     */
+    private CoordinatorMetadataImage metadataImage;
+
     /**
      * The group metadata manager.
      */
@@ -425,7 +431,7 @@ public class OffsetMetadataManager {
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         Time time,
-        MetadataImage metadataImage,
+        CoordinatorMetadataImage metadataImage,
         GroupMetadataManager groupMetadataManager,
         GroupCoordinatorConfig config,
         GroupCoordinatorMetricsShard metrics
@@ -433,6 +439,7 @@ public class OffsetMetadataManager {
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(OffsetMetadataManager.class);
         this.time = time;
+        this.metadataImage = metadataImage;
         this.groupMetadataManager = groupMetadataManager;
         this.config = config;
         this.metrics = metrics;
@@ -692,6 +699,18 @@ public class OffsetMetadataManager {
             final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic().setName(topic.name());
             response.topics().add(topicResponse);
 
+            // Resolve topicId from the metadata image.
+            final Uuid resolvedTopicId = metadataImage
+                .topicMetadata(topic.name())
+                .map(CoordinatorMetadataImage.TopicMetadata::id)
+                .orElse(Uuid.ZERO_UUID);
+
+            // If the topic doesn't exist in metadata, and we need to validate 
the member's assignment,
+            // throw ILLEGAL_GENERATION.
+            if (resolvedTopicId.equals(Uuid.ZERO_UUID) && validator != 
CommitPartitionValidator.NO_OP) {
+                throw Errors.ILLEGAL_GENERATION.exception();
+            }
+
             topic.partitions().forEach(partition -> {
                 if (isMetadataInvalid(partition.committedMetadata())) {
                     topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
@@ -702,7 +721,7 @@ public class OffsetMetadataManager {
                     try {
                         validator.validate(
                             topic.name(),
-                            org.apache.kafka.common.Uuid.ZERO_UUID,
+                            resolvedTopicId,
                             partition.partitionIndex()
                         );
                     } catch (StaleMemberEpochException ex) {
@@ -1273,6 +1292,16 @@ public class OffsetMetadataManager {
         }
     }
 
+    /**
+     * A new metadata image is available.
+     *
+     * @param delta    The delta image.
+     * @param newImage The new metadata image.
+     */
+    public void onMetadataUpdate(CoordinatorMetadataDelta delta, 
CoordinatorMetadataImage newImage) {
+        this.metadataImage = newImage;
+    }
+
     /**
      * @return The offset for the provided groupId and topic partition or null
      * if it does not exist.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index c869477190d..3cdc85b62c8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -670,11 +670,27 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         // the member should be using the OffsetCommit API version >= 9.
         if (!isTransactional && !member.useClassicProtocol() && apiVersion < 
9) {
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
-                "by members using the modern group protocol");
+                "by members using the consumer group protocol");
         }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For members in a consumer group, the epoch must either match the 
last epoch sent
+        // in a heartbeat or be greater than or equal to the partition's 
assignment epoch.
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+
+        if (memberEpoch > member.memberEpoch()) {
+            if (member.useClassicProtocol()) {
+                throw new IllegalGenerationException(String.format("Received 
generation id %d is newer than "
+                    + "current member epoch %d.", memberEpoch, 
member.memberEpoch()));
+            } else {
+                throw new StaleMemberEpochException(String.format("Received 
member epoch %d is newer than "
+                    + "current member epoch %d.", memberEpoch, 
member.memberEpoch()));
+            }
+        }
+
+        // Member epoch is older; validate against per-partition assignment 
epochs.
+        return createAssignmentEpochValidator(member, memberEpoch);
     }
 
     /**
@@ -837,6 +853,54 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch (KIP-1251).
+     *
+     * @param member              The consumer whose assignments are being 
validated.
+     * @param receivedMemberEpoch The received member epoch.
+     * @return A validator for per-partition validation.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Search for the partition in the assigned partitions, then in 
partitions pending revocation.
+            Integer assignmentEpoch = member.assignmentEpoch(topicId, 
partitionId);
+            if (assignmentEpoch == null) {
+                assignmentEpoch = member.pendingRevocationEpoch(topicId, 
partitionId);
+            }
+
+            if (assignmentEpoch == null) {
+                if (member.useClassicProtocol()) {
+                    throw new IllegalGenerationException(String.format(
+                        "Partition %s-%d is not assigned or pending revocation 
for member.",
+                        topicName, partitionId));
+                } else {
+                    throw new StaleMemberEpochException(String.format(
+                        "Partition %s-%d is not assigned or pending revocation 
for member.",
+                        topicName, partitionId));
+                }
+            }
+
+            if (receivedMemberEpoch < assignmentEpoch) {
+                if (member.useClassicProtocol()) {
+                    throw new IllegalGenerationException(String.format(
+                        "Received generation id %d is older than assignment 
epoch %d for partition %s-%d.",
+                        receivedMemberEpoch, assignmentEpoch, topicName, 
partitionId)
+                    );
+                } else {
+                    throw new StaleMemberEpochException(String.format(
+                        "Received member epoch %d is older than assignment 
epoch %d for partition %s-%d.",
+                        receivedMemberEpoch, assignmentEpoch, topicName, 
partitionId)
+                    );
+                }
+            }
+        };
+    }
+
     /**
      * Computes the subscription type based on the provided information.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 065e9adaf12..3a31f403873 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
@@ -1272,9 +1273,43 @@ public class GroupCoordinatorShardTest {
             any(), eq(image)
         );
 
+        verify(offsetMetadataManager, times(1)).onMetadataUpdate(
+            any(), eq(image)
+        );
+
         verify(groupMetadataManager, times(1)).onLoaded();
     }
 
+    @Test
+    public void testOnMetadataUpdate() {
+        CoordinatorMetadataImage image = CoordinatorMetadataImage.EMPTY;
+        CoordinatorMetadataDelta delta = CoordinatorMetadataDelta.EMPTY;
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        coordinator.onMetadataUpdate(delta, image);
+
+        verify(groupMetadataManager, times(1)).onMetadataUpdate(
+            eq(delta), eq(image)
+        );
+
+        verify(offsetMetadataManager, times(1)).onMetadataUpdate(
+            eq(delta), eq(image)
+        );
+    }
+
     @Test
     public void testReplayGroupMetadata() {
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 9c48b15103f..052043b4141 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -46,9 +46,11 @@ import 
org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
 import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
 import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
 import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
@@ -90,9 +92,12 @@ import java.util.Set;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignmentWithEpochs;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignmentWithEpochs;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -115,7 +120,7 @@ public class OffsetMetadataManagerTest {
             private final GroupCoordinatorMetricsShard metrics = 
mock(GroupCoordinatorMetricsShard.class);
             private final GroupConfigManager configManager = 
mock(GroupConfigManager.class);
             private GroupMetadataManager groupMetadataManager = null;
-            private MetadataImage metadataImage = null;
+            private CoordinatorMetadataImage metadataImage = null;
             private GroupCoordinatorConfig config = null;
 
             Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
@@ -133,8 +138,13 @@ public class OffsetMetadataManagerTest {
                 return this;
             }
 
+            Builder withMetadataImage(CoordinatorMetadataImage metadataImage) {
+                this.metadataImage = metadataImage;
+                return this;
+            }
+
             OffsetMetadataManagerTestContext build() {
-                if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+                if (metadataImage == null) metadataImage = 
CoordinatorMetadataImage.EMPTY;
                 if (config == null) {
                     config = 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24);
                 }
@@ -146,7 +156,7 @@ public class OffsetMetadataManagerTest {
                         .withExecutor(executor)
                         .withSnapshotRegistry(snapshotRegistry)
                         .withLogContext(logContext)
-                        .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))
+                        .withMetadataImage(metadataImage)
                         .withGroupCoordinatorMetricsShard(metrics)
                         .withGroupConfigManager(configManager)
                         .withConfig(GroupCoordinatorConfig.fromProps(Map.of()))
@@ -1252,6 +1262,109 @@ public class OffsetMetadataManagerTest {
         assertThrows(IllegalGenerationException.class, () -> 
context.commitOffset(request));
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroTopicId() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        Uuid topicId = Uuid.randomUuid();
+
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 5, 0)))
+            .build()
+        );
+
+        // When topicId is ZERO_UUID, since NO_OP validator is not used,
+        // STALE_EPOCH_EXCEPTION is thrown.
+        assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(
+            new OffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationIdOrMemberEpoch(7)
+                .setTopics(List.of(
+                    new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(List.of(
+                            new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                        ))
+                ))
+        ));
+    }
+
+    @Test
+    public void testConsumerGroupOffsetCommitResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+            .build()
+        );
+
+        OffsetCommitRequestData request = new OffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setTopics(List.of(
+                new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setTopicId(barTopicId)
+                    .setName(barTopicName)
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                    ))
+            ));
+
+        // When client epoch (3) < assignment epoch (5), exception should be 
thrown.
+        request.setGenerationIdOrMemberEpoch(3);
+        assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(request));
+
+        // When client epoch (5) >= assignment epoch (5), commit should 
succeed.
+        request.setGenerationIdOrMemberEpoch(5);
+        assertDoesNotThrow(() -> context.commitOffset(request));
+
+        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result 
= context.commitOffset(request);
+        assertEquals(
+            new OffsetCommitResponseData()
+                .setTopics(List.of(
+                    new OffsetCommitResponseData.OffsetCommitResponseTopic()
+                        .setTopicId(barTopicId)
+                        .setName(barTopicName)
+                        .setPartitions(List.of(
+                            new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                        ))
+                )),
+            result.response()
+        );
+    }
+
     @Test
     public void testConsumerGroupOffsetCommitFromAdminClient() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
@@ -1541,6 +1654,70 @@ public class OffsetMetadataManagerTest {
         verifyTransactionalOffsetCommit(context);
     }
 
+    @Test
+    public void testConsumerGroupTransactionalOffsetCommitResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+            .build()
+        );
+
+        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                    .setName(barTopicName)
+                    .setPartitions(List.of(
+                        new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                    ))
+            ));
+
+        // When client epoch (3) < assignment epoch (5), exception should be 
thrown.
+        request.setGenerationId(3);
+        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(request));
+
+        // When client epoch (5) >= assignment epoch (5), commit should 
succeed.
+        request.setGenerationId(5);
+        assertDoesNotThrow(() -> context.commitTransactionalOffset(request));
+
+        CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> 
result = context.commitTransactionalOffset(request);
+        assertEquals(
+            new TxnOffsetCommitResponseData()
+                .setTopics(List.of(
+                    new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                        .setName(barTopicName)
+                        .setPartitions(List.of(
+                            new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                        ))
+                )),
+            result.response()
+        );
+    }
+
     @Test
     public void testStreamsGroupTransactionalOffsetCommit() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
@@ -1700,6 +1877,43 @@ public class OffsetMetadataManagerTest {
         verifyTransactionalOffsetCommitWithStaleMemberEpoch(context);
     }
 
+    @Test
+    public void 
testConsumerGroupTransactionalOffsetCommitWithUnresolvedTopicId() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        Uuid topicId = Uuid.randomUuid();
+
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 5, 0)))
+            .build()
+        );
+
+        // When topicId couldn't be resolved and fall back to ZERO_UUID, since 
NO_OP validator is not used,
+        // ILLEGAL_GENERATION is thrown.
+        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
+            new TxnOffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationId(7)
+                .setTopics(List.of(
+                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(List.of(
+                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                        ))
+                ))
+        ));
+    }
+
     @Test
     public void 
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
@@ -3728,7 +3942,7 @@ public class OffsetMetadataManagerTest {
             .setSubtopologyId("0")
             .setSourceTopics(List.of("bar")))));
 
-        // Member at epoch 10, with partitions assigned at epoch 4 and 5 
respsectively.
+        // Member at epoch 10, with partitions assigned at epoch 4 and 5 
respectively.
         group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
             .setMemberEpoch(10)
             .setAssignedTasks(new TasksTupleWithEpochs(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
index e449db9a639..202325d2327 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
@@ -48,6 +48,7 @@ import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig
 import static org.apache.kafka.coordinator.group.Utils.toAssignmentWithEpochs;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class ConsumerGroupMemberTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerGroupMemberTest.class);
@@ -413,6 +414,38 @@ public class ConsumerGroupMemberTest {
         );
     }
 
+    @Test
+    public void testAssignedAndPendingRevocationEpoch() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+
+        ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(topicId1, 1, 2, 3)), 10))
+            
.setPartitionsPendingRevocation(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(topicId2, 4, 5, 6)), 9))
+            .build();
+
+        assertEquals(10, member.assignmentEpoch(topicId1, 1));
+        assertEquals(10, member.assignmentEpoch(topicId1, 2));
+        assertEquals(10, member.assignmentEpoch(topicId1, 3));
+        assertNull(member.pendingRevocationEpoch(topicId1, 1));
+        assertNull(member.pendingRevocationEpoch(topicId1, 2));
+        assertNull(member.pendingRevocationEpoch(topicId1, 3));
+
+        assertEquals(9, member.pendingRevocationEpoch(topicId2, 4));
+        assertEquals(9, member.pendingRevocationEpoch(topicId2, 5));
+        assertEquals(9, member.pendingRevocationEpoch(topicId2, 6));
+        assertNull(member.assignmentEpoch(topicId2, 4));
+        assertNull(member.assignmentEpoch(topicId2, 5));
+
+        assertNull(member.assignmentEpoch(topicId1, 10));
+        assertNull(member.pendingRevocationEpoch(topicId2, 10));
+        assertNull(member.assignmentEpoch(unassignedTopicId, 0));
+        assertNull(member.pendingRevocationEpoch(unassignedTopicId, 0));
+    }
+
     private List<ConsumerGroupMemberMetadataValue.ClassicProtocol> 
toClassicProtocolCollection(String name) {
         List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new 
ArrayList<>();
         protocols.add(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 4570315a00d..73bf9655a76 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.coordinator.group.CommitPartitionValidator;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
@@ -52,7 +53,9 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,12 +67,15 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignmentWithEpochs;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignmentWithEpochs;
 import static org.apache.kafka.coordinator.group.Utils.computeGroupHash;
 import static org.apache.kafka.coordinator.group.Utils.computeTopicHash;
 import static org.apache.kafka.coordinator.group.Utils.toAssignmentWithEpochs;
@@ -914,24 +920,189 @@ public class ConsumerGroupTest {
         assertThrows(UnknownMemberIdException.class, () ->
             group.validateOffsetCommit("", null, -1, isTransactional, 
version));
 
-        // The member epoch is stale.
+        // This should succeed.
         if (version >= 9) {
+            group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version);
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetCommitEpochValidationParams")
+    public void testValidateOffsetCommitWithEpochValidation(boolean 
isTransactional, short version, PartitionAssignmentType assignmentType) {
+        Uuid topicId = Uuid.randomUuid();
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10);
+
+        if (assignmentType == PartitionAssignmentType.ASSIGNED) {
+            memberBuilder.setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 7, 0)));
+        } else {
+            
memberBuilder.setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 7, 0)));
+        }
+        group.updateMember(memberBuilder.build());
+
+        // When client epoch (11) > broker epoch (10), throw 
StaleMemberEpochException.
+        if (isTransactional || version >= 9) {
             assertThrows(StaleMemberEpochException.class, () ->
-                group.validateOffsetCommit("new-protocol-member-id", "", 10, 
isTransactional, version));
+                group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
         } else {
             assertThrows(UnsupportedVersionException.class, () ->
-                group.validateOffsetCommit("new-protocol-member-id", "", 10, 
isTransactional, version));
+                group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
         }
+
+        // When client epoch (10) == broker epoch (10), no exception thrown.
+        if (isTransactional || version >= 9) {
+            var validator = group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version);
+            assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version));
+        }
+
+        // When assignment epoch (7) <= client epoch (7) <= broker epoch (10), 
no exception thrown.
+        if (isTransactional || version >= 9) {
+            var validator = group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version);
+            assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+        }
+
+        // When client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7),
+        // stale member epoch exception thrown from assignment epoch validator.
+        if (isTransactional || version >= 9) {
+            var validator = group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version);
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate("foo", topicId, 0));
+            assertEquals(
+                "Received member epoch 6 is older than assignment epoch 7 for 
partition foo-0.",
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+        }
+    }
+
+    private enum PartitionAssignmentType {
+        ASSIGNED,
+        PENDING_REVOCATION
+    }
+
+    private static Stream<Arguments> offsetCommitEpochValidationParams() {
+        Stream.Builder<Arguments> builder = Stream.builder();
+
+        for (short version = ApiKeys.OFFSET_COMMIT.oldestVersion(); version <= 
ApiKeys.OFFSET_COMMIT.latestVersion(true); version++) {
+            for (PartitionAssignmentType type : 
PartitionAssignmentType.values()) {
+                builder.add(Arguments.of(false, version, type));
+            }
+        }
+        for (short version = ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(); 
version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true); version++) {
+            for (PartitionAssignmentType type : 
PartitionAssignmentType.values()) {
+                builder.add(Arguments.of(true, version, type));
+            }
+        }
+
+        return builder.build();
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetCommitVersionsAndTransactionalParams")
+    public void testValidateOffsetCommitWithClassicProtocolMember(boolean 
isTransactional, short version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+            .build());
+
+        // When client epoch (11) > broker epoch (10), throw 
IllegalGenerationException.
         assertThrows(IllegalGenerationException.class, () ->
-            group.validateOffsetCommit("old-protocol-member-id", "", 10, 
isTransactional, version));
+            group.validateOffsetCommit("member-id", "", 11, isTransactional, 
version)
+        );
 
-        // This should succeed.
-        if (version >= 9) {
-            group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version);
+        // When client epoch (10) == broker epoch (10), no exception thrown 
and NO_OP validator returned.
+        var validator = group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version);
+        assertEquals(CommitPartitionValidator.NO_OP, validator);
+
+        // When assignment epoch (7) <= client epoch (7) <= broker epoch (10), 
no exception thrown.
+        var newValidator = group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version);
+        assertDoesNotThrow(() -> newValidator.validate("foo", topicId, 0));
+
+        // When client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7),
+        // IllegalGenerationException thrown from assignment epoch validator.
+        var staleValidator = group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version);
+        assertThrows(IllegalGenerationException.class, () ->
+            staleValidator.validate("foo", topicId, 0));
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetCommitVersionsAndTransactionalParams")
+    public void testValidateOffsetCommitWithUnassignedTopicOrPartition(boolean 
isTransactional, short version) {
+        Uuid assignedTopicId = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, 7, 0)))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, 7, 1)))
+            .build());
+
+        // Commit an unassigned topic
+        if (isTransactional || version >= 9) {
+            var validator = group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version);
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate("bar", unassignedTopicId, 0));
+            assertEquals(
+                "Partition bar-0 is not assigned or pending revocation for 
member.",
+                ex.getMessage()
+            );
         } else {
             assertThrows(UnsupportedVersionException.class, () ->
-                group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version));
+                group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
         }
+
+        // Commit to an unassigned partition of an existing topic
+        if (isTransactional || version >= 9) {
+            var validator = group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version);
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate("foo", assignedTopicId, 2));
+            assertEquals(
+                "Partition foo-2 is not assigned or pending revocation for 
member.",
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+        }
+    }
+
+    private static Stream<Arguments> 
offsetCommitVersionsAndTransactionalParams() {
+        Stream.Builder<Arguments> builder = Stream.builder();
+
+        for (short version = ApiKeys.OFFSET_COMMIT.oldestVersion(); version <= 
ApiKeys.OFFSET_COMMIT.latestVersion(true); version++) {
+            builder.add(Arguments.of(false, version));
+        }
+        for (short version = ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(); 
version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true); version++) {
+            builder.add(Arguments.of(true, version));
+        }
+
+        return builder.build();
     }
 
     @Test
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index b1986f639a7..8149dfedf2a 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.group.GroupMetadataManager;
@@ -100,7 +101,7 @@ public class TransactionalOffsetFetchBenchmark {
             .withTime(TIME)
             .withGroupMetadataManager(groupMetadataManager)
             .withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
-            .withMetadataImage(image)
+            .withMetadataImage(new KRaftCoordinatorMetadataImage(image))
             
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
             .build();
 

Reply via email to