This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new baa064e4223 KAFKA-20444: [4/N] Prepare TxnOffsetCommitResponse for 
topic IDs (KIP-1319) (#22224)
baa064e4223 is described below

commit baa064e4223005c0bf78cd03c4bcf4d65b50bfa1
Author: David Jacot <[email protected]>
AuthorDate: Thu May 7 22:52:58 2026 +0200

    KAFKA-20444: [4/N] Prepare TxnOffsetCommitResponse for topic IDs (KIP-1319) 
(#22224)
    
    This patch prepares the `TxnOffsetCommit` classes for topic IDs at
    version 6 of the API:
    - Add `TxnOffsetCommitResponse.TopicIdBuilder` that keys topics by
      topic ID, alongside the existing `TopicNameBuilder`.
    - Wire topic IDs through `TxnOffsetCommitRequest.getErrorResponse(...)`.
    
    Reviewers: Lianet Magrans <[email protected]>, Sean Quah
     <[email protected]>
---
 .../common/requests/TxnOffsetCommitRequest.java    |  28 +-----
 .../common/requests/TxnOffsetCommitResponse.java   |  64 ++++++++++--
 .../requests/TxnOffsetCommitRequestTest.java       |  43 ++++++--
 .../requests/TxnOffsetCommitResponseTest.java      | 111 ++++++++++++++-------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   8 +-
 5 files changed, 174 insertions(+), 80 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index a9e67d74fae..fb17c422bce 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -23,8 +23,6 @@ import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
-import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
-import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
@@ -193,32 +191,9 @@ public class TxnOffsetCommitRequest extends 
AbstractRequest {
         return data;
     }
 
-    static List<TxnOffsetCommitResponseTopic> 
getErrorResponseTopics(List<TxnOffsetCommitRequestTopic> requestTopics,
-                                                                     Errors e) 
{
-        List<TxnOffsetCommitResponseTopic> responseTopicData = new 
ArrayList<>();
-        for (TxnOffsetCommitRequestTopic entry : requestTopics) {
-            List<TxnOffsetCommitResponsePartition> responsePartitions = new 
ArrayList<>();
-            for (TxnOffsetCommitRequestPartition requestPartition : 
entry.partitions()) {
-                responsePartitions.add(new TxnOffsetCommitResponsePartition()
-                                           
.setPartitionIndex(requestPartition.partitionIndex())
-                                           .setErrorCode(e.code()));
-            }
-            responseTopicData.add(new TxnOffsetCommitResponseTopic()
-                                      .setName(entry.name())
-                                      .setPartitions(responsePartitions)
-            );
-        }
-        return responseTopicData;
-    }
-
     @Override
     public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-        List<TxnOffsetCommitResponseTopic> responseTopicData =
-            getErrorResponseTopics(data.topics(), Errors.forException(e));
-
-        return new TxnOffsetCommitResponse(new TxnOffsetCommitResponseData()
-                                               
.setThrottleTimeMs(throttleTimeMs)
-                                               .setTopics(responseTopicData));
+        return new TxnOffsetCommitResponse(getErrorResponse(data, 
Errors.forException(e)).setThrottleTimeMs(throttleTimeMs));
     }
 
     @Override
@@ -233,6 +208,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
         TxnOffsetCommitResponseData response = new 
TxnOffsetCommitResponseData();
         request.topics().forEach(topic -> {
             TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic 
responseTopic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                .setTopicId(topic.topicId())
                 .setName(topic.name());
             response.topics().add(responseTopic);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index a01b5779b4e..a9f4a580041 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
@@ -50,8 +51,12 @@ import java.util.function.Function;
  */
 public class TxnOffsetCommitResponse extends AbstractResponse {
 
-    public static Builder newBuilder() {
-        return new TopicNameBuilder();
+    public static Builder newBuilder(boolean useTopicIds) {
+        if (useTopicIds) {
+            return new TopicIdBuilder();
+        } else {
+            return new TopicNameBuilder();
+        }
     }
 
     public abstract static class Builder {
@@ -62,19 +67,22 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
         );
 
         protected abstract TxnOffsetCommitResponseTopic get(
+            Uuid topicId,
             String topicName
         );
 
         protected abstract TxnOffsetCommitResponseTopic getOrCreate(
+            Uuid topicId,
             String topicName
         );
 
         public Builder addPartition(
+            Uuid topicId,
             String topicName,
             int partitionIndex,
             Errors error
         ) {
-            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicName);
+            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicId, topicName);
             topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
                 .setPartitionIndex(partitionIndex)
                 .setErrorCode(error.code()));
@@ -82,12 +90,13 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
         }
 
         public Builder addPartitions(
+            Uuid topicId,
             String topicName,
             List<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition> 
partitions,
             
Function<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition, Integer> 
partitionIndex,
             Errors error
         ) {
-            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicName);
+            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicId, topicName);
             partitions.forEach(partition ->
                 topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
                     .setPartitionIndex(partitionIndex.apply(partition))
@@ -105,7 +114,7 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
             } else {
                 // Otherwise, we have to merge them together.
                 newData.topics().forEach(newTopic -> {
-                    TxnOffsetCommitResponseTopic existingTopic = 
get(newTopic.name());
+                    TxnOffsetCommitResponseTopic existingTopic = 
get(newTopic.topicId(), newTopic.name());
                     if (existingTopic == null) {
                         // If no topic exists, we can directly copy the new 
topic data.
                         add(newTopic);
@@ -125,6 +134,43 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
         }
     }
 
+    public static class TopicIdBuilder extends Builder {
+        private final HashMap<Uuid, TxnOffsetCommitResponseTopic> byTopicId = 
new HashMap<>();
+
+        @Override
+        protected void add(TxnOffsetCommitResponseTopic topic) {
+            throwIfTopicIdIsNull(topic.topicId());
+            data.topics().add(topic);
+            byTopicId.put(topic.topicId(), topic);
+        }
+
+        @Override
+        protected TxnOffsetCommitResponseTopic get(Uuid topicId, String 
topicName) {
+            throwIfTopicIdIsNull(topicId);
+            return byTopicId.get(topicId);
+        }
+
+        @Override
+        protected TxnOffsetCommitResponseTopic getOrCreate(Uuid topicId, 
String topicName) {
+            throwIfTopicIdIsNull(topicId);
+            TxnOffsetCommitResponseTopic topic = byTopicId.get(topicId);
+            if (topic == null) {
+                topic = new TxnOffsetCommitResponseTopic()
+                    .setName(topicName)
+                    .setTopicId(topicId);
+                data.topics().add(topic);
+                byTopicId.put(topicId, topic);
+            }
+            return topic;
+        }
+
+        private static void throwIfTopicIdIsNull(Uuid topicId) {
+            if (topicId == null) {
+                throw new IllegalArgumentException("TopicId cannot be null.");
+            }
+        }
+    }
+
     public static class TopicNameBuilder extends Builder {
         private final HashMap<String, TxnOffsetCommitResponseTopic> 
byTopicName = new HashMap<>();
 
@@ -136,17 +182,19 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
         }
 
         @Override
-        protected TxnOffsetCommitResponseTopic get(String topicName) {
+        protected TxnOffsetCommitResponseTopic get(Uuid topicId, String 
topicName) {
             throwIfTopicNameIsNull(topicName);
             return byTopicName.get(topicName);
         }
 
         @Override
-        protected TxnOffsetCommitResponseTopic getOrCreate(String topicName) {
+        protected TxnOffsetCommitResponseTopic getOrCreate(Uuid topicId, 
String topicName) {
             throwIfTopicNameIsNull(topicName);
             TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
             if (topic == null) {
-                topic = new TxnOffsetCommitResponseTopic().setName(topicName);
+                topic = new TxnOffsetCommitResponseTopic()
+                    .setName(topicName)
+                    .setTopicId(topicId);
                 data.topics().add(topic);
                 byTopicName.put(topicName, topic);
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index 331fbcfba57..f18cf5d84d1 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -37,7 +37,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static 
org.apache.kafka.common.requests.TxnOffsetCommitRequest.getErrorResponse;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -133,22 +132,52 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
     @Test
     @Override
     public void testGetErrorResponse() {
-        TxnOffsetCommitResponseData expectedResponse = new 
TxnOffsetCommitResponseData()
+        var topicOneId = Uuid.randomUuid();
+        var topicTwoId = Uuid.randomUuid();
+
+        var data = new TxnOffsetCommitRequestData()
+            .setTransactionalId("transactionalId")
+            .setGroupId(groupId)
+            .setProducerId(10L)
+            .setProducerEpoch((short) 1)
+            .setTopics(List.of(
+                new TxnOffsetCommitRequestTopic()
+                    .setTopicId(topicOneId)
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(partitionOne)
+                            .setCommittedOffset(offset))),
+                new TxnOffsetCommitRequestTopic()
+                    .setTopicId(topicTwoId)
+                    .setName(topicTwo)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitRequestPartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setCommittedOffset(offset)))));
+
+        var expectedResponseData = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneId)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
-                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                            .setPartitionIndex(partitionOne))),
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))),
                 new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicTwoId)
                     .setName(topicTwo)
                     .setPartitions(List.of(
                         new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
-                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                            .setPartitionIndex(partitionTwo)))));
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())))));
+
+        assertEquals(expectedResponseData, 
TxnOffsetCommitRequest.getErrorResponse(data, Errors.UNKNOWN_MEMBER_ID));
 
