This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c4823daa00a KAFKA-20165 [2]: Integration tests to validate different 
OffsetFetch response behaviour on topic deleted (#21509)
c4823daa00a is described below

commit c4823daa00a7af2f4e964ea816d005078866efcc
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Apr 14 12:48:23 2026 -0400

    KAFKA-20165 [2]: Integration tests to validate different OffsetFetch 
response behaviour on topic deleted (#21509)
    
    Adding integration tests to show/validate the expected responses when
    fetching offsets for a deleted topic, which changes depending on the
    version (topic names vs topic Ids used).
    
    Reviewers: David Jacot <[email protected]>, Lan Ding
     <[email protected]>, David Jacot <[email protected]>
---
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 177 +++++++++++++++++++++
 1 file changed, 177 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index ff228f245cb..5144ac5fb42 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -735,6 +735,183 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
     )
   }
 
+  /**
+   * Helper to set up two topics (foo with 3 partitions, bar with 2 
partitions),
+   * join a consumer group, and commit offsets to both topics.
+   */
+  private def setupTopicsJoinAndCommit(): (Uuid, Uuid, String, Int) = {
+    createOffsetsTopic()
+
+    // Create two topics.
+    val fooTopicId = createTopic(topic = "foo", numPartitions = 3)
+    val barTopicId = createTopic(topic = "bar", numPartitions = 2)
+
+    // Join the consumer group.
+    val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol = 
true)
+
+    // Commit offsets for both topics.
+    for (partitionId <- 0 to 2) {
+      commitOffset(
+        groupId = "grp",
+        memberId = memberId,
+        memberEpoch = memberEpoch,
+        topic = "foo",
+        topicId = fooTopicId,
+        partition = partitionId,
+        offset = 100L + partitionId,
+        expectedError = Errors.NONE,
+        version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+      )
+    }
+    for (partitionId <- 0 to 1) {
+      commitOffset(
+        groupId = "grp",
+        memberId = memberId,
+        memberEpoch = memberEpoch,
+        topic = "bar",
+        topicId = barTopicId,
+        partition = partitionId,
+        offset = 200L + partitionId,
+        expectedError = Errors.NONE,
+        version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+      )
+    }
+
+    (fooTopicId, barTopicId, memberId, memberEpoch)
+  }
+
+  // Validate responses to OffsetFetch when the topic is deleted and topic IDs 
used (version 10+)
+  // The expectation is that the response contains the deleted topic,
+  // with UNKNOWN_TOPIC_ID error at the partition level.
+  @ClusterTest
+  def testFetchOffsetWithDeletedTopicUsingTopicIds(): Unit = {
+    val (fooTopicId, barTopicId, memberId, memberEpoch) = 
setupTopicsJoinAndCommit()
+
+    // Delete the bar topic.
+    deleteTopic("bar")
+
+    val expectedResponse = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("grp")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setTopicId(fooTopicId)
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100L),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(101L),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(2)
+              .setCommittedOffset(102L)
+          ).asJava),
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setTopicId(barTopicId)
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(-1L)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(-1L)
+              .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+          ).asJava)
+      ).asJava)
+
+    // Wait for the deleted topic (bar) to return UNKNOWN_TOPIC_ID error for 
its partitions.
+    // The undeleted topic (foo) should still return its committed offsets.
+    for (version <- 10 to 
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+      TestUtils.waitUntilTrue(
+        () => {
+          expectedResponse == fetchOffsets(
+            group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+              .setGroupId("grp")
+              .setMemberId(memberId)
+              .setMemberEpoch(memberEpoch)
+              .setTopics(List(
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setTopicId(fooTopicId)
+                  .setPartitionIndexes(List[Integer](0, 1, 2).asJava),
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setTopicId(barTopicId)
+                  .setPartitionIndexes(List[Integer](0, 1).asJava)
+              ).asJava),
+            requireStable = false,
+            version = version.toShort
+          )
+        },
+        msg = s"Expected UNKNOWN_TOPIC_ID error for deleted topic partitions 
on version $version"
+      )
+    }
+  }
+
+  // Validate responses to OffsetFetch when the topic is deleted and topic 
names used (versions < 10)
+  // The expectation is that the response contains the deleted topic,
+  // without any error, and -1 as committed offset.
+  @ClusterTest
+  def testFetchOffsetWithDeletedTopicUsingTopicNames(): Unit = {
+    val (_, _, memberId, memberEpoch) = setupTopicsJoinAndCommit()
+
+    // Delete the bar topic.
+    deleteTopic("bar")
+
+    val expectedResponse = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("grp")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("bar")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(-1L),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(-1L)
+          ).asJava),
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100L),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(101L),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(2)
+              .setCommittedOffset(102L)
+          ).asJava)
+      ).asJava)
+
+    // Wait for the deleted topic (bar) to return -1 offset with NONE error 
for its partitions.
+    // The undeleted topic (foo) should still return its committed offsets.
+    for (version <- 1 to 9) {
+      TestUtils.waitUntilTrue(
+        () => {
+          expectedResponse == fetchOffsets(
+            group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+              .setGroupId("grp")
+              .setMemberId(memberId)
+              .setMemberEpoch(memberEpoch)
+              .setTopics(List(
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setName("foo")
+                  .setPartitionIndexes(List[Integer](0, 1, 2).asJava),
+                new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                  .setName("bar")
+                  .setPartitionIndexes(List[Integer](0, 1).asJava)
+              ).asJava),
+            requireStable = false,
+            version = version.toShort
+          )
+        },
+        msg = s"Expected -1 offset for deleted topic partitions on version 
$version"
+      )
+    }
+  }
+
   @ClusterTest
   def testGroupErrors(): Unit = {
     val topicId = createTopic(

Reply via email to