This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7340eefc489 MINOR: Reshape TxnOffsetCommitRequest.Builder (#22147)
7340eefc489 is described below
commit 7340eefc4897aabb949c6da0d8f6737ee12d9ad4
Author: David Jacot <[email protected]>
AuthorDate: Tue Apr 28 17:57:49 2026 +0200
MINOR: Reshape TxnOffsetCommitRequest.Builder (#22147)
This patch refactors `TxnOffsetCommitRequest.Builder` to match the
factory-method style used by `OffsetCommitRequest.Builder`: the public
constructors are replaced by a private one plus a single static
`forTopicNames(TxnOffsetCommitRequestData, boolean)` factory. The
`getTopics(...)` helper is made public so callers can build the topic
list themselves. All call sites in production and test code are migrated
to build a `TxnOffsetCommitRequestData` and pass it to the factory.
The change is behavior-preserving. It prepares the ground for a future
`forTopicIdsOrNames(...)` factory that will be added when
`TxnOffsetCommit` gains topic ID support.
Reviewers: Andrew Schofield <[email protected]>
---
.../producer/internals/TransactionManager.java | 21 ++++----
.../common/requests/TxnOffsetCommitRequest.java | 51 ++++--------------
.../kafka/common/requests/RequestResponseTest.java | 60 +++++++++-------------
.../requests/TxnOffsetCommitRequestTest.java | 60 ++++++++++------------
.../server/GroupCoordinatorBaseRequestTest.scala | 5 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 35 +++++++------
.../scala/unit/kafka/server/RequestQuotaTest.scala | 12 ++---
7 files changed, 100 insertions(+), 144 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 64ee1cd79f6..6bfba3979d2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.message.InitProducerIdRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.RecordBatch;
@@ -1248,17 +1249,17 @@ public class TransactionManager {
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
}
+ final TxnOffsetCommitRequestData data = new
TxnOffsetCommitRequestData()
+ .setTransactionalId(transactionalId)
+ .setGroupId(groupMetadata.groupId())
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch)
+ .setMemberId(groupMetadata.memberId())
+ .setGenerationId(groupMetadata.generationId())
+ .setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
+
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
final TxnOffsetCommitRequest.Builder builder =
- new TxnOffsetCommitRequest.Builder(transactionalId,
- groupMetadata.groupId(),
- producerIdAndEpoch.producerId,
- producerIdAndEpoch.epoch,
- pendingTxnOffsetCommits,
- groupMetadata.memberId(),
- groupMetadata.generationId(),
- groupMetadata.groupInstanceId(),
- isTransactionV2Enabled()
- );
+ TxnOffsetCommitRequest.Builder.forTopicNames(data,
isTransactionV2Enabled());
if (result == null) {
// In this case, transaction V2 is in use.
return new TxnOffsetCommitHandler(builder);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 0e1d9005809..c94e4332537 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -47,49 +47,20 @@ public class TxnOffsetCommitRequest extends AbstractRequest
{
public final TxnOffsetCommitRequestData data;
public final boolean isTransactionV2Enabled;
- public Builder(final String transactionalId,
- final String consumerGroupId,
- final long producerId,
- final short producerEpoch,
- final Map<TopicPartition, CommittedOffset>
pendingTxnOffsetCommits,
- final boolean isTransactionV2Enabled) {
- this(transactionalId,
- consumerGroupId,
- producerId,
- producerEpoch,
- pendingTxnOffsetCommits,
- JoinGroupRequest.UNKNOWN_MEMBER_ID,
- JoinGroupRequest.UNKNOWN_GENERATION_ID,
- Optional.empty(),
- isTransactionV2Enabled);
- }
-
- public Builder(final String transactionalId,
- final String consumerGroupId,
- final long producerId,
- final short producerEpoch,
- final Map<TopicPartition, CommittedOffset>
pendingTxnOffsetCommits,
- final String memberId,
- final int generationId,
- final Optional<String> groupInstanceId,
- final boolean isTransactionV2Enabled) {
+ private Builder(
+ final TxnOffsetCommitRequestData data,
+ final boolean isTransactionV2Enabled
+ ) {
super(ApiKeys.TXN_OFFSET_COMMIT);
+ this.data = data;
this.isTransactionV2Enabled = isTransactionV2Enabled;
- this.data = new TxnOffsetCommitRequestData()
- .setTransactionalId(transactionalId)
- .setGroupId(consumerGroupId)
- .setProducerId(producerId)
- .setProducerEpoch(producerEpoch)
- .setTopics(getTopics(pendingTxnOffsetCommits))
- .setMemberId(memberId)
- .setGenerationId(generationId)
- .setGroupInstanceId(groupInstanceId.orElse(null));
}
- public Builder(final TxnOffsetCommitRequestData data) {
- super(ApiKeys.TXN_OFFSET_COMMIT);
- this.data = data;
- this.isTransactionV2Enabled = true;
+ public static Builder forTopicNames(
+ final TxnOffsetCommitRequestData data,
+ final boolean isTransactionV2Enabled
+ ) {
+ return new Builder(data, isTransactionV2Enabled);
}
@Override
@@ -136,7 +107,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest
{
return offsetMap;
}
- static List<TxnOffsetCommitRequestTopic> getTopics(Map<TopicPartition,
CommittedOffset> pendingTxnOffsetCommits) {
+ public static List<TxnOffsetCommitRequestTopic>
getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
Map<String, List<TxnOffsetCommitRequestPartition>> topicPartitionMap =
new HashMap<>();
for (Map.Entry<TopicPartition, CommittedOffset> entry :
pendingTxnOffsetCommits.entrySet()) {
TopicPartition topicPartition = entry.getKey();
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a87a835fac0..b4f2e5c9945 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -237,6 +237,7 @@ import
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
@@ -2841,34 +2842,20 @@ public class RequestResponseTest {
offsets.put(new TopicPartition("topic", 74),
new TxnOffsetCommitRequest.CommittedOffset(100, "blah",
Optional.of(27)));
- if (version < 3) {
- return new TxnOffsetCommitRequest.Builder("transactionalId",
- "groupId",
- 21L,
- (short) 42,
- offsets,
- false).build();
- } else if (version < 5) {
- return new TxnOffsetCommitRequest.Builder("transactionalId",
- "groupId",
- 21L,
- (short) 42,
- offsets,
- "member",
- 2,
- Optional.of("instance"),
- false).build(version);
- } else {
- return new TxnOffsetCommitRequest.Builder("transactionalId",
- "groupId",
- 21L,
- (short) 42,
- offsets,
- "member",
- 2,
- Optional.of("instance"),
- true).build(version);
+ TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("transactionalId")
+ .setGroupId("groupId")
+ .setProducerId(21L)
+ .setProducerEpoch((short) 42)
+ .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
+
+ if (version >= 3) {
+ data.setMemberId("member")
+ .setGenerationId(2)
+ .setGroupInstanceId("instance");
}
+
+ return TxnOffsetCommitRequest.Builder.forTopicNames(data, version >=
5).build(version);
}
private TxnOffsetCommitRequest
createTxnOffsetCommitRequestWithAutoDowngrade() {
@@ -2878,15 +2865,16 @@ public class RequestResponseTest {
offsets.put(new TopicPartition("topic", 74),
new TxnOffsetCommitRequest.CommittedOffset(100, "blah",
Optional.of(27)));
- return new TxnOffsetCommitRequest.Builder("transactionalId",
- "groupId",
- 21L,
- (short) 42,
- offsets,
- "member",
- 2,
- Optional.of("instance"),
- false).build();
+ TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("transactionalId")
+ .setGroupId("groupId")
+ .setProducerId(21L)
+ .setProducerEpoch((short) 42)
+ .setMemberId("member")
+ .setGenerationId(2)
+ .setGroupInstanceId("instance")
+ .setTopics(TxnOffsetCommitRequest.getTopics(offsets));
+ return TxnOffsetCommitRequest.Builder.forTopicNames(data,
false).build();
}
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
index 2f4063fd4c0..b1143de96a3 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
@@ -28,8 +29,6 @@ import
org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -65,27 +64,25 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
String transactionalId = "transactionalId";
int producerId = 10;
short producerEpoch = 1;
- builder = new TxnOffsetCommitRequest.Builder(
- transactionalId,
- groupId,
- producerId,
- producerEpoch,
- OFFSETS,
- true
- );
+ TxnOffsetCommitRequestData data = new TxnOffsetCommitRequestData()
+ .setTransactionalId(transactionalId)
+ .setGroupId(groupId)
+ .setProducerId(producerId)
+ .setProducerEpoch(producerEpoch)
+ .setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
+ builder = TxnOffsetCommitRequest.Builder.forTopicNames(data, true);
int generationId = 5;
- builderWithGroupMetadata = new TxnOffsetCommitRequest.Builder(
- transactionalId,
- groupId,
- producerId,
- producerEpoch,
- OFFSETS,
- memberId,
- generationId,
- Optional.of(groupInstanceId),
- true
- );
+ TxnOffsetCommitRequestData dataWithGroupMetadata = new
TxnOffsetCommitRequestData()
+ .setTransactionalId(transactionalId)
+ .setGroupId(groupId)
+ .setProducerId(producerId)
+ .setProducerEpoch(producerEpoch)
+ .setMemberId(memberId)
+ .setGenerationId(generationId)
+ .setGroupInstanceId(groupInstanceId)
+ .setTopics(TxnOffsetCommitRequest.getTopics(OFFSETS));
+ builderWithGroupMetadata =
TxnOffsetCommitRequest.Builder.forTopicNames(dataWithGroupMetadata, true);
}
@Test
@@ -95,26 +92,23 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
errorsMap.put(new TopicPartition(topicOne, partitionOne),
Errors.NOT_COORDINATOR);
errorsMap.put(new TopicPartition(topicTwo, partitionTwo),
Errors.NOT_COORDINATOR);
- List<TxnOffsetCommitRequestTopic> expectedTopics = Arrays.asList(
+ List<TxnOffsetCommitRequestTopic> expectedTopics = List.of(
new TxnOffsetCommitRequestTopic()
.setName(topicOne)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new TxnOffsetCommitRequestPartition()
.setPartitionIndex(partitionOne)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
- .setCommittedMetadata(metadata)
- )),
+ .setCommittedMetadata(metadata))),
new TxnOffsetCommitRequestTopic()
.setName(topicTwo)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new TxnOffsetCommitRequestPartition()
.setPartitionIndex(partitionTwo)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
- .setCommittedMetadata(metadata)
- ))
- );
+ .setCommittedMetadata(metadata))));
for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
final TxnOffsetCommitRequest request;
@@ -130,7 +124,7 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
request.getErrorResponse(throttleTimeMs,
Errors.NOT_COORDINATOR.exception());
assertEquals(errorsMap, response.errors());
- assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2),
response.errorCounts());
+ assertEquals(Map.of(Errors.NOT_COORDINATOR, 2),
response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
}
@@ -139,16 +133,16 @@ public class TxnOffsetCommitRequestTest extends
OffsetCommitRequestTest {
@Override
public void testGetErrorResponse() {
TxnOffsetCommitResponseData expectedResponse = new
TxnOffsetCommitResponseData()
- .setTopics(Arrays.asList(
+ .setTopics(List.of(
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
.setName(topicOne)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
.setName(topicTwo)
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionTwo)))));
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 3b2ac9ce3ee..50bad2dfcd3 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -258,7 +258,7 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
expectedError: Errors,
version: Short =
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
): Unit = {
- val request = new TxnOffsetCommitRequest.Builder(
+ val request = TxnOffsetCommitRequest.Builder.forTopicNames(
new TxnOffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
@@ -274,7 +274,8 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
.setPartitionIndex(partition)
.setCommittedOffset(offset)
).asJava)
- ).asJava)
+ ).asJava),
+ true
).build(version)
val expectedResponse = new TxnOffsetCommitResponseData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3fdcdaf3e49..49d7fe11139 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1475,14 +1475,13 @@ class KafkaApisTest extends Logging {
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val partitionOffsetCommitData = new
TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
- val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
- "txnId",
- "groupId",
- 15L,
- 0.toShort,
- util.Map.of(invalidTopicPartition, partitionOffsetCommitData),
- true
- ).build()
+ val data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("txnId")
+ .setGroupId("groupId")
+ .setProducerId(15L)
+ .setProducerEpoch(0.toShort)
+
.setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(invalidTopicPartition,
partitionOffsetCommitData)))
+ val offsetCommitRequest =
TxnOffsetCommitRequest.Builder.forTopicNames(data, true).build()
val request = buildRequest(offsetCommitRequest)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
@@ -1521,7 +1520,7 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(0)
.setCommittedOffset(10)))))
- val requestChannelRequest = buildRequest(new
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+ val requestChannelRequest =
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
true).build())
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
@@ -1566,7 +1565,7 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(0)
.setCommittedOffset(10)))))
- val requestChannelRequest = buildRequest(new
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+ val requestChannelRequest =
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
true).build())
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
when(txnCoordinator.partitionFor(txnOffsetCommitRequest.transactionalId)).thenReturn(0)
@@ -1638,7 +1637,7 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(1)
.setCommittedOffset(70)))))
- val requestChannelRequest = buildRequest(new
TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
+ val requestChannelRequest =
buildRequest(TxnOffsetCommitRequest.Builder.forTopicNames(txnOffsetCommitRequest,
true).build())
// This is the request expected by the group coordinator.
val expectedTxnOffsetCommitRequest = new TxnOffsetCommitRequestData()
@@ -1757,12 +1756,14 @@ class KafkaApisTest extends Logging {
val producerId = 15L
val epoch = 0.toShort
- val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
- "txnId",
- groupId,
- producerId,
- epoch,
- util.Map.of(topicPartition, partitionOffsetCommitData),
+ val data = new TxnOffsetCommitRequestData()
+ .setTransactionalId("txnId")
+ .setGroupId(groupId)
+ .setProducerId(producerId)
+ .setProducerEpoch(epoch)
+ .setTopics(TxnOffsetCommitRequest.getTopics(util.Map.of(topicPartition,
partitionOffsetCommitData)))
+ val offsetCommitRequest = TxnOffsetCommitRequest.Builder.forTopicNames(
+ data,
version >=
TxnOffsetCommitRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
).build(version)
val request = buildRequest(offsetCommitRequest)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 25c6eba4dc8..d0bd0c9808f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -473,12 +473,12 @@ class RequestQuotaTest extends BaseRequestTest {
new
WriteTxnMarkersRequest.Builder(java.util.List.of[WriteTxnMarkersRequest.TxnMarkerEntry])
case ApiKeys.TXN_OFFSET_COMMIT =>
- new TxnOffsetCommitRequest.Builder(
- "test-transactional-id",
- "test-txn-group",
- 2,
- 0,
- util.Map.of[TopicPartition,
TxnOffsetCommitRequest.CommittedOffset],
+ TxnOffsetCommitRequest.Builder.forTopicNames(
+ new TxnOffsetCommitRequestData()
+ .setTransactionalId("test-transactional-id")
+ .setGroupId("test-txn-group")
+ .setProducerId(2)
+ .setProducerEpoch(0),
true
)