This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20c2450e5bf MINOR: Reshape TxnOffsetCommitResponse.Builder (#22146)
20c2450e5bf is described below

commit 20c2450e5bf0f279953cbc142d694285058cb976
Author: David Jacot <[email protected]>
AuthorDate: Tue Apr 28 17:56:56 2026 +0200

    MINOR: Reshape TxnOffsetCommitResponse.Builder (#22146)
    
    This patch refactors `TxnOffsetCommitResponse.Builder` to mirror the
    class hierarchy used by `OffsetCommitResponse.Builder`: the `Builder`
    becomes abstract and the existing topic-name-keyed logic moves to a
    `TopicNameBuilder` subclass. A new `newBuilder()` factory replaces the
    only direct `new Builder()` call site in `KafkaApis`. Unit tests
    covering `addPartition`, `addPartitions`, `merge`, and the null-name
    guard are added in `TxnOffsetCommitResponseTest`.
    
    The change is behavior-preserving. It prepares the ground for a future
    `TopicIdBuilder` subclass that will be added when `TxnOffsetCommit`
    gains topic ID support.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../common/requests/TxnOffsetCommitResponse.java   |  78 ++++++++---
 .../requests/TxnOffsetCommitResponseTest.java      | 155 +++++++++++++++++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 3 files changed, 201 insertions(+), 34 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index dea99cf2b07..5e556ad8de2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -49,33 +49,34 @@ import java.util.function.Function;
  */
 public class TxnOffsetCommitResponse extends AbstractResponse {
 
-    public static class Builder {
-        TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData();
-        HashMap<String, TxnOffsetCommitResponseTopic> byTopicName = new 
HashMap<>();
+    public static Builder newBuilder() {
+        return new TopicNameBuilder();
+    }
+
+    public abstract static class Builder {
+        protected TxnOffsetCommitResponseData data = new 
TxnOffsetCommitResponseData();
+
+        protected abstract void add(
+            TxnOffsetCommitResponseTopic topic
+        );
 
-        private TxnOffsetCommitResponseTopic getOrCreateTopic(
+        protected abstract TxnOffsetCommitResponseTopic get(
             String topicName
-        ) {
-            TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
-            if (topic == null) {
-                topic = new TxnOffsetCommitResponseTopic().setName(topicName);
-                data.topics().add(topic);
-                byTopicName.put(topicName, topic);
-            }
-            return topic;
-        }
+        );
+
+        protected abstract TxnOffsetCommitResponseTopic getOrCreate(
+            String topicName
+        );
 
         public Builder addPartition(
             String topicName,
             int partitionIndex,
             Errors error
         ) {
-            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
-
+            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicName);
             topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
                 .setPartitionIndex(partitionIndex)
                 .setErrorCode(error.code()));
-
             return this;
         }
 
@@ -85,14 +86,12 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
             Function<P, Integer> partitionIndex,
             Errors error
         ) {
-            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
-
+            final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicName);
             partitions.forEach(partition ->
                 topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
                     .setPartitionIndex(partitionIndex.apply(partition))
                     .setErrorCode(error.code()))
             );
-
             return this;
         }
 
@@ -105,11 +104,10 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
             } else {
                 // Otherwise, we have to merge them together.
                 newData.topics().forEach(newTopic -> {
-                    TxnOffsetCommitResponseTopic existingTopic = 
byTopicName.get(newTopic.name());
+                    TxnOffsetCommitResponseTopic existingTopic = 
get(newTopic.name());
                     if (existingTopic == null) {
                         // If no topic exists, we can directly copy the new 
topic data.
-                        data.topics().add(newTopic);
-                        byTopicName.put(newTopic.name(), newTopic);
+                        add(newTopic);
                     } else {
                         // Otherwise, we add the partitions to the existing 
one. Note we
                         // expect non-overlapping partitions here as we don't 
verify
@@ -118,7 +116,6 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
                     }
                 });
             }
-
             return this;
         }
 
@@ -127,6 +124,41 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
         }
     }
 
