This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new faa8b4870f4 KAFKA-20533: Correcting error response for topic deletion 
during share fetch (#22170)
faa8b4870f4 is described below

commit faa8b4870f4355278a59bee23bd9f25a4e82b41c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Apr 28 22:13:49 2026 +0100

    KAFKA-20533: Correcting error response for topic deletion during share 
fetch (#22170)
    
    The PR fixes a suppressed NPE when topic name cannot be resolved during
    share fetch. The metrics update triggers NPE when topic name is null and
    the error response gets mapped to UNKNOWN_SERVER_ERROR. Correcting the
    behaviour to return the correct error code per partition response.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 11 +++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 47 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 855fa1faaaf..c405b35705f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4174,10 +4174,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       // record the bytes out metrics only when the response is being sent.
       response.data.responses.forEach { topicResponse =>
         topicResponse.partitions.forEach { data =>
-          // If the topic name was not known, we will have no bytes out.
-          if (topicResponse.topicId != null) {
-            val tp = new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicIdNames.get(topicResponse.topicId), data.partitionIndex))
-            brokerTopicStats.updateBytesOut(tp.topic, false, false, 
ShareFetchResponse.recordsSize(data))
+          // If the topic name was not known, we will have no bytes out. This 
can happen if the topic
+          // was deleted and the fetch request was received, or if the topic 
id in the request was invalid.
+          // In both cases, the error code for the partition will be set 
accordingly, and we won't have
+          // a topic name to record metrics with.
+          val topicName = topicIdNames.get(topicResponse.topicId)
+          if (topicName != null) {
+            brokerTopicStats.updateBytesOut(topicName, false, false, 
ShareFetchResponse.recordsSize(data))
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9331650e1a9..9956bf1203d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -5189,6 +5189,53 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.UNKNOWN_SERVER_ERROR.code, responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestTopicDeletedDuringFetch(): Unit = {
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    // Do NOT add the topic to metadata cache - simulating a deleted topic.
+    // topicIdNames will not contain this topic's mapping.
+    val memberId: String = Uuid.randomUuid().toString
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any(), any())).thenReturn(
+      // Send the topic name that corresponds to the context response 
considering session existed.
+      // This is to simulate the scenario where the topic gets deleted after 
the context is created
+      // and the subsequent fetch is received.
+      new ShareSessionContext(0, util.List.of(
+        new TopicIdPartition(topicId, partitionIndex, "foo")
+      ))
+    )
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId).
+      setShareSessionEpoch(0).
+      setTopics(new 
ShareFetchRequestData.FetchTopicCollection(util.List.of(new 
ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(new 
ShareFetchRequestData.FetchPartitionCollection(util.List.of(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)).iterator))).iterator))
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    val topicResponses = responseData.responses()
+    assertEquals(1, topicResponses.size())
+    val topicResponse = topicResponses.stream.findFirst.get
+    assertEquals(topicId, topicResponse.topicId)
+    assertEquals(1, topicResponse.partitions.size())
+    assertEquals(partitionIndex, 
topicResponse.partitions.get(0).partitionIndex)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, 
topicResponse.partitions.get(0).errorCode)
+  }
+
   @Test
   def testHandleShareFetchRequestErrorInReadingPartition(): Unit = {
     val topicName = "foo"

Reply via email to