This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7340eefc489 MINOR: Reshape TxnOffsetCommitRequest.Builder (#22147)
7340eefc489 is described below

commit 7340eefc4897aabb949c6da0d8f6737ee12d9ad4
Author: David Jacot <[email protected]>
AuthorDate: Tue Apr 28 17:57:49 2026 +0200

    MINOR: Reshape TxnOffsetCommitRequest.Builder (#22147)
    
    This patch refactors `TxnOffsetCommitRequest.Builder` to match the
    factory-method style used by `OffsetCommitRequest.Builder`: the public
    constructors are replaced by a private one plus a single static
    `forTopicNames(TxnOffsetCommitRequestData, boolean)` factory. The
    `getTopics(...)` helper is made public so callers can build the topic
    list themselves. All call sites in production and test code are migrated
    to build a `TxnOffsetCommitRequestData` and pass it to the factory.
    
    The change is behavior-preserving. It prepares the ground for a future
    `forTopicIdsOrNames(...)` factory that will be added when
    `TxnOffsetCommit` gains topic ID support.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../producer/internals/TransactionManager.java     | 21 ++++----
 .../common/requests/TxnOffsetCommitRequest.java    | 51 ++++--------------
 .../kafka/common/requests/RequestResponseTest.java | 60 +++++++++-------------
 .../requests/TxnOffsetCommitRequestTest.java       | 60 ++++++++++------------
 .../server/GroupCoordinatorBaseRequestTest.scala   |  5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 35 +++++++------
 .../scala/unit/kafka/server/RequestQuotaTest.scala | 12 ++---
 7 files changed, 100 insertions(+), 144 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 64ee1cd79f6..6bfba3979d2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.message.EndTxnRequestData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.RecordBatch;
@@ -1248,17 +1249,17 @@ public class TransactionManager {
             pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
         }
 
+        final TxnOffsetCommitRequestData data = new 
TxnOffsetCommitRequestData()
+            .setTransactionalId(transactionalId)
+            .setGroupId(groupMetadata.groupId())
+            .setProducerId(producerIdAndEpoch.producerId)
+            .setProducerEpoch(producerIdAndEpoch.epoch)
+            .setMemberId(groupMetadata.memberId())
+            .setGenerationId(groupMetadata.generationId())
+            .setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
+            
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
         final TxnOffsetCommitRequest.Builder builder =
-            new TxnOffsetCommitRequest.Builder(transactionalId,
-                groupMetadata.groupId(),
-                producerIdAndEpoch.producerId,
-                producerIdAndEpoch.epoch,
-                pendingTxnOffsetCommits,
-                groupMetadata.memberId(),
-                groupMetadata.generationId(),
-                groupMetadata.groupInstanceId(),
-                isTransactionV2Enabled()
-            );
+            TxnOffsetCommitRequest.Builder.forTopicNames(data, 
isTransactionV2Enabled());
         if (result == null) {
             // In this case, transaction V2 is in use.
             return new TxnOffsetCommitHandler(builder);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 0e1d9005809..c94e4332537 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -47,49 +47,20 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
         public final TxnOffsetCommitRequestData data;
         public final boolean isTransactionV2Enabled;
 
-        public Builder(final String transactionalId,
-                       final String consumerGroupId,
-                       final long producerId,
-                       final short producerEpoch,
-                       final Map<TopicPartition, CommittedOffset> 
pendingTxnOffsetCommits,
-                       final boolean isTransactionV2Enabled) {
-            this(transactionalId,
-                consumerGroupId,
-                producerId,
-                producerEpoch,
-                pendingTxnOffsetCommits,
-                JoinGroupRequest.UNKNOWN_MEMBER_ID,
-                JoinGroupRequest.UNKNOWN_GENERATION_ID,
-                Optional.empty(),
-                isTransactionV2Enabled);
-        }
-
-        public Builder(final String transactionalId,
-                       final String consumerGroupId,
-                       final long producerId,
-                       final short producerEpoch,
-                       final Map<TopicPartition, CommittedOffset> 
pendingTxnOffsetCommits,
-                       final String memberId,
-                       final int generationId,
-                       final Optional<String> groupInstanceId,
-                       final boolean isTransactionV2Enabled) {
+        private Builder(
+            final TxnOffsetCommitRequestData data,
+            final boolean isTransactionV2Enabled
+        ) {
             super(ApiKeys.TXN_OFFSET_COMMIT);
+            this.data = data;
             this.isTransactionV2Enabled = isTransactionV2Enabled;
-            this.data = new TxnOffsetCommitRequestData()
-                    .setTransactionalId(transactionalId)
-                    .setGroupId(consumerGroupId)
-                    .setProducerId(producerId)
-                    .setProducerEpoch(producerEpoch)
-                    .setTopics(getTopics(pendingTxnOffsetCommits))
-                    .setMemberId(memberId)
-                    .setGenerationId(generationId)
-                    .setGroupInstanceId(groupInstanceId.orElse(null));
         }
 
-        public Builder(final TxnOffsetCommitRequestData data) {
-            super(ApiKeys.TXN_OFFSET_COMMIT);
-            this.data = data;
-            this.isTransactionV2Enabled = true;
+        public static Builder forTopicNames(
+            final TxnOffsetCommitRequestData data,
+            final boolean isTransactionV2Enabled
+        ) {
+            return new Builder(data, isTransactionV2Enabled);
         }
 
         @Override
@@ -136,7 +107,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest 
{
         return offsetMap;
     }
 
-    static List<TxnOffsetCommitRequestTopic> getTopics(Map<TopicPartition, 
CommittedOffset> pendingTxnOffsetCommits) {
+    public static List<TxnOffsetCommitRequestTopic> 
getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
         Map<String, List<TxnOffsetCommitRequestPartition>> topicPartitionMap = 
new HashMap<>();
         for (Map.Entry<TopicPartition, CommittedOffset> entry : 
pendingTxnOffsetCommits.entrySet()) {
             TopicPartition topicPartition = entry.getKey();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a87a835fac0..b4f2e5c9945 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -237,6 +237,7 @@ import 
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.UnregisterBrokerRequestData;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
@@ -2841,34 +2842,20 @@ public class RequestResponseTest {
         offsets.put(new TopicPartition("topic", 74),
                 new TxnOffsetCommitRequest.CommittedOffset(100, "blah", 
Optional.of(27)));
 
-        if (version < 3) {
-            return new TxnOffsetCommitRequest.Builder("transactionalId",
-                "groupId",
-                21L,
-                (short) 42,
-                offsets,
-                false).build();
-        } else if (version < 5) {
-            return new TxnOffsetCommitRequest.Builder("transactionalId",
-                "groupId",
-                21L,
-                (short) 42,
-                offsets,
-                "member",
-                2,
-                Optional.of("instance"),
-                false).build(version);
-        } else {
-            return new TxnOffsetCommitRequest.Builder("transactionalId",
-                "groupId",
-                21L,
-                (short) 42,
-                offsets,
-                "member",
-                2,
-                Optional.of("instance"),
-                true).build(version);
+        TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
+            .setTransactionalId("transactionalId")
+            .setGroupId("groupId")
+            .setProducerId(21L)
+            .setProducerEpoch((short) 42)
+            .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
+
+        if (version >= 3) {
+            data.setMemberId("member")
+                .setGenerationId(2)
+                .setGroupInstanceId("instance");
         }
+
+        return TxnOffsetCommitRequest.Builder.forTopicNames(data, version >= 
5).build(version);
     }
 
     private TxnOffsetCommitRequest 
createTxnOffsetCommitRequestWithAutoDowngrade() {
@@ -2878,15 +2865,16 @@ public class RequestResponseTest {
         offsets.put(new TopicPartition("topic", 74),
             new TxnOffsetCommitRequest.CommittedOffset(100, "blah", 
Optional.of(27)));
 
-        return new TxnOffsetCommitRequest.Builder("transactionalId",
-            "groupId",
-            21L,
-            (short) 42,
-            offsets,
-            "member",
-            2,
-            Optional.of("instance"),
-            false).build();
+        TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
+            .setTransactionalId("transactionalId")
+            .setGroupId("groupId")
+            .setProducerId(21L)
+            .setProducerEpoch((short) 42)
+            .setMemberId("member")
+            .setGenerationId(2)
+            .setGroupInstanceId("instance")
+            .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
+        return TxnOffsetCommitRequest.Builder.forTopicNames(data, 
false).build();
     }
 
     private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index 2f4063fd4c0..b1143de96a3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
 import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
@@ -28,8 +29,6 @@ import 
org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,27 +64,25 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
         String transactionalId = "transactionalId";
         int producerId = 10;
         short producerEpoch = 1;
-        builder = new TxnOffsetCommitRequest.Builder(
-            transactionalId,
-            groupId,
-            producerId,
-            producerEpoch,
-            OFFSETS,
-            true
-        );
+        TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
+            .setTransactionalId(transactionalId)
+            .setGroupId(groupId)
+            .setProducerId(producerId)
+            .setProducerEpoch(producerEpoch)
+            .setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
+        builder = TxnOffsetCommitRequest.Builder.forTopicNames(data, true);
 
         int generationId = 5;
-        builderWithGroupMetadata = new TxnOffsetCommitRequest.Builder(
-            transactionalId,
-            groupId,
-            producerId,
-            producerEpoch,
-            OFFSETS,
-            memberId,
-            generationId,
-            Optional.of(groupInstanceId),
-            true
-        );
+        TxnOffsetCommitRequestData dataWithGroupMetadata = new 
TxnOffsetCommitRequestData()
+            .setTransactionalId(transactionalId)
+            .setGroupId(groupId)
+            .setProducerId(producerId)
+            .setProducerEpoch(producerEpoch)
+            .setMemberId(memberId)
+            .setGenerationId(generationId)
+            .setGroupInstanceId(groupInstanceId)
+            .setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
+        builderWithGroupMetadata = 
TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
     }
 
     @Test
@@ -95,26 +92,23 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
         errorsMap.put(new TopicPartition(topicOne, partitionOne), 
Errors.NOT_COORDINATOR);
         errorsMap.put(new TopicPartition(topicTwo, partitionTwo), 
Errors.NOT_COORDINATOR);
 
-        List<TxnOffsetCommitRequestTopic> expectedTopics = Arrays.asList(
+        List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
             new TxnOffsetCommitRequestTopic()
                 .setName(topicOne)
-                .setPartitions(Collections.singletonList(
+                .setPartitions(List.of(
                     new TxnOffsetCommitRequestPartition()
                         .setPartitionIndex(partitionOne)
                         .setCommittedOffset(offset)
                         .setCommittedLeaderEpoch(leaderEpoch)
-                        .setCommittedMetadata(metadata)
-                )),
+                        .setCommittedMetadata(metadata))),
             new TxnOffsetCommitRequestTopic()
                 .setName(topicTwo)
-                .setPartitions(Collections.singletonList(
+                .setPartitions(List.of(
                     new TxnOffsetCommitRequestPartition()
                         .setPartitionIndex(partitionTwo)
                         .setCommittedOffset(offset)
                         .setCommittedLeaderEpoch(leaderEpoch)
-                        .setCommittedMetadata(metadata)
-                ))
-        );
+                        .setCommittedMetadata(metadata))));
 
         for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
             final TxnOffsetCommitRequest request;
@@ -130,7 +124,7 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
                 request.getErrorResponse(throttleTimeMs, 
Errors.NOT_COORDINATOR.exception());
 
             assertEquals(errorsMap, response.errors());
-            assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
+            assertEquals(Map.of(Errors.NOT_COORDINATOR, 2), 
response.errorCounts());
             assertEquals(throttleTimeMs, response.throttleTimeMs());
         }
     }
@@ -139,16 +133,16 @@ public class TxnOffsetCommitRequestTest extends 
OffsetCommitRequestTest {
     @Override
     public void testGetErrorResponse() {
         TxnOffsetCommitResponseData expectedResponse = new 
TxnOffsetCommitResponseData()
-            .setTopics(Arrays.asList(
+            .setTopics(List.of(
                 new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
                     .setName(topicOne)
-                    .setPartitions(Collections.singletonList(
+                    .setPartitions(List.of(
                         new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
                             .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
                             .setPartitionIndex(partitionOne))),
                 new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
                     .setName(topicTwo)
-                    .setPartitions(Collections.singletonList(
+                    .setPartitions(List.of(
                         new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
                             .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
                             .setPartitionIndex(partitionTwo)))));
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 3b2ac9ce3ee..50bad2dfcd3 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -258,7 +258,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
      expectedError: Errors,
      version: Short = 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
   ): Unit = {
-    val request = new TxnOffsetCommitRequest.Builder(
+    val request = TxnOffsetCommitRequest.Builder.forTopicNames(
       new TxnOffsetCommitRequestData()
         .setGroupId(groupId)
         .setMemberId(memberId)
@@ -274,7 +274,8 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
                 .setPartitionIndex(partition)
                 .setCommittedOffset(offset)
             ).asJava)
-        ).asJava)
+        ).asJava),
+      true
     ).build(version)
 
     val expectedResponse = new TxnOffsetCommitResponseData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3fdcdaf3e49..49d7fe11139 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1475,14 +1475,13 @@ class KafkaApisTest extends Logging {
 
       val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
       val partitionOffsetCommitData = new 
TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
-      val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
-        "txnId",
-        "groupId",
-        15L,
-        0.toShort,
-        util.Map.of(invalidTopicPartition, partitionOffsetCommitData),
-        true
-      ).build()
+      val data = new TxnOffsetCommitRequestData()
+        .setTransactionalId("txnId")
+        .setGroupId("groupId")
+        .setProducerId(15L)
+        .setProducerEpoch(0.toShort)
+        
.setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(invalidTopicPartition, 
partitionOffsetCommitData)))
+      val offsetCommitRequest = 
TxnOffsetCommitRequest.Builder.forTopicNames(data, true).build()
       val request = buildRequest(offsetCommitRequest)
       
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
         any[Long])).thenReturn(0)
@@ -1521,7 +1520,7 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(0)
               .setCommittedOffset(10)))))
 
