This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a80dedab971 KAFKA-20066: Implement KIP-1251: Assignment epochs for
consumer groups [3/N] (#21692)
a80dedab971 is described below
commit a80dedab9714ad26acc3d17e5a242ceb5b2c2207
Author: Lucy Liu <[email protected]>
AuthorDate: Wed Mar 18 07:50:51 2026 -0500
KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups
[3/N] (#21692)
# Summary
This PR changes the offset commit fencing logic, as a follow-up to
https://github.com/apache/kafka/pull/21558 change on assignment
structure to include epoch information.
It introduces per-partition assignment epochs to relax the strict member
epoch validation for consumer group offset commits. When receiving an
offset commit request that includes the client-side member epoch and a
member ID, we previously require checking
```
clientEpoch == brokerEpoch
```
for a valid offset commit, which could lead to false fencing.
We now allow a relaxed offset commit check using an assignment epoch for
each assigned partition and each member,
```
assignmentEpoch <= clientEpoch <= brokerEpoch
```
This prevents false rejections of legitimate offset commits when a
member's epoch is bumped but the client hasn't received the update yet.
Reviewers: Lucas Brutschy <[email protected]>, Sean Quah
<[email protected]>, Dongnuo Lyu <[email protected]>, David Jacot
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 58 +++---
.../scala/unit/kafka/server/KafkaApisTest.scala | 12 +-
.../coordinator/group/GroupCoordinatorShard.java | 2 +
.../coordinator/group/OffsetMetadataManager.java | 41 +++-
.../group/modern/consumer/ConsumerGroup.java | 70 ++++++-
.../group/GroupCoordinatorShardTest.java | 35 ++++
.../group/OffsetMetadataManagerTest.java | 222 ++++++++++++++++++++-
.../modern/consumer/ConsumerGroupMemberTest.java | 33 +++
.../group/modern/consumer/ConsumerGroupTest.java | 187 ++++++++++++++++-
.../TransactionalOffsetFetchBenchmark.java | 3 +-
10 files changed, 611 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 555469599b3..54c6395b5f1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -285,9 +285,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (useTopicIds) {
offsetCommitRequest.data.topics.forEach { topic =>
- if (topic.topicId != Uuid.ZERO_UUID) {
- metadataCache.getTopicName(topic.topicId).ifPresent(name =>
topic.setName(name))
- }
+ metadataCache.getTopicName(topic.topicId).ifPresent(name =>
topic.setName(name))
}
}
@@ -311,32 +309,40 @@ class KafkaApis(val requestChannel: RequestChannel,
// to the response with TOPIC_AUTHORIZATION_FAILED.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
topic.topicId, topic.name, topic.partitions, _.partitionIndex,
Errors.TOPIC_AUTHORIZATION_FAILED)
- } else if (!metadataCache.contains(topic.name)) {
- // If the topic is unknown, we add the topic and all its partitions
- // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
- topic.topicId, topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
- // Otherwise, we check all partitions to ensure that they all exist.
- val topicWithValidPartitions = new
OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setTopicId(topic.topicId)
- .setName(topic.name)
-
- topic.partitions.forEach { partition =>
- if (metadataCache.getLeaderAndIsr(topic.name,
partition.partitionIndex).isPresent) {
- topicWithValidPartitions.partitions.add(partition)
- } else {
- responseBuilder.addPartition(
- topic.topicId,
- topic.name,
- partition.partitionIndex,
- Errors.UNKNOWN_TOPIC_OR_PARTITION
- )
+ // For lower API versions, the topic id may not be included in the
request.
+ // In this case, we resolve the topic id from metadata cache to
ensure that the topic exists.
+ // If the topic doesn't exist, the currentTopicId will fallback to
ZERO_UUID.
+ val currentTopicId = metadataCache.getTopicId(topic.name)
+ topic.setTopicId(currentTopicId)
+
+ if (currentTopicId == Uuid.ZERO_UUID) {
+ // If the topic is unknown, we add the topic and all its partitions
+ // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+ Uuid.ZERO_UUID, topic.name, topic.partitions, _.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ } else {
+ // Otherwise, we check all partitions to ensure that they all
exist.
+ val topicWithValidPartitions = new
OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setTopicId(topic.topicId())
+ .setName(topic.name)
+
+ topic.partitions.forEach { partition =>
+ if (metadataCache.getLeaderAndIsr(topic.name,
partition.partitionIndex).isPresent) {
+ topicWithValidPartitions.partitions.add(partition)
+ } else {
+ responseBuilder.addPartition(
+ topic.topicId(),
+ topic.name,
+ partition.partitionIndex,
+ Errors.UNKNOWN_TOPIC_OR_PARTITION
+ )
+ }
}
- }
- if (!topicWithValidPartitions.partitions.isEmpty) {
- authorizedTopicsRequest += topicWithValidPartitions
+ if (!topicWithValidPartitions.partitions.isEmpty) {
+ authorizedTopicsRequest += topicWithValidPartitions
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 069fe8645a7..663aaefe839 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1050,7 +1050,7 @@ class KafkaApisTest extends Logging {
.setMemberId("member")
.setTopics(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+ .setTopicId(topicId)
.setName(topicName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -1111,7 +1111,7 @@ class KafkaApisTest extends Logging {
.setMemberId("member")
.setTopics(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
+ .setTopicId(topicId)
.setName(topicName)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
@@ -1305,8 +1305,10 @@ class KafkaApisTest extends Logging {
@Test
def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
- addTopicToMetadataCache("foo", numPartitions = 2)
- addTopicToMetadataCache("bar", numPartitions = 2)
+ val fooId = Uuid.randomUuid()
+ val barId = Uuid.randomUuid()
+ addTopicToMetadataCache("foo", numPartitions = 2, topicId = fooId)
+ addTopicToMetadataCache("bar", numPartitions = 2, topicId = barId)
val offsetCommitRequest = new OffsetCommitRequestData()
.setGroupId("group")
@@ -1356,6 +1358,7 @@ class KafkaApisTest extends Logging {
// foo exists but only has 2 partitions.
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("foo")
+ .setTopicId(fooId)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1365,6 +1368,7 @@ class KafkaApisTest extends Logging {
.setCommittedOffset(20))),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
+ .setTopicId(barId)
.setPartitions(util.List.of(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 8937df57f32..9fc202ebae7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -1081,6 +1081,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
public void onLoaded(CoordinatorMetadataImage newImage) {
CoordinatorMetadataDelta emptyDelta = newImage.emptyDelta();
groupMetadataManager.onMetadataUpdate(emptyDelta, newImage);
+ offsetMetadataManager.onMetadataUpdate(emptyDelta, newImage);
coordinatorMetrics.activateMetricsShard(metricsShard);
groupMetadataManager.onLoaded();
@@ -1105,6 +1106,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
@Override
public void onMetadataUpdate(CoordinatorMetadataDelta delta,
CoordinatorMetadataImage newImage) {
groupMetadataManager.onMetadataUpdate(delta, newImage);
+ offsetMetadataManager.onMetadataUpdate(delta, newImage);
}
private static OffsetCommitKey convertLegacyOffsetCommitKey(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index d1ce206c844..10ff727e637 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
@@ -47,7 +49,6 @@ import
org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@@ -84,7 +85,7 @@ public class OffsetMetadataManager {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private GroupMetadataManager groupMetadataManager = null;
- private MetadataImage metadataImage = null;
+ private CoordinatorMetadataImage metadataImage = null;
private GroupCoordinatorConfig config = null;
private GroupCoordinatorMetricsShard metrics = null;
@@ -113,7 +114,7 @@ public class OffsetMetadataManager {
return this;
}
- public Builder withMetadataImage(MetadataImage metadataImage) {
+ public Builder withMetadataImage(CoordinatorMetadataImage
metadataImage) {
this.metadataImage = metadataImage;
return this;
}
@@ -126,7 +127,7 @@ public class OffsetMetadataManager {
public OffsetMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
- if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+ if (metadataImage == null) metadataImage =
CoordinatorMetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;
if (groupMetadataManager == null) {
@@ -164,6 +165,11 @@ public class OffsetMetadataManager {
*/
private final Time time;
+ /**
+ * The metadata image.
+ */
+ private CoordinatorMetadataImage metadataImage;
+
/**
* The group metadata manager.
*/
@@ -425,7 +431,7 @@ public class OffsetMetadataManager {
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
- MetadataImage metadataImage,
+ CoordinatorMetadataImage metadataImage,
GroupMetadataManager groupMetadataManager,
GroupCoordinatorConfig config,
GroupCoordinatorMetricsShard metrics
@@ -433,6 +439,7 @@ public class OffsetMetadataManager {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(OffsetMetadataManager.class);
this.time = time;
+ this.metadataImage = metadataImage;
this.groupMetadataManager = groupMetadataManager;
this.config = config;
this.metrics = metrics;
@@ -692,6 +699,18 @@ public class OffsetMetadataManager {
final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic().setName(topic.name());
response.topics().add(topicResponse);
+ // Resolve topicId from the metadata image.
+ final Uuid resolvedTopicId = metadataImage
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+
+ // If the topic doesn't exist in metadata, and we need to validate
the member's assignment,
+ // throw ILLEGAL_GENERATION.
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID) && validator !=
CommitPartitionValidator.NO_OP) {
+ throw Errors.ILLEGAL_GENERATION.exception();
+ }
+
topic.partitions().forEach(partition -> {
if (isMetadataInvalid(partition.committedMetadata())) {
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
@@ -702,7 +721,7 @@ public class OffsetMetadataManager {
try {
validator.validate(
topic.name(),
- org.apache.kafka.common.Uuid.ZERO_UUID,
+ resolvedTopicId,
partition.partitionIndex()
);
} catch (StaleMemberEpochException ex) {
@@ -1273,6 +1292,16 @@ public class OffsetMetadataManager {
}
}
+ /**
+ * A new metadata image is available.
+ *
+ * @param delta The delta image.
+ * @param newImage The new metadata image.
+ */
+ public void onMetadataUpdate(CoordinatorMetadataDelta delta,
CoordinatorMetadataImage newImage) {
+ this.metadataImage = newImage;
+ }
+
/**
* @return The offset for the provided groupId and topic partition or null
* if it does not exist.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index c869477190d..3cdc85b62c8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -670,11 +670,27 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
// the member should be using the OffsetCommit API version >= 9.
if (!isTransactional && !member.useClassicProtocol() && apiVersion <
9) {
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
- "by members using the modern group protocol");
+ "by members using the consumer group protocol");
}
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For members in a consumer group, the epoch must either match the
last epoch sent
+ // in a heartbeat or be greater than or equal to the partition's
assignment epoch.
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+
+ if (memberEpoch > member.memberEpoch()) {
+ if (member.useClassicProtocol()) {
+ throw new IllegalGenerationException(String.format("Received
generation id %d is newer than "
+ + "current member epoch %d.", memberEpoch,
member.memberEpoch()));
+ } else {
+ throw new StaleMemberEpochException(String.format("Received
member epoch %d is newer than "
+ + "current member epoch %d.", memberEpoch,
member.memberEpoch()));
+ }
+ }
+
+ // Member epoch is older; validate against per-partition assignment
epochs.
+ return createAssignmentEpochValidator(member, memberEpoch);
}
/**
@@ -837,6 +853,54 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch (KIP-1251).
+ *
+ * @param member The consumer whose assignments are being
validated.
+ * @param receivedMemberEpoch The received member epoch.
+ * @return A validator for per-partition validation.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Search for the partition in the assigned partitions, then in
partitions pending revocation.
+ Integer assignmentEpoch = member.assignmentEpoch(topicId,
partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch = member.pendingRevocationEpoch(topicId,
partitionId);
+ }
+
+ if (assignmentEpoch == null) {
+ if (member.useClassicProtocol()) {
+ throw new IllegalGenerationException(String.format(
+ "Partition %s-%d is not assigned or pending revocation
for member.",
+ topicName, partitionId));
+ } else {
+ throw new StaleMemberEpochException(String.format(
+ "Partition %s-%d is not assigned or pending revocation
for member.",
+ topicName, partitionId));
+ }
+ }
+
+ if (receivedMemberEpoch < assignmentEpoch) {
+ if (member.useClassicProtocol()) {
+ throw new IllegalGenerationException(String.format(
+ "Received generation id %d is older than assignment
epoch %d for partition %s-%d.",
+ receivedMemberEpoch, assignmentEpoch, topicName,
partitionId)
+ );
+ } else {
+ throw new StaleMemberEpochException(String.format(
+ "Received member epoch %d is older than assignment
epoch %d for partition %s-%d.",
+ receivedMemberEpoch, assignmentEpoch, topicName,
partitionId)
+ );
+ }
+ }
+ };
+ }
+
/**
* Computes the subscription type based on the provided information.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 065e9adaf12..3a31f403873 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
@@ -1272,9 +1273,43 @@ public class GroupCoordinatorShardTest {
any(), eq(image)
);
+ verify(offsetMetadataManager, times(1)).onMetadataUpdate(
+ any(), eq(image)
+ );
+
verify(groupMetadataManager, times(1)).onLoaded();
}
+ @Test
+ public void testOnMetadataUpdate() {
+ CoordinatorMetadataImage image = CoordinatorMetadataImage.EMPTY;
+ CoordinatorMetadataDelta delta = CoordinatorMetadataDelta.EMPTY;
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ coordinator.onMetadataUpdate(delta, image);
+
+ verify(groupMetadataManager, times(1)).onMetadataUpdate(
+ eq(delta), eq(image)
+ );
+
+ verify(offsetMetadataManager, times(1)).onMetadataUpdate(
+ eq(delta), eq(image)
+ );
+ }
+
@Test
public void testReplayGroupMetadata() {
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 9c48b15103f..052043b4141 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -46,9 +46,11 @@ import
org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
+import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
@@ -90,9 +92,12 @@ import java.util.Set;
import java.util.stream.Stream;
import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignmentWithEpochs;
+import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignmentWithEpochs;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_DELETIONS_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -115,7 +120,7 @@ public class OffsetMetadataManagerTest {
private final GroupCoordinatorMetricsShard metrics =
mock(GroupCoordinatorMetricsShard.class);
private final GroupConfigManager configManager =
mock(GroupConfigManager.class);
private GroupMetadataManager groupMetadataManager = null;
- private MetadataImage metadataImage = null;
+ private CoordinatorMetadataImage metadataImage = null;
private GroupCoordinatorConfig config = null;
Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
@@ -133,8 +138,13 @@ public class OffsetMetadataManagerTest {
return this;
}
+ Builder withMetadataImage(CoordinatorMetadataImage metadataImage) {
+ this.metadataImage = metadataImage;
+ return this;
+ }
+
OffsetMetadataManagerTestContext build() {
- if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+ if (metadataImage == null) metadataImage =
CoordinatorMetadataImage.EMPTY;
if (config == null) {
config =
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24);
}
@@ -146,7 +156,7 @@ public class OffsetMetadataManagerTest {
.withExecutor(executor)
.withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext)
- .withMetadataImage(new
KRaftCoordinatorMetadataImage(metadataImage))
+ .withMetadataImage(metadataImage)
.withGroupCoordinatorMetricsShard(metrics)
.withGroupConfigManager(configManager)
.withConfig(GroupCoordinatorConfig.fromProps(Map.of()))
@@ -1252,6 +1262,109 @@ public class OffsetMetadataManagerTest {
assertThrows(IllegalGenerationException.class, () ->
context.commitOffset(request));
}
+ @Test
+ public void testConsumerGroupOffsetCommitWithZeroTopicId() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 5, 0)))
+ .build()
+ );
+
+ // When topicId is ZERO_UUID, since NO_OP validator is not used,
+ // STALE_EPOCH_EXCEPTION is thrown.
+ assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(
+ new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(7)
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ))
+ ));
+ }
+
+ @Test
+ public void testConsumerGroupOffsetCommitResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(new
KRaftCoordinatorMetadataImage(metadataImage))
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+ .build()
+ );
+
+ OffsetCommitRequestData request = new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setTopics(List.of(
+ new OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setTopicId(barTopicId)
+ .setName(barTopicName)
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ));
+
+ // When client epoch (3) < assignment epoch (5), exception should be
thrown.
+ request.setGenerationIdOrMemberEpoch(3);
+ assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(request));
+
+ // When client epoch (5) >= assignment epoch (5), commit should
succeed.
+ request.setGenerationIdOrMemberEpoch(5);
+ assertDoesNotThrow(() -> context.commitOffset(request));
+
+ CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result
= context.commitOffset(request);
+ assertEquals(
+ new OffsetCommitResponseData()
+ .setTopics(List.of(
+ new OffsetCommitResponseData.OffsetCommitResponseTopic()
+ .setTopicId(barTopicId)
+ .setName(barTopicName)
+ .setPartitions(List.of(
+ new
OffsetCommitResponseData.OffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ )),
+ result.response()
+ );
+ }
+
@Test
public void testConsumerGroupOffsetCommitFromAdminClient() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
@@ -1541,6 +1654,70 @@ public class OffsetMetadataManagerTest {
verifyTransactionalOffsetCommit(context);
}
+ @Test
+ public void testConsumerGroupTransactionalOffsetCommitResolvesTopicId() {
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(barTopicId, barTopicName, 3)
+ .build();
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withMetadataImage(new
KRaftCoordinatorMetadataImage(metadataImage))
+ .build();
+
+ // Create an empty group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+ .build()
+ );
+
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName(barTopicName)
+ .setPartitions(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ));
+
+ // When client epoch (3) < assignment epoch (5), exception should be
thrown.
+ request.setGenerationId(3);
+ assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(request));
+
+ // When client epoch (5) >= assignment epoch (5), commit should
succeed.
+ request.setGenerationId(5);
+ assertDoesNotThrow(() -> context.commitTransactionalOffset(request));
+
+ CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord>
result = context.commitTransactionalOffset(request);
+ assertEquals(
+ new TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName(barTopicName)
+ .setPartitions(List.of(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code())
+ ))
+ )),
+ result.response()
+ );
+ }
+
@Test
public void testStreamsGroupTransactionalOffsetCommit() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
@@ -1700,6 +1877,43 @@ public class OffsetMetadataManagerTest {
verifyTransactionalOffsetCommitWithStaleMemberEpoch(context);
}
+ @Test
+ public void
testConsumerGroupTransactionalOffsetCommitWithUnresolvedTopicId() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+ "foo",
+ true
+ );
+
+ group.updateMember(new ConsumerGroupMember.Builder("member")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 5, 0)))
+ .build()
+ );
+
+ // When topicId couldn't be resolved and fall back to ZERO_UUID, since
NO_OP validator is not used,
+ // ILLEGAL_GENERATION is thrown.
+ assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
+ new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationId(7)
+ .setTopics(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ ))
+ ))
+ ));
+ }
+
@Test
public void
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
@@ -3728,7 +3942,7 @@ public class OffsetMetadataManagerTest {
.setSubtopologyId("0")
.setSourceTopics(List.of("bar")))));
- // Member at epoch 10, with partitions assigned at epoch 4 and 5
respsectively.
+ // Member at epoch 10, with partitions assigned at epoch 4 and 5
respectively.
group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
.setMemberEpoch(10)
.setAssignedTasks(new TasksTupleWithEpochs(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
index e449db9a639..202325d2327 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
@@ -48,6 +48,7 @@ import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig
import static org.apache.kafka.coordinator.group.Utils.toAssignmentWithEpochs;
import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
public class ConsumerGroupMemberTest {
private static final Logger LOG =
LoggerFactory.getLogger(ConsumerGroupMemberTest.class);
@@ -413,6 +414,38 @@ public class ConsumerGroupMemberTest {
);
}
+ @Test
+ public void testAssignedAndPendingRevocationEpoch() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new
ConsumerGroupMember.Builder("member-id")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(topicId1, 1, 2, 3)), 10))
+
.setPartitionsPendingRevocation(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(topicId2, 4, 5, 6)), 9))
+ .build();
+
+ assertEquals(10, member.assignmentEpoch(topicId1, 1));
+ assertEquals(10, member.assignmentEpoch(topicId1, 2));
+ assertEquals(10, member.assignmentEpoch(topicId1, 3));
+ assertNull(member.pendingRevocationEpoch(topicId1, 1));
+ assertNull(member.pendingRevocationEpoch(topicId1, 2));
+ assertNull(member.pendingRevocationEpoch(topicId1, 3));
+
+ assertEquals(9, member.pendingRevocationEpoch(topicId2, 4));
+ assertEquals(9, member.pendingRevocationEpoch(topicId2, 5));
+ assertEquals(9, member.pendingRevocationEpoch(topicId2, 6));
+ assertNull(member.assignmentEpoch(topicId2, 4));
+ assertNull(member.assignmentEpoch(topicId2, 5));
+
+ assertNull(member.assignmentEpoch(topicId1, 10));
+ assertNull(member.pendingRevocationEpoch(topicId2, 10));
+ assertNull(member.assignmentEpoch(unassignedTopicId, 0));
+ assertNull(member.pendingRevocationEpoch(unassignedTopicId, 0));
+ }
+
private List<ConsumerGroupMemberMetadataValue.ClassicProtocol>
toClassicProtocolCollection(String name) {
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new
ArrayList<>();
protocols.add(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 4570315a00d..73bf9655a76 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
@@ -52,7 +53,9 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,12 +67,15 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignmentWithEpochs;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignmentWithEpochs;
import static org.apache.kafka.coordinator.group.Utils.computeGroupHash;
import static org.apache.kafka.coordinator.group.Utils.computeTopicHash;
import static org.apache.kafka.coordinator.group.Utils.toAssignmentWithEpochs;
@@ -914,24 +920,189 @@ public class ConsumerGroupTest {
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetCommit("", null, -1, isTransactional,
version));
- // The member epoch is stale.
+ // This should succeed.
if (version >= 9) {
+ group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version);
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitEpochValidationParams")
+ public void testValidateOffsetCommitWithEpochValidation(boolean
isTransactional, short version, PartitionAssignmentType assignmentType) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10);
+
+ if (assignmentType == PartitionAssignmentType.ASSIGNED) {
+ memberBuilder.setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)));
+ } else {
+
memberBuilder.setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)));
+ }
+ group.updateMember(memberBuilder.build());
+
+ // When client epoch (11) > broker epoch (10), throw
StaleMemberEpochException.
+ if (isTransactional || version >= 9) {
assertThrows(StaleMemberEpochException.class, () ->
- group.validateOffsetCommit("new-protocol-member-id", "", 10,
isTransactional, version));
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
} else {
assertThrows(UnsupportedVersionException.class, () ->
- group.validateOffsetCommit("new-protocol-member-id", "", 10,
isTransactional, version));
+ group.validateOffsetCommit("member-id", "", 11,
isTransactional, version));
}
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ var validator = group.validateOffsetCommit("member-id", "", 10,
isTransactional, version);
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+ }
+
+ // When assignment epoch (7) <= client epoch (7) <= broker epoch (10),
no exception thrown.
+ if (isTransactional || version >= 9) {
+ var validator = group.validateOffsetCommit("member-id", "", 7,
isTransactional, version);
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
+ // stale member epoch exception thrown from assignment epoch validator.
+ if (isTransactional || version >= 9) {
+ var validator = group.validateOffsetCommit("member-id", "", 6,
isTransactional, version);
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7 for
partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ private enum PartitionAssignmentType {
+ ASSIGNED,
+ PENDING_REVOCATION
+ }
+
+ private static Stream<Arguments> offsetCommitEpochValidationParams() {
+ Stream.Builder<Arguments> builder = Stream.builder();
+
+ for (short version = ApiKeys.OFFSET_COMMIT.oldestVersion(); version <=
ApiKeys.OFFSET_COMMIT.latestVersion(true); version++) {
+ for (PartitionAssignmentType type :
PartitionAssignmentType.values()) {
+ builder.add(Arguments.of(false, version, type));
+ }
+ }
+ for (short version = ApiKeys.TXN_OFFSET_COMMIT.oldestVersion();
version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true); version++) {
+ for (PartitionAssignmentType type :
PartitionAssignmentType.values()) {
+ builder.add(Arguments.of(true, version, type));
+ }
+ }
+
+ return builder.build();
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithClassicProtocolMember(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setClassicMemberMetadata(new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (11) > broker epoch (10), throw
IllegalGenerationException.
assertThrows(IllegalGenerationException.class, () ->
- group.validateOffsetCommit("old-protocol-member-id", "", 10,
isTransactional, version));
+ group.validateOffsetCommit("member-id", "", 11, isTransactional,
version)
+ );
- // This should succeed.
- if (version >= 9) {
- group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version);
+ // When client epoch (10) == broker epoch (10), no exception thrown
and NO_OP validator returned.
+ var validator = group.validateOffsetCommit("member-id", "", 10,
isTransactional, version);
+ assertEquals(CommitPartitionValidator.NO_OP, validator);
+
+ // When assignment epoch (7) <= client epoch (7) <= broker epoch (10),
no exception thrown.
+ var newValidator = group.validateOffsetCommit("member-id", "", 7,
isTransactional, version);
+ assertDoesNotThrow(() -> newValidator.validate("foo", topicId, 0));
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
+ // IllegalGenerationException thrown from assignment epoch validator.
+ var staleValidator = group.validateOffsetCommit("member-id", "", 6,
isTransactional, version);
+ assertThrows(IllegalGenerationException.class, () ->
+ staleValidator.validate("foo", topicId, 0));
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithUnassignedTopicOrPartition(boolean
isTransactional, short version) {
+ Uuid assignedTopicId = Uuid.randomUuid();
+ Uuid unassignedTopicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, 7, 0)))
+ .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(assignedTopicId, 7, 1)))
+ .build());
+
+ // Commit an unassigned topic
+ if (isTransactional || version >= 9) {
+ var validator = group.validateOffsetCommit("member-id", "", 7,
isTransactional, version);
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("bar", unassignedTopicId, 0));
+ assertEquals(
+ "Partition bar-0 is not assigned or pending revocation for
member.",
+ ex.getMessage()
+ );
} else {
assertThrows(UnsupportedVersionException.class, () ->
- group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version));
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
}
+
+ // Commit to an unassigned partition of an existing topic
+ if (isTransactional || version >= 9) {
+ var validator = group.validateOffsetCommit("member-id", "", 7,
isTransactional, version);
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", assignedTopicId, 2));
+ assertEquals(
+ "Partition foo-2 is not assigned or pending revocation for
member.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+ }
+
+ private static Stream<Arguments>
offsetCommitVersionsAndTransactionalParams() {
+ Stream.Builder<Arguments> builder = Stream.builder();
+
+ for (short version = ApiKeys.OFFSET_COMMIT.oldestVersion(); version <=
ApiKeys.OFFSET_COMMIT.latestVersion(true); version++) {
+ builder.add(Arguments.of(false, version));
+ }
+ for (short version = ApiKeys.TXN_OFFSET_COMMIT.oldestVersion();
version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true); version++) {
+ builder.add(Arguments.of(true, version));
+ }
+
+ return builder.build();
}
@Test
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index b1986f639a7..8149dfedf2a 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
@@ -100,7 +101,7 @@ public class TransactionalOffsetFetchBenchmark {
.withTime(TIME)
.withGroupMetadataManager(groupMetadataManager)
.withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
- .withMetadataImage(image)
+ .withMetadataImage(new KRaftCoordinatorMetadataImage(image))
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
.build();