This is an automated email from the ASF dual-hosted git repository.

squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38227ab7682 KAFKA-20551: Remove unnecessary generics from 
TxnOffsetCommitResponse, OffsetCommitResponse, and OffsetDeleteResponse (#22210)
38227ab7682 is described below

commit 38227ab768282c7ed3f3986e5e2acf46219d1642
Author: Jiayao Sun <[email protected]>
AuthorDate: Wed May 6 18:35:14 2026 +1200

    KAFKA-20551: Remove unnecessary generics from TxnOffsetCommitResponse, 
OffsetCommitResponse, and OffsetDeleteResponse (#22210)
    
    Simplify the `addPartitions` builder methods by replacing the generic
    type parameter `<P>` and the `Function` mapper with explicit request
    partition classes (e.g., `OffsetCommitRequestPartition`). This removes
    unnecessary abstraction that was only useful in test cases, and
    streamlines the API usages across both production and test code.
    
    Reviewers: Ken Huang <[email protected]>, Sean Quah
     <[email protected]>
---
 .../kafka/common/requests/OffsetCommitResponse.java |  7 ++++---
 .../kafka/common/requests/OffsetDeleteResponse.java |  7 ++++---
 .../common/requests/TxnOffsetCommitResponse.java    |  7 ++++---
 .../requests/TxnOffsetCommitResponseTest.java       |  3 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala    | 21 +++++++--------------
 5 files changed, 21 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 521ffa1c2fd..483640b95c4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
@@ -165,11 +166,11 @@ public class OffsetCommitResponse extends 
AbstractResponse {
             return this;
         }
 
-        public <P> Builder addPartitions(
+        public Builder addPartitions(
             Uuid topicId,
             String topicName,
-            List<P> partitions,
-            Function<P, Integer> partitionIndex,
+            List<OffsetCommitRequestData.OffsetCommitRequestPartition> 
partitions,
+            Function<OffsetCommitRequestData.OffsetCommitRequestPartition, 
Integer> partitionIndex,
             Errors error
         ) {
             final OffsetCommitResponseTopic topicResponse = 
getOrCreate(topicId, topicName);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
index 0f3655d62c6..c51b547dd58 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
@@ -75,10 +76,10 @@ public class OffsetDeleteResponse extends AbstractResponse {
             return this;
         }
 
-        public <P> Builder addPartitions(
+        public Builder addPartitions(
             String topicName,
-            List<P> partitions,
-            Function<P, Integer> partitionIndex,
+            List<OffsetDeleteRequestData.OffsetDeleteRequestPartition> 
partitions,
+            Function<OffsetDeleteRequestData.OffsetDeleteRequestPartition, 
Integer> partitionIndex,
             Errors error
         ) {
             final OffsetDeleteResponseTopic topicResponse = 
getOrCreateTopic(topicName);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 5e556ad8de2..a01b5779b4e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
@@ -80,10 +81,10 @@ public class TxnOffsetCommitResponse extends 
AbstractResponse {
             return this;
         }
 
-        public <P> Builder addPartitions(
+        public Builder addPartitions(
             String topicName,
-            List<P> partitions,
-            Function<P, Integer> partitionIndex,
+            List<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition> 
partitions,
+            
Function<TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition, Integer> 
partitionIndex,
             Errors error
         ) {
             final TxnOffsetCommitResponseTopic topicResponse = 
getOrCreate(topicName);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index f3c0318de3c..6dd950a5539 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -96,7 +96,8 @@ public class TxnOffsetCommitResponseTest extends 
OffsetCommitResponseTest {
     @Test
     public void testBuilderAddPartitions() {
         TxnOffsetCommitResponse.Builder builder = 
TxnOffsetCommitResponse.newBuilder();
-        builder.addPartitions(topicOne, List.of(partitionOne, partitionTwo), p 
-> p, errorOne);
+        builder.addPartition(topicOne, partitionOne, errorOne);
+        builder.addPartition(topicOne, partitionTwo, errorOne);
 
         TxnOffsetCommitResponseData expected = new 
TxnOffsetCommitResponseData()
             .setTopics(List.of(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d38c8921dbd..93fe6449687 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -303,13 +303,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (useTopicIds && topic.name.isEmpty) {
           // If the topic name is undefined, it means that the topic id is 
unknown so we add
           // the topic and all its partitions to the response with 
UNKNOWN_TOPIC_ID.
-          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_ID)
+          responseBuilder.addPartitions(topic.topicId, topic.name, 
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_ID)
         } else if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
-          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-            topic.topicId, topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+          responseBuilder.addPartitions(topic.topicId, topic.name, 
topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
         } else {
           // For lower API versions, the topic id may not be included in the 
request.
           // In this case, we resolve the topic id from metadata cache to 
ensure that the topic exists.
@@ -321,8 +319,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (topic.topicId == Uuid.ZERO_UUID) {
             // If the topic is unknown, we add the topic and all its partitions
             // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-            
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
-              Uuid.ZERO_UUID, topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            responseBuilder.addPartitions(Uuid.ZERO_UUID, topic.name, 
topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
           } else {
             // Otherwise, we check all partitions to ensure that they all 
exist.
             val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
@@ -2063,13 +2060,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
-          
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+          responseBuilder.addPartitions(topic.name, topic.partitions, 
_.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
         } else if (!metadataCache.contains(topic.name)) {
           // If the topic is unknown, we add the topic and all its partitions
           // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-          
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          responseBuilder.addPartitions(topic.name, topic.partitions, 
_.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         } else {
           // Otherwise, we check all partitions to ensure that they all exist.
           val topicWithValidPartitions = new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
@@ -2384,13 +2379,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (!authorizedTopics.contains(topic.name)) {
           // If the topic is not authorized, we add the topic and all its 
partitions
           // to the response with TOPIC_AUTHORIZATION_FAILED.
-          
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+          responseBuilder.addPartitions(topic.name, topic.partitions, 
_.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
         } else if (!metadataCache.contains(topic.name)) {
           // If the topic is unknown, we add the topic and all its partitions
           // to the response with UNKNOWN_TOPIC_OR_PARTITION.
-          
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
-            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          responseBuilder.addPartitions(topic.name, topic.partitions, 
_.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         } else {
           // Otherwise, we check all partitions to ensure that they all exist.
           val topicWithValidPartitions = new 
OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)

Reply via email to