-    val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+    val requestChannelRequest = 
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
 true).build())
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
     
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
@@ -1566,7 +1565,7 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(0)
               .setCommittedOffset(10)))))
 
-    val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+    val requestChannelRequest = 
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
 true).build())
 
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
     
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
@@ -1638,7 +1637,7 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setCommittedOffset(70)))))
 
-    val requestChannelRequest = buildRequest(new 
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+    val requestChannelRequest = 
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
 true).build())
 
     // This is the request expected by the group coordinator.
     val expectedTxnOffsetCommitRequest = new TxnOffsetCommitRequestData()
@@ -1757,12 +1756,14 @@ class KafkaApisTest extends Logging {
     val producerId = 15L
     val epoch = 0.toShort
 
-    val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
-      "txnId",
-      groupId,
-      producerId,
-      epoch,
-      util.Map.of(topicPartition, partitionOffsetCommitData),
+    val data = new TxnOffsetCommitRequestData()
+      .setTransactionalId("txnId")
+      .setGroupId(groupId)
+      .setProducerId(producerId)
+      .setProducerEpoch(epoch)
+      .setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(topicPartition, 
partitionOffsetCommitData)))
+    val offsetCommitRequest = TxnOffsetCommitRequest.Builder.forTopicNames(
+      data,
       version >= 
TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
     ).build(version)
     val request = buildRequest(offsetCommitRequest)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 25c6eba4dc8..d0bd0c9808f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -473,12 +473,12 @@ class RequestQuotaTest extends BaseRequestTest {
           new 
WriteTxnMarkersRequest.Builder(java.util.List.of[WriteTxnMarkersRequest.TxnMarkerEntry])
 
         case ApiKeys.TXN_OFFSET_COMMIT =>
-          new TxnOffsetCommitRequest.Builder(
-            "test-transactional-id",
-            "test-txn-group",
-            2,
-            0,
-            util.Map.of[TopicPartition, 
TxnOffsetCommitRequest.CommittedOffset],
+          TxnOffsetCommitRequest.Builder.forTopicNames(
+            new TxnOffsetCommitRequestData()
+              .setTransactionalId("test-transactional-id")
+              .setGroupId("test-txn-group")
+              .setProducerId(2)
+              .setProducerEpoch(0),
             true
           )
 

Reply via email to