This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 20c2450e5bf MINOR: Reshape TxnOffsetCommitResponse.Builder (#22146)
20c2450e5bf is described below
commit 20c2450e5bf0f279953cbc142d694285058cb976
Author: David Jacot <[email protected]>
AuthorDate: Tue Apr 28 17:56:56 2026 +0200
MINOR: Reshape TxnOffsetCommitResponse.Builder (#22146)
This patch refactors `TxnOffsetCommitResponse.Builder` to mirror the
class hierarchy used by `OffsetCommitResponse.Builder`: the `Builder`
becomes abstract and the existing topic-name-keyed logic moves to a
`TopicNameBuilder` subclass. A new `newBuilder()` factory replaces the
only direct `new Builder()` call site in `KafkaApis`. Unit tests
covering `addPartition`, `addPartitions`, `merge`, and the null-name
guard are added in `TxnOffsetCommitResponseTest`.
The change is behavior-preserving. It prepares the ground for a future
`TopicIdBuilder` subclass that will be added when `TxnOffsetCommit`
gains topic ID support.
Reviewers: Andrew Schofield <[email protected]>
---
.../common/requests/TxnOffsetCommitResponse.java | 78 ++++++++---
.../requests/TxnOffsetCommitResponseTest.java | 155 +++++++++++++++++++--
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
3 files changed, 201 insertions(+), 34 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index dea99cf2b07..5e556ad8de2 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -49,33 +49,34 @@ import java.util.function.Function;
*/
public class TxnOffsetCommitResponse extends AbstractResponse {
- public static class Builder {
- TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData();
- HashMap<String, TxnOffsetCommitResponseTopic> byTopicName = new
HashMap<>();
+ public static Builder newBuilder() {
+ return new TopicNameBuilder();
+ }
+
+ public abstract static class Builder {
+ protected TxnOffsetCommitResponseData data = new
TxnOffsetCommitResponseData();
+
+ protected abstract void add(
+ TxnOffsetCommitResponseTopic topic
+ );
- private TxnOffsetCommitResponseTopic getOrCreateTopic(
+ protected abstract TxnOffsetCommitResponseTopic get(
String topicName
- ) {
- TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
- if (topic == null) {
- topic = new TxnOffsetCommitResponseTopic().setName(topicName);
- data.topics().add(topic);
- byTopicName.put(topicName, topic);
- }
- return topic;
- }
+ );
+
+ protected abstract TxnOffsetCommitResponseTopic getOrCreate(
+ String topicName
+ );
public Builder addPartition(
String topicName,
int partitionIndex,
Errors error
) {
- final TxnOffsetCommitResponseTopic topicResponse =
getOrCreateTopic(topicName);
-
+ final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicName);
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));
-
return this;
}
@@ -85,14 +86,12 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
Function<P, Integer> partitionIndex,
Errors error
) {
- final TxnOffsetCommitResponseTopic topicResponse =
getOrCreateTopic(topicName);
-
+ final TxnOffsetCommitResponseTopic topicResponse =
getOrCreate(topicName);
partitions.forEach(partition ->
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
.setErrorCode(error.code()))
);
-
return this;
}
@@ -105,11 +104,10 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
- TxnOffsetCommitResponseTopic existingTopic =
byTopicName.get(newTopic.name());
+ TxnOffsetCommitResponseTopic existingTopic =
get(newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new
topic data.
- data.topics().add(newTopic);
- byTopicName.put(newTopic.name(), newTopic);
+ add(newTopic);
} else {
// Otherwise, we add the partitions to the existing
one. Note we
// expect non-overlapping partitions here as we don't
verify
@@ -118,7 +116,6 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
}
});
}
-
return this;
}
@@ -127,6 +124,41 @@ public class TxnOffsetCommitResponse extends
AbstractResponse {
}
}
+ public static class TopicNameBuilder extends Builder {
+ private final HashMap<String, TxnOffsetCommitResponseTopic>
byTopicName = new HashMap<>();
+
+ @Override
+ protected void add(TxnOffsetCommitResponseTopic topic) {
+ throwIfTopicNameIsNull(topic.name());
+ data.topics().add(topic);
+ byTopicName.put(topic.name(), topic);
+ }
+
+ @Override
+ protected TxnOffsetCommitResponseTopic get(String topicName) {
+ throwIfTopicNameIsNull(topicName);
+ return byTopicName.get(topicName);
+ }
+
+ @Override
+ protected TxnOffsetCommitResponseTopic getOrCreate(String topicName) {
+ throwIfTopicNameIsNull(topicName);
+ TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName);
+ if (topic == null) {
+ topic = new TxnOffsetCommitResponseTopic().setName(topicName);
+ data.topics().add(topic);
+ byTopicName.put(topicName, topic);
+ }
+ return topic;
+ }
+
+ private static void throwIfTopicNameIsNull(String topicName) {
+ if (topicName == null) {
+ throw new IllegalArgumentException("TopicName cannot be
null.");
+ }
+ }
+ }
+
private final TxnOffsetCommitResponseData data;
public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
index bd6f98ed339..f3c0318de3c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java
@@ -17,15 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
+import
org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.MessageUtil;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
@@ -44,16 +46,15 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
public void testParse() {
TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData()
.setThrottleTimeMs(throttleTimeMs)
- .setTopics(Arrays.asList(
- new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setPartitions(
- Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic().setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionOne)
.setErrorCode(errorOne.code()))),
- new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setPartitions(
- Collections.singletonList(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
- .setPartitionIndex(partitionTwo)
- .setErrorCode(errorTwo.code())))
- ));
+ new TxnOffsetCommitResponseTopic().setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorTwo.code())))));
for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
TxnOffsetCommitResponse response = TxnOffsetCommitResponse.parse(
@@ -64,4 +65,138 @@ public class TxnOffsetCommitResponseTest extends
OffsetCommitResponseTest {
}
}
+ @Test
+ public void testBuilderAddPartition() {
+ TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
+ builder.addPartition(topicOne, partitionOne, errorOne);
+ builder.addPartition(topicOne, partitionTwo, errorTwo);
+ builder.addPartition(topicTwo, partitionOne, errorOne);
+
+ TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(errorOne.code()),
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorTwo.code()))),
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicTwo)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(errorOne.code())))));
+
+ assertEquals(expected, builder.build().data());
+ }
+
+ @Test
+ public void testBuilderAddPartitions() {
+ TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
+ builder.addPartitions(topicOne, List.of(partitionOne, partitionTwo), p
-> p, errorOne);
+
+ TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(errorOne.code()),
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorOne.code())))));
+
+ assertEquals(expected, builder.build().data());
+ }
+
+ @Test
+ public void testBuilderMergeIntoEmpty() {
+ TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(errorOne.code())))));
+
+ TxnOffsetCommitResponse response = TxnOffsetCommitResponse.newBuilder()
+ .merge(newData)
+ .build();
+
+ assertEquals(newData, response.data());
+ }
+
+ @Test
+ public void testBuilderMergeAddsNewTopic() {
+ TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
+ builder.addPartition(topicOne, partitionOne, errorOne);
+
+ TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicTwo)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorTwo.code())))));
+
+ TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(errorOne.code()))),
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicTwo)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorTwo.code())))));
+
+ assertEquals(expected, builder.merge(newData).build().data());
+ }
+
+ @Test
+ public void testBuilderMergeAppendsToExistingTopic() {
+ TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
+ builder.addPartition(topicOne, partitionOne, errorOne);
+
+ TxnOffsetCommitResponseData newData = new TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorTwo.code())))));
+
+ TxnOffsetCommitResponseData expected = new
TxnOffsetCommitResponseData()
+ .setTopics(List.of(
+ new TxnOffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(List.of(
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionOne)
+ .setErrorCode(errorOne.code()),
+ new TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(partitionTwo)
+ .setErrorCode(errorTwo.code())))));
+
+ assertEquals(expected, builder.merge(newData).build().data());
+ }
+
+ @Test
+ public void testTopicNameBuilderRejectsNullTopicName() {
+ TxnOffsetCommitResponse.Builder builder =
TxnOffsetCommitResponse.newBuilder();
+ assertThrows(IllegalArgumentException.class,
+ () -> builder.addPartition(null, partitionOne, errorOne));
+ }
+
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e923563ad0..855fa1faaaf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2057,7 +2057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
txnOffsetCommitRequest.data.topics.asScala
)(_.name)
- val responseBuilder = new TxnOffsetCommitResponse.Builder()
+ val responseBuilder = TxnOffsetCommitResponse.newBuilder()
val authorizedTopicCommittedOffsets = new
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
txnOffsetCommitRequest.data.topics.forEach { topic =>
if (!authorizedTopics.contains(topic.name)) {