This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c4823daa00a KAFKA-20165 [2]: Integration tests to validate different
OffsetFetch response behaviour on topic deleted (#21509)
c4823daa00a is described below
commit c4823daa00a7af2f4e964ea816d005078866efcc
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Apr 14 12:48:23 2026 -0400
KAFKA-20165 [2]: Integration tests to validate different OffsetFetch
response behaviour on topic deleted (#21509)
Adding integration tests to show/validate the expected responses when
fetching offsets for a deleted topic, which changes depending on the
version (topic names vs topic Ids used).
Reviewers: David Jacot <[email protected]>, Lan Ding
<[email protected]>, David Jacot <[email protected]>
---
.../unit/kafka/server/OffsetFetchRequestTest.scala | 177 +++++++++++++++++++++
1 file changed, 177 insertions(+)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index ff228f245cb..5144ac5fb42 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -735,6 +735,183 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
)
}
+ /**
+ * Helper to set up two topics (foo with 3 partitions, bar with 2
partitions),
+ * join a consumer group, and commit offsets to both topics.
+ */
+ private def setupTopicsJoinAndCommit(): (Uuid, Uuid, String, Int) = {
+ createOffsetsTopic()
+
+ // Create two topics.
+ val fooTopicId = createTopic(topic = "foo", numPartitions = 3)
+ val barTopicId = createTopic(topic = "bar", numPartitions = 2)
+
+ // Join the consumer group.
+ val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol =
true)
+
+ // Commit offsets for both topics.
+ for (partitionId <- 0 to 2) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "foo",
+ topicId = fooTopicId,
+ partition = partitionId,
+ offset = 100L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+ for (partitionId <- 0 to 1) {
+ commitOffset(
+ groupId = "grp",
+ memberId = memberId,
+ memberEpoch = memberEpoch,
+ topic = "bar",
+ topicId = barTopicId,
+ partition = partitionId,
+ offset = 200L + partitionId,
+ expectedError = Errors.NONE,
+ version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+ )
+ }
+
+ (fooTopicId, barTopicId, memberId, memberEpoch)
+ }
+
+ // Validate responses to OffsetFetch when the topic is deleted and topic IDs
used (version 10+)
+ // The expectation is that the response contains the deleted topic,
+ // with UNKNOWN_TOPIC_ID error at the partition level.
+ @ClusterTest
+ def testFetchOffsetWithDeletedTopicUsingTopicIds(): Unit = {
+ val (fooTopicId, barTopicId, memberId, memberEpoch) =
setupTopicsJoinAndCommit()
+
+ // Delete the bar topic.
+ deleteTopic("bar")
+
+ val expectedResponse = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setTopicId(fooTopicId)
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(2)
+ .setCommittedOffset(102L)
+ ).asJava),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setTopicId(barTopicId)
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(-1L)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(-1L)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+ ).asJava)
+ ).asJava)
+
+ // Wait for the deleted topic (bar) to return UNKNOWN_TOPIC_ID error for
its partitions.
+ // The undeleted topic (foo) should still return its committed offsets.
+ for (version <- 10 to
ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
+ TestUtils.waitUntilTrue(
+ () => {
+ expectedResponse == fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp")
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setTopicId(fooTopicId)
+ .setPartitionIndexes(List[Integer](0, 1, 2).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setTopicId(barTopicId)
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava),
+ requireStable = false,
+ version = version.toShort
+ )
+ },
+ msg = s"Expected UNKNOWN_TOPIC_ID error for deleted topic partitions
on version $version"
+ )
+ }
+ }
+
+ // Validate responses to OffsetFetch when the topic is deleted and topic
names used (versions < 10)
+ // The expectation is that the response contains the deleted topic,
+ // without any error, and -1 as committed offset.
+ @ClusterTest
+ def testFetchOffsetWithDeletedTopicUsingTopicNames(): Unit = {
+ val (_, _, memberId, memberEpoch) = setupTopicsJoinAndCommit()
+
+ // Delete the bar topic.
+ deleteTopic("bar")
+
+ val expectedResponse = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("grp")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("bar")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(-1L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(-1L)
+ ).asJava),
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(101L),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(2)
+ .setCommittedOffset(102L)
+ ).asJava)
+ ).asJava)
+
+ // Wait for the deleted topic (bar) to return -1 offset with NONE error
for its partitions.
+ // The undeleted topic (foo) should still return its committed offsets.
+ for (version <- 1 to 9) {
+ TestUtils.waitUntilTrue(
+ () => {
+ expectedResponse == fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId("grp")
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(List[Integer](0, 1, 2).asJava),
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("bar")
+ .setPartitionIndexes(List[Integer](0, 1).asJava)
+ ).asJava),
+ requireStable = false,
+ version = version.toShort
+ )
+ },
+ msg = s"Expected -1 offset for deleted topic partitions on version
$version"
+ )
+ }
+ }
+
@ClusterTest
def testGroupErrors(): Unit = {
val topicId = createTopic(