-        assertEquals(expectedResponse, 
getErrorResponse(builderWithGroupMetadata.data, Errors.UNKNOWN_MEMBER_ID));
+        var request = TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
true, true).build();
+        var response = request.getErrorResponse(throttleTimeMs, 
Errors.UNKNOWN_MEMBER_ID.exception());
+        assertEquals(expectedResponseData.setThrottleTimeMs(throttleTimeMs), 
response.data());
     }
 
     @ParameterizedTest
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index 6dd950a5539..d10eeab0ff6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
@@ -23,6 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.MessageUtil;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.List;
 
@@ -31,6 +35,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
 
+    private final Uuid topicOneId = Uuid.randomUuid();
+    private final Uuid topicTwoId = Uuid.randomUuid();
+
     @Test
     @Override
     public void testConstructorWithErrorResponse() {
@@ -65,16 +72,21 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
         }
     }
 
-    @Test
-    public void testBuilderAddPartition() {
-        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
-        builder.addPartition(topicOne, partitionOne, errorOne);
-        builder.addPartition(topicOne, partitionTwo, errorTwo);
-        builder.addPartition(topicTwo, partitionOne, errorOne);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuilderAddPartition(boolean useTopicIds) {
+        var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+        var topicTwoIdOrZero = useTopicIds ? topicTwoId : Uuid.ZERO_UUID;
 
-        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+        var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+        builder.addPartition(topicOneIdOrZero, topicOne, partitionOne, 
errorOne);
+        builder.addPartition(topicOneIdOrZero, topicOne, partitionTwo, 
errorTwo);
+        builder.addPartition(topicTwoIdOrZero, topicTwo, partitionOne, 
errorOne);
+
+        var expected = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneIdOrZero)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
@@ -84,6 +96,7 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
                             .setPartitionIndex(partitionTwo)
                             .setErrorCode(errorTwo.code()))),
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicTwoIdOrZero)
                     .setName(topicTwo)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
