This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2342c80dca4 KAFKA-20444: [3/N] Allow building TxnOffsetCommit v6
requests with topic IDs (KIP-1319) (#22215)
2342c80dca4 is described below
commit 2342c80dca4ccede3128ff0ed62d45d2340a3bdb
Author: David Jacot <[email protected]>
AuthorDate: Wed May 6 15:32:41 2026 +0200
KAFKA-20444: [3/N] Allow building TxnOffsetCommit v6 requests with topic
IDs (KIP-1319) (#22215)
This patch adds a `forTopicIdsOrNames(...)` factory to
`TxnOffsetCommitRequest.Builder` that allows building requests starting
from version 6 of the API. The existing `forTopicNames(...)` factory is
capped at version 5. `build(short version)` validates that the request
carries topic IDs starting from version 6 and topic names for versions 0
to 5. No call site uses the new factory yet — that lands in a follow-up
patch wiring up `TransactionManager`.
Reviewers: Sean Quah <[email protected]>
---
.../common/requests/TxnOffsetCommitRequest.java | 49 +++++-
.../kafka/common/requests/RequestResponseTest.java | 9 +-
.../requests/TxnOffsetCommitRequestTest.java | 165 ++++++++++++++++-----
3 files changed, 180 insertions(+), 43 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 626116c70a1..a9e67d74fae 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
@@ -63,17 +64,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest
{
public static class Builder extends
AbstractRequest.Builder<TxnOffsetCommitRequest> {
public final TxnOffsetCommitRequestData data;
- public final boolean isTransactionV2Enabled;
private Builder(
final TxnOffsetCommitRequestData data,
- final boolean isTransactionV2Enabled,
final short oldestAllowedVersion,
final short latestAllowedVersion
) {
super(ApiKeys.TXN_OFFSET_COMMIT, oldestAllowedVersion,
latestAllowedVersion);
this.data = data;
- this.isTransactionV2Enabled = isTransactionV2Enabled;
}
public static Builder forTopicNames(
@@ -82,9 +80,22 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
) {
return new Builder(
data,
- isTransactionV2Enabled,
ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(),
- (short) 5
+ isTransactionV2Enabled ? (short) 5 :
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
+ );
+ }
+
+ public static Builder forTopicIdsOrNames(
+ final TxnOffsetCommitRequestData data,
+ final boolean isTransactionV2Enabled,
+ final boolean enableUnstableLastVersion
+ ) {
+ return new Builder(
+ data,
+ ApiKeys.TXN_OFFSET_COMMIT.oldestVersion(),
+ isTransactionV2Enabled
+ ?
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(enableUnstableLastVersion)
+ : LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
);
}
@@ -94,8 +105,20 @@ public class TxnOffsetCommitRequest extends AbstractRequest
{
throw new UnsupportedVersionException("Broker doesn't support
group metadata commit API on version " + version
+ ", minimum supported request version is 3 which requires
brokers to be on version 2.5 or above.");
}
- if (!isTransactionV2Enabled) {
- version = (short) Math.min(version,
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2);
+ if (version >= 6) {
+ for (TxnOffsetCommitRequestTopic topic : data.topics()) {
+ if (topic.topicId() == null ||
topic.topicId().equals(Uuid.ZERO_UUID)) {
+ throw new UnsupportedVersionException("The broker
TxnOffsetCommit api version " +
+ version + " does require usage of topic ids.");
+ }
+ }
+ } else {
+ for (TxnOffsetCommitRequestTopic topic : data.topics()) {
+ if (topic.name() == null || topic.name().isEmpty()) {
+ throw new UnsupportedVersionException("The broker
TxnOffsetCommit api version " +
+ version + " does require usage of topic names.");
+ }
+ }
}
return new TxnOffsetCommitRequest(data, version);
}
@@ -132,7 +155,16 @@ public class TxnOffsetCommitRequest extends
AbstractRequest {
return offsetMap;
}
- public static List<TxnOffsetCommitRequestTopic>
getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
+ public static List<TxnOffsetCommitRequestTopic> getTopics(
+ Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits
+ ) {
+ return getTopics(pendingTxnOffsetCommits, Map.of());
+ }
+
+ public static List<TxnOffsetCommitRequestTopic> getTopics(
+ Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits,
+ Map<String, Uuid> topicIds
+ ) {
Map<String, List<TxnOffsetCommitRequestPartition>> topicPartitionMap =
new HashMap<>();
for (Map.Entry<TopicPartition, CommittedOffset> entry :
pendingTxnOffsetCommits.entrySet()) {
TopicPartition topicPartition = entry.getKey();
@@ -151,6 +183,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest
{
return topicPartitionMap.entrySet().stream()
.map(entry -> new TxnOffsetCommitRequestTopic()
.setName(entry.getKey())
+
.setTopicId(topicIds.getOrDefault(entry.getKey(), Uuid.ZERO_UUID))
.setPartitions(entry.getValue()))
.collect(Collectors.toList());
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f2f7cf33560..29bc1213c63 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2842,12 +2842,13 @@ public class RequestResponseTest {
offsets.put(new TopicPartition("topic", 74),
new TxnOffsetCommitRequest.CommittedOffset(100, "blah",
Optional.of(27)));
+ Map<String, Uuid> topicIds = Map.of("topic", Uuid.randomUuid());
TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
.setTransactionalId("transactionalId")
.setGroupId("groupId")
.setProducerId(21L)
.setProducerEpoch((short) 42)
- .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
+ .setTopics(TxnOffsetCommitRequest.getTopics(offsets, topicIds));
if (version >= 3) {
data.setMemberId("member")
@@ -2855,7 +2856,11 @@ public class RequestResponseTest {
.setGroupInstanceId("instance");
}
- return TxnOffsetCommitRequest.Builder.forTopicNames(data, version >=
5).build(version);
+ if (version >= 6) {
+ return TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true, true).build(version);
+ } else {
+ return TxnOffsetCommitRequest.Builder.forTopicNames(data, version
>= 5).build(version);
+ }
}
private TxnOffsetCommitRequest
createTxnOffsetCommitRequestWithAutoDowngrade() {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index 6c37c59ede1..331fbcfba57 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
@@ -25,9 +26,11 @@ import
org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
import java.util.HashMap;
import java.util.List;
@@ -85,12 +88,13 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
builderWithGroupMetadata =
TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
}
- @Test
- @Override
- public void testConstructor() {
- Map<TopicPartition, Errors> errorsMap = new HashMap<>();
- errorsMap.put(new TopicPartition(topicOne, partitionOne),
Errors.NOT_COORDINATOR);
- errorsMap.put(new TopicPartition(topicTwo, partitionTwo),
Errors.NOT_COORDINATOR);
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, toVersion = 5)
+ public void testConstructor(short version) {
+ var errorsMap = Map.of(
+ new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR,
+ new TopicPartition(topicTwo, partitionTwo), Errors.NOT_COORDINATOR
+ );
List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
new TxnOffsetCommitRequestTopic()
@@ -110,23 +114,20 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
.setCommittedLeaderEpoch(leaderEpoch)
.setCommittedMetadata(metadata))));
- for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
- final TxnOffsetCommitRequest request;
- if (version < 3) {
- request = builder.build(version);
- } else {
- request = builderWithGroupMetadata.build(version);
- }
- assertEquals(OFFSETS, request.offsets());
- assertEquals(expectedTopics,
TxnOffsetCommitRequest.getTopics(request.offsets()));
-
- TxnOffsetCommitResponse response =
- request.getErrorResponse(throttleTimeMs,
Errors.NOT_COORDINATOR.exception());
-
- assertEquals(errorsMap, response.errors());
- assertEquals(Map.of(Errors.NOT_COORDINATOR, 2),
response.errorCounts());
- assertEquals(throttleTimeMs, response.throttleTimeMs());
+ final TxnOffsetCommitRequest request;
+ if (version < 3) {
+ request = builder.build(version);
+ } else {
+ request = builderWithGroupMetadata.build(version);
}
+ assertEquals(OFFSETS, request.offsets());
+ assertEquals(expectedTopics,
TxnOffsetCommitRequest.getTopics(request.offsets()));
+
+ var response = request.getErrorResponse(throttleTimeMs,
Errors.NOT_COORDINATOR.exception());
+
+ assertEquals(errorsMap, response.errors());
+ assertEquals(Map.of(Errors.NOT_COORDINATOR, 2),
response.errorCounts());
+ assertEquals(throttleTimeMs, response.throttleTimeMs());
}
@Test
@@ -150,17 +151,115 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
assertEquals(expectedResponse,
getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID));
}
- @Test
- public void testVersionSupportForGroupMetadata() {
- for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
- assertDoesNotThrow(() -> builder.build(version));
- if (version >= 3) {
- assertDoesNotThrow(() ->
builderWithGroupMetadata.build(version));
- } else {
- assertEquals("Broker doesn't support group metadata commit API
on version " + version +
- ", minimum supported request version is 3 which requires
brokers to be on version 2.5 or above.",
- assertThrows(UnsupportedVersionException.class, () ->
builderWithGroupMetadata.build(version)).getMessage());
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT, toVersion = 5)
+ public void testVersionSupportForGroupMetadata(short version) {
+ assertDoesNotThrow(() -> builder.build(version));
+ if (version >= 3) {
+ assertDoesNotThrow(() -> builderWithGroupMetadata.build(version));
+ } else {
+ assertEquals("Broker doesn't support group metadata commit API on
version " + version +
+ ", minimum supported request version is 3 which requires
brokers to be on version 2.5 or above.",
+ assertThrows(UnsupportedVersionException.class, () ->
builderWithGroupMetadata.build(version)).getMessage());
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testForTopicIdsOrNamesWithTopicNameOnly(short version) {
+ var data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("tx")
+ .setGroupId(groupId)
+ .setProducerId(1)
+ .setProducerEpoch((short) 0)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestTopic()
+ .setName("foo")
+ .setPartitions(List.of(
+ new TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(0L)))));
+
+ if (version >= 6) {
+ assertThrows(UnsupportedVersionException.class,
+ () -> TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true, true).build(version));
+ } else {
+ assertDoesNotThrow(
+ () -> TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true, true).build(version));
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testForTopicIdsOrNamesWithTopicIdOnly(short version) {
+ var topicId = Uuid.randomUuid();
+ var data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("tx")
+ .setGroupId(groupId)
+ .setProducerId(1)
+ .setProducerEpoch((short) 0)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestTopic()
+ .setTopicId(topicId)
+ .setPartitions(List.of(
+ new TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(0L)))));
+
+ if (version >= 6) {
+ var request =
TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, true,
true).build(version);
+ assertEquals(data, request.data());
+ } else {
+ assertThrows(UnsupportedVersionException.class,
+ () -> TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true, true).build(version));
}
}
+
+ @Test
+ public void
testForTopicNamesCapsAtTransactionV1WhenTransactionV2IsDisabled() {
+ var builder = TxnOffsetCommitRequest.Builder.forTopicNames(
+ new TxnOffsetCommitRequestData(),
+ false
+ );
+
assertEquals(TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2,
builder.latestAllowedVersion());
+ }
+
+ @Test
+ public void testForTopicNamesCapsAtV5WhenTransactionV2IsEnabled() {
+ var builder = TxnOffsetCommitRequest.Builder.forTopicNames(
+ new TxnOffsetCommitRequestData(),
+ true
+ );
+ assertEquals((short) 5, builder.latestAllowedVersion());
+ }
+
+ @Test
+ public void
testForTopicIdsOrNamesCapsAtTransactionV1WhenTransactionV2IsDisabled() {
+ var builder = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(
+ new TxnOffsetCommitRequestData(),
+ false,
+ true
+ );
+
assertEquals(TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2,
builder.latestAllowedVersion());
+ }
+
+ @Test
+ public void
testForTopicIdsOrNamesUsesLatestStableVersionWhenUnstableIsDisabled() {
+ var builder = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(
+ new TxnOffsetCommitRequestData(),
+ true,
+ false
+ );
+ assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(false),
builder.latestAllowedVersion());
+ }
+
+ @Test
+ public void
testForTopicIdsOrNamesUsesLatestUnstableVersionWhenUnstableIsEnabled() {
+ var builder = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(
+ new TxnOffsetCommitRequestData(),
+ true,
+ true
+ );
+ assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true),
builder.latestAllowedVersion());
+ }
}