+    public static class TopicNameBuilder extends Builder {
+        private final HashMap<String, TxnOffsetCommitResponseTopic> 
byTopicName = new HashMap<>();
+
+        @Override
+        protected void add(TxnOffsetCommitResponseTopic topic) {
+            throwIfTopicNameIsNull(topic.name());
+            data.topics().add(topic);
+            byTopicName.put(topic.name(), topic);
+        }
+
+        @Override
+        protected TxnOffsetCommitResponseTopic get(String topicName) {
+            throwIfTopicNameIsNull(topicName);
+            return byTopicName.get(topicName);
+        }
+
+        @Override
+        protected TxnOffsetCommitResponseTopic getOrCreate(String topicName) {
+            throwIfTopicNameIsNull(topicName);
+            TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new TxnOffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        private static void throwIfTopicNameIsNull(String topicName) {
+            if (topicName == null) {
+                throw new IllegalArgumentException("TopicName cannot be 
null.");
+            }
+        }
+    }
+
     private final TxnOffsetCommitResponseData data;
 
     public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index bd6f98ed339..f3c0318de3c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -17,15 +17,17 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.MessageUtil;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
 
@@ -44,16 +46,15 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
     public void testParse() {
         TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData()
             .setThrottleTimeMs(throttleTimeMs)
-            .setTopics(Arrays.asList(
-                new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setPartitions(
-                    Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic().setPartitions(List.of(
+                    new TxnOffsetCommitResponsePartition()
                         .setPartitionIndex(partitionOne)
                         .setErrorCode(errorOne.code()))),
-                    new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setPartitions(
-                        Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
-                            .setPartitionIndex(partitionTwo)
-                            .setErrorCode(errorTwo.code())))
-                ));
+                new TxnOffsetCommitResponseTopic().setPartitions(List.of(
+                    new TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(partitionTwo)
+                        .setErrorCode(errorTwo.code())))));
 
         for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
             TxnOffsetCommitResponse response = TxnOffsetCommitResponse.parse(
@@ -64,4 +65,138 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
         }
     }
 
+    @Test
+    public void testBuilderAddPartition() {
+        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
+        builder.addPartition(topicOne, partitionOne, errorOne);
+        builder.addPartition(topicOne, partitionTwo, errorTwo);
+        builder.addPartition(topicTwo, partitionOne, errorOne);
+
+        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(errorOne.code()),
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(errorTwo.code()))),
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicTwo)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(errorOne.code())))));
+
+        assertEquals(expected, builder.build().data());
+    }
+
+    @Test
+    public void testBuilderAddPartitions() {
+        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
+        builder.addPartitions(topicOne, List.of(partitionOne, partitionTwo), p 
-> p, errorOne);
+
+        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(errorOne.code()),
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(errorOne.code())))));
+
+        assertEquals(expected, builder.build().data());
+    }
+
+    @Test
+    public void testBuilderMergeIntoEmpty() {
+        TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(errorOne.code())))));
+
+        TxnOffsetCommitResponse response = TxnOffsetCommitResponse.newBuilder()
+            .merge(newData)
+            .build();
+
+        assertEquals(newData, response.data());
+    }
+
+    @Test
+    public void testBuilderMergeAddsNewTopic() {
+        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
+        builder.addPartition(topicOne, partitionOne, errorOne);
+
+        TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicTwo)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(errorTwo.code())))));
+
+        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(errorOne.code()))),
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicTwo)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(errorTwo.code())))));
+
+        assertEquals(expected, builder.merge(newData).build().data());
+    }
+
+    @Test
+    public void testBuilderMergeAppendsToExistingTopic() {
+        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
+        builder.addPartition(topicOne, partitionOne, errorOne);
+
+        TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(errorTwo.code())))));
+
+        TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
+            .setTopics(List.of(
+                new TxnOffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(List.of(
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionOne)
+                            .setErrorCode(errorOne.code()),
+                        new TxnOffsetCommitResponsePartition()
+                            .setPartitionIndex(partitionTwo)
+                            .setErrorCode(errorTwo.code())))));
+
+        assertEquals(expected, builder.merge(newData).build().data());
+    }
+
+    @Test
+    public void testTopicNameBuilderRejectsNullTopicName() {
+        TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
+        assertThrows(IllegalArgumentException.class,
+            () -> builder.addPartition(null, partitionOne, errorOne));
+    }
+
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e923563ad0..855fa1faaaf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2057,7 +2057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         txnOffsetCommitRequest.data.topics.asScala
       )(_.name)
 
-      val responseBuilder = new TxnOffsetCommitResponse.Builder()
+      val responseBuilder = TxnOffsetCommitResponse.newBuilder()
       val authorizedTopicCommittedOffsets = new 
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
       txnOffsetCommitRequest.data.topics.forEach { topic =>
         if (!authorizedTopics.contains(topic.name)) {

Reply via email to