This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c717e6e174f MINOR: Clean up OffsetCommit request/response validation 
tests in KafkaApisTest (#22250)
c717e6e174f is described below

commit c717e6e174ff4f552548e7cf6ffd39daae232ee3
Author: David Jacot <[email protected]>
AuthorDate: Mon May 11 09:23:47 2026 +0200

    MINOR: Clean up OffsetCommit request/response validation tests in 
KafkaApisTest (#22250)
    
    - Consolidate
    `testHandleOffsetCommitRequestTopicsAndPartitionsValidation`
      and
    `testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds`
      into a single `@ParameterizedTest @ApiKeyVersionsSource(apiKey =
    ApiKeys.OFFSET_COMMIT)`
      method that covers every API version, mirroring the shape of
      `testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation`.
    - Drop the unnecessary version gating on `topicId`/`name` in the
      source request data of `testHandleOffsetCommitRequest` and
      `testHandleOffsetCommitRequestFutureFailed`. Both fields can be set
      unconditionally; the wire schema filters at serialization time and
      the builder accepts requests that have both fields populated.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 194 +++------------------
 1 file changed, 28 insertions(+), 166 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index bcb6edb92a5..3dda1e6e921 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1052,8 +1052,8 @@ class KafkaApisTest extends Logging {
       .setMemberId("member")
       .setTopics(util.List.of(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
-          .setName(if (version < 10) topicName else "")
+          .setTopicId(topicId)
+          .setName(topicName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
@@ -1113,8 +1113,8 @@ class KafkaApisTest extends Logging {
       .setMemberId("member")
       .setTopics(util.List.of(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setTopicId(if (version >= 10) topicId else Uuid.ZERO_UUID)
-          .setName(if (version < 10) topicName else "")
+          .setTopicId(topicId)
+          .setName(topicName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
@@ -1162,13 +1162,15 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedOffsetCommitResponse, response.data)
   }
 
-  @Test
-  def 
testHandleOffsetCommitRequestTopicsAndPartitionsValidationWithTopicIds(): Unit 
= {
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(version: 
Short): Unit = {
     val fooId = Uuid.randomUuid()
     val barId = Uuid.randomUuid()
     val zarId = Uuid.randomUuid()
     val fooName = "foo"
     val barName = "bar"
+    val zarName = "zar"
     addTopicToMetadataCache(fooName, topicId = fooId, numPartitions = 2)
     addTopicToMetadataCache(barName, topicId = barId, numPartitions = 2)
 
@@ -1179,6 +1181,7 @@ class KafkaApisTest extends Logging {
         // foo exists but only has 2 partitions.
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
           .setTopicId(fooId)
+          .setName(fooName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
@@ -1192,6 +1195,7 @@ class KafkaApisTest extends Logging {
         // bar exists.
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
           .setTopicId(barId)
+          .setName(barName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
@@ -1202,6 +1206,7 @@ class KafkaApisTest extends Logging {
         // zar does not exist.
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
           .setTopicId(zarId)
+          .setName(zarName)
           .setPartitions(util.List.of(
             new OffsetCommitRequestData.OffsetCommitRequestPartition()
               .setPartitionIndex(0)
@@ -1210,7 +1215,9 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setCommittedOffset(70)))))
 
-    val requestChannelRequest = 
buildRequest(OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build())
+    val requestChannelRequest = buildRequest(
+      
OffsetCommitRequest.Builder.forTopicIdsOrNames(offsetCommitRequest).build(version)
+    )
 
     // This is the request expected by the group coordinator.
     val expectedOffsetCommitRequest = new OffsetCommitRequestData()
@@ -1255,159 +1262,8 @@ class KafkaApisTest extends Logging {
     val offsetCommitResponse = new OffsetCommitResponseData()
       .setTopics(util.List.of(
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setTopicId(fooId)
-          .setName(fooName)
-          .setPartitions(util.List.of(
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(0)
-              .setErrorCode(Errors.NONE.code),
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(1)
-              .setErrorCode(Errors.NONE.code))),
-        new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setTopicId(barId)
-          .setName(barName)
-          .setPartitions(util.List.of(
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(0)
-              .setErrorCode(Errors.NONE.code),
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(1)
-              .setErrorCode(Errors.NONE.code)))))
-
-    val expectedOffsetCommitResponse = new OffsetCommitResponseData()
-      .setTopics(util.List.of(
-        new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setTopicId(fooId)
-          .setPartitions(util.List.of(
-            // foo-2 is first because partitions failing the validation
-            // are put in the response first.
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(2)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(0)
-              .setErrorCode(Errors.NONE.code),
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(1)
-              .setErrorCode(Errors.NONE.code))),
-        // zar is before bar because topics failing the validation are
-        // put in the response first.
-        new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setTopicId(zarId)
-          .setPartitions(util.List.of(
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(0)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(1)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))),
-        new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setTopicId(barId)
-          .setPartitions(util.List.of(
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(0)
-              .setErrorCode(Errors.NONE.code),
-            new OffsetCommitResponseData.OffsetCommitResponsePartition()
-              .setPartitionIndex(1)
-              .setErrorCode(Errors.NONE.code)))))
-
-    future.complete(offsetCommitResponse)
-    val response = 
verifyNoThrottling[OffsetCommitResponse](requestChannelRequest)
-    assertEquals(expectedOffsetCommitResponse, response.data)
-  }
-
-  @Test
-  def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {
-    val fooId = Uuid.randomUuid()
-    val barId = Uuid.randomUuid()
-    addTopicToMetadataCache("foo", numPartitions = 2, topicId = fooId)
-    addTopicToMetadataCache("bar", numPartitions = 2, topicId = barId)
-
-    val offsetCommitRequest = new OffsetCommitRequestData()
-      .setGroupId("group")
-      .setMemberId("member")
-      .setTopics(util.List.of(
-        // foo exists but only has 2 partitions.
-        new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("foo")
-          .setPartitions(util.List.of(
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(0)
-              .setCommittedOffset(10),
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(1)
-              .setCommittedOffset(20),
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(2)
-              .setCommittedOffset(30))),
-        // bar exists.
-        new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("bar")
-          .setPartitions(util.List.of(
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(0)
-              .setCommittedOffset(40),
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(1)
-              .setCommittedOffset(50))),
-        // zar does not exist.
-        new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("zar")
-          .setPartitions(util.List.of(
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(0)
-              .setCommittedOffset(60),
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(1)
-              .setCommittedOffset(70)))))
-
-    val requestChannelRequest = 
buildRequest(OffsetCommitRequest.Builder.forTopicNames(offsetCommitRequest).build())
-
-    // This is the request expected by the group coordinator.
-    val expectedOffsetCommitRequest = new OffsetCommitRequestData()
-      .setGroupId("group")
-      .setMemberId("member")
-      .setTopics(util.List.of(
-        // foo exists but only has 2 partitions.
-        new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("foo")
-          .setTopicId(fooId)
-          .setPartitions(util.List.of(
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(0)
-              .setCommittedOffset(10),
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(1)
-              .setCommittedOffset(20))),
-        new OffsetCommitRequestData.OffsetCommitRequestTopic()
-          .setName("bar")
-          .setTopicId(barId)
-          .setPartitions(util.List.of(
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(0)
-              .setCommittedOffset(40),
-            new OffsetCommitRequestData.OffsetCommitRequestPartition()
-              .setPartitionIndex(1)
-              .setCommittedOffset(50)))))
-
-    val future = new CompletableFuture[OffsetCommitResponseData]()
-    when(groupCoordinator.commitOffsets(
-      requestChannelRequest.context,
-      expectedOffsetCommitRequest,
-      RequestLocal.noCaching.bufferSupplier
-    )).thenReturn(future)
-    kafkaApis = createKafkaApis()
-    kafkaApis.handle(
-      requestChannelRequest,
-      RequestLocal.noCaching
-    )
-
-    // This is the response returned by the group coordinator.
-    val offsetCommitResponse = new OffsetCommitResponseData()
-      .setTopics(util.List.of(
-        new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) fooName else "")
           .setPartitions(util.List.of(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1416,7 +1272,8 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setErrorCode(Errors.NONE.code))),
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("bar")
+          .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) barName else "")
           .setPartitions(util.List.of(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(0)
@@ -1425,10 +1282,13 @@ class KafkaApisTest extends Logging {
               .setPartitionIndex(1)
               .setErrorCode(Errors.NONE.code)))))
 
+    // For v10+, the unknown topic returns UNKNOWN_TOPIC_ID; for v0-9 it 
returns
+    // UNKNOWN_TOPIC_OR_PARTITION.
     val expectedOffsetCommitResponse = new OffsetCommitResponseData()
       .setTopics(util.List.of(
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("foo")
+          .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) fooName else "")
           .setPartitions(util.List.of(
             // foo-2 is first because partitions failing the validation
             // are put in the response first.
@@ -1444,16 +1304,18 @@ class KafkaApisTest extends Logging {
         // zar is before bar because topics failing the validation are
         // put in the response first.
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("zar")
+          .setTopicId(if (version >= 10) zarId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) zarName else "")
           .setPartitions(util.List.of(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(0)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
+              .setErrorCode(if (version >= 10) Errors.UNKNOWN_TOPIC_ID.code 
else Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(1)
-              .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
+              .setErrorCode(if (version >= 10) Errors.UNKNOWN_TOPIC_ID.code 
else Errors.UNKNOWN_TOPIC_OR_PARTITION.code))),
         new OffsetCommitResponseData.OffsetCommitResponseTopic()
-          .setName("bar")
+          .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
+          .setName(if (version < 10) barName else "")
           .setPartitions(util.List.of(
             new OffsetCommitResponseData.OffsetCommitResponsePartition()
               .setPartitionIndex(0)

Reply via email to