This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7f5861817d2 KAFKA-20444: [6/N] Handle GROUP_ID_NOT_FOUND and
STALE_MEMBER_EPOCH in TransactionManager (KIP-1319) (#22239)
7f5861817d2 is described below
commit 7f5861817d2bed897890e256677b8a9b473849cd
Author: David Jacot <[email protected]>
AuthorDate: Sun May 10 18:20:00 2026 +0200
KAFKA-20444: [6/N] Handle GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH in
TransactionManager (KIP-1319) (#22239)
This patch updates `TxnOffsetCommitHandler` in `TransactionManager` to
handle the two new error codes alongside the existing
`ILLEGAL_GENERATION` and `UNKNOWN_MEMBER_ID` cases. All four are treated
as a consumer group metadata mismatch and abort the transaction with a
`CommitFailedException`, so behavior at v6+ is identical to v0-5.
Reviewers: Sean Quah <[email protected]>
---
.../producer/internals/TransactionManager.java | 8 ++++++-
.../producer/internals/TransactionManagerTest.java | 28 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 50e35d32096..3b0c183f3a3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1940,7 +1940,13 @@ public class TransactionManager {
abortableError(error.exception());
break;
} else if (error == Errors.UNKNOWN_MEMBER_ID
- || error == Errors.ILLEGAL_GENERATION) {
+ || error == Errors.ILLEGAL_GENERATION
+ || error == Errors.GROUP_ID_NOT_FOUND
+ || error == Errors.STALE_MEMBER_EPOCH) {
+ // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are returned
by
+ // TxnOffsetCommit v6+. Older versions map them to
+ // ILLEGAL_GENERATION. All four indicate a consumer group
+ // metadata mismatch and must abort the transaction.
abortableError(new CommitFailedException("Transaction
offset Commit failed " +
"due to consumer group metadata mismatch: " +
error.exception().getMessage()));
break;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 798dfe54fb2..db76ee022b6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1260,6 +1260,34 @@ public class TransactionManagerTest {
assertAbortableError(CommitFailedException.class);
}
+ @ParameterizedTest
+ @EnumSource(value = Errors.class, names = {"GROUP_ID_NOT_FOUND",
"STALE_MEMBER_EPOCH"})
+ public void testGroupMetadataMismatchErrorInTxnOffsetCommit(Errors error) {
+ // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH from TxnOffsetCommit (v6+)
+ // must abort the transaction with a CommitFailedException, matching
the
+ // behavior for ILLEGAL_GENERATION returned by older broker versions.
+ final TopicPartition tp = new TopicPartition("foo", 0);
+
+ doInitTransactions();
+
+ transactionManager.beginTransaction();
+ TransactionalRequestResult sendOffsetsResult =
transactionManager.sendOffsetsToTransaction(
+ Map.of(tp, new OffsetAndMetadata(39L)), new
ConsumerGroupMetadata(consumerGroupId));
+
+ prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId,
producerId, epoch);
+ prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.GROUP, consumerGroupId);
+ runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP)
!= null);
+
+ prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch,
Map.of(tp, error));
+
+ runUntil(transactionManager::hasError);
+ assertInstanceOf(CommitFailedException.class,
transactionManager.lastError());
+ assertTrue(sendOffsetsResult.isCompleted());
+ assertFalse(sendOffsetsResult.isSuccessful());
+ assertInstanceOf(CommitFailedException.class,
sendOffsetsResult.error());
+ assertAbortableError(CommitFailedException.class);
+ }
+
@Test
public void testLookupCoordinatorOnDisconnectAfterSend() {
// This is called from the initTransactions method in the producer as
the first order of business.