This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new baa064e4223 KAFKA-20444: [4/N] Prepare TxnOffsetCommitResponse for
topic IDs (KIP-1319) (#22224)
baa064e4223 is described below
commit baa064e4223005c0bf78cd03c4bcf4d65b50bfa1
Author: David Jacot <[email protected]>
AuthorDate: Thu May 7 22:52:58 2026 +0200
KAFKA-20444: [4/N] Prepare TxnOffsetCommitResponse for topic IDs (KIP-1319)
(#22224)
This patch prepares the `TxnOffsetCommit` classes for topic IDs at
version 6 of the API:
- Add `TxnOffsetCommitResponse.TopicIdBuilder` that keys topics by
topic ID, alongside the existing `TopicNameBuilder`.
- Wire topic IDs through `TxnOffsetCommitRequest.getErrorResponse(...)`.
Reviewers: Lianet Magrans <[email protected]>, Sean Quah
<[email protected]>
---
.../common/requests/TxnOffsetCommitRequest.java | 28 +-----
.../common/requests/TxnOffsetCommitResponse.java | 64 ++++++++++--
.../requests/TxnOffsetCommitRequestTest.java | 43 ++++++--
.../requests/TxnOffsetCommitResponseTest.java | 111 ++++++++++++++-------
core/src/main/scala/kafka/server/KafkaApis.scala | 8 +-
5 files changed, 174 insertions(+), 80 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index a9e67d74fae..fb17c422bce 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -23,8 +23,6 @@ import
org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
-import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
-import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
@@ -193,32 +191,9 @@ public class TxnOffsetCommitRequest extends
AbstractRequest {
return data;
}
- static List<TxnOffsetCommitResponseTopic>
getErrorResponseTopics(List<TxnOffsetCommitRequestTopic> requestTopics,
- Errors e)
{
- List<TxnOffsetCommitResponseTopic> responseTopicData = new
ArrayList<>();
- for (TxnOffsetCommitRequestTopic entry : requestTopics) {
- List<TxnOffsetCommitResponsePartition> responsePartitions = new
ArrayList<>();
- for (TxnOffsetCommitRequestPartition requestPartition :
entry.partitions()) {
- responsePartitions.add(new TxnOffsetCommitResponsePartition()
-
.setPartitionIndex(requestPartition.partitionIndex())
- .setErrorCode(e.code()));
- }
- responseTopicData.add(new TxnOffsetCommitResponseTopic()
- .setName(entry.name())
- .setPartitions(responsePartitions)
- );
- }
- return responseTopicData;
- }
-
@Override
public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
- List<TxnOffsetCommitResponseTopic> responseTopicData =
- getErrorResponseTopics(data.topics(), Errors.forException(e));
-
- return new TxnOffsetCommitResponse(new TxnOffsetCommitResponseData()
-
.setThrottleTimeMs(throttleTimeMs)
- .setTopics(responseTopicData));
+ return new TxnOffsetCommitResponse(getErrorResponse(data,
Errors.forException(e)).setThrottleTimeMs(throttleTimeMs));
}
@Override
@@ -233,6 +208,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest
{
TxnOffsetCommitResponseData response = new
TxnOffsetCommitResponseData();
request.topics().forEach(topic -> {
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic
responseTopic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setTopicId(topic.topicId())
.setName(topic.name());
response.topics().add(responseTopic);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index a01b5779b4e..a9f4a580041 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
@@ -50,8 +51,12 @@ import java.util.function.Function;
*/
public class TxnOffsetCommitResponse extends AbstractResponse {
- public static Builder newBuilder() {
- return new TopicNameBuilder();
+ public static Builder newBuilder(boolean useTopicIds) {
+ if (useTopicIds) {
+ return new TopicIdBuilder();
+ } else {
+ return new TopicNameBuilder();
+ }
}
public abstract static class Builder {
@@ -62,19 +67,22 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
);
protected abstract TxnOffsetCommitResponseTopic get(
+ Uuid topicId,
String topicName
);
protected abstract TxnOffsetCommitResponseTopic getOrCreate(
+ Uuid topicId,
String topicName
);
public Builder addPartition(
+ Uuid topicId,
String topicName,
int partitionIndex,
Errors error
) {
- final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicName);
+ final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicId, topicName);
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));
@@ -82,12 +90,13 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
}
public Builder addPartitions(
+ Uuid topicId,
String topicName,
List<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition>
partitions,
Function<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition, Integer>
partitionIndex,
Errors error
) {
- final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicName);
+ final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicId, topicName);
partitions.forEach(partition ->
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
@@ -105,7 +114,7 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
- TxnOffsetCommitResponseTopic existingTopic =
get(newTopic.name());
+ TxnOffsetCommitResponseTopic existingTopic =
get(newTopic.topicId(), newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new
topic data.
add(newTopic);
@@ -125,6 +134,43 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
}
}
+ public static class TopicIdBuilder extends Builder {
+ private final HashMap<Uuid, TxnOffsetCommitResponseTopic> byTopicId =
new HashMap<>();
+
+ @Override
+ protected void add(TxnOffsetCommitResponseTopic topic) {
+ throwIfTopicIdIsNull(topic.topicId());
+ data.topics().add(topic);
+ byTopicId.put(topic.topicId(), topic);
+ }
+
+ @Override
+ protected TxnOffsetCommitResponseTopic get(Uuid topicId, String
topicName) {
+ throwIfTopicIdIsNull(topicId);
+ return byTopicId.get(topicId);
+ }
+
+ @Override
+ protected TxnOffsetCommitResponseTopic getOrCreate(Uuid topicId,
String topicName) {
+ throwIfTopicIdIsNull(topicId);
+ TxnOffsetCommitResponseTopic topic = byTopicId.get(topicId);
+ if (topic == null) {
+ topic = new TxnOffsetCommitResponseTopic()
+ .setName(topicName)
+ .setTopicId(topicId);
+ data.topics().add(topic);
+ byTopicId.put(topicId, topic);
+ }
+ return topic;
+ }
+
+ private static void throwIfTopicIdIsNull(Uuid topicId) {
+ if (topicId == null) {
+ throw new IllegalArgumentException("TopicId cannot be null.");
+ }
+ }
+ }
+
public static class TopicNameBuilder extends Builder {
private final HashMap<String, TxnOffsetCommitResponseTopic>
byTopicName = new HashMap<>();
@@ -136,17 +182,19 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
}
@Override
- protected TxnOffsetCommitResponseTopic get(String topicName) {
+ protected TxnOffsetCommitResponseTopic get(Uuid topicId, String
topicName) {
throwIfTopicNameIsNull(topicName);
return byTopicName.get(topicName);
}
@Override
- protected TxnOffsetCommitResponseTopic getOrCreate(String topicName) {
+ protected TxnOffsetCommitResponseTopic getOrCreate(Uuid topicId,
String topicName) {
throwIfTopicNameIsNull(topicName);
TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
if (topic == null) {
- topic = new TxnOffsetCommitResponseTopic().setName(topicName);
+ topic = new TxnOffsetCommitResponseTopic()
+ .setName(topicName)
+ .setTopicId(topicId);
data.topics().add(topic);
byTopicName.put(topicName, topic);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index 331fbcfba57..f18cf5d84d1 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -37,7 +37,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.kafka.common.requests.TxnOffsetCommitRequest.getErrorResponse;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -133,22 +132,52 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
@Test
@Override
public void testGetErrorResponse() {
- TxnOffsetCommitResponseData expectedResponse = new
TxnOffsetCommitResponseData()
+ var topicOneId = Uuid.randomUuid();
+ var topicTwoId = Uuid.randomUuid();
+
+ var data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("transactionalId")
+ .setGroupId(groupId)
+ .setProducerId(10L)
+ .setProducerEpoch((short) 1)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestTopic()
+ .setTopicId(topicOneId)
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(partitionOne)
+ .setCommittedOffset(offset))),
+ new TxnOffsetCommitRequestTopic()
+ .setTopicId(topicTwoId)
+ .setName(topicTwo)
+ .setPartitions(List.of(
+ new TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(partitionTwo)
+ .setCommittedOffset(offset)))));
+
+ var expectedResponseData = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneId)
.setName(topicOne)
.setPartitions(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
- .setPartitionIndex(partitionOne))),
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))),
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setTopicId(topicTwoId)
.setName(topicTwo)
.setPartitions(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
- .setPartitionIndex(partitionTwo)))));
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())))));
+
+ assertEquals(expectedResponseData,
TxnOffsetCommitRequest.getErrorResponse(data, Errors.UNKNOWN_MEMBER_ID));
- assertEquals(expectedResponse,
getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID));
+ var request = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data,
true, true).build();
+ var response = request.getErrorResponse(throttleTimeMs,
Errors.UNKNOWN_MEMBER_ID.exception());
+ assertEquals(expectedResponseData.setThrottleTimeMs(throttleTimeMs),
response.data());
}
@ParameterizedTest
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index 6dd950a5539..d10eeab0ff6 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
@@ -23,6 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.MessageUtil;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
@@ -31,6 +35,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
+ private final Uuid topicOneId = Uuid.randomUuid();
+ private final Uuid topicTwoId = Uuid.randomUuid();
+
@Test
@Override
public void testConstructorWithErrorResponse() {
@@ -65,16 +72,21 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
}
}
- @Test
- public void testBuilderAddPartition() {
- TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
- builder.addPartition(topicOne, partitionOne, errorOne);
- builder.addPartition(topicOne, partitionTwo, errorTwo);
- builder.addPartition(topicTwo, partitionOne, errorOne);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuilderAddPartition(boolean useTopicIds) {
+ var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+ var topicTwoIdOrZero = useTopicIds ? topicTwoId : Uuid.ZERO_UUID;
- TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+ builder.addPartition(topicOneIdOrZero, topicOne, partitionOne,
errorOne);
+ builder.addPartition(topicOneIdOrZero, topicOne, partitionTwo,
errorTwo);
+ builder.addPartition(topicTwoIdOrZero, topicTwo, partitionOne,
errorOne);
+
+ var expected = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneIdOrZero)
.setName(topicOne)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
@@ -84,6 +96,7 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
.setPartitionIndex(partitionTwo)
.setErrorCode(errorTwo.code()))),
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicTwoIdOrZero)
.setName(topicTwo)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
@@ -93,15 +106,27 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
assertEquals(expected, builder.build().data());
}
- @Test
- public void testBuilderAddPartitions() {
- TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
- builder.addPartition(topicOne, partitionOne, errorOne);
- builder.addPartition(topicOne, partitionTwo, errorOne);
-
- TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuilderAddPartitions(boolean useTopicIds) {
+ var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+
+ var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+ builder.addPartitions(
+ topicOneIdOrZero,
+ topicOne,
+ List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(partitionOne),
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(partitionTwo)
+ ),
+
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition::partitionIndex,
+ errorOne
+ );
+
+ var expected = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneIdOrZero)
.setName(topicOne)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
@@ -114,47 +139,58 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
assertEquals(expected, builder.build().data());
}
- @Test
- public void testBuilderMergeIntoEmpty() {
- TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuilderMergeIntoEmpty(boolean useTopicIds) {
+ var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+
+ var newData = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneIdOrZero)
.setName(topicOne)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionOne)
.setErrorCode(errorOne.code())))));
- TxnOffsetCommitResponse response = TxnOffsetCommitResponse.newBuilder()
+ var response = TxnOffsetCommitResponse.newBuilder(useTopicIds)
.merge(newData)
.build();
assertEquals(newData, response.data());
}
- @Test
- public void testBuilderMergeAddsNewTopic() {
- TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
- builder.addPartition(topicOne, partitionOne, errorOne);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuilderMergeAddsNewTopic(boolean useTopicIds) {
+ var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+ var topicTwoIdOrZero = useTopicIds ? topicTwoId : Uuid.ZERO_UUID;
+
+ var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+ builder.addPartition(topicOneIdOrZero, topicOne, partitionOne,
errorOne);
- TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+ var newData = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicTwoIdOrZero)
.setName(topicTwo)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionTwo)
.setErrorCode(errorTwo.code())))));
- TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ var expected = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneIdOrZero)
.setName(topicOne)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionOne)
.setErrorCode(errorOne.code()))),
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicTwoIdOrZero)
.setName(topicTwo)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
@@ -164,23 +200,28 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
assertEquals(expected, builder.merge(newData).build().data());
}
- @Test
- public void testBuilderMergeAppendsToExistingTopic() {
- TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
- builder.addPartition(topicOne, partitionOne, errorOne);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuilderMergeAppendsToExistingTopic(boolean useTopicIds) {
+ var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
- TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+ var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+ builder.addPartition(topicOneIdOrZero, topicOne, partitionOne,
errorOne);
+
+ var newData = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneIdOrZero)
.setName(topicOne)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionTwo)
.setErrorCode(errorTwo.code())))));
- TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ var expected = new TxnOffsetCommitResponseData()
.setTopics(List.of(
new TxnOffsetCommitResponseTopic()
+ .setTopicId(topicOneIdOrZero)
.setName(topicOne)
.setPartitions(List.of(
new TxnOffsetCommitResponsePartition()
@@ -193,11 +234,11 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
assertEquals(expected, builder.merge(newData).build().data());
}
- @Test
- public void testTopicNameBuilderRejectsNullTopicName() {
- TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuilderRejectsNullKey(boolean useTopicIds) {
+ var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
assertThrows(IllegalArgumentException.class,
- () -> builder.addPartition(null, partitionOne, errorOne));
+ () -> builder.addPartition(useTopicIds ? null : Uuid.ZERO_UUID,
useTopicIds ? topicOne : null, partitionOne, errorOne));
}
-
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93fe6449687..1cdb7d258d9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2054,17 +2054,17 @@ class KafkaApis(val requestChannel: RequestChannel,
txnOffsetCommitRequest.data.topics.asScala
)(_.name)
- val responseBuilder = TxnOffsetCommitResponse.newBuilder()
+ val responseBuilder = TxnOffsetCommitResponse.newBuilder(false)
val authorizedTopicCommittedOffsets = new
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
txnOffsetCommitRequest.data.topics.forEach { topic =>
if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its
partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
- responseBuilder.addPartitions(topic.name, topic.partitions,
_.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+ responseBuilder.addPartitions(topic.topicId, topic.name,
topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
} else if (!metadataCache.contains(topic.name)) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
- responseBuilder.addPartitions(topic.name, topic.partitions,
_.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ responseBuilder.addPartitions(topic.topicId, topic.name,
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
// Otherwise, we check all partitions to ensure that they all exist.
val topicWithValidPartitions = new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
@@ -2073,7 +2073,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (metadataCache.getLeaderAndIsr(topic.name,
partition.partitionIndex).isPresent()) {
topicWithValidPartitions.partitions.add(partition)
} else {
- responseBuilder.addPartition(topic.name,
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ responseBuilder.addPartition(topic.topicId, topic.name,
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}