@@ -93,15 +106,27 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
         assertEquals(expected, builder.build().data());
     }
 
-    @Test
-    public void testBuilderAddPartitions() {
-        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
-        builder.addPartition(topicOne, partitionOne, errorOne);
-        builder.addPartition(topicOne, partitionTwo, errorOne);
-
-        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuilderAddPartitions(boolean useTopicIds) {
+        var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+
+        var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+        builder.addPartitions(
+            topicOneIdOrZero,
+            topicOne,
+            List.of(
+                new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(partitionOne),
+                new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(partitionTwo)
+            ),
+            
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition::partitionIndex,
+            errorOne
+        );
+
+        var expected = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneIdOrZero)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
@@ -114,47 +139,58 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
         assertEquals(expected, builder.build().data());
     }
 
-    @Test
-    public void testBuilderMergeIntoEmpty() {
-        TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuilderMergeIntoEmpty(boolean useTopicIds) {
+        var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+
+        var newData = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneIdOrZero)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
                             .setPartitionIndex(partitionOne)
                             .setErrorCode(errorOne.code())))));
 
-        TxnOffsetCommitResponse response = TxnOffsetCommitResponse.newBuilder()
+        var response = TxnOffsetCommitResponse.newBuilder(useTopicIds)
             .merge(newData)
             .build();
 
         assertEquals(newData, response.data());
     }
 
