This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 89f3888c871 KAFKA-20444: [9/N] Preserve topic-level structure in
TxnOffsetCommit response handling (KIP-1319) (#22265)
89f3888c871 is described below
commit 89f3888c8718da91967badef560cf34ebe8e09ed
Author: David Jacot <[email protected]>
AuthorDate: Wed May 27 15:03:58 2026 +0200
KAFKA-20444: [9/N] Preserve topic-level structure in TxnOffsetCommit
response handling (KIP-1319) (#22265)
`TxnOffsetCommitHandler.handleResponse` in `TransactionManager` folded
the response into a `Map<TopicPartition, Errors>` via
`TxnOffsetCommitResponse.errors()` before processing it. This loses the
response's topic-level structure, which v6+ (KIP-1319) needs in order to
resolve topic IDs back to names.
This patch switches the handler to iterate directly over
`response.data().topics()` and `responseTopic.partitions()`, so the
per-topic structure is preserved. The error-handling switch ladder is
unchanged; only the iteration shape is different. The now-unused
`TxnOffsetCommitResponse.errors()` accessor is removed along with its
two test usages.
This is a pure refactor: at v0-5, the response topic always carries the
topic name, so `new TopicPartition(responseTopic.name(),
responsePartition.partitionIndex())` reproduces what `errors()` would
have returned. The v6+ wiring that resolves topic IDs back to names will
be added in a follow-up patch.
Reviewers: Lianet Magrans <[email protected]>
---
.../producer/internals/TransactionManager.java | 102 +++++++++++----------
.../common/requests/TxnOffsetCommitResponse.java | 11 ---
.../requests/TxnOffsetCommitRequestTest.java | 6 --
.../requests/TxnOffsetCommitResponseTest.java | 1 -
.../kafka/api/AuthorizerIntegrationTest.scala | 5 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 5 +-
6 files changed, 61 insertions(+), 69 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3b0c183f3a3..2b10ceac6f3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.RecordBatch;
@@ -1912,58 +1913,61 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) {
TxnOffsetCommitResponse txnOffsetCommitResponse =
(TxnOffsetCommitResponse) response;
boolean coordinatorReloaded = false;
- Map<TopicPartition, Errors> errors =
txnOffsetCommitResponse.errors();
- log.debug("Received TxnOffsetCommit response for consumer group
{}: {}", builder.data.groupId(),
- errors);
-
- for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
- TopicPartition topicPartition = entry.getKey();
- Errors error = entry.getValue();
- if (error == Errors.NONE) {
- pendingTxnOffsetCommits.remove(topicPartition);
- } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
- || error == Errors.NOT_COORDINATOR
- || error == Errors.REQUEST_TIMED_OUT) {
- if (!coordinatorReloaded) {
- coordinatorReloaded = true;
-
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP,
builder.data.groupId());
+ log.debug("Received TxnOffsetCommit response for consumer group
{}: {}",
+ builder.data.groupId(),
txnOffsetCommitResponse.data().topics());
+
+ for (TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic
responseTopic : txnOffsetCommitResponse.data().topics()) {
+ for
(TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition responsePartition
: responseTopic.partitions()) {
+ TopicPartition topicPartition = new
TopicPartition(responseTopic.name(), responsePartition.partitionIndex());
+ Errors error =
Errors.forCode(responsePartition.errorCode());
+ if (error == Errors.NONE) {
+ pendingTxnOffsetCommits.remove(topicPartition);
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+ || error == Errors.NOT_COORDINATOR
+ || error == Errors.REQUEST_TIMED_OUT) {
+ if (!coordinatorReloaded) {
+ coordinatorReloaded = true;
+
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP,
builder.data.groupId());
+ }
+ } else if (error.exception() instanceof
RetriableException) {
+ // The topic is unknown, the coordinator is loading,
or it is another retriable error;
+ // retry with the current coordinator.
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
+ break;
+ } else if (error == Errors.FENCED_INSTANCE_ID ||
+ error == Errors.TRANSACTION_ABORTABLE) {
+ abortableError(error.exception());
+ break;
+ } else if (error == Errors.UNKNOWN_MEMBER_ID
+ || error == Errors.ILLEGAL_GENERATION
+ || error == Errors.GROUP_ID_NOT_FOUND
+ || error == Errors.STALE_MEMBER_EPOCH) {
+ // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are
returned by
+ // TxnOffsetCommit v6+. Older versions map them to
+ // ILLEGAL_GENERATION. All four indicate a consumer
group
+ // metadata mismatch and must abort the transaction.
+ abortableError(new CommitFailedException("Transaction
offset Commit failed " +
+ "due to consumer group metadata mismatch: " +
error.exception().getMessage()));
+ break;
+ } else if (error == Errors.INVALID_PRODUCER_EPOCH
+ || error == Errors.PRODUCER_FENCED) {
+ // We could still receive INVALID_PRODUCER_EPOCH from
old versioned transaction coordinator,
+ // just treat it the same as PRODUCE_FENCED.
+ fatalError(Errors.PRODUCER_FENCED.exception());
+ break;
+ } else if (error ==
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
+ || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)
{
+ fatalError(error.exception());
+ break;
+ } else {
+ fatalError(new KafkaException("Unexpected error in
TxnOffsetCommitResponse: " + error.message()));
+ break;
}
- } else if (error.exception() instanceof RetriableException) {
- // If the topic is unknown, the coordinator is loading, or
is another retriable error, retry with the current coordinator
- continue;
- } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
- break;
- } else if (error == Errors.FENCED_INSTANCE_ID ||
- error == Errors.TRANSACTION_ABORTABLE) {
- abortableError(error.exception());
- break;
- } else if (error == Errors.UNKNOWN_MEMBER_ID
- || error == Errors.ILLEGAL_GENERATION
- || error == Errors.GROUP_ID_NOT_FOUND
- || error == Errors.STALE_MEMBER_EPOCH) {
- // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH are returned
by
- // TxnOffsetCommit v6+. Older versions map them to
- // ILLEGAL_GENERATION. All four indicate a consumer group
- // metadata mismatch and must abort the transaction.
- abortableError(new CommitFailedException("Transaction
offset Commit failed " +
- "due to consumer group metadata mismatch: " +
error.exception().getMessage()));
- break;
- } else if (error == Errors.INVALID_PRODUCER_EPOCH
- || error == Errors.PRODUCER_FENCED) {
- // We could still receive INVALID_PRODUCER_EPOCH from old
versioned transaction coordinator,
- // just treat it the same as PRODUCE_FENCED.
- fatalError(Errors.PRODUCER_FENCED.exception());
- break;
- } else if (error ==
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
- || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
- fatalError(error.exception());
- break;
- } else {
- fatalError(new KafkaException("Unexpected error in
TxnOffsetCommitResponse: " + error.message()));
- break;
}
+ // Stop processing further topics once the transaction has
reached a terminal state.
+ if (result.isCompleted()) break;
}
if (result.isCompleted()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 20de83f9750..2a4939f652a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -264,17 +264,6 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
Errors.forCode(partition.errorCode()))));
}
- public Map<TopicPartition, Errors> errors() {
- Map<TopicPartition, Errors> errorMap = new HashMap<>();
- for (TxnOffsetCommitResponseTopic topic : data.topics()) {
- for (TxnOffsetCommitResponsePartition partition :
topic.partitions()) {
- errorMap.put(new TopicPartition(topic.name(),
partition.partitionIndex()),
- Errors.forCode(partition.errorCode()));
- }
- }
- return errorMap;
- }
-
public static TxnOffsetCommitResponse parse(Readable readable, short
version) {
return new TxnOffsetCommitResponse(new
TxnOffsetCommitResponseData(readable, version));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index f18cf5d84d1..649423c1779 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -90,11 +90,6 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, toVersion = 5)
public void testConstructor(short version) {
- var errorsMap = Map.of(
- new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR,
- new TopicPartition(topicTwo, partitionTwo), Errors.NOT_COORDINATOR
- );
-
List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
new TxnOffsetCommitRequestTopic()
.setName(topicOne)
@@ -124,7 +119,6 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
var response = request.getErrorResponse(throttleTimeMs,
Errors.NOT_COORDINATOR.exception());
- assertEquals(errorsMap, response.errors());
assertEquals(Map.of(Errors.NOT_COORDINATOR, 2),
response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index d10eeab0ff6..cc78733d952 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -43,7 +43,6 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
public void testConstructorWithErrorResponse() {
TxnOffsetCommitResponse response = new
TxnOffsetCommitResponse(throttleTimeMs, errorsMap);
- assertEquals(errorsMap, response.errors());
assertEquals(expectedErrorCounts, response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d6c94d5baa4..f21e75c0eb8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -156,7 +156,10 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) =>
resp.errors.get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)),
ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) =>
Errors.forCode(resp.data.errorCode)),
ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
- ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) =>
resp.errors.get(tp)),
+ ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) =>
resp.data.topics.asScala
+ .find(_.name == tp.topic)
+ .flatMap(_.partitions.asScala.find(_.partitionIndex ==
tp.partition).map(p => Errors.forCode(p.errorCode)))
+ .orNull),
ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) =>
Errors.forCode(resp.results.asScala.head.errorCode)),
ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) =>
resp.error.error),
ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) =>
Errors.forCode(resp.filterResults.asScala.head.errorCode)),
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 02d4ee8300a..6ab30fce2fd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1360,7 +1360,10 @@ class KafkaApisTest extends Logging {
kafkaApis.handleTxnOffsetCommitRequest(request,
RequestLocal.withThreadConfinedCaching)
val response = verifyNoThrottling[TxnOffsetCommitResponse](request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.errors().get(invalidTopicPartition))
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.data.topics.asScala
+ .find(_.name == invalidTopicPartition.topic)
+ .flatMap(_.partitions.asScala.find(_.partitionIndex ==
invalidTopicPartition.partition).map(p => Errors.forCode(p.errorCode)))
+ .orNull)
} finally {
kafkaApis.close()
}