This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new faa8b4870f4 KAFKA-20533: Correcting error response for topic deletion
during share fetch (#22170)
faa8b4870f4 is described below
commit faa8b4870f4355278a59bee23bd9f25a4e82b41c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Apr 28 22:13:49 2026 +0100
KAFKA-20533: Correcting error response for topic deletion during share
fetch (#22170)
The PR fixes a suppressed NPE when topic name cannot be resolved during
share fetch. The metrics update triggers NPE when topic name is null and
the error response gets mapped to UNKNOWN_SERVER_ERROR. Correcting the
behaviour to return the correct error code per partition response.
Reviewers: Andrew Schofield <[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 11 +++--
.../scala/unit/kafka/server/KafkaApisTest.scala | 47 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 855fa1faaaf..c405b35705f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4174,10 +4174,13 @@ class KafkaApis(val requestChannel: RequestChannel,
// record the bytes out metrics only when the response is being sent.
response.data.responses.forEach { topicResponse =>
topicResponse.partitions.forEach { data =>
- // If the topic name was not known, we will have no bytes out.
- if (topicResponse.topicId != null) {
- val tp = new TopicIdPartition(topicResponse.topicId, new
TopicPartition(topicIdNames.get(topicResponse.topicId), data.partitionIndex))
- brokerTopicStats.updateBytesOut(tp.topic, false, false,
ShareFetchResponse.recordsSize(data))
+ // If the topic name was not known, we will have no bytes out. This
can happen if the topic
+ // was deleted and the fetch request was received, or if the topic
id in the request was invalid.
+ // In both cases, the error code for the partition will be set
accordingly, and we won't have
+ // a topic name to record metrics with.
+ val topicName = topicIdNames.get(topicResponse.topicId)
+ if (topicName != null) {
+ brokerTopicStats.updateBytesOut(topicName, false, false,
ShareFetchResponse.recordsSize(data))
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9331650e1a9..9956bf1203d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -5189,6 +5189,53 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
}
+ @Test
+ def testHandleShareFetchRequestTopicDeletedDuringFetch(): Unit = {
+ val topicId = Uuid.randomUuid()
+ val partitionIndex = 0
+ metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+ // Do NOT add the topic to metadata cache - simulating a deleted topic.
+ // topicIdNames will not contain this topic's mapping.
+ val memberId: String = Uuid.randomUuid().toString
+
+ val groupId = "group"
+
+ when(sharePartitionManager.newContext(any(), any(), any(), any(), any(),
any(), any())).thenReturn(
+ // Send the topic name that corresponds to the context response
considering session existed.
+ // This is to simulate the scenario where the topic gets deleted after
the context is created
+ // and the subsequent fetch is received.
+ new ShareSessionContext(0, util.List.of(
+ new TopicIdPartition(topicId, partitionIndex, "foo")
+ ))
+ )
+
+ val shareFetchRequestData = new ShareFetchRequestData().
+ setGroupId(groupId).
+ setMemberId(memberId).
+ setShareSessionEpoch(0).
+ setTopics(new
ShareFetchRequestData.FetchTopicCollection(util.List.of(new
ShareFetchRequestData.FetchTopic().
+ setTopicId(topicId).
+ setPartitions(new
ShareFetchRequestData.FetchPartitionCollection(util.List.of(
+ new ShareFetchRequestData.FetchPartition()
+ .setPartitionIndex(partitionIndex)).iterator))).iterator))
+
+ val shareFetchRequest = new
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+ val request = buildRequest(shareFetchRequest)
+ kafkaApis = createKafkaApis()
+ kafkaApis.handleShareFetchRequest(request)
+ val response = verifyNoThrottling[ShareFetchResponse](request)
+ val responseData = response.data()
+
+ assertEquals(Errors.NONE.code, responseData.errorCode)
+ val topicResponses = responseData.responses()
+ assertEquals(1, topicResponses.size())
+ val topicResponse = topicResponses.stream.findFirst.get
+ assertEquals(topicId, topicResponse.topicId)
+ assertEquals(1, topicResponse.partitions.size())
+ assertEquals(partitionIndex,
topicResponse.partitions.get(0).partitionIndex)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
topicResponse.partitions.get(0).errorCode)
+ }
+
@Test
def testHandleShareFetchRequestErrorInReadingPartition(): Unit = {
val topicName = "foo"