-    @Test
-    public void testBuilderMergeAddsNewTopic() {
-        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
-        builder.addPartition(topicOne, partitionOne, errorOne);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuilderMergeAddsNewTopic(boolean useTopicIds) {
+        var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
+        var topicTwoIdOrZero = useTopicIds ? topicTwoId : Uuid.ZERO_UUID;
+
+        var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+        builder.addPartition(topicOneIdOrZero, topicOne, partitionOne, 
errorOne);
 
-        TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+        var newData = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicTwoIdOrZero)
                     .setName(topicTwo)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
                             .setPartitionIndex(partitionTwo)
                             .setErrorCode(errorTwo.code())))));
 
-        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+        var expected = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneIdOrZero)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
                             .setPartitionIndex(partitionOne)
                             .setErrorCode(errorOne.code()))),
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicTwoIdOrZero)
                     .setName(topicTwo)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
@@ -164,23 +200,28 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
         assertEquals(expected, builder.merge(newData).build().data());
     }
 
-    @Test
-    public void testBuilderMergeAppendsToExistingTopic() {
-        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
-        builder.addPartition(topicOne, partitionOne, errorOne);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuilderMergeAppendsToExistingTopic(boolean useTopicIds) {
+        var topicOneIdOrZero = useTopicIds ? topicOneId : Uuid.ZERO_UUID;
 
-        TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+        var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
+        builder.addPartition(topicOneIdOrZero, topicOne, partitionOne, 
errorOne);
+
+        var newData = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneIdOrZero)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
                             .setPartitionIndex(partitionTwo)
                             .setErrorCode(errorTwo.code())))));
 
-        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+        var expected = new TxnOffsetCommitResponseData()
             .setTopics(List.of(
                 new TxnOffsetCommitResponseTopic()
+                    .setTopicId(topicOneIdOrZero)
                     .setName(topicOne)
                     .setPartitions(List.of(
                         new TxnOffsetCommitResponsePartition()
@@ -193,11 +234,11 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
         assertEquals(expected, builder.merge(newData).build().data());
     }
 
-    @Test
-    public void testTopicNameBuilderRejectsNullTopicName() {
-        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testBuilderRejectsNullKey(boolean useTopicIds) {
+        var builder = TxnOffsetCommitResponse.newBuilder(useTopicIds);
         assertThrows(IllegalArgumentException.class,
-            () -> builder.addPartition(null, partitionOne, errorOne));
+            () -> builder.addPartition(useTopicIds ? null : Uuid.ZERO_UUID, 
useTopicIds ? topicOne : null, partitionOne, errorOne));
     }
-
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93fe6449687..1cdb7d258d9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2054,17 +2054,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         txnOffsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = TxnOffsetCommitResponse.newBuilder()
+      val responseBuilder = TxnOffsetCommitResponse.newBuilder(false)
       val authorizedTopicCommittedOffsets = new 
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
       txnOffsetCommitRequest.data.topics.forEach { topic =>
         if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
-          responseBuilder.addPartitions(topic.name, topic.partitions, 
_.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+          responseBuilder.addPartitions(topic.topicId, topic.name, 
topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
         } else if (!metadataCache.contains(topic.name)) {
           // If the topic is unknown, we add the topic and all its partitions
           // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-          responseBuilder.addPartitions(topic.name, topic.partitions, 
_.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          responseBuilder.addPartitions(topic.topicId, topic.name, 
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         } else {
           // Otherwise, we check all partitions to ensure that they all exist.
           val topicWithValidPartitions = new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
@@ -2073,7 +2073,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             if (metadataCache.getLeaderAndIsr(topic.name, 
partition.partitionIndex).isPresent()) {
               topicWithValidPartitions.partitions.add(partition)
             } else {
-              responseBuilder.addPartition(topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              responseBuilder.addPartition(topic.topicId, topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
           }
 

Reply via email to