This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 62db165d2af KAFKA-20444: [8/N] Extend TxnOffsetCommit integration
tests to v6 (KIP-1319) (#22255)
62db165d2af is described below
commit 62db165d2af99c489010688fcaa4addf4c398964
Author: David Jacot <[email protected]>
AuthorDate: Tue May 12 09:25:32 2026 +0200
KAFKA-20444: [8/N] Extend TxnOffsetCommit integration tests to v6
(KIP-1319) (#22255)
Extend the existing `TxnOffsetCommit` and `WriteTxnMarkers` integration
tests to cover v6 (KIP-1319), and thread the topic id through the
`commitTxnOffset` helper in `GroupCoordinatorBaseRequestTest`.
Three new error cases in
`TxnOffsetCommitRequestTest.testTxnOffsetCommit`:
- v6+ stale member epoch (new protocol) → `STALE_MEMBER_EPOCH`.
- v6+ unknown group id → `GROUP_ID_NOT_FOUND`.
- v6+ unknown topic id → `UNKNOWN_TOPIC_ID`.
Reviewers: Sean Quah <[email protected]>
---
.../server/GroupCoordinatorBaseRequestTest.scala | 5 +-
.../kafka/server/TxnOffsetCommitRequestTest.scala | 58 ++++++++++++++++++++--
.../kafka/server/WriteTxnMarkersRequestTest.scala | 5 +-
3 files changed, 60 insertions(+), 8 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 6b041a329d0..a2456a0aa9a 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -258,6 +258,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
producerEpoch: Short,
transactionalId: String,
topic: String,
+ topicId: Uuid,
partition: Int,
offset: Long,
expectedError: Errors,
@@ -273,6 +274,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
.setTransactionalId(transactionalId)
.setTopics(List(
new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setTopicId(topicId)
.setName(topic)
.setPartitions(List(
new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -286,7 +288,8 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
val expectedResponse = new TxnOffsetCommitResponseData()
.setTopics(List(
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
- .setName(topic)
+ .setTopicId(if (version >= 6) topicId else Uuid.ZERO_UUID)
+ .setName(if (version < 6) topic else "")
.setPartitions(List(
new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(partition)
diff --git
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index 61ed91a4e7d..ee774c05c8b 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -17,6 +17,7 @@
package kafka.server
import kafka.utils.TestUtils
+import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
@@ -75,13 +76,14 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
- createTopic(topic, 1)
+ val topicId = createTopic(topic, 1)
- for (version <- 0 to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
+ for (version <- 0 to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
// Verify that the TXN_OFFSET_COMMIT request is processed correctly when
member id is UNKNOWN_MEMBER_ID
// and generation id is UNKNOWN_GENERATION_ID under all api versions.
verifyTxnCommitAndFetch(
topic = topic,
+ topicId = topicId,
partition = partition,
transactionalId = transactionalId,
groupId = groupId,
@@ -98,6 +100,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
// must not be empty from version 3 onwards.
verifyTxnCommitAndFetch(
topic = topic,
+ topicId = topicId,
partition = partition,
transactionalId = transactionalId,
groupId = groupId,
@@ -111,6 +114,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
// Verify TXN_OFFSET_COMMIT request failed with incorrect memberId.
verifyTxnCommitAndFetch(
topic = topic,
+ topicId = topicId,
partition = partition,
transactionalId = transactionalId,
groupId = groupId,
@@ -122,8 +126,11 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
)
// Verify TXN_OFFSET_COMMIT request failed with incorrect generationId.
+ // Under the new consumer group protocol, v6+ returns
STALE_MEMBER_EPOCH
+ // directly while v0-5 maps it to ILLEGAL_GENERATION (KIP-1319).
verifyTxnCommitAndFetch(
topic = topic,
+ topicId = topicId,
partition = partition,
transactionalId = transactionalId,
groupId = groupId,
@@ -131,13 +138,51 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
generationId = 100,
offset = 200 + version,
version = version.toShort,
- expectedTxnCommitError = Errors.ILLEGAL_GENERATION
+ expectedTxnCommitError =
+ if (useNewProtocol && version >= 6) Errors.STALE_MEMBER_EPOCH
+ else Errors.ILLEGAL_GENERATION
)
+
+ // Verify TXN_OFFSET_COMMIT request failed with an unknown groupId.
+ // v6+ propagates GROUP_ID_NOT_FOUND directly while v0-5 maps it to
+ // ILLEGAL_GENERATION (KIP-1319). This applies to both protocols.
+ verifyTxnCommitAndFetch(
+ topic = topic,
+ topicId = topicId,
+ partition = partition,
+ transactionalId = transactionalId,
+ groupId = "unknown",
+ memberId = memberId,
+ generationId = memberEpoch,
+ offset = 200 + version,
+ version = version.toShort,
+ expectedTxnCommitError =
+ if (version >= 6) Errors.GROUP_ID_NOT_FOUND
+ else Errors.ILLEGAL_GENERATION
+ )
+
+ if (version >= 6) {
+ // Verify TXN_OFFSET_COMMIT request failed with UNKNOWN_TOPIC_ID for
an
+ // unknown topic id. Only v6+ carries topic IDs on the wire.
+ verifyTxnCommitAndFetch(
+ topic = topic,
+ topicId = Uuid.randomUuid(),
+ partition = partition,
+ transactionalId = transactionalId,
+ groupId = groupId,
+ memberId = memberId,
+ generationId = memberEpoch,
+ offset = 200 + version,
+ version = version.toShort,
+ expectedTxnCommitError = Errors.UNKNOWN_TOPIC_ID
+ )
+ }
} else {
// Verify that the TXN_OFFSET_COMMIT request failed when group
metadata is set under version 3.
assertThrows(classOf[UnsupportedVersionException], () =>
verifyTxnCommitAndFetch(
topic = topic,
+ topicId = topicId,
partition = partition,
transactionalId = transactionalId,
groupId = groupId,
@@ -154,6 +199,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
private def verifyTxnCommitAndFetch(
topic: String,
+ topicId: Uuid,
partition: Int,
transactionalId: String,
groupId: String,
@@ -195,6 +241,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
topic = topic,
+ topicId = topicId,
partition = partition,
offset = offset,
expectedError = expectedTxnCommitError,
@@ -239,9 +286,9 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
- createTopic(topic, 1)
+ val topicId = createTopic(topic, 1)
- for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
+ for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
val useTV2 = version >
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
// Initialize producer. Wait until the coordinator finishes loading.
@@ -300,6 +347,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
topic = topic,
+ topicId = topicId,
partition = partition,
offset = offset,
expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else
Errors.NONE,
diff --git
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
index 82263b89181..1bc57a2ff65 100644
--- a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -64,9 +64,9 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
- createTopic(topic, 1)
+ val topicId = createTopic(topic, 1)
- for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false)) {
+ for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
val useTV2 = version >
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
// Initialize producer. Wait until the coordinator finishes loading.
@@ -124,6 +124,7 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
transactionalId = transactionalId,
topic = topic,
+ topicId = topicId,
partition = partition,
offset = offset + version,
expectedError = Errors.NONE,