This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 75620447819 KAFKA-20444: [1/N] Add TxnOffsetCommit v6 schema
(KIP-1319) (#22205)
75620447819 is described below
commit 756204478193b7aa5838bb5aaf049277b8798999
Author: David Jacot <[email protected]>
AuthorDate: Tue May 5 20:57:50 2026 +0200
KAFKA-20444: [1/N] Add TxnOffsetCommit v6 schema (KIP-1319) (#22205)
This patch introduces version 6 of the TxnOffsetCommit API:
* Adds a new `TopicId` field at `6+` and bounds the existing `Name`
field to `0-5` on both the request and the response.
* Renames the `GenerationId` field to `GenerationIdOrMemberEpoch` on the
request to reflect its dual semantics under the new consumer group
protocol (source-only rename; the wire format is positional and
unchanged).
* Ensures that the client cannot use version 6 yet.
* Keeps the new version as unstable.
Reviewers: Sean Quah <[email protected]>
---
.../producer/internals/TransactionManager.java | 2 +-
.../common/requests/TxnOffsetCommitRequest.java | 15 ++-
.../common/message/TxnOffsetCommitRequest.json | 13 ++-
.../common/message/TxnOffsetCommitResponse.json | 30 +++++-
.../kafka/clients/producer/KafkaProducerTest.java | 2 +-
.../producer/internals/TransactionManagerTest.java | 4 +-
.../apache/kafka/common/message/MessageTest.java | 120 +++++++--------------
.../kafka/common/requests/RequestResponseTest.java | 4 +-
.../requests/TxnOffsetCommitRequestTest.java | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../server/GroupCoordinatorBaseRequestTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../kafka/server/TxnOffsetCommitRequestTest.scala | 4 +-
.../kafka/server/WriteTxnMarkersRequestTest.scala | 2 +-
.../coordinator/group/OffsetMetadataManager.java | 4 +-
.../group/GroupCoordinatorServiceTest.java | 8 +-
.../group/OffsetMetadataManagerTest.java | 22 ++--
17 files changed, 116 insertions(+), 124 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index be4b1f8dfbc..50e35d32096 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1255,7 +1255,7 @@ public class TransactionManager {
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch)
.setMemberId(groupMetadata.memberId())
- .setGenerationId(groupMetadata.generationId())
+ .setGenerationIdOrMemberEpoch(groupMetadata.generationId())
.setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
final TxnOffsetCommitRequest.Builder builder =
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index c94e4332537..30c40c29dab 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -49,9 +49,11 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
private Builder(
final TxnOffsetCommitRequestData data,
- final boolean isTransactionV2Enabled
+ final boolean isTransactionV2Enabled,
+ final short oldestAllowedVersion,
+ final short latestAllowedVersion
) {
- super(ApiKeys.TXN_OFFSET_COMMIT);
+ super(ApiKeys.TXN_OFFSET_COMMIT, oldestAllowedVersion,
latestAllowedVersion);
this.data = data;
this.isTransactionV2Enabled = isTransactionV2Enabled;
}
@@ -60,7 +62,12 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
final TxnOffsetCommitRequestData data,
final boolean isTransactionV2Enabled
) {
- return new Builder(data, isTransactionV2Enabled);
+ return new Builder(
+ data,
+ isTransactionV2Enabled,
+ ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(),
+ (short) 5
+ );
}
@Override
@@ -77,7 +84,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
private boolean groupMetadataSet() {
return !data.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID)
||
- data.generationId() !=
JoinGroupRequest.UNKNOWN_GENERATION_ID ||
+ data.generationIdOrMemberEpoch() !=
JoinGroupRequest.UNKNOWN_GENERATION_ID ||
data.groupInstanceId() != null;
}
diff --git
a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index 59a1f05e097..d2b91b3d930 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -30,7 +30,10 @@
// transaction V2 (KIP_890 part 2) is enabled, the TxnOffsetCommit request
will also include the function for a
// AddOffsetsToTxn call. If V2 is disabled, the client can't use
TxnOffsetCommit request version higher than 4 within
// a transaction.
- "validVersions": "0-5",
+ //
+ // Version 6 adds support for topic IDs and removes support for topic names
(KIP-1319).
+ "validVersions": "0-6",
+ "latestVersionUnstable": true,
"flexibleVersions": "3+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+",
"entityType": "transactionalId",
@@ -41,8 +44,8 @@
"about": "The current producer ID in use by the transactional ID." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "The current epoch associated with the producer ID." },
- { "name": "GenerationId", "type": "int32", "versions": "3+", "default":
"-1",
- "about": "The generation of the consumer." },
+ { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "3+",
"default": "-1",
+ "about": "The generation of the group if using the classic group
protocol or the member epoch if using the consumer protocol." },
{ "name": "MemberId", "type": "string", "versions": "3+", "default": "",
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "3+",
@@ -50,8 +53,10 @@
"about": "The unique identifier of the consumer instance provided by end
user." },
{ "name": "Topics", "type" : "[]TxnOffsetCommitRequestTopic", "versions":
"0+",
"about": "Each topic that we want to commit offsets for.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ { "name": "Name", "type": "string", "versions": "0-5", "entityType":
"topicName", "ignorable": true,
"about": "The topic name." },
+ { "name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": true,
+ "about": "The topic ID." },
{ "name": "Partitions", "type": "[]TxnOffsetCommitRequestPartition",
"versions": "0+",
"about": "The partitions inside the topic that we want to commit
offsets for.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git
a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
index 9769ed2aa97..58b5c3ebbdc 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
@@ -26,15 +26,41 @@
// Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
//
// Version 5 is the same with version 3 (KIP-890).
- "validVersions": "0-5",
+ //
+ // Version 6 adds support for topic IDs and removes support for topic names.
It can also return
+ // GROUP_ID_NOT_FOUND, STALE_MEMBER_EPOCH and UNKNOWN_TOPIC_ID (KIP-1319).
+ "validVersions": "0-6",
"flexibleVersions": "3+",
+ // Supported errors:
+ // - GROUP_AUTHORIZATION_FAILED (version 0+)
+ // - TOPIC_AUTHORIZATION_FAILED (version 0+)
+ // - NOT_COORDINATOR (version 0+)
+ // - COORDINATOR_NOT_AVAILABLE (version 0+)
+ // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+ // - TRANSACTIONAL_ID_AUTHORIZATION_FAILED (version 0+)
+ // - INVALID_PRODUCER_ID_MAPPING (version 0+)
+ // - INVALID_PRODUCER_EPOCH (version 0+)
+ // - PRODUCER_FENCED (version 0+)
+ // - INVALID_TXN_STATE (version 0+)
+ // - UNSUPPORTED_FOR_MESSAGE_FORMAT (version 0+)
+ // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
+ // - OFFSET_METADATA_TOO_LARGE (version 0+)
+ // - ILLEGAL_GENERATION (version 3+)
+ // - UNKNOWN_MEMBER_ID (version 3+)
+ // - FENCED_INSTANCE_ID (version 3+)
+ // - TRANSACTION_ABORTABLE (version 4+)
+ // - GROUP_ID_NOT_FOUND (version 6+)
+ // - STALE_MEMBER_EPOCH (version 6+)
+ // - UNKNOWN_TOPIC_ID (version 6+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
{ "name": "Topics", "type": "[]TxnOffsetCommitResponseTopic", "versions":
"0+",
"about": "The responses for each topic.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ { "name": "Name", "type": "string", "versions": "0-5", "entityType":
"topicName", "ignorable": true,
"about": "The topic name." },
+ { "name": "TopicId", "type": "uuid", "versions": "6+", "ignorable": true,
+ "about": "The topic ID." },
{ "name": "Partitions", "type": "[]TxnOffsetCommitResponsePartition",
"versions": "0+",
"about": "The responses for each partition in the topic.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 60dd4069829..c1354460710 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -2227,7 +2227,7 @@ public class KafkaProducerTest {
TxnOffsetCommitRequestData data = ((TxnOffsetCommitRequest)
request).data();
return data.groupId().equals(groupId) &&
data.memberId().equals(memberId) &&
- data.generationId() == generationId &&
+ data.generationIdOrMemberEpoch() == generationId &&
data.groupInstanceId().equals(groupInstanceId);
}, txnOffsetsCommitResponse(Collections.singletonMap(
new TopicPartition("topic", 0), Errors.NONE)));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 9efed2f3c2d..798dfe54fb2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1249,7 +1249,7 @@ public class TransactionManagerTest {
assertEquals(consumerGroupId,
txnOffsetCommitRequest.data().groupId());
assertEquals(producerId,
txnOffsetCommitRequest.data().producerId());
assertEquals(epoch, txnOffsetCommitRequest.data().producerEpoch());
- return txnOffsetCommitRequest.data().generationId() !=
generationId;
+ return txnOffsetCommitRequest.data().generationIdOrMemberEpoch()
!= generationId;
}, new TxnOffsetCommitResponse(0, singletonMap(tp,
Errors.ILLEGAL_GENERATION)));
runUntil(transactionManager::hasError);
@@ -4475,7 +4475,7 @@ public class TransactionManagerTest {
assertEquals(producerEpoch,
txnOffsetCommitRequest.data().producerEpoch());
assertEquals(groupInstanceId,
txnOffsetCommitRequest.data().groupInstanceId());
assertEquals(memberId, txnOffsetCommitRequest.data().memberId());
- assertEquals(generationId,
txnOffsetCommitRequest.data().generationId());
+ assertEquals(generationId,
txnOffsetCommitRequest.data().generationIdOrMemberEpoch());
return true;
}, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 8df678ba3b3..c339db8f35e 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -455,92 +455,46 @@ public final class MessageTest {
testMessageRoundTrip(version, response, response);
}
- @Test
- public void testTxnOffsetCommitRequestVersions() throws Exception {
- String groupId = "groupId";
- String topicName = "topic";
- String metadata = "metadata";
- String txnId = "transactionalId";
- int producerId = 25;
- short producerEpoch = 10;
- String instanceId = "instance";
- String memberId = "member";
- int generationId = 1;
-
- int partition = 2;
- int offset = 100;
-
- testAllMessageRoundTrips(new TxnOffsetCommitRequestData()
- .setGroupId(groupId)
- .setTransactionalId(txnId)
- .setProducerId(producerId)
- .setProducerEpoch(producerEpoch)
- .setTopics(Collections.singletonList(
- new TxnOffsetCommitRequestTopic()
- .setName(topicName)
-
.setPartitions(Collections.singletonList(
- new
TxnOffsetCommitRequestPartition()
-
.setPartitionIndex(partition)
-
.setCommittedMetadata(metadata)
-
.setCommittedOffset(offset)
- )))));
-
- Supplier<TxnOffsetCommitRequestData> request =
- () -> new TxnOffsetCommitRequestData()
- .setGroupId(groupId)
- .setTransactionalId(txnId)
- .setProducerId(producerId)
- .setProducerEpoch(producerEpoch)
- .setGroupInstanceId(instanceId)
- .setMemberId(memberId)
- .setGenerationId(generationId)
- .setTopics(Collections.singletonList(
- new TxnOffsetCommitRequestTopic()
- .setName(topicName)
- .setPartitions(Collections.singletonList(
- new TxnOffsetCommitRequestPartition()
- .setPartitionIndex(partition)
- .setCommittedLeaderEpoch(10)
- .setCommittedMetadata(metadata)
- .setCommittedOffset(offset)
- ))));
-
- for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
- TxnOffsetCommitRequestData requestData = request.get();
- if (version < 2) {
-
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
- }
-
- if (version < 3) {
- final short finalVersion = version;
- assertThrows(UnsupportedVersionException.class, () ->
testEquivalentMessageRoundTrip(finalVersion, requestData));
- requestData.setGroupInstanceId(null);
- assertThrows(UnsupportedVersionException.class, () ->
testEquivalentMessageRoundTrip(finalVersion, requestData));
- requestData.setMemberId("");
- assertThrows(UnsupportedVersionException.class, () ->
testEquivalentMessageRoundTrip(finalVersion, requestData));
- requestData.setGenerationId(-1);
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testTxnOffsetCommitRequestVersions(short version) throws
Exception {
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("groupId")
+ .setTransactionalId("transactionalId")
+ .setProducerId(25)
+ .setProducerEpoch((short) 10)
+ .setMemberId(version >= 3 ? "member" : "")
+ .setGenerationIdOrMemberEpoch(version >= 3 ? 1 : -1)
+ .setGroupInstanceId(version >= 3 ? "instance" : null)
+ .setTopics(singletonList(
+ new TxnOffsetCommitRequestTopic()
+ .setTopicId(version >= 6 ? Uuid.randomUuid() :
Uuid.ZERO_UUID)
+ .setName(version < 6 ? "topic" : "")
+ .setPartitions(singletonList(
+ new TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(2)
+ .setCommittedLeaderEpoch(version >= 2 ? 10 : -1)
+ .setCommittedMetadata("metadata")
+ .setCommittedOffset(100)))));
- testAllMessageRoundTripsFromVersion(version, requestData);
- }
+ testMessageRoundTrip(version, request, request);
}
- @Test
- public void testTxnOffsetCommitResponseVersions() throws Exception {
- testAllMessageRoundTrips(
- new TxnOffsetCommitResponseData()
- .setTopics(
- singletonList(
- new TxnOffsetCommitResponseTopic()
- .setName("topic")
- .setPartitions(singletonList(
- new TxnOffsetCommitResponsePartition()
- .setPartitionIndex(1)
-
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
- ))
- )
- )
- .setThrottleTimeMs(20));
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testTxnOffsetCommitResponseVersions(short version) throws
Exception {
+ TxnOffsetCommitResponseData response = new
TxnOffsetCommitResponseData()
+ .setThrottleTimeMs(20)
+ .setTopics(singletonList(
+ new TxnOffsetCommitResponseTopic()
+ .setTopicId(version >= 6 ? Uuid.randomUuid() :
Uuid.ZERO_UUID)
+ .setName(version < 6 ? "topic" : "")
+ .setPartitions(singletonList(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())))));
+
+ testMessageRoundTrip(version, response, response);
}
@ParameterizedTest
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f9f938847ea..f2f7cf33560 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2851,7 +2851,7 @@ public class RequestResponseTest {
if (version >= 3) {
data.setMemberId("member")
- .setGenerationId(2)
+ .setGenerationIdOrMemberEpoch(2)
.setGroupInstanceId("instance");
}
@@ -2871,7 +2871,7 @@ public class RequestResponseTest {
.setProducerId(21L)
.setProducerEpoch((short) 42)
.setMemberId("member")
- .setGenerationId(2)
+ .setGenerationIdOrMemberEpoch(2)
.setGroupInstanceId("instance")
.setTopics(TxnOffsetCommitRequest.getTopics(offsets));
return TxnOffsetCommitRequest.Builder.forTopicNames(data,
false).build();
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index b1143de96a3..6c37c59ede1 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -79,7 +79,7 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setMemberId(memberId)
- .setGenerationId(generationId)
+ .setGenerationIdOrMemberEpoch(generationId)
.setGroupInstanceId(groupInstanceId)
.setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
builderWithGroupMetadata =
TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c405b35705f..d38c8921dbd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2095,7 +2095,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData()
.setGroupId(txnOffsetCommitRequest.data.groupId)
.setMemberId(txnOffsetCommitRequest.data.memberId)
- .setGenerationId(txnOffsetCommitRequest.data.generationId)
+
.setGenerationIdOrMemberEpoch(txnOffsetCommitRequest.data.generationIdOrMemberEpoch)
.setGroupInstanceId(txnOffsetCommitRequest.data.groupInstanceId)
.setProducerEpoch(txnOffsetCommitRequest.data.producerEpoch)
.setProducerId(txnOffsetCommitRequest.data.producerId)
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index cd9afcc889b..6b041a329d0 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -267,7 +267,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
new TxnOffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
- .setGenerationId(generationId)
+ .setGenerationIdOrMemberEpoch(generationId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTransactionalId(transactionalId)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 798d43892b1..f38a6146244 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1509,7 +1509,7 @@ class KafkaApisTest extends Logging {
val txnOffsetCommitRequest = new TxnOffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setProducerId(20)
.setProducerEpoch(30)
.setGroupInstanceId("instance-id")
@@ -1744,7 +1744,7 @@ class KafkaApisTest extends Logging {
}
@ParameterizedTest
- @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT,
enableUnstableLastVersion = false)
def
shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(version:
Short): Unit = {
val topic = "topic"
addTopicToMetadataCache(topic, numPartitions = 2)
diff --git
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index cc6cd2c957e..61ed91a4e7d 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -77,7 +77,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
createTopic(topic, 1)
- for (version <- 0 to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+ for (version <- 0 to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
// Verify that the TXN_OFFSET_COMMIT request is processed correctly when
member id is UNKNOWN_MEMBER_ID
// and generation id is UNKNOWN_GENERATION_ID under all api versions.
verifyTxnCommitAndFetch(
@@ -241,7 +241,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
createTopic(topic, 1)
- for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+ for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
val useTV2 = version >
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
// Initialize producer. Wait until the coordinator finishes loading.
diff --git
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
index a476271c04c..82263b89181 100644
--- a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -66,7 +66,7 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
createTopic(topic, 1)
- for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+ for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
val useTV2 = version >
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
// Initialize producer. Wait until the coordinator finishes loading.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 1aeedd4c94f..73fbc151243 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -522,7 +522,7 @@ public class OffsetMetadataManager {
try {
group = groupMetadataManager.group(request.groupId());
} catch (GroupIdNotFoundException ex) {
- if (request.generationId() < 0) {
+ if (request.generationIdOrMemberEpoch() < 0) {
// If the group does not exist and generation id is -1, the
request comes from
// either the admin client or a consumer which does not use
the group management
// facility. In this case, a so-called simple group is created
and the request
@@ -537,7 +537,7 @@ public class OffsetMetadataManager {
return group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
- request.generationId(),
+ request.generationIdOrMemberEpoch(),
true,
context.requestVersion()
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index e375872a125..c5ea4301e8b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -2876,7 +2876,7 @@ public class GroupCoordinatorServiceTest {
.setGroupId("foo")
.setTransactionalId("transactional-id")
.setMemberId("member-id")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName(TOPIC_NAME)
.setPartitions(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -2914,7 +2914,7 @@ public class GroupCoordinatorServiceTest {
.setGroupId(groupId)
.setTransactionalId("transactional-id")
.setMemberId("member-id")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName(TOPIC_NAME)
.setPartitions(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -2953,7 +2953,7 @@ public class GroupCoordinatorServiceTest {
.setProducerId(10L)
.setProducerEpoch((short) 5)
.setMemberId("member-id")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName(TOPIC_NAME)
.setPartitions(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -3007,7 +3007,7 @@ public class GroupCoordinatorServiceTest {
.setProducerId(10L)
.setProducerEpoch((short) 5)
.setMemberId("member-id")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName(TOPIC_NAME)
.setPartitions(List.of(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 69e0617350d..adb17e4b17a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -1695,11 +1695,11 @@ public class OffsetMetadataManagerTest {
));
// When client epoch (3) < assignment epoch (5), exception should be
thrown.
- request.setGenerationId(3);
+ request.setGenerationIdOrMemberEpoch(3);
assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(request));
// When client epoch (5) >= assignment epoch (5), commit should
succeed.
- request.setGenerationId(5);
+ request.setGenerationIdOrMemberEpoch(5);
assertDoesNotThrow(() -> context.commitTransactionalOffset(request));
CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord>
result = context.commitTransactionalOffset(request);
@@ -1742,7 +1742,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -1796,7 +1796,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -1842,7 +1842,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -1901,7 +1901,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(7)
+ .setGenerationIdOrMemberEpoch(7)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -1938,7 +1938,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(100)
+ .setGenerationIdOrMemberEpoch(100)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -1977,7 +1977,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(1)
+ .setGenerationIdOrMemberEpoch(1)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -2031,7 +2031,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -2060,7 +2060,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
@@ -2099,7 +2099,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
- .setGenerationId(100)
+ .setGenerationIdOrMemberEpoch(